From a664e3b8383bcc2ac382c25ba17b12ce633055ba Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Wed, 23 Aug 2023 18:14:49 +0300 Subject: [PATCH] Code refactoring --- internal/webrtc/client.go | 206 ---------------------------------- internal/webrtc/kinesis.go | 220 +++++++++++++++++++++++++++++++++++++ 2 files changed, 220 insertions(+), 206 deletions(-) create mode 100644 internal/webrtc/kinesis.go diff --git a/internal/webrtc/client.go b/internal/webrtc/client.go index f9207eff..39c8bcc8 100644 --- a/internal/webrtc/client.go +++ b/internal/webrtc/client.go @@ -1,9 +1,7 @@ package webrtc import ( - "encoding/json" "errors" - "fmt" "io" "net/http" "net/url" @@ -214,207 +212,3 @@ func whepClient(url string) (core.Producer, error) { return prod, nil } - -type KinesisRequest struct { - Action string `json:"action"` - ClientID string `json:"recipientClientId"` - Payload []byte `json:"messagePayload"` -} - -func (k KinesisRequest) String() string { - return fmt.Sprintf("action=%s, payload=%s", k.Action, k.Payload) -} - -type KinesisResponse struct { - Payload []byte `json:"messagePayload"` - Type string `json:"messageType"` -} - -func (k KinesisResponse) String() string { - return fmt.Sprintf("type=%s, payload=%s", k.Type, k.Payload) -} - -func kinesisClient(rawURL string, query url.Values, desc string) (core.Producer, error) { - // 1. Connect to signalign server - conn, _, err := websocket.DefaultDialer.Dial(rawURL, nil) - if err != nil { - return nil, err - } - - // 2. Load ICEServers from query param (base64 json) - conf := pion.Configuration{} - - if s := query.Get("ice_servers"); s != "" { - conf.ICEServers, err = webrtc.UnmarshalICEServers([]byte(s)) - if err != nil { - log.Warn().Err(err).Caller().Send() - } - } - - // close websocket when we ready return Producer or connection error - defer conn.Close() - - // 3. Create Peer Connection - api, err := webrtc.NewAPI("") - if err != nil { - return nil, err - } - - pc, err := api.NewPeerConnection(conf) - if err != nil { - return nil, err - } - - // protect from sending ICE candidate before Offer - var sendOffer core.Waiter - - // protect from blocking on errors - defer sendOffer.Done(nil) - - // waiter will wait PC error or WS error or nil (connection OK) - var connState core.Waiter - - req := KinesisRequest{ - ClientID: query.Get("client_id"), - } - - prod := webrtc.NewConn(pc) - prod.Desc = desc - prod.Mode = core.ModeActiveProducer - prod.Listen(func(msg any) { - switch msg := msg.(type) { - case *pion.ICECandidate: - _ = sendOffer.Wait() - - req.Action = "ICE_CANDIDATE" - req.Payload, _ = json.Marshal(msg.ToJSON()) - if err = conn.WriteJSON(&req); err != nil { - connState.Done(err) - return - } - - log.Trace().Msgf("[webrtc] kinesis send: %s", req) - - case pion.PeerConnectionState: - switch msg { - case pion.PeerConnectionStateConnecting: - case pion.PeerConnectionStateConnected: - connState.Done(nil) - default: - connState.Done(errors.New("webrtc: " + msg.String())) - } - } - }) - - medias := []*core.Media{ - {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, - {Kind: core.KindAudio, Direction: core.DirectionRecvonly}, - } - - // 4. Create offer - offer, err := prod.CreateOffer(medias) - if err != nil { - return nil, err - } - - // 5. Send offer - req.Action = "SDP_OFFER" - req.Payload, _ = json.Marshal(pion.SessionDescription{ - Type: pion.SDPTypeOffer, - SDP: offer, - }) - if err = conn.WriteJSON(req); err != nil { - return nil, err - } - - log.Trace().Msgf("[webrtc] kinesis send: %s", req) - - sendOffer.Done(nil) - - go func() { - var err error - - // will be closed when conn will be closed - for { - var res KinesisResponse - if err = conn.ReadJSON(&res); err != nil { - // some buggy messages from Amazon servers - if errors.Is(err, io.ErrUnexpectedEOF) { - continue - } - break - } - - log.Trace().Msgf("[webrtc] kinesis recv: %s", res) - - switch res.Type { - case "SDP_ANSWER": - // 6. Get answer - var sd pion.SessionDescription - if err = json.Unmarshal(res.Payload, &sd); err != nil { - break - } - - if err = prod.SetAnswer(sd.SDP); err != nil { - break - } - - case "ICE_CANDIDATE": - // 7. Continue to receiving candidates - var ci pion.ICECandidateInit - if err = json.Unmarshal(res.Payload, &ci); err != nil { - break - } - - if err = prod.AddCandidate(ci.Candidate); err != nil { - break - } - } - } - - connState.Done(err) - }() - - if err = connState.Wait(); err != nil { - return nil, err - } - - return prod, nil -} - -type WyzeKVS struct { - ClientId string `json:"ClientId"` - Cam string `json:"cam"` - Result string `json:"result"` - Servers json.RawMessage `json:"servers"` - URL string `json:"signalingUrl"` -} - -func wyzeClient(rawURL string) (core.Producer, error) { - client := http.Client{Timeout: 5 * time.Second} - res, err := client.Get(rawURL) - if err != nil { - return nil, err - } - - b, err := io.ReadAll(res.Body) - if err != nil { - return nil, err - } - - var kvs WyzeKVS - if err = json.Unmarshal(b, &kvs); err != nil { - return nil, err - } - - if kvs.Result != "ok" { - return nil, errors.New("wyse: wrong result: " + kvs.Result) - } - - query := url.Values{ - "client_id": []string{kvs.ClientId}, - "ice_servers": []string{string(kvs.Servers)}, - } - - return kinesisClient(kvs.URL, query, "WebRTC/Wyze") -} diff --git a/internal/webrtc/kinesis.go b/internal/webrtc/kinesis.go new file mode 100644 index 00000000..4ec733e1 --- /dev/null +++ b/internal/webrtc/kinesis.go @@ -0,0 +1,220 @@ +package webrtc + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/webrtc" + "github.com/gorilla/websocket" + pion "github.com/pion/webrtc/v3" +) + +type kinesisRequest struct { + Action string `json:"action"` + ClientID string `json:"recipientClientId"` + Payload []byte `json:"messagePayload"` +} + +func (k kinesisRequest) String() string { + return fmt.Sprintf("action=%s, payload=%s", k.Action, k.Payload) +} + +type kinesisResponse struct { + Payload []byte `json:"messagePayload"` + Type string `json:"messageType"` +} + +func (k kinesisResponse) String() string { + return fmt.Sprintf("type=%s, payload=%s", k.Type, k.Payload) +} + +func kinesisClient(rawURL string, query url.Values, desc string) (core.Producer, error) { + // 1. Connect to signalign server + conn, _, err := websocket.DefaultDialer.Dial(rawURL, nil) + if err != nil { + return nil, err + } + + // 2. Load ICEServers from query param (base64 json) + conf := pion.Configuration{} + + if s := query.Get("ice_servers"); s != "" { + conf.ICEServers, err = webrtc.UnmarshalICEServers([]byte(s)) + if err != nil { + log.Warn().Err(err).Caller().Send() + } + } + + // close websocket when we ready return Producer or connection error + defer conn.Close() + + // 3. Create Peer Connection + api, err := webrtc.NewAPI("") + if err != nil { + return nil, err + } + + pc, err := api.NewPeerConnection(conf) + if err != nil { + return nil, err + } + + // protect from sending ICE candidate before Offer + var sendOffer core.Waiter + + // protect from blocking on errors + defer sendOffer.Done(nil) + + // waiter will wait PC error or WS error or nil (connection OK) + var connState core.Waiter + + req := kinesisRequest{ + ClientID: query.Get("client_id"), + } + + prod := webrtc.NewConn(pc) + prod.Desc = desc + prod.Mode = core.ModeActiveProducer + prod.Listen(func(msg any) { + switch msg := msg.(type) { + case *pion.ICECandidate: + _ = sendOffer.Wait() + + req.Action = "ICE_CANDIDATE" + req.Payload, _ = json.Marshal(msg.ToJSON()) + if err = conn.WriteJSON(&req); err != nil { + connState.Done(err) + return + } + + log.Trace().Msgf("[webrtc] kinesis send: %s", req) + + case pion.PeerConnectionState: + switch msg { + case pion.PeerConnectionStateConnecting: + case pion.PeerConnectionStateConnected: + connState.Done(nil) + default: + connState.Done(errors.New("webrtc: " + msg.String())) + } + } + }) + + medias := []*core.Media{ + {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, + {Kind: core.KindAudio, Direction: core.DirectionRecvonly}, + } + + // 4. Create offer + offer, err := prod.CreateOffer(medias) + if err != nil { + return nil, err + } + + // 5. Send offer + req.Action = "SDP_OFFER" + req.Payload, _ = json.Marshal(pion.SessionDescription{ + Type: pion.SDPTypeOffer, + SDP: offer, + }) + if err = conn.WriteJSON(req); err != nil { + return nil, err + } + + log.Trace().Msgf("[webrtc] kinesis send: %s", req) + + sendOffer.Done(nil) + + go func() { + var err error + + // will be closed when conn will be closed + for { + var res kinesisResponse + if err = conn.ReadJSON(&res); err != nil { + // some buggy messages from Amazon servers + if errors.Is(err, io.ErrUnexpectedEOF) { + continue + } + break + } + + log.Trace().Msgf("[webrtc] kinesis recv: %s", res) + + switch res.Type { + case "SDP_ANSWER": + // 6. Get answer + var sd pion.SessionDescription + if err = json.Unmarshal(res.Payload, &sd); err != nil { + break + } + + if err = prod.SetAnswer(sd.SDP); err != nil { + break + } + + case "ICE_CANDIDATE": + // 7. Continue to receiving candidates + var ci pion.ICECandidateInit + if err = json.Unmarshal(res.Payload, &ci); err != nil { + break + } + + if err = prod.AddCandidate(ci.Candidate); err != nil { + break + } + } + } + + connState.Done(err) + }() + + if err = connState.Wait(); err != nil { + return nil, err + } + + return prod, nil +} + +type wyzeKVS struct { + ClientId string `json:"ClientId"` + Cam string `json:"cam"` + Result string `json:"result"` + Servers json.RawMessage `json:"servers"` + URL string `json:"signalingUrl"` +} + +func wyzeClient(rawURL string) (core.Producer, error) { + client := http.Client{Timeout: 5 * time.Second} + res, err := client.Get(rawURL) + if err != nil { + return nil, err + } + + b, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + var kvs wyzeKVS + if err = json.Unmarshal(b, &kvs); err != nil { + return nil, err + } + + if kvs.Result != "ok" { + return nil, errors.New("wyse: wrong result: " + kvs.Result) + } + + query := url.Values{ + "client_id": []string{kvs.ClientId}, + "ice_servers": []string{string(kvs.Servers)}, + } + + return kinesisClient(kvs.URL, query, "WebRTC/Wyze") +}