add snapshot producer

This commit is contained in:
seydx
2025-01-24 22:35:04 +01:00
parent 2c5f1e0417
commit 0651a09a3c
3 changed files with 376 additions and 283 deletions
+8
View File
@@ -84,10 +84,18 @@ func apiRing(w http.ResponseWriter, r *http.Request) {
var items []*api.Source var items []*api.Source
for _, camera := range devices.AllCameras { for _, camera := range devices.AllCameras {
cleanQuery.Set("device_id", camera.DeviceID) cleanQuery.Set("device_id", camera.DeviceID)
// Stream source
items = append(items, &api.Source{ items = append(items, &api.Source{
Name: camera.Description, Name: camera.Description,
URL: "ring:?" + cleanQuery.Encode(), URL: "ring:?" + cleanQuery.Encode(),
}) })
// Snapshot source
items = append(items, &api.Source{
Name: camera.Description + " Snapshot",
URL: "ring:?" + cleanQuery.Encode() + "&snapshot",
})
} }
api.ResponseSources(w, items) api.ResponseSources(w, items)
+304 -283
View File
@@ -18,7 +18,7 @@ import (
type Client struct { type Client struct {
api *RingRestClient api *RingRestClient
ws *websocket.Conn ws *websocket.Conn
prod *webrtc.Conn prod core.Producer
camera *CameraData camera *CameraData
dialogID string dialogID string
sessionID string sessionID string
@@ -101,322 +101,337 @@ const (
) )
func Dial(rawURL string) (*Client, error) { func Dial(rawURL string) (*Client, error) {
// 1. Create Ring Rest API client // 1. Parse URL and validate basic params
u, err := url.Parse(rawURL) u, err := url.Parse(rawURL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
query := u.Query() query := u.Query()
encodedToken := query.Get("refresh_token") encodedToken := query.Get("refresh_token")
deviceID := query.Get("device_id") deviceID := query.Get("device_id")
_, isSnapshot := query["snapshot"]
if encodedToken == "" || deviceID == "" { if encodedToken == "" || deviceID == "" {
return nil, errors.New("ring: wrong query") return nil, errors.New("ring: wrong query")
} }
// URL-decode the refresh token // URL-decode the refresh token
refreshToken, err := url.QueryUnescape(encodedToken) refreshToken, err := url.QueryUnescape(encodedToken)
if err != nil { if err != nil {
return nil, fmt.Errorf("ring: invalid refresh token encoding: %w", err) return nil, fmt.Errorf("ring: invalid refresh token encoding: %w", err)
} }
// Initialize Ring API client // Initialize Ring API client
ringAPI, err := NewRingRestClient(RefreshTokenAuth{RefreshToken: refreshToken}, nil) ringAPI, err := NewRingRestClient(RefreshTokenAuth{RefreshToken: refreshToken}, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Get camera details // Get camera details
devices, err := ringAPI.FetchRingDevices() devices, err := ringAPI.FetchRingDevices()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var camera *CameraData var camera *CameraData
for _, cam := range devices.AllCameras { for _, cam := range devices.AllCameras {
if fmt.Sprint(cam.DeviceID) == deviceID { if fmt.Sprint(cam.DeviceID) == deviceID {
camera = &cam camera = &cam
break break
} }
} }
if camera == nil { if camera == nil {
return nil, errors.New("ring: camera not found") return nil, errors.New("ring: camera not found")
} }
// 2. Connect to signaling server // Create base client
ticket, err := ringAPI.GetSocketTicket() client := &Client{
if err != nil { api: ringAPI,
return nil, err camera: camera,
} dialogID: uuid.NewString(),
done: make(chan struct{}),
}
// Create WebSocket connection // Check if snapshot request
wsURL := fmt.Sprintf("wss://api.prod.signalling.ring.devices.a2z.com/ws?api_version=4.0&auth_type=ring_solutions&client_id=ring_site-%s&token=%s", if isSnapshot {
uuid.NewString(), url.QueryEscape(ticket.Ticket)) client.prod = NewSnapshotProducer(ringAPI, camera)
return client, nil
}
ws, _, err := websocket.DefaultDialer.Dial(wsURL, map[string][]string{ // If not snapshot, continue with WebRTC setup
"User-Agent": {"android:com.ringapp"}, ticket, err := ringAPI.GetSocketTicket()
}) if err != nil {
if err != nil { return nil, err
return nil, err }
}
// 3. Create Peer Connection // Create WebSocket connection
conf := pion.Configuration{ wsURL := fmt.Sprintf("wss://api.prod.signalling.ring.devices.a2z.com/ws?api_version=4.0&auth_type=ring_solutions&client_id=ring_site-%s&token=%s",
ICEServers: []pion.ICEServer{ uuid.NewString(), url.QueryEscape(ticket.Ticket))
{URLs: []string{
"stun:stun.kinesisvideo.us-east-1.amazonaws.com:443", client.ws, _, err = websocket.DefaultDialer.Dial(wsURL, map[string][]string{
"stun:stun.kinesisvideo.us-east-2.amazonaws.com:443", "User-Agent": {"android:com.ringapp"},
"stun:stun.kinesisvideo.us-west-2.amazonaws.com:443", })
"stun:stun.l.google.com:19302", if err != nil {
"stun:stun1.l.google.com:19302", return nil, err
"stun:stun2.l.google.com:19302", }
"stun:stun3.l.google.com:19302",
"stun:stun4.l.google.com:19302", // Create Peer Connection
}}, conf := pion.Configuration{
}, ICEServers: []pion.ICEServer{
ICETransportPolicy: pion.ICETransportPolicyAll, {URLs: []string{
"stun:stun.kinesisvideo.us-east-1.amazonaws.com:443",
"stun:stun.kinesisvideo.us-east-2.amazonaws.com:443",
"stun:stun.kinesisvideo.us-west-2.amazonaws.com:443",
"stun:stun.l.google.com:19302",
"stun:stun1.l.google.com:19302",
"stun:stun2.l.google.com:19302",
"stun:stun3.l.google.com:19302",
"stun:stun4.l.google.com:19302",
}},
},
ICETransportPolicy: pion.ICETransportPolicyAll,
BundlePolicy: pion.BundlePolicyBalanced, BundlePolicy: pion.BundlePolicyBalanced,
} }
api, err := webrtc.NewAPI() api, err := webrtc.NewAPI()
if err != nil { if err != nil {
ws.Close() client.ws.Close()
return nil, err return nil, err
} }
pc, err := api.NewPeerConnection(conf) pc, err := api.NewPeerConnection(conf)
if err != nil { if err != nil {
ws.Close() client.ws.Close()
return nil, err return nil, err
} }
// protect from sending ICE candidate before Offer // protect from sending ICE candidate before Offer
var sendOffer core.Waiter var sendOffer core.Waiter
// protect from blocking on errors // protect from blocking on errors
defer sendOffer.Done(nil) defer sendOffer.Done(nil)
// waiter will wait PC error or WS error or nil (connection OK) // waiter will wait PC error or WS error or nil (connection OK)
var connState core.Waiter var connState core.Waiter
prod := webrtc.NewConn(pc) prod := webrtc.NewConn(pc)
prod.FormatName = "ring/webrtc" prod.FormatName = "ring/webrtc"
prod.Mode = core.ModeActiveProducer prod.Mode = core.ModeActiveProducer
prod.Protocol = "ws" prod.Protocol = "ws"
prod.URL = rawURL prod.URL = rawURL
client := &Client{ client.prod = prod
api: ringAPI,
ws: ws,
prod: prod,
camera: camera,
dialogID: uuid.NewString(),
done: make(chan struct{}),
}
prod.Listen(func(msg any) { prod.Listen(func(msg any) {
switch msg := msg.(type) { switch msg := msg.(type) {
case *pion.ICECandidate: case *pion.ICECandidate:
_ = sendOffer.Wait() _ = sendOffer.Wait()
iceCandidate := msg.ToJSON() iceCandidate := msg.ToJSON()
// skip empty ICE candidates // skip empty ICE candidates
if iceCandidate.Candidate == "" { if iceCandidate.Candidate == "" {
return return
} }
icePayload := map[string]interface{}{ icePayload := map[string]interface{}{
"ice": iceCandidate.Candidate, "ice": iceCandidate.Candidate,
"mlineindex": iceCandidate.SDPMLineIndex, "mlineindex": iceCandidate.SDPMLineIndex,
} }
if err = client.sendSessionMessage("ice", icePayload); err != nil { if err = client.sendSessionMessage("ice", icePayload); err != nil {
connState.Done(err) connState.Done(err)
return return
} }
case pion.PeerConnectionState: case pion.PeerConnectionState:
switch msg { switch msg {
case pion.PeerConnectionStateConnecting: case pion.PeerConnectionStateConnecting:
case pion.PeerConnectionStateConnected: case pion.PeerConnectionStateConnected:
connState.Done(nil) connState.Done(nil)
default: default:
connState.Done(errors.New("ring: " + msg.String())) connState.Done(errors.New("ring: " + msg.String()))
} }
} }
}) })
// Setup media configuration // Setup media configuration
medias := []*core.Media{ medias := []*core.Media{
{ {
Kind: core.KindAudio, Kind: core.KindAudio,
Direction: core.DirectionSendRecv, Direction: core.DirectionSendRecv,
Codecs: []*core.Codec{ Codecs: []*core.Codec{
{ {
Name: "opus", Name: "opus",
ClockRate: 48000, ClockRate: 48000,
Channels: 2, Channels: 2,
}, },
}, },
}, },
{ {
Kind: core.KindVideo, Kind: core.KindVideo,
Direction: core.DirectionRecvonly, Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{ Codecs: []*core.Codec{
{ {
Name: "H264", Name: "H264",
ClockRate: 90000, ClockRate: 90000,
}, },
}, },
}, },
} }
// 4. Create offer // Create offer
offer, err := prod.CreateOffer(medias) offer, err := prod.CreateOffer(medias)
if err != nil { if err != nil {
client.Stop() client.Stop()
return nil, err return nil, err
} }
// 5. Send offer // Send offer
offerPayload := map[string]interface{}{ offerPayload := map[string]interface{}{
"stream_options": map[string]bool{ "stream_options": map[string]bool{
"audio_enabled": true, "audio_enabled": true,
"video_enabled": true, "video_enabled": true,
}, },
"sdp": offer, "sdp": offer,
} }
if err = client.sendSessionMessage("live_view", offerPayload); err != nil { if err = client.sendSessionMessage("live_view", offerPayload); err != nil {
client.Stop() client.Stop()
return nil, err return nil, err
} }
sendOffer.Done(nil) sendOffer.Done(nil)
// Ring expects a ping message every 5 seconds // Ring expects a ping message every 5 seconds
go func() { go client.startPingLoop(pc)
ticker := time.NewTicker(5 * time.Second) go client.startMessageLoop(&connState)
defer ticker.Stop()
for { if err = connState.Wait(); err != nil {
select { return nil, err
case <-client.done: }
return
case <-ticker.C:
if pc.ConnectionState() == pion.PeerConnectionStateConnected {
if err := client.sendSessionMessage("ping", nil); err != nil {
return
}
}
}
}
}()
go func() {
var err error
// will be closed when conn will be closed return client, nil
defer func() { }
connState.Done(err)
}()
for { func (c *Client) startPingLoop(pc *pion.PeerConnection) {
select { ticker := time.NewTicker(5 * time.Second)
case <-client.done: defer ticker.Stop()
return
default:
var res BaseMessage
if err = ws.ReadJSON(&res); err != nil {
select {
case <-client.done:
return
default:
}
client.Stop() for {
return select {
} case <-c.done:
return
case <-ticker.C:
if pc.ConnectionState() == pion.PeerConnectionStateConnected {
if err := c.sendSessionMessage("ping", nil); err != nil {
return
}
}
}
}
}
// check if "doorbot_id" is present func (c *Client) startMessageLoop(connState *core.Waiter) {
if _, ok := res.Body["doorbot_id"]; !ok { var err error
continue
}
// check if the message is from the correct doorbot
doorbotID := res.Body["doorbot_id"].(float64)
if doorbotID != float64(client.camera.ID) {
continue
}
// check if the message is from the correct session // will be closed when conn will be closed
if res.Method == "session_created" || res.Method == "session_started" { defer func() {
if _, ok := res.Body["session_id"]; ok && client.sessionID == "" { connState.Done(err)
client.sessionID = res.Body["session_id"].(string) }()
}
}
if _, ok := res.Body["session_id"]; ok { for {
if res.Body["session_id"].(string) != client.sessionID { select {
continue case <-c.done:
} return
} default:
var res BaseMessage
if err = c.ws.ReadJSON(&res); err != nil {
select {
case <-c.done:
return
default:
}
rawMsg, _ := json.Marshal(res) c.Stop()
return
}
switch res.Method { // check if "doorbot_id" is present
case "sdp": if _, ok := res.Body["doorbot_id"]; !ok {
// 6. Get answer continue
var msg AnswerMessage }
if err = json.Unmarshal(rawMsg, &msg); err != nil {
client.Stop() // check if the message is from the correct doorbot
return doorbotID := res.Body["doorbot_id"].(float64)
} if doorbotID != float64(c.camera.ID) {
if err = prod.SetAnswer(msg.Body.SDP); err != nil { continue
client.Stop() }
return
}
if err = client.activateSession(); err != nil {
client.Stop()
return
}
case "ice":
// 7. Continue to receiving candidates
var msg IceCandidateMessage
if err = json.Unmarshal(rawMsg, &msg); err != nil {
break
}
// check for empty ICE candidate // check if the message is from the correct session
if msg.Body.Ice == "" { if res.Method == "session_created" || res.Method == "session_started" {
break if _, ok := res.Body["session_id"]; ok && c.sessionID == "" {
} c.sessionID = res.Body["session_id"].(string)
}
}
if err = prod.AddCandidate(msg.Body.Ice); err != nil { if _, ok := res.Body["session_id"]; ok {
client.Stop() if res.Body["session_id"].(string) != c.sessionID {
return continue
} }
}
case "close": rawMsg, _ := json.Marshal(res)
client.Stop()
return
case "pong": switch res.Method {
// Ignore case "sdp":
continue if prod, ok := c.prod.(*webrtc.Conn); ok {
} // Get answer
} var msg AnswerMessage
} if err = json.Unmarshal(rawMsg, &msg); err != nil {
}() c.Stop()
return
}
if err = prod.SetAnswer(msg.Body.SDP); err != nil {
c.Stop()
return
}
if err = c.activateSession(); err != nil {
c.Stop()
return
}
}
case "ice":
if prod, ok := c.prod.(*webrtc.Conn); ok {
// Continue to receiving candidates
var msg IceCandidateMessage
if err = json.Unmarshal(rawMsg, &msg); err != nil {
break
}
if err = connState.Wait(); err != nil { // check for empty ICE candidate
return nil, err if msg.Body.Ice == "" {
} break
}
return client, nil if err = prod.AddCandidate(msg.Body.Ice); err != nil {
c.Stop()
return
}
}
case "close":
c.Stop()
return
case "pong":
// Ignore
continue
}
}
}
} }
func (c *Client) activateSession() error { func (c *Client) activateSession() error {
@@ -471,16 +486,18 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver,
} }
func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
if media.Kind == core.KindAudio { if webrtcProd, ok := c.prod.(*webrtc.Conn); ok {
// Enable speaker if media.Kind == core.KindAudio {
speakerPayload := map[string]interface{}{ // Enable speaker
"stealth_mode": false, speakerPayload := map[string]interface{}{
} "stealth_mode": false,
}
_ = c.sendSessionMessage("camera_options", speakerPayload)
}
return webrtcProd.AddTrack(media, codec, track)
}
_ = c.sendSessionMessage("camera_options", speakerPayload); return fmt.Errorf("add track not supported for snapshot")
}
return c.prod.AddTrack(media, codec, track)
} }
func (c *Client) Start() error { func (c *Client) Start() error {
@@ -517,5 +534,9 @@ func (c *Client) Stop() error {
} }
func (c *Client) MarshalJSON() ([]byte, error) { func (c *Client) MarshalJSON() ([]byte, error) {
return c.prod.MarshalJSON() if webrtcProd, ok := c.prod.(*webrtc.Conn); ok {
return webrtcProd.MarshalJSON()
}
return nil, errors.New("ring: can't marshal")
} }
+64
View File
@@ -0,0 +1,64 @@
package ring
import (
"fmt"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type SnapshotProducer struct {
core.Connection
client *RingRestClient
camera *CameraData
}
func NewSnapshotProducer(client *RingRestClient, camera *CameraData) *SnapshotProducer {
return &SnapshotProducer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "ring/snapshot",
Protocol: "https",
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
},
},
},
client: client,
camera: camera,
}
}
func (p *SnapshotProducer) Start() error {
// Fetch snapshot
response, err := p.client.Request("GET", fmt.Sprintf("https://app-snaps.ring.com/snapshots/next/%d", int(p.camera.ID)), nil)
if err != nil {
return fmt.Errorf("failed to get snapshot: %w", err)
}
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: response,
}
// Send to all receivers
for _, receiver := range p.Receivers {
receiver.WriteRTP(pkt)
}
return nil
}
func (p *SnapshotProducer) Stop() error {
return p.Connection.Stop()
}