diff --git a/internal/webrtc/client.go b/internal/webrtc/client.go index a246c5fa..6cb3bfe5 100644 --- a/internal/webrtc/client.go +++ b/internal/webrtc/client.go @@ -1,44 +1,74 @@ package webrtc import ( + "encoding/json" "errors" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + "github.com/AlexxIT/go2rtc/internal/api/ws" + "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/gorilla/websocket" pion "github.com/pion/webrtc/v3" - "io" - "net/http" - "strings" - "time" ) -func streamsHandler(url string) (core.Producer, error) { - url = url[7:] - if i := strings.Index(url, "://"); i > 0 { - switch url[:i] { +// streamsHandler supports: +// 1. WHEP: webrtc:http://192.168.1.123:1984/api/webrtc?src=camera1 +// 2. go2rtc: webrtc:ws://192.168.1.123:1984/api/ws?src=camera1 +// 3. Wyze: webrtc:http://192.168.1.123:5000/signaling/camera1?kvs#format=wyze +// 4. Kinesis: webrtc:wss://...amazonaws.com/?...#format=kinesis#client_id=...#ice_servers=[{...},{...}] +func streamsHandler(rawURL string) (core.Producer, error) { + var query url.Values + if i := strings.IndexByte(rawURL, '#'); i > 0 { + query = streams.ParseQuery(rawURL[i+1:]) + rawURL = rawURL[:i] + } + + rawURL = rawURL[7:] // remove webrtc: + if i := strings.IndexByte(rawURL, ':'); i > 0 { + scheme := rawURL[:i] + format := query.Get("format") + + switch scheme { case "ws", "wss": - return asyncClient(url) + if format == "kinesis" { + // https://aws.amazon.com/kinesis/video-streams/ + // https://docs.aws.amazon.com/kinesisvideostreams-webrtc-dg/latest/devguide/what-is-kvswebrtc.html + // https://github.com/orgs/awslabs/repositories?q=kinesis+webrtc + return kinesisClient(rawURL, query, "WebRTC/Kinesis") + } else { + return go2rtcClient(rawURL) + } + case "http", "https": - return syncClient(url) + if format == "wyze" { + // https://github.com/mrlt8/docker-wyze-bridge + return wyzeClient(rawURL) + } else { + return whepClient(rawURL) + } } } - return nil, errors.New("unsupported url: " + url) + return nil, errors.New("unsupported url: " + rawURL) } -// asyncClient can connect only to go2rtc server +// go2rtcClient can connect only to go2rtc server // ex: ws://localhost:1984/api/ws?src=camera1 -func asyncClient(url string) (core.Producer, error) { +func go2rtcClient(url string) (core.Producer, error) { // 1. Connect to signalign server conn, _, err := websocket.DefaultDialer.Dial(url, nil) if err != nil { return nil, err } - defer func() { - if err != nil { - _ = conn.Close() - } - }() + + // close websocket when we ready return Producer or connection error + defer conn.Close() // 2. Create PeerConnection pc, err := PeerConnection(true) @@ -47,22 +77,27 @@ func asyncClient(url string) (core.Producer, error) { return nil, err } - var sendOffer core.Waiter + // waiter will wait PC error or WS error or nil (connection OK) + var connState core.Waiter prod := webrtc.NewConn(pc) prod.Desc = "WebRTC/WebSocket async" prod.Mode = core.ModeActiveProducer prod.Listen(func(msg any) { switch msg := msg.(type) { - case pion.PeerConnectionState: - _ = conn.Close() - case *pion.ICECandidate: - sendOffer.Wait() - s := msg.ToJSON().Candidate log.Trace().Str("candidate", s).Msg("[webrtc] local") _ = conn.WriteJSON(&ws.Message{Type: "webrtc/candidate", Value: s}) + + case pion.PeerConnectionState: + switch msg { + case pion.PeerConnectionStateConnecting: + case pion.PeerConnectionStateConnected: + connState.Done(nil) + default: + connState.Done(errors.New("webrtc: " + msg.String())) + } } }) @@ -84,8 +119,6 @@ func asyncClient(url string) (core.Producer, error) { return nil, err } - sendOffer.Done() - // 5. Get answer if err = conn.ReadJSON(msg); err != nil { return nil, err @@ -102,13 +135,12 @@ func asyncClient(url string) (core.Producer, error) { // 6. Continue to receiving candidates go func() { + var err error + for { // receive data from remote - msg := new(ws.Message) - if err = conn.ReadJSON(msg); err != nil { - if cerr, ok := err.(*websocket.CloseError); ok { - log.Trace().Err(err).Caller().Msgf("[webrtc] ws code=%d", cerr.Code) - } + var msg ws.Message + if err = conn.ReadJSON(&msg); err != nil { break } @@ -120,15 +152,19 @@ func asyncClient(url string) (core.Producer, error) { } } - _ = conn.Close() + connState.Done(err) }() + if err = connState.Wait(); err != nil { + return nil, err + } + return prod, nil } -// syncClient - support WebRTC-HTTP Egress Protocol (WHEP) +// whepClient - support WebRTC-HTTP Egress Protocol (WHEP) // ex: http://localhost:1984/api/webrtc?src=camera1 -func syncClient(url string) (core.Producer, error) { +func whepClient(url string) (core.Producer, error) { // 2. Create PeerConnection pc, err := PeerConnection(true) if err != nil { @@ -176,3 +212,207 @@ func syncClient(url string) (core.Producer, error) { return prod, nil } + +type KinesisRequest struct { + Action string `json:"action"` + ClientID string `json:"recipientClientId"` + Payload []byte `json:"messagePayload"` +} + +func (k KinesisRequest) String() string { + return fmt.Sprintf("action=%s, payload=%s", k.Action, k.Payload) +} + +type KinesisResponse struct { + Payload []byte `json:"messagePayload"` + Type string `json:"messageType"` +} + +func (k KinesisResponse) String() string { + return fmt.Sprintf("type=%s, payload=%s", k.Type, k.Payload) +} + +func kinesisClient(rawURL string, query url.Values, desc string) (core.Producer, error) { + // 1. Connect to signalign server + conn, _, err := websocket.DefaultDialer.Dial(rawURL, nil) + if err != nil { + return nil, err + } + + // 2. Load ICEServers from query param (base64 json) + conf := pion.Configuration{} + + if s := query.Get("ice_servers"); s != "" { + conf.ICEServers, err = webrtc.UnmarshalICEServers([]byte(s)) + if err != nil { + log.Warn().Err(err).Caller().Send() + } + } + + // close websocket when we ready return Producer or connection error + defer conn.Close() + + // 3. Create Peer Connection + api, err := webrtc.NewAPI("") + if err != nil { + return nil, err + } + + pc, err := api.NewPeerConnection(conf) + if err != nil { + return nil, err + } + + // protect from sending ICE candidate before Offer + var sendOffer core.Waiter + + // protect from blocking on errors + defer sendOffer.Done(nil) + + // waiter will wait PC error or WS error or nil (connection OK) + var connState core.Waiter + + req := KinesisRequest{ + ClientID: query.Get("client_id"), + } + + prod := webrtc.NewConn(pc) + prod.Desc = desc + prod.Mode = core.ModeActiveProducer + prod.Listen(func(msg any) { + switch msg := msg.(type) { + case *pion.ICECandidate: + _ = sendOffer.Wait() + + req.Action = "ICE_CANDIDATE" + req.Payload, _ = json.Marshal(msg.ToJSON()) + if err = conn.WriteJSON(&req); err != nil { + connState.Done(err) + return + } + + log.Trace().Msgf("[webrtc] kinesis send: %s", req) + + case pion.PeerConnectionState: + switch msg { + case pion.PeerConnectionStateConnecting: + case pion.PeerConnectionStateConnected: + connState.Done(nil) + default: + connState.Done(errors.New("webrtc: " + msg.String())) + } + } + }) + + medias := []*core.Media{ + {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, + {Kind: core.KindAudio, Direction: core.DirectionRecvonly}, + } + + // 4. Create offer + offer, err := prod.CreateOffer(medias) + if err != nil { + return nil, err + } + + // 5. Send offer + req.Action = "SDP_OFFER" + req.Payload, _ = json.Marshal(pion.SessionDescription{ + Type: pion.SDPTypeOffer, + SDP: offer, + }) + if err = conn.WriteJSON(req); err != nil { + return nil, err + } + + log.Trace().Msgf("[webrtc] kinesis send: %s", req) + + sendOffer.Done(nil) + + go func() { + var err error + + // will be closed when conn will be closed + for { + var res KinesisResponse + if err = conn.ReadJSON(&res); err != nil { + // some buggy messages from Amazon servers + if errors.Is(err, io.ErrUnexpectedEOF) { + continue + } + break + } + + log.Trace().Msgf("[webrtc] kinesis recv: %s", res) + + switch res.Type { + case "SDP_ANSWER": + // 6. Get answer + var sd pion.SessionDescription + if err = json.Unmarshal(res.Payload, &sd); err != nil { + break + } + + if err = prod.SetAnswer(sd.SDP); err != nil { + break + } + + case "ICE_CANDIDATE": + // 7. Continue to receiving candidates + var ci pion.ICECandidateInit + if err = json.Unmarshal(res.Payload, &ci); err != nil { + break + } + + if err = prod.AddCandidate(ci.Candidate); err != nil { + break + } + } + } + + connState.Done(err) + }() + + if err = connState.Wait(); err != nil { + return nil, err + } + + return prod, nil +} + +type WyzeKVS struct { + ClientId string `json:"ClientId"` + Cam string `json:"cam"` + Result string `json:"result"` + Servers json.RawMessage `json:"servers"` + URL string `json:"signalingUrl"` +} + +func wyzeClient(rawURL string) (core.Producer, error) { + client := http.Client{Timeout: 5 * time.Second} + res, err := client.Get(rawURL) + if err != nil { + return nil, err + } + + b, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + var kvs WyzeKVS + if err = json.Unmarshal(b, &kvs); err != nil { + return nil, err + } + + if kvs.Result != "ok" { + return nil, errors.New("wyse: wrong result: " + kvs.Result) + } + + query := url.Values{ + "client_id": []string{kvs.ClientId}, + "ice_servers": []string{string(kvs.Servers)}, + } + + return kinesisClient(kvs.URL, query, "WebRTC/Wyze") +} diff --git a/internal/webrtc/webrtc.go b/internal/webrtc/webrtc.go index df92f398..ed74ebbb 100644 --- a/internal/webrtc/webrtc.go +++ b/internal/webrtc/webrtc.go @@ -114,6 +114,9 @@ func asyncHandler(tr *ws.Transport, msg *ws.Message) error { var sendAnswer core.Waiter + // protect from blocking on errors + defer sendAnswer.Done(nil) + conn := webrtc.NewConn(pc) conn.Desc = "WebRTC/WebSocket async" conn.Mode = mode @@ -132,7 +135,7 @@ func asyncHandler(tr *ws.Transport, msg *ws.Message) error { } case *pion.ICECandidate: - sendAnswer.Wait() + _ = sendAnswer.Wait() s := msg.ToJSON().Candidate log.Trace().Str("candidate", s).Msg("[webrtc] local") @@ -186,7 +189,7 @@ func asyncHandler(tr *ws.Transport, msg *ws.Message) error { tr.Write(&ws.Message{Type: "webrtc/answer", Value: answer}) } - sendAnswer.Done() + sendAnswer.Done(nil) asyncCandidates(tr, conn) diff --git a/pkg/core/waiter.go b/pkg/core/waiter.go index eed265da..c61e3be4 100644 --- a/pkg/core/waiter.go +++ b/pkg/core/waiter.go @@ -12,6 +12,7 @@ type Waiter struct { sync.WaitGroup mu sync.Mutex state int // state < 0 means finish + err error } func (w *Waiter) Add(delta int) { @@ -23,7 +24,7 @@ func (w *Waiter) Add(delta int) { w.mu.Unlock() } -func (w *Waiter) Wait() { +func (w *Waiter) Wait() error { w.mu.Lock() // first wait auto start waiter if w.state == 0 { @@ -33,9 +34,11 @@ func (w *Waiter) Wait() { w.mu.Unlock() w.WaitGroup.Wait() + + return w.err } -func (w *Waiter) Done() { +func (w *Waiter) Done(err error) { w.mu.Lock() // safe run Done only when have tasks @@ -47,21 +50,21 @@ func (w *Waiter) Done() { // block waiter for any operations after last done if w.state == 0 { w.state = -1 + w.err = err } w.mu.Unlock() } -func (w *Waiter) WaitChan() <-chan struct{} { - var ch chan struct{} +func (w *Waiter) WaitChan() <-chan error { + var ch chan error w.mu.Lock() if w.state >= 0 { - ch = make(chan struct{}) + ch = make(chan error) go func() { - w.Wait() - ch <- struct{}{} + ch <- w.Wait() }() } diff --git a/pkg/webrtc/client_test.go b/pkg/webrtc/client_test.go index 3dce0372..82b52b25 100644 --- a/pkg/webrtc/client_test.go +++ b/pkg/webrtc/client_test.go @@ -1,11 +1,12 @@ package webrtc import ( + "testing" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/webrtc/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" ) func TestClient(t *testing.T) { @@ -100,3 +101,10 @@ a=recvonly err = sender.ReplaceTrack(track) require.Nil(t, err) } + +func TestUnmarshalICEServers(t *testing.T) { + s := `[{"credential":"xxx","urls":"xxx","username":"xxx"},{"credential":null,"urls":"xxx","username":null}]` + servers, err := UnmarshalICEServers([]byte(s)) + require.Nil(t, err) + require.Len(t, servers, 2) +} diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index e3b1c960..64835353 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -1,11 +1,12 @@ package webrtc import ( + "time" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/webrtc/v3" - "time" ) type Conn struct { @@ -130,7 +131,7 @@ func NewConn(pc *webrtc.PeerConnection) *Conn { } func (c *Conn) Close() error { - c.closed.Done() + c.closed.Done(nil) return c.pc.Close() } diff --git a/pkg/webrtc/helpers.go b/pkg/webrtc/helpers.go index ab3f83b3..ed1daf1e 100644 --- a/pkg/webrtc/helpers.go +++ b/pkg/webrtc/helpers.go @@ -1,6 +1,7 @@ package webrtc import ( + "encoding/json" "errors" "fmt" "hash/crc32" @@ -293,3 +294,35 @@ func CandidateManualHostTCPPassive(address string, port int) string { foundation, priority, address, port, ) } + +func UnmarshalICEServers(b []byte) ([]webrtc.ICEServer, error) { + type ICEServer struct { + URLs any `json:"urls"` + Username string `json:"username,omitempty"` + Credential string `json:"credential,omitempty"` + } + + var src []ICEServer + if err := json.Unmarshal(b, &src); err != nil { + return nil, err + } + + var dst []webrtc.ICEServer + for i := range src { + srv := webrtc.ICEServer{ + Username: src[i].Username, + Credential: src[i].Credential, + } + + switch v := src[i].URLs.(type) { + case []string: + srv.URLs = v + case string: + srv.URLs = []string{v} + } + + dst = append(dst, srv) + } + + return dst, nil +}