diff --git a/pkg/tuya/api.go b/pkg/tuya/api.go index 6ffa3dd7..d08a68dc 100644 --- a/pkg/tuya/api.go +++ b/pkg/tuya/api.go @@ -17,25 +17,23 @@ import ( ) type TuyaClient struct { - httpClient *http.Client - mqtt *TuyaMQTT - apiURL string - rtspURL string - hlsURL string - sessionId string - clientId string - clientSecret string - deviceId string - accessToken string - refreshToken string - expireTime int64 - uid string - motoId string - auth string - skill *Skill - iceServers []pionWebrtc.ICEServer - medias []*core.Media - hasBackchannel bool + httpClient *http.Client + mqtt *TuyaMQTT + apiURL string + rtspURL string + hlsURL string + sessionId string + clientId string + clientSecret string + deviceId string + accessToken string + refreshToken string + expireTime int64 + uid string + motoId string + auth string + skill *Skill + iceServers []pionWebrtc.ICEServer } type Token struct { @@ -159,21 +157,16 @@ type OpenIoTHubConfigResponse struct { Code int `json:"code,omitempty"` } -const ( - defaultTimeout = 5 * time.Second -) - -func NewTuyaClient(openAPIURL string, deviceId string, uid string, clientId string, clientSecret string, streamMode string) (*TuyaClient, error) { +func NewTuyaClient(openAPIURL string, deviceId string, uid string, clientId string, clientSecret string, streamMode string, streamRole string) (*TuyaClient, error) { client := &TuyaClient{ - httpClient: &http.Client{Timeout: defaultTimeout}, - mqtt: &TuyaMQTT{waiter: core.Waiter{}}, - apiURL: openAPIURL, - sessionId: core.RandString(6, 62), - clientId: clientId, - deviceId: deviceId, - clientSecret: clientSecret, - uid: uid, - hasBackchannel: false, + httpClient: &http.Client{Timeout: 5 * time.Second}, + mqtt: &TuyaMQTT{waiter: core.Waiter{}}, + apiURL: openAPIURL, + sessionId: core.RandString(6, 62), + clientId: clientId, + deviceId: deviceId, + clientSecret: clientSecret, + uid: uid, } if err := client.InitToken(); err != nil { @@ -189,7 +182,7 @@ func NewTuyaClient(openAPIURL string, deviceId string, uid string, clientId stri return nil, fmt.Errorf("failed to get HLS URL: %w", err) } } else { - if err := client.InitDevice(); err != nil { + if err := client.InitDevice(streamRole); err != nil { return nil, fmt.Errorf("failed to initialize device: %w", err) } @@ -206,6 +199,135 @@ func (c *TuyaClient) Close() { c.httpClient.CloseIdleConnections() } +func (c *TuyaClient) InitToken() (err error) { + url := fmt.Sprintf("https://%s/v1.0/token?grant_type=1", c.apiURL) + + c.accessToken = "" + c.refreshToken = "" + + body, err := c.Request("GET", url, nil) + if err != nil { + return err + } + + var tokenResponse TokenResponse + err = json.Unmarshal(body, &tokenResponse) + if err != nil { + return err + } + + if !tokenResponse.Success { + return fmt.Errorf(tokenResponse.Msg) + } + + c.accessToken = tokenResponse.Result.AccessToken + c.refreshToken = tokenResponse.Result.RefreshToken + c.expireTime = tokenResponse.Result.ExpireTime + + return nil +} + +func (c *TuyaClient) InitDevice(streamRole string) (err error) { + url := fmt.Sprintf("https://%s/v1.0/users/%s/devices/%s/webrtc-configs", c.apiURL, c.uid, c.deviceId) + + body, err := c.Request("GET", url, nil) + if err != nil { + return err + } + + var webRTCConfigResponse WebRTCConfigResponse + err = json.Unmarshal(body, &webRTCConfigResponse) + if err != nil { + return err + } + + if !webRTCConfigResponse.Success { + return fmt.Errorf(webRTCConfigResponse.Msg) + } + + err = json.Unmarshal([]byte(webRTCConfigResponse.Result.Skill), &c.skill) + if err != nil { + return err + } + + iceServers, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices) + if err != nil { + return err + } + + c.iceServers, err = webrtc.UnmarshalICEServers(iceServers) + if err != nil { + return err + } + + c.motoId = webRTCConfigResponse.Result.MotoID + c.auth = webRTCConfigResponse.Result.Auth + + return nil +} + +func (c *TuyaClient) GetStreamUrl(streamType string) (err error) { + url := fmt.Sprintf("https://%s/v1.0/devices/%s/stream/actions/allocate", c.apiURL, c.deviceId) + + request := &AllocateRequest{ + Type: streamType, + } + + body, err := c.Request("POST", url, request) + if err != nil { + return err + } + + var allosResponse AllocateResponse + err = json.Unmarshal(body, &allosResponse) + if err != nil { + return err + } + + if !allosResponse.Success { + return fmt.Errorf(allosResponse.Msg) + } + + switch streamType { + case "rtsp": + c.rtspURL = allosResponse.Result.URL + case "hls": + c.hlsURL = allosResponse.Result.URL + default: + return fmt.Errorf("unsupported stream type: %s", streamType) + } + + return nil +} + +func (c *TuyaClient) LoadHubConfig() (config *OpenIoTHubConfig, err error) { + url := fmt.Sprintf("https://%s/v2.0/open-iot-hub/access/config", c.apiURL) + + request := &OpenIoTHubConfigRequest{ + UID: c.uid, + UniqueID: uuid.New().String(), + LinkType: "mqtt", + Topics: "ipc", + } + + body, err := c.Request("POST", url, request) + if err != nil { + return nil, err + } + + var openIoTHubConfigResponse OpenIoTHubConfigResponse + err = json.Unmarshal(body, &openIoTHubConfigResponse) + if err != nil { + return nil, err + } + + if !openIoTHubConfigResponse.Success { + return nil, fmt.Errorf(openIoTHubConfigResponse.Msg) + } + + return &openIoTHubConfigResponse.Result, nil +} + func (c *TuyaClient) Request(method string, url string, body any) ([]byte, error) { var bodyReader io.Reader if body != nil { @@ -253,224 +375,7 @@ func (c *TuyaClient) Request(method string, url string, body any) ([]byte, error return res, nil } -func (c *TuyaClient) InitToken() (err error) { - url := fmt.Sprintf("https://%s/v1.0/token?grant_type=1", c.apiURL) - - c.accessToken = "" - c.refreshToken = "" - - body, err := c.Request("GET", url, nil) - if err != nil { - return err - } - - var tokenResponse TokenResponse - err = json.Unmarshal(body, &tokenResponse) - if err != nil { - return err - } - - if !tokenResponse.Success { - return fmt.Errorf("error: %s", tokenResponse.Msg) - } - - c.accessToken = tokenResponse.Result.AccessToken - c.refreshToken = tokenResponse.Result.RefreshToken - c.expireTime = tokenResponse.Result.ExpireTime - - return nil -} - -func (c *TuyaClient) InitDevice() (err error) { - url := fmt.Sprintf("https://%s/v1.0/users/%s/devices/%s/webrtc-configs", c.apiURL, c.uid, c.deviceId) - - body, err := c.Request("GET", url, nil) - if err != nil { - return err - } - - var webRTCConfigResponse WebRTCConfigResponse - err = json.Unmarshal(body, &webRTCConfigResponse) - if err != nil { - return err - } - - if !webRTCConfigResponse.Success { - return fmt.Errorf("error: %s", webRTCConfigResponse.Msg) - } - - c.motoId = webRTCConfigResponse.Result.MotoID - c.auth = webRTCConfigResponse.Result.Auth - - c.skill = &Skill{ - WebRTC: 3, // basic webrtc - Audios: make([]AudioSkill, 0), - Videos: make([]VideoSkill, 0), - } - - if webRTCConfigResponse.Result.Skill != "" { - _ = json.Unmarshal([]byte(webRTCConfigResponse.Result.Skill), c.skill) - } - - c.hasBackchannel = contains(webRTCConfigResponse.Result.AudioAttributes.CallMode, 2) && - contains(webRTCConfigResponse.Result.AudioAttributes.HardwareCapability, 1) - - c.medias = make([]*core.Media, 0) - - if len(c.skill.Audios) > 0 { - direction := core.DirectionRecvonly - if c.hasBackchannel { - direction = core.DirectionSendRecv - } - - codecs := make([]*core.Codec, 0) - for _, audio := range c.skill.Audios { - codecs = append(codecs, &core.Codec{ - Name: getAudioCodec(audio.CodecType), - ClockRate: uint32(audio.SampleRate), - Channels: uint8(audio.Channels), - }) - } - - c.medias = append(c.medias, &core.Media{ - Kind: core.KindAudio, - Direction: direction, - Codecs: codecs, - }) - } else { - // Use default values for Audio - c.medias = append(c.medias, &core.Media{ - Kind: core.KindAudio, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: core.CodecPCMU, - ClockRate: uint32(8000), - Channels: uint8(1), - }, - }, - }) - } - - if len(c.skill.Videos) > 0 { - codecs := make([]*core.Codec, 0) - for _, video := range c.skill.Videos { - if video.CodecType == 2 { - codecs = append(codecs, &core.Codec{ - Name: core.CodecH264, - ClockRate: uint32(video.SampleRate), - PayloadType: 96, - }) - } else if video.CodecType == 4 { - codecs = append(codecs, &core.Codec{ - Name: core.CodecH265, - ClockRate: uint32(video.SampleRate), - PayloadType: 96, - }) - } - } - - c.medias = append(c.medias, &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: codecs, - }) - } else { - // Use default values for Video - c.medias = append(c.medias, &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: core.CodecH264, - ClockRate: uint32(90000), - PayloadType: 96, - }, - { - Name: core.CodecH265, - ClockRate: uint32(90000), - PayloadType: 96, - }, - }, - }) - } - - iceServersBytes, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices) - if err != nil { - return err - } - - c.iceServers, err = webrtc.UnmarshalICEServers([]byte(iceServersBytes)) - if err != nil { - return err - } - - return nil -} - -func (c *TuyaClient) GetStreamUrl(streamType string) (err error) { - url := fmt.Sprintf("https://%s/v1.0/devices/%s/stream/actions/allocate", c.apiURL, c.deviceId) - - request := &AllocateRequest{ - Type: streamType, - } - - body, err := c.Request("POST", url, request) - if err != nil { - return err - } - - var allosResponse AllocateResponse - err = json.Unmarshal(body, &allosResponse) - if err != nil { - return err - } - - if !allosResponse.Success { - return fmt.Errorf("error: %s", allosResponse.Msg) - } - - switch streamType { - case "rtsp": - c.rtspURL = allosResponse.Result.URL - case "hls": - c.hlsURL = allosResponse.Result.URL - default: - return fmt.Errorf("unsupported stream type: %s", streamType) - } - - return nil -} - -func (c *TuyaClient) LoadHubConfig() (config *OpenIoTHubConfig, err error) { - url := fmt.Sprintf("https://%s/v2.0/open-iot-hub/access/config", c.apiURL) - - request := &OpenIoTHubConfigRequest{ - UID: c.uid, - UniqueID: uuid.New().String(), - LinkType: "mqtt", - Topics: "ipc", - } - - body, err := c.Request("POST", url, request) - if err != nil { - return nil, err - } - - var openIoTHubConfigResponse OpenIoTHubConfigResponse - err = json.Unmarshal(body, &openIoTHubConfigResponse) - if err != nil { - return nil, err - } - - if !openIoTHubConfigResponse.Success { - return nil, fmt.Errorf("error: %s", openIoTHubConfigResponse.Msg) - } - - return &openIoTHubConfigResponse.Result, nil -} - -func (c *TuyaClient) getStreamType(streamChoice string) int { +func (c *TuyaClient) getStreamType(streamRole string) int { // Default streamType if nothing is found defaultStreamType := 1 @@ -501,7 +406,7 @@ func (c *TuyaClient) getStreamType(streamChoice string) int { } // Return the streamType based on the selection - switch streamChoice { + switch streamRole { case "main": return highestResType case "sub": @@ -511,29 +416,6 @@ func (c *TuyaClient) getStreamType(streamChoice string) int { } } -func getAudioCodec(codecType int) string { - switch codecType { - // case 100: - // return "ADPCM" - case 101: - return core.CodecPCM - case 102, 103, 104: - return core.CodecAAC - case 105: - return core.CodecPCMU - case 106: - return core.CodecPCMA - // case 107: - // return "G726-32" - // case 108: - // return "SPEEX" - case 109: - return core.CodecMP3 - default: - return core.CodecPCMU - } -} - func (c *TuyaClient) isHEVC(streamType int) bool { for _, video := range c.skill.Videos { if video.StreamType == streamType { @@ -544,22 +426,9 @@ func (c *TuyaClient) isHEVC(streamType int) bool { return false } -func (c *TuyaClient) isClaritySupported(webrtcValue int) bool { - return (webrtcValue & (1 << 5)) != 0 -} - func (c *TuyaClient) calBusinessSign(ts int64) string { data := fmt.Sprintf("%s%s%s%d", c.clientId, c.accessToken, c.clientSecret, ts) val := md5.Sum([]byte(data)) res := fmt.Sprintf("%X", val) return res } - -func contains(slice []int, val int) bool { - for _, item := range slice { - if item == val { - return true - } - } - return false -} diff --git a/pkg/tuya/client.go b/pkg/tuya/client.go index 37697310..ecf4d480 100644 --- a/pkg/tuya/client.go +++ b/pkg/tuya/client.go @@ -1,6 +1,7 @@ package tuya import ( + "encoding/json" "errors" "fmt" "net/url" @@ -14,10 +15,30 @@ import ( ) type Client struct { - api *TuyaClient - conn *webrtc.Conn - dcConn *DCConn - done chan struct{} + api *TuyaClient + conn *webrtc.Conn + pc *pion.PeerConnection + dc *pion.DataChannel + videoSSRC uint32 + audioSSRC uint32 + isHEVC bool + connected core.Waiter + closed bool + handlers map[uint32]func(*rtp.Packet) +} + +type DataChannelMessage struct { + Type string `json:"type"` + Msg string `json:"msg"` +} + +type RecvMessage struct { + Video struct { + SSRC uint32 `json:"ssrc"` + } `json:"video"` + Audio struct { + SSRC uint32 `json:"ssrc"` + } `json:"audio"` } const ( @@ -30,7 +51,6 @@ const ( ) func Dial(rawURL string) (core.Producer, error) { - // Parse URL and validate basic params u, err := url.Parse(rawURL) if err != nil { return nil, err @@ -41,8 +61,13 @@ func Dial(rawURL string) (core.Producer, error) { uid := query.Get("uid") clientId := query.Get("client_id") clientSecret := query.Get("client_secret") - streamType := query.Get("type") + streamRole := query.Get("role") streamMode := query.Get("mode") + + if streamRole == "" || (streamRole != "main" && streamRole != "sub") { + streamRole = "main" + } + useRTSP := streamMode == "rtsp" useHLS := streamMode == "hls" useWebRTC := streamMode == "webrtc" || streamMode == "" @@ -72,14 +97,14 @@ func Dial(rawURL string) (core.Producer, error) { } // Initialize Tuya API client - tuyaAPI, err := NewTuyaClient(u.Hostname(), deviceID, uid, clientId, clientSecret, streamMode) + tuyaAPI, err := NewTuyaClient(u.Hostname(), deviceID, uid, clientId, clientSecret, streamMode, streamRole) if err != nil { - return nil, err + return nil, fmt.Errorf("tuya: %w", err) } client := &Client{ - api: tuyaAPI, - done: make(chan struct{}), + api: tuyaAPI, + handlers: make(map[uint32]func(*rtp.Packet)), } if useRTSP { @@ -93,8 +118,9 @@ func Dial(rawURL string) (core.Producer, error) { } return streams.GetProducer(client.api.hlsURL) } else { - isHEVC := client.api.isHEVC(client.api.getStreamType(streamType)) + client.isHEVC = client.api.isHEVC(client.api.getStreamType(streamRole)) + // Create a new PeerConnection conf := pion.Configuration{ ICEServers: client.api.iceServers, ICETransportPolicy: pion.ICETransportPolicyAll, @@ -107,7 +133,7 @@ func Dial(rawURL string) (core.Producer, error) { return nil, err } - pc, err := api.NewPeerConnection(conf) + client.pc, err = api.NewPeerConnection(conf) if err != nil { client.api.Close() return nil, err @@ -119,10 +145,8 @@ func Dial(rawURL string) (core.Producer, error) { // protect from blocking on errors defer sendOffer.Done(nil) - // waiter will wait PC error - var connState core.Waiter - - client.conn = webrtc.NewConn(pc) + // Create new WebRTC connection + client.conn = webrtc.NewConn(client.pc) client.conn.FormatName = "tuya/webrtc" client.conn.Mode = core.ModeActiveProducer client.conn.Protocol = "mqtt" @@ -137,8 +161,8 @@ func Dial(rawURL string) (core.Producer, error) { SDP: answer.Sdp, } - if err = pc.SetRemoteDescription(desc); err != nil { - client.Stop() + if err = client.pc.SetRemoteDescription(desc); err != nil { + client.connected.Done(err) return } @@ -147,7 +171,18 @@ func Dial(rawURL string) (core.Producer, error) { return } - client.conn.SDP = answer.Sdp + if client.isHEVC { + // Tuya answers always with H264 codec, replace with HEVC + for _, media := range client.conn.Medias { + if media.Kind == core.KindVideo { + for _, codec := range media.Codecs { + if codec.Name == core.CodecH264 { + codec.Name = core.CodecH265 + } + } + } + } + } } client.api.mqtt.handleCandidate = func(candidate CandidateFrame) { @@ -170,15 +205,57 @@ func Dial(rawURL string) (core.Producer, error) { client.Stop() } - // Set up data channel for HEVC - if isHEVC { - client.dcConn, err = NewDCConn(pc, client) - if err != nil { - client.api.Close() - return nil, err - } + // On HEVC, use DataChannel to receive video/audio + if client.isHEVC { + // Create a new DataChannel + maxRetransmits := uint16(5) + ordered := true + client.dc, err = client.pc.CreateDataChannel("fmp4Stream", &pion.DataChannelInit{ + MaxRetransmits: &maxRetransmits, + Ordered: &ordered, + }) + + // Set up data channel handler + client.dc.OnMessage(func(msg pion.DataChannelMessage) { + if msg.IsString { + client.probe(msg) + } else { + packet := &rtp.Packet{} + if err := packet.Unmarshal(msg.Data); err != nil { + return + } + + if handler, ok := client.handlers[packet.SSRC]; ok { + handler(packet) + } + } + }) + + client.dc.OnError(func(err error) { + // fmt.Printf("tuya: datachannel error: %s\n", err.Error()) + client.connected.Done(err) + }) + + client.dc.OnClose(func() { + // fmt.Println("tuya: datachannel closed") + client.connected.Done(errors.New("datachannel: closed")) + }) + + client.dc.OnOpen(func() { + // fmt.Println("tuya: datachannel opened") + + codecRequest, _ := json.Marshal(DataChannelMessage{ + Type: "codec", + Msg: "", + }) + + if err := client.sendMessageToDataChannel(codecRequest); err != nil { + client.connected.Done(fmt.Errorf("failed to send codec request: %w", err)) + } + }) } + // Set up pc handler client.conn.Listen(func(msg any) { switch msg := msg.(type) { case *pion.ICECandidate: @@ -192,16 +269,25 @@ func Dial(rawURL string) (core.Producer, error) { case pion.PeerConnectionStateConnecting: break case pion.PeerConnectionStateConnected: - connState.Done(nil) + // On HEVC, wait for DataChannel to be opened and camera to send codec info + if !client.isHEVC { + client.connected.Done(nil) + } default: client.Stop() - connState.Done(errors.New("webrtc: " + msg.String())) + client.connected.Done(errors.New("webrtc: " + msg.String())) } } }) + // Audio first, otherwise tuya will send corrupt sdp + medias := []*core.Media{ + {Kind: core.KindAudio, Direction: core.DirectionSendRecv}, + {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, + } + // Create offer - offer, err := client.conn.CreateOffer(client.api.medias) + offer, err := client.conn.CreateOffer(medias) if err != nil { client.api.Close() return nil, err @@ -213,20 +299,12 @@ func Dial(rawURL string) (core.Producer, error) { offer = re.ReplaceAllString(offer, "") // Send offer - client.api.sendOffer(offer, streamType) + client.api.sendOffer(offer, streamRole) sendOffer.Done(nil) - if client.dcConn != nil { - if err = client.dcConn.connected.Wait(); err != nil { - client.Stop() - return nil, err - } - - return client.dcConn, nil - } - - if err = connState.Wait(); err != nil { - return nil, err + // Wait for connection + if err = client.connected.Wait(); err != nil { + return nil, fmt.Errorf("tuya: %w", err) } return client, nil @@ -242,10 +320,15 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - // RepackG711 will not work, so add default logic without repacking + // Manually handle backchannel, because repacking audio through go2rtc does not work + + localTrack := c.getSender() + if localTrack == nil { + return errors.New("webrtc: can't get track") + } payloadType := codec.PayloadType - localTrack := c.conn.GetSenderTrack(media.ID) + sender := core.NewSender(media, codec) sender.Handler = func(packet *rtp.Packet) { c.conn.Send += packet.MarshalSize() @@ -260,29 +343,47 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece } func (c *Client) Start() error { - if c.dcConn != nil { - c.dcConn.Start() + if len(c.conn.Receivers) == 0 { + return errors.New("tuya: no receivers") + } + + var video, audio *core.Receiver + for _, receiver := range c.conn.Receivers { + if receiver.Codec.IsVideo() { + video = receiver + } else if receiver.Codec.IsAudio() { + audio = receiver + } + } + + c.handlers[c.videoSSRC] = func(packet *rtp.Packet) { + if video != nil { + video.WriteRTP(packet) + } + } + + c.handlers[c.audioSSRC] = func(packet *rtp.Packet) { + if audio != nil { + audio.WriteRTP(packet) + } } return c.conn.Start() } func (c *Client) Stop() error { - select { - case <-c.done: + if c.closed { return nil - default: - close(c.done) + } + + for ssrc := range c.handlers { + delete(c.handlers, ssrc) } if c.conn != nil { _ = c.conn.Stop() } - if c.dcConn != nil { - _ = c.dcConn.Stop() - } - if c.api != nil { c.api.Close() } @@ -293,3 +394,73 @@ func (c *Client) Stop() error { func (c *Client) MarshalJSON() ([]byte, error) { return c.conn.MarshalJSON() } + +func (c *Client) probe(msg pion.DataChannelMessage) { + // fmt.Printf("[tuya] Received string message: %s\n", string(msg.Data)) + + var message DataChannelMessage + if err := json.Unmarshal([]byte(msg.Data), &message); err != nil { + c.connected.Done(fmt.Errorf("failed to parse datachannel message: %w", err)) + } + + switch message.Type { + case "codec": + // fmt.Printf("[tuya] Codec info from camera: %s\n", message.Msg) + + frameRequest, _ := json.Marshal(DataChannelMessage{ + Type: "start", + Msg: "frame", + }) + + err := c.sendMessageToDataChannel(frameRequest) + if err != nil { + c.connected.Done(fmt.Errorf("failed to send frame request: %w", err)) + } + + case "recv": + var recvMessage RecvMessage + if err := json.Unmarshal([]byte(message.Msg), &recvMessage); err != nil { + c.connected.Done(fmt.Errorf("failed to parse recv message: %w", err)) + return + } + + c.videoSSRC = recvMessage.Video.SSRC + c.audioSSRC = recvMessage.Audio.SSRC + + completeMsg, _ := json.Marshal(DataChannelMessage{ + Type: "complete", + Msg: "", + }) + + err := c.sendMessageToDataChannel(completeMsg) + if err != nil { + c.connected.Done(fmt.Errorf("failed to send complete message: %w", err)) + } + + c.connected.Done(nil) + } +} + +func (c *Client) sendMessageToDataChannel(message []byte) error { + if c.dc != nil { + // fmt.Printf("[tuya] sending message to data channel: %s\n", message) + return c.dc.Send(message) + } + + return nil +} + +func (c *Client) getSender() *webrtc.Track { + for _, tr := range c.pc.GetTransceivers() { + if tr.Kind() == pion.RTPCodecTypeAudio { + if tr.Kind() == pion.RTPCodecType(pion.RTPTransceiverDirectionSendonly) || tr.Kind() == pion.RTPCodecType(pion.RTPTransceiverDirectionSendrecv) { + if s := tr.Sender(); s != nil { + if t := s.Track().(*webrtc.Track); t != nil { + return t + } + } + } + } + } + return nil +} diff --git a/pkg/tuya/dc.go b/pkg/tuya/dc.go deleted file mode 100644 index 2ccd1313..00000000 --- a/pkg/tuya/dc.go +++ /dev/null @@ -1,253 +0,0 @@ -package tuya - -import ( - "encoding/json" - "errors" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/mp4" - "github.com/pion/rtp" - pion "github.com/pion/webrtc/v4" -) - -type DCConn struct { - core.Connection - - client *Client - dc *pion.DataChannel - dem *mp4.Demuxer - queue *FrameBufferQueue - msgs chan pion.DataChannelMessage - connected core.Waiter - closed core.Waiter - initialized bool -} - -type DataChannelMessage struct { - Type string `json:"type"` - Msg string `json:"msg"` -} - -func NewDCConn(pc *pion.PeerConnection, c *Client) (*DCConn, error) { - maxRetransmits := uint16(5) - ordered := true - dc, err := pc.CreateDataChannel("fmp4Stream", &pion.DataChannelInit{ - MaxRetransmits: &maxRetransmits, - Ordered: &ordered, - }) - - if err != nil { - return nil, err - } - - conn := &DCConn{ - Connection: core.Connection{ - ID: core.NewID(), - FormatName: "webrtc/fmp4", - Transport: dc, - }, - client: c, - dc: dc, - dem: &mp4.Demuxer{}, - queue: NewFrameBufferQueue(), - msgs: make(chan pion.DataChannelMessage, 10), // Saw max 4 messages in a row, 10 should be enough - initialized: false, - } - - dc.OnMessage(func(msg pion.DataChannelMessage) { - conn.msgs <- msg - }) - - dc.OnError(func(err error) { - conn.connected.Done(err) - }) - - dc.OnClose(func() { - close(conn.msgs) - conn.connected.Done(errors.New("datachannel: closed")) - }) - - go conn.initializationLoop() - - return conn, nil -} - -func (c *DCConn) initializationLoop() { - for msg := range c.msgs { - if c.initialized { - return - } - - err := c.probe(msg) - if err != nil { - c.connected.Done(err) - return - } - - if c.initialized { - c.connected.Done(nil) - return - } - } -} - -func (c *DCConn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - if media.Direction == core.DirectionSendRecv || media.Direction == core.DirectionSendonly { - return c.client.GetTrack(media, codec) - } - - for _, receiver := range c.Receivers { - if receiver.Codec == codec { - return receiver, nil - } - } - receiver := core.NewReceiver(media, codec) - c.Receivers = append(c.Receivers, receiver) - return receiver, nil -} - -func (c *DCConn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - payloadType := codec.PayloadType - localTrack := c.client.conn.GetSenderTrack(media.ID) - sender := core.NewSender(media, codec) - sender.Handler = func(packet *rtp.Packet) { - c.Send += packet.MarshalSize() - //important to send with remote PayloadType - _ = localTrack.WriteRTP(payloadType, packet) - } - - sender.HandleRTP(track) - c.Senders = append(c.Senders, sender) - - return nil -} - -func (c *DCConn) Start() error { - receivers := make(map[uint32]*core.Receiver) - for _, receiver := range c.Receivers { - trackID := c.dem.GetTrackID(receiver.Codec) - receivers[trackID] = receiver - } - - ch := make(chan []byte, 10) - defer close(ch) - - go func() { - for data := range ch { - allTracks := c.dem.DemuxAll(data) - for _, trackData := range allTracks { - trackID := trackData.TrackID - packets := trackData.Packets - receiver := receivers[trackID] - if receiver == nil { - continue - } - - for _, packet := range packets { - receiver.WriteRTP(packet) - } - } - } - }() - - go func() { - for msg := range c.msgs { - if len(msg.Data) >= 4 { - segmentNum := int(msg.Data[1]) - fragmentCount := int(msg.Data[2]) - fragmentSeq := int(msg.Data[3]) - mp4Data := msg.Data[4:] - - c.queue.AddFragment(segmentNum, fragmentCount, fragmentSeq, mp4Data) - - if c.queue.IsSegmentComplete(segmentNum, fragmentCount) { - b := c.queue.GetCombinedBuffer(segmentNum) - c.Recv += len(b) - ch <- b - } - } - } - }() - - c.closed.Wait() - return nil -} - -func (c *DCConn) sendMessageToDataChannel(message string) error { - if c.dc != nil { - return c.dc.SendText(message) - } - - return nil -} - -func (c *DCConn) probe(msg pion.DataChannelMessage) (err error) { - if msg.IsString { - var message DataChannelMessage - if err = json.Unmarshal(msg.Data, &message); err != nil { - return err - } - - switch message.Type { - case "codec": - response, _ := json.Marshal(DataChannelMessage{ - Type: "start", - Msg: "fmp4", - }) - - err = c.sendMessageToDataChannel(string(response)) - if err != nil { - return err - } - - case "recv": - response, _ := json.Marshal(DataChannelMessage{ - Type: "complete", - Msg: "", - }) - - err = c.sendMessageToDataChannel(string(response)) - if err != nil { - return err - } - } - - } else { - if len(msg.Data) >= 4 { - messageType := msg.Data[0] - segmentNum := int(msg.Data[1]) - fragmentCount := int(msg.Data[2]) - fragmentSeq := int(msg.Data[3]) - mp4Data := msg.Data[4:] - - // initialization segment - if messageType == 0 && segmentNum == 1 && fragmentCount == 1 && fragmentSeq == 1 { - medias := c.dem.Probe(mp4Data) - c.Medias = append(c.Medias, medias...) - - // Add backchannel - webrtcMedias := c.client.GetMedias() - for _, media := range webrtcMedias { - if media.Kind == core.KindAudio { - if media.Direction == core.DirectionSendRecv || media.Direction == core.DirectionSendonly { - c.Medias = append(c.Medias, media) - } - } - } - - c.initialized = true - } - } - } - - return nil -} - -func (c *DCConn) Stop() error { - if c.dc != nil && c.dc.ReadyState() == pion.DataChannelStateOpen { - _ = c.dc.Close() - } - - c.closed.Done(nil) - return nil -} diff --git a/pkg/tuya/frameBuffer.go b/pkg/tuya/frameBuffer.go deleted file mode 100644 index bbcb4ff5..00000000 --- a/pkg/tuya/frameBuffer.go +++ /dev/null @@ -1,86 +0,0 @@ -package tuya - -import ( - "sort" - "sync" -) - -type FrameBufferQueue struct { - segments map[int]map[int][]byte // segNum -> fragSeq -> data - mu sync.Mutex -} - -func NewFrameBufferQueue() *FrameBufferQueue { - return &FrameBufferQueue{ - segments: make(map[int]map[int][]byte), - } -} - -func (q *FrameBufferQueue) AddFragment(segmentNum, fragmentCount, fragmentSeq int, data []byte) { - q.mu.Lock() - defer q.mu.Unlock() - - if _, ok := q.segments[segmentNum]; !ok { - q.segments[segmentNum] = make(map[int][]byte) - } - - q.segments[segmentNum][fragmentSeq] = data -} - -func (q *FrameBufferQueue) IsSegmentComplete(segmentNum, fragmentCount int) bool { - q.mu.Lock() - defer q.mu.Unlock() - - if frags, ok := q.segments[segmentNum]; ok { - // Make sure we have the right number of fragments - if len(frags) != fragmentCount { - return false - } - - // Check if we have all sequences from 1 to fragmentCount - for i := 1; i <= fragmentCount; i++ { - if _, ok := frags[i]; !ok { - return false - } - } - - return true - } - - return false -} - -func (q *FrameBufferQueue) GetCombinedBuffer(segNum int) []byte { - q.mu.Lock() - defer q.mu.Unlock() - - if frags, ok := q.segments[segNum]; ok { - // Sort fragments by sequence number - var keys []int - for k := range frags { - keys = append(keys, k) - } - sort.Ints(keys) - - // Calculate total size for pre-allocation - totalSize := 0 - for _, k := range keys { - totalSize += len(frags[k]) - } - - // Pre-allocate buffer for better performance - combined := make([]byte, 0, totalSize) - - // Combine fragments in sequence order - for _, k := range keys { - combined = append(combined, frags[k]...) - } - - // Remove this segment to free memory - delete(q.segments, segNum) - - return combined - } - - return nil -} diff --git a/pkg/tuya/mqtt.go b/pkg/tuya/mqtt.go index 9eb25b49..ea69f60e 100644 --- a/pkg/tuya/mqtt.go +++ b/pkg/tuya/mqtt.go @@ -131,7 +131,7 @@ func (c *TuyaClient) onConnect(client mqtt.Client) { func (c *TuyaClient) consume(client mqtt.Client, msg mqtt.Message) { var rmqtt MqttMessage if err := json.Unmarshal(msg.Payload(), &rmqtt); err != nil { - c.mqtt.onError(fmt.Errorf("unmarshal mqtt message fail: %s, payload: %s", err.Error(), string(msg.Payload()))) + c.mqtt.onError(err) return } @@ -152,10 +152,7 @@ func (c *TuyaClient) consume(client mqtt.Client, msg mqtt.Message) { func (c *TuyaMQTT) onMqttAnswer(msg *MqttMessage) { var answerFrame AnswerFrame if err := json.Unmarshal(msg.Data.Message, &answerFrame); err != nil { - c.onError(fmt.Errorf("unmarshal mqtt answer frame fail: %s, session: %s, frame: %s", - err.Error(), - msg.Data.Header.SessionID, - string(msg.Data.Message))) + c.onError(err) return } @@ -165,10 +162,7 @@ func (c *TuyaMQTT) onMqttAnswer(msg *MqttMessage) { func (c *TuyaMQTT) onMqttCandidate(msg *MqttMessage) { var candidateFrame CandidateFrame if err := json.Unmarshal(msg.Data.Message, &candidateFrame); err != nil { - c.onError(fmt.Errorf("unmarshal mqtt candidate frame fail: %s, session: %s, frame: %s", - err.Error(), - msg.Data.Header.SessionID, - string(msg.Data.Message))) + c.onError(err) return } @@ -208,37 +202,48 @@ func (c *TuyaMQTT) onError(err error) { } } -func (c *TuyaClient) sendOffer(sdp string, streamType string) { - fixedStreamType := c.getStreamType(streamType) - isHEVC := c.isHEVC(fixedStreamType) +func (c *TuyaClient) sendOffer(sdp string, streamRole string) { + streamType := c.getStreamType(streamRole) + isHEVC := c.isHEVC(streamType) if isHEVC { // On HEVC we use streamType 0 for main stream and 1 for sub stream - if streamType == "main" { - fixedStreamType = 0 + if streamRole == "main" { + streamType = 0 } else { - fixedStreamType = 1 + streamType = 1 } } - c.sendMqttMessage("offer", 302, "", OfferFrame{ + err := c.sendMqttMessage("offer", 302, "", OfferFrame{ Mode: "webrtc", Sdp: sdp, - StreamType: fixedStreamType, + StreamType: streamType, Auth: c.auth, DatachannelEnable: isHEVC, }) + + if err != nil { + c.mqtt.onError(err) + return + } } func (c *TuyaClient) sendCandidate(candidate string) { - c.sendMqttMessage("candidate", 302, "", CandidateFrame{ + err := c.sendMqttMessage("candidate", 302, "", CandidateFrame{ Mode: "webrtc", Candidate: candidate, }) + + if err != nil { + c.mqtt.onError(err) + return + } } func (c *TuyaClient) sendResolution(resolution int) { - if !c.isClaritySupported(resolution) { + isClaritySupperted := (c.skill.WebRTC & (1 << 5)) != 0 + if !isClaritySupperted { return } @@ -261,16 +266,14 @@ func (c *TuyaClient) sendDisconnect() { }) } -func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transactionID string, data interface{}) { +func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transactionID string, data interface{}) error { if c.mqtt.closed { - c.mqtt.onError(fmt.Errorf("mqtt client is closed, send mqtt message fail")) - return + return fmt.Errorf("mqtt client is closed, send mqtt message fail") } jsonMessage, err := json.Marshal(data) if err != nil { - c.mqtt.onError(fmt.Errorf("marshal mqtt message fail: %s", err.Error())) - return + return err } msg := &MqttMessage{ @@ -292,12 +295,13 @@ func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transacti payload, err := json.Marshal(msg) if err != nil { - c.mqtt.onError(fmt.Errorf("marshal mqtt message fail: %s", err.Error())) - return + return err } token := c.mqtt.client.Publish(c.mqtt.publishTopic, 1, false, payload) if token.Wait() && token.Error() != nil { - c.mqtt.onError(fmt.Errorf("mqtt publish fail: %s, topic: %s", token.Error().Error(), c.mqtt.publishTopic)) + return token.Error() } + + return nil } diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index 063831f0..092b05c8 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -217,4 +217,4 @@ func sanitizeIP6(host string) string { return "[" + host + "]" } return host -} \ No newline at end of file +} diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index 1efb1507..ebc3a008 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -87,4 +87,4 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv c.Senders = append(c.Senders, sender) return nil -} \ No newline at end of file +}