From a7e76db464a82bc277ca116b91e3e442cae7580b Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 12 May 2025 13:31:45 +0200 Subject: [PATCH] change hls url and query and add more checks --- pkg/tuya/api.go | 2 +- pkg/tuya/client.go | 246 ++++++++++++++++++++++++--------------------- 2 files changed, 130 insertions(+), 118 deletions(-) diff --git a/pkg/tuya/api.go b/pkg/tuya/api.go index 4ba4f1eb..cef83af1 100644 --- a/pkg/tuya/api.go +++ b/pkg/tuya/api.go @@ -398,7 +398,7 @@ func(c *TuyaClient) GetStreamUrl(streamType string) (err error) { c.rtspURL = allosResponse.Result.URL fmt.Printf("RTSP URL: %s\n", c.rtspURL) case "hls": - c.hlsURL = "ffmpeg:" + allosResponse.Result.URL + "#video=copy" + c.hlsURL = allosResponse.Result.URL fmt.Printf("HLS URL: %s\n", c.hlsURL) default: return fmt.Errorf("unsupported stream type: %s", streamType) diff --git a/pkg/tuya/client.go b/pkg/tuya/client.go index 9019e931..3a11b595 100644 --- a/pkg/tuya/client.go +++ b/pkg/tuya/client.go @@ -41,13 +41,31 @@ func Dial(rawURL string) (core.Producer, error) { clientID := query.Get("client_id") secret := query.Get("secret") resolution := query.Get("resolution") - useRTSP := query.Get("rtsp") == "1" - useHLS := query.Get("hls") == "1" + streamType := query.Get("type") + useRTSP := streamType == "rtsp" + useHLS := streamType == "hls" + useWebRTC := streamType == "webrtc" || streamType == "" + + // check if host is correct + switch u.Hostname() { + case DefaultCnURL: + case DefaultWestUsURL: + case DefaultEastUsURL: + case DefaultCentralEuURL: + case DefaultWestEuURL: + case DefaultInURL: + default: + return nil, fmt.Errorf("tuya: wrong host %s", u.Hostname()) + } if deviceID == "" || uid == "" || clientID == "" || secret == "" { return nil, errors.New("tuya: wrong query") } + if !useRTSP && !useHLS && !useWebRTC { + return nil, errors.New("tuya: wrong stream type") + } + // Initialize Tuya API client tuyaAPI, err := NewTuyaClient(u.Hostname(), deviceID, uid, clientID, secret, useRTSP, useHLS) if err != nil { @@ -59,140 +77,134 @@ func Dial(rawURL string) (core.Producer, error) { done: make(chan struct{}), } - // RTSP if useRTSP { if client.api.rtspURL == "" { return nil, errors.New("tuya: no rtsp url") } - return streams.GetProducer(client.api.rtspURL) - } - - // HLS - if useHLS { + } else if useHLS { if client.api.hlsURL == "" { return nil, errors.New("tuya: no hls url") } return streams.GetProducer(client.api.hlsURL) - } - - // Default to WebRTC - conf := pion.Configuration{ - ICEServers: client.api.iceServers, - ICETransportPolicy: pion.ICETransportPolicyAll, - BundlePolicy: pion.BundlePolicyMaxBundle, - } - - api, err := webrtc.NewAPI() - if err != nil { - client.api.Close() - return nil, err - } - - pc, err := api.NewPeerConnection(conf) - if err != nil { - client.api.Close() - return nil, err - } - - // protect from sending ICE candidate before Offer - var sendOffer core.Waiter - - // protect from blocking on errors - defer sendOffer.Done(nil) - - // waiter will wait PC error or WS error or nil (connection OK) - var connState core.Waiter - - prod := webrtc.NewConn(pc) - prod.FormatName = "tuya/webrtc" - prod.Mode = core.ModeActiveProducer - prod.Protocol = "mqtt" - prod.URL = rawURL - - client.prod = prod - - // Set up MQTT handlers - client.api.mqtt.handleAnswer = func(answer AnswerFrame) { - desc := pion.SessionDescription{ - Type: pion.SDPTypePranswer, - SDP: answer.Sdp, + } else { + conf := pion.Configuration{ + ICEServers: client.api.iceServers, + ICETransportPolicy: pion.ICETransportPolicyAll, + BundlePolicy: pion.BundlePolicyMaxBundle, } - if err = pc.SetRemoteDescription(desc); err != nil { - client.Stop() - return + api, err := webrtc.NewAPI() + if err != nil { + client.api.Close() + return nil, err } - - if err = prod.SetAnswer(answer.Sdp); err != nil { + + pc, err := api.NewPeerConnection(conf) + if err != nil { + client.api.Close() + return nil, err + } + + // protect from sending ICE candidate before Offer + var sendOffer core.Waiter + + // protect from blocking on errors + defer sendOffer.Done(nil) + + // waiter will wait PC error or WS error or nil (connection OK) + var connState core.Waiter + + prod := webrtc.NewConn(pc) + prod.FormatName = "tuya/webrtc" + prod.Mode = core.ModeActiveProducer + prod.Protocol = "mqtt" + prod.URL = rawURL + + client.prod = prod + + // Set up MQTT handlers + client.api.mqtt.handleAnswer = func(answer AnswerFrame) { + desc := pion.SessionDescription{ + Type: pion.SDPTypePranswer, + SDP: answer.Sdp, + } + + if err = pc.SetRemoteDescription(desc); err != nil { + client.Stop() + return + } + + if err = prod.SetAnswer(answer.Sdp); err != nil { + client.Stop() + return + } + + prod.SDP = answer.Sdp + } + + client.api.mqtt.handleCandidate = func(candidate CandidateFrame) { + if candidate.Candidate != "" { + prod.AddCandidate(candidate.Candidate) + if err != nil { + client.Stop() + } + } + } + + client.api.mqtt.handleDisconnect = func() { client.Stop() - return } - prod.SDP = answer.Sdp - } + client.api.mqtt.handleError = func(err error) { + fmt.Printf("Tuya error: %s\n", err.Error()) + client.Stop() + } - client.api.mqtt.handleCandidate = func(candidate CandidateFrame) { - if candidate.Candidate != "" { - prod.AddCandidate(candidate.Candidate) - if err != nil { - client.Stop() + prod.Listen(func(msg any) { + switch msg := msg.(type) { + case *pion.ICECandidate: + _ = sendOffer.Wait() + client.api.sendCandidate("a=" + msg.ToJSON().Candidate) + + case pion.PeerConnectionState: + switch msg { + case pion.PeerConnectionStateNew: + break + case pion.PeerConnectionStateConnecting: + break + case pion.PeerConnectionStateConnected: + connState.Done(nil) + default: + connState.Done(errors.New("webrtc: " + msg.String())) + } + } + }) + + // Create offer + offer, err := prod.CreateOffer(client.api.medias) + if err != nil { + client.api.Close() + return nil, err + } + + // Send offer + client.api.sendOffer(offer) + sendOffer.Done(nil) + + if err = connState.Wait(); err != nil { + return nil, err + } + + if resolution != "" { + value, err := strconv.Atoi(resolution) + if err == nil { + client.api.sendResolution(value) } } + + return client, nil } - - client.api.mqtt.handleDisconnect = func() { - client.Stop() - } - - client.api.mqtt.handleError = func(err error) { - fmt.Printf("Tuya error: %s\n", err.Error()) - client.Stop() - } - - prod.Listen(func(msg any) { - switch msg := msg.(type) { - case *pion.ICECandidate: - _ = sendOffer.Wait() - client.api.sendCandidate("a=" + msg.ToJSON().Candidate) - - case pion.PeerConnectionState: - switch msg { - case pion.PeerConnectionStateNew: - break - case pion.PeerConnectionStateConnecting: - break - case pion.PeerConnectionStateConnected: - connState.Done(nil) - default: - connState.Done(errors.New("webrtc: " + msg.String())) - } - } - }) - - // Create offer - offer, err := prod.CreateOffer(client.api.medias) - if err != nil { - client.api.Close() - return nil, err - } - - // Send offer - client.api.sendOffer(offer) - sendOffer.Done(nil) - - if err = connState.Wait(); err != nil { - return nil, err - } - - if resolution != "" { - value, err := strconv.Atoi(resolution) - if err == nil { - client.api.sendResolution(value) - } - } - - return client, nil } func (c *Client) GetMedias() []*core.Media {