From 0651a09a3c0f19250dcc2ff0845195450b6c8684 Mon Sep 17 00:00:00 2001 From: seydx Date: Fri, 24 Jan 2025 22:35:04 +0100 Subject: [PATCH] add snapshot producer --- internal/ring/init.go | 8 + pkg/ring/client.go | 587 ++++++++++++++++++++++-------------------- pkg/ring/snapshot.go | 64 +++++ 3 files changed, 376 insertions(+), 283 deletions(-) create mode 100644 pkg/ring/snapshot.go diff --git a/internal/ring/init.go b/internal/ring/init.go index 521c137a..bc49178b 100644 --- a/internal/ring/init.go +++ b/internal/ring/init.go @@ -84,10 +84,18 @@ func apiRing(w http.ResponseWriter, r *http.Request) { var items []*api.Source for _, camera := range devices.AllCameras { cleanQuery.Set("device_id", camera.DeviceID) + + // Stream source items = append(items, &api.Source{ Name: camera.Description, URL: "ring:?" + cleanQuery.Encode(), }) + + // Snapshot source + items = append(items, &api.Source{ + Name: camera.Description + " Snapshot", + URL: "ring:?" + cleanQuery.Encode() + "&snapshot", + }) } api.ResponseSources(w, items) diff --git a/pkg/ring/client.go b/pkg/ring/client.go index b48727d9..c432ecf9 100644 --- a/pkg/ring/client.go +++ b/pkg/ring/client.go @@ -18,7 +18,7 @@ import ( type Client struct { api *RingRestClient ws *websocket.Conn - prod *webrtc.Conn + prod core.Producer camera *CameraData dialogID string sessionID string @@ -101,322 +101,337 @@ const ( ) func Dial(rawURL string) (*Client, error) { - // 1. Create Ring Rest API client - u, err := url.Parse(rawURL) - if err != nil { - return nil, err - } + // 1. Parse URL and validate basic params + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } - query := u.Query() - encodedToken := query.Get("refresh_token") - deviceID := query.Get("device_id") + query := u.Query() + encodedToken := query.Get("refresh_token") + deviceID := query.Get("device_id") + _, isSnapshot := query["snapshot"] - if encodedToken == "" || deviceID == "" { - return nil, errors.New("ring: wrong query") - } + if encodedToken == "" || deviceID == "" { + return nil, errors.New("ring: wrong query") + } - // URL-decode the refresh token - refreshToken, err := url.QueryUnescape(encodedToken) - if err != nil { - return nil, fmt.Errorf("ring: invalid refresh token encoding: %w", err) - } + // URL-decode the refresh token + refreshToken, err := url.QueryUnescape(encodedToken) + if err != nil { + return nil, fmt.Errorf("ring: invalid refresh token encoding: %w", err) + } - // Initialize Ring API client - ringAPI, err := NewRingRestClient(RefreshTokenAuth{RefreshToken: refreshToken}, nil) - if err != nil { - return nil, err - } + // Initialize Ring API client + ringAPI, err := NewRingRestClient(RefreshTokenAuth{RefreshToken: refreshToken}, nil) + if err != nil { + return nil, err + } - // Get camera details - devices, err := ringAPI.FetchRingDevices() - if err != nil { - return nil, err - } + // Get camera details + devices, err := ringAPI.FetchRingDevices() + if err != nil { + return nil, err + } - var camera *CameraData - for _, cam := range devices.AllCameras { - if fmt.Sprint(cam.DeviceID) == deviceID { - camera = &cam - break - } - } - if camera == nil { - return nil, errors.New("ring: camera not found") - } + var camera *CameraData + for _, cam := range devices.AllCameras { + if fmt.Sprint(cam.DeviceID) == deviceID { + camera = &cam + break + } + } + if camera == nil { + return nil, errors.New("ring: camera not found") + } - // 2. Connect to signaling server - ticket, err := ringAPI.GetSocketTicket() - if err != nil { - return nil, err - } + // Create base client + client := &Client{ + api: ringAPI, + camera: camera, + dialogID: uuid.NewString(), + done: make(chan struct{}), + } - // Create WebSocket connection - wsURL := fmt.Sprintf("wss://api.prod.signalling.ring.devices.a2z.com/ws?api_version=4.0&auth_type=ring_solutions&client_id=ring_site-%s&token=%s", - uuid.NewString(), url.QueryEscape(ticket.Ticket)) + // Check if snapshot request + if isSnapshot { + client.prod = NewSnapshotProducer(ringAPI, camera) + return client, nil + } - ws, _, err := websocket.DefaultDialer.Dial(wsURL, map[string][]string{ - "User-Agent": {"android:com.ringapp"}, - }) - if err != nil { - return nil, err - } + // If not snapshot, continue with WebRTC setup + ticket, err := ringAPI.GetSocketTicket() + if err != nil { + return nil, err + } - // 3. Create Peer Connection - conf := pion.Configuration{ - ICEServers: []pion.ICEServer{ - {URLs: []string{ - "stun:stun.kinesisvideo.us-east-1.amazonaws.com:443", - "stun:stun.kinesisvideo.us-east-2.amazonaws.com:443", - "stun:stun.kinesisvideo.us-west-2.amazonaws.com:443", - "stun:stun.l.google.com:19302", - "stun:stun1.l.google.com:19302", - "stun:stun2.l.google.com:19302", - "stun:stun3.l.google.com:19302", - "stun:stun4.l.google.com:19302", - }}, - }, - ICETransportPolicy: pion.ICETransportPolicyAll, + // Create WebSocket connection + wsURL := fmt.Sprintf("wss://api.prod.signalling.ring.devices.a2z.com/ws?api_version=4.0&auth_type=ring_solutions&client_id=ring_site-%s&token=%s", + uuid.NewString(), url.QueryEscape(ticket.Ticket)) + + client.ws, _, err = websocket.DefaultDialer.Dial(wsURL, map[string][]string{ + "User-Agent": {"android:com.ringapp"}, + }) + if err != nil { + return nil, err + } + + // Create Peer Connection + conf := pion.Configuration{ + ICEServers: []pion.ICEServer{ + {URLs: []string{ + "stun:stun.kinesisvideo.us-east-1.amazonaws.com:443", + "stun:stun.kinesisvideo.us-east-2.amazonaws.com:443", + "stun:stun.kinesisvideo.us-west-2.amazonaws.com:443", + "stun:stun.l.google.com:19302", + "stun:stun1.l.google.com:19302", + "stun:stun2.l.google.com:19302", + "stun:stun3.l.google.com:19302", + "stun:stun4.l.google.com:19302", + }}, + }, + ICETransportPolicy: pion.ICETransportPolicyAll, BundlePolicy: pion.BundlePolicyBalanced, - } + } - api, err := webrtc.NewAPI() - if err != nil { - ws.Close() - return nil, err - } + api, err := webrtc.NewAPI() + if err != nil { + client.ws.Close() + return nil, err + } - pc, err := api.NewPeerConnection(conf) - if err != nil { - ws.Close() - return nil, err - } + pc, err := api.NewPeerConnection(conf) + if err != nil { + client.ws.Close() + return nil, err + } - // protect from sending ICE candidate before Offer - var sendOffer core.Waiter + // protect from sending ICE candidate before Offer + var sendOffer core.Waiter - // protect from blocking on errors - defer sendOffer.Done(nil) + // protect from blocking on errors + defer sendOffer.Done(nil) - // waiter will wait PC error or WS error or nil (connection OK) - var connState core.Waiter + // waiter will wait PC error or WS error or nil (connection OK) + var connState core.Waiter - prod := webrtc.NewConn(pc) - prod.FormatName = "ring/webrtc" - prod.Mode = core.ModeActiveProducer - prod.Protocol = "ws" - prod.URL = rawURL + prod := webrtc.NewConn(pc) + prod.FormatName = "ring/webrtc" + prod.Mode = core.ModeActiveProducer + prod.Protocol = "ws" + prod.URL = rawURL - client := &Client{ - api: ringAPI, - ws: ws, - prod: prod, - camera: camera, - dialogID: uuid.NewString(), - done: make(chan struct{}), - } + client.prod = prod - prod.Listen(func(msg any) { - switch msg := msg.(type) { - case *pion.ICECandidate: - _ = sendOffer.Wait() + prod.Listen(func(msg any) { + switch msg := msg.(type) { + case *pion.ICECandidate: + _ = sendOffer.Wait() - iceCandidate := msg.ToJSON() + iceCandidate := msg.ToJSON() - // skip empty ICE candidates - if iceCandidate.Candidate == "" { - return - } + // skip empty ICE candidates + if iceCandidate.Candidate == "" { + return + } - icePayload := map[string]interface{}{ - "ice": iceCandidate.Candidate, - "mlineindex": iceCandidate.SDPMLineIndex, - } - - if err = client.sendSessionMessage("ice", icePayload); err != nil { - connState.Done(err) - return - } + icePayload := map[string]interface{}{ + "ice": iceCandidate.Candidate, + "mlineindex": iceCandidate.SDPMLineIndex, + } + + if err = client.sendSessionMessage("ice", icePayload); err != nil { + connState.Done(err) + return + } - case pion.PeerConnectionState: - switch msg { - case pion.PeerConnectionStateConnecting: - case pion.PeerConnectionStateConnected: - connState.Done(nil) - default: - connState.Done(errors.New("ring: " + msg.String())) - } - } - }) + case pion.PeerConnectionState: + switch msg { + case pion.PeerConnectionStateConnecting: + case pion.PeerConnectionStateConnected: + connState.Done(nil) + default: + connState.Done(errors.New("ring: " + msg.String())) + } + } + }) - // Setup media configuration - medias := []*core.Media{ - { - Kind: core.KindAudio, - Direction: core.DirectionSendRecv, - Codecs: []*core.Codec{ - { - Name: "opus", - ClockRate: 48000, - Channels: 2, - }, - }, - }, - { - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: "H264", - ClockRate: 90000, - }, - }, - }, - } + // Setup media configuration + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendRecv, + Codecs: []*core.Codec{ + { + Name: "opus", + ClockRate: 48000, + Channels: 2, + }, + }, + }, + { + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: "H264", + ClockRate: 90000, + }, + }, + }, + } - // 4. Create offer - offer, err := prod.CreateOffer(medias) - if err != nil { - client.Stop() - return nil, err - } + // Create offer + offer, err := prod.CreateOffer(medias) + if err != nil { + client.Stop() + return nil, err + } - // 5. Send offer - offerPayload := map[string]interface{}{ - "stream_options": map[string]bool{ - "audio_enabled": true, - "video_enabled": true, - }, - "sdp": offer, - } + // Send offer + offerPayload := map[string]interface{}{ + "stream_options": map[string]bool{ + "audio_enabled": true, + "video_enabled": true, + }, + "sdp": offer, + } - if err = client.sendSessionMessage("live_view", offerPayload); err != nil { - client.Stop() - return nil, err - } + if err = client.sendSessionMessage("live_view", offerPayload); err != nil { + client.Stop() + return nil, err + } - sendOffer.Done(nil) + sendOffer.Done(nil) - // Ring expects a ping message every 5 seconds - go func() { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() + // Ring expects a ping message every 5 seconds + go client.startPingLoop(pc) + go client.startMessageLoop(&connState) - for { - select { - case <-client.done: - return - case <-ticker.C: - if pc.ConnectionState() == pion.PeerConnectionStateConnected { - if err := client.sendSessionMessage("ping", nil); err != nil { - return - } - } - } - } - }() - - go func() { - var err error + if err = connState.Wait(); err != nil { + return nil, err + } - // will be closed when conn will be closed - defer func() { - connState.Done(err) - }() + return client, nil +} - for { - select { - case <-client.done: - return - default: - var res BaseMessage - if err = ws.ReadJSON(&res); err != nil { - select { - case <-client.done: - return - default: - } +func (c *Client) startPingLoop(pc *pion.PeerConnection) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() - client.Stop() - return - } + for { + select { + case <-c.done: + return + case <-ticker.C: + if pc.ConnectionState() == pion.PeerConnectionStateConnected { + if err := c.sendSessionMessage("ping", nil); err != nil { + return + } + } + } + } +} - // check if "doorbot_id" is present - if _, ok := res.Body["doorbot_id"]; !ok { - continue - } - - // check if the message is from the correct doorbot - doorbotID := res.Body["doorbot_id"].(float64) - if doorbotID != float64(client.camera.ID) { - continue - } +func (c *Client) startMessageLoop(connState *core.Waiter) { + var err error - // check if the message is from the correct session - if res.Method == "session_created" || res.Method == "session_started" { - if _, ok := res.Body["session_id"]; ok && client.sessionID == "" { - client.sessionID = res.Body["session_id"].(string) - } - } + // will be closed when conn will be closed + defer func() { + connState.Done(err) + }() - if _, ok := res.Body["session_id"]; ok { - if res.Body["session_id"].(string) != client.sessionID { - continue - } - } + for { + select { + case <-c.done: + return + default: + var res BaseMessage + if err = c.ws.ReadJSON(&res); err != nil { + select { + case <-c.done: + return + default: + } - rawMsg, _ := json.Marshal(res) + c.Stop() + return + } - switch res.Method { - case "sdp": - // 6. Get answer - var msg AnswerMessage - if err = json.Unmarshal(rawMsg, &msg); err != nil { - client.Stop() - return - } - if err = prod.SetAnswer(msg.Body.SDP); err != nil { - client.Stop() - return - } - if err = client.activateSession(); err != nil { - client.Stop() - return - } - - case "ice": - // 7. Continue to receiving candidates - var msg IceCandidateMessage - if err = json.Unmarshal(rawMsg, &msg); err != nil { - break - } + // check if "doorbot_id" is present + if _, ok := res.Body["doorbot_id"]; !ok { + continue + } + + // check if the message is from the correct doorbot + doorbotID := res.Body["doorbot_id"].(float64) + if doorbotID != float64(c.camera.ID) { + continue + } - // check for empty ICE candidate - if msg.Body.Ice == "" { - break - } + // check if the message is from the correct session + if res.Method == "session_created" || res.Method == "session_started" { + if _, ok := res.Body["session_id"]; ok && c.sessionID == "" { + c.sessionID = res.Body["session_id"].(string) + } + } - if err = prod.AddCandidate(msg.Body.Ice); err != nil { - client.Stop() - return - } + if _, ok := res.Body["session_id"]; ok { + if res.Body["session_id"].(string) != c.sessionID { + continue + } + } - case "close": - client.Stop() - return + rawMsg, _ := json.Marshal(res) - case "pong": - // Ignore - continue - } - } - } - }() + switch res.Method { + case "sdp": + if prod, ok := c.prod.(*webrtc.Conn); ok { + // Get answer + var msg AnswerMessage + if err = json.Unmarshal(rawMsg, &msg); err != nil { + c.Stop() + return + } + if err = prod.SetAnswer(msg.Body.SDP); err != nil { + c.Stop() + return + } + if err = c.activateSession(); err != nil { + c.Stop() + return + } + } + + case "ice": + if prod, ok := c.prod.(*webrtc.Conn); ok { + // Continue to receiving candidates + var msg IceCandidateMessage + if err = json.Unmarshal(rawMsg, &msg); err != nil { + break + } - if err = connState.Wait(); err != nil { - return nil, err - } + // check for empty ICE candidate + if msg.Body.Ice == "" { + break + } - return client, nil + if err = prod.AddCandidate(msg.Body.Ice); err != nil { + c.Stop() + return + } + } + + case "close": + c.Stop() + return + + case "pong": + // Ignore + continue + } + } + } } func (c *Client) activateSession() error { @@ -471,16 +486,18 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - if media.Kind == core.KindAudio { - // Enable speaker - speakerPayload := map[string]interface{}{ - "stealth_mode": false, - } + if webrtcProd, ok := c.prod.(*webrtc.Conn); ok { + if media.Kind == core.KindAudio { + // Enable speaker + speakerPayload := map[string]interface{}{ + "stealth_mode": false, + } + _ = c.sendSessionMessage("camera_options", speakerPayload) + } + return webrtcProd.AddTrack(media, codec, track) + } - _ = c.sendSessionMessage("camera_options", speakerPayload); - } - - return c.prod.AddTrack(media, codec, track) + return fmt.Errorf("add track not supported for snapshot") } func (c *Client) Start() error { @@ -517,5 +534,9 @@ func (c *Client) Stop() error { } func (c *Client) MarshalJSON() ([]byte, error) { - return c.prod.MarshalJSON() + if webrtcProd, ok := c.prod.(*webrtc.Conn); ok { + return webrtcProd.MarshalJSON() + } + + return nil, errors.New("ring: can't marshal") } \ No newline at end of file diff --git a/pkg/ring/snapshot.go b/pkg/ring/snapshot.go new file mode 100644 index 00000000..bbf86e28 --- /dev/null +++ b/pkg/ring/snapshot.go @@ -0,0 +1,64 @@ +package ring + +import ( + "fmt" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type SnapshotProducer struct { + core.Connection + + client *RingRestClient + camera *CameraData +} + +func NewSnapshotProducer(client *RingRestClient, camera *CameraData) *SnapshotProducer { + return &SnapshotProducer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "ring/snapshot", + Protocol: "https", + Medias: []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecJPEG, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + }, + }, + }, + }, + }, + client: client, + camera: camera, + } +} + +func (p *SnapshotProducer) Start() error { + // Fetch snapshot + response, err := p.client.Request("GET", fmt.Sprintf("https://app-snaps.ring.com/snapshots/next/%d", int(p.camera.ID)), nil) + if err != nil { + return fmt.Errorf("failed to get snapshot: %w", err) + } + + pkt := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: response, + } + + // Send to all receivers + for _, receiver := range p.Receivers { + receiver.WriteRTP(pkt) + } + + return nil +} + +func (p *SnapshotProducer) Stop() error { + return p.Connection.Stop() +} \ No newline at end of file