From 3d222136f95c4de1f219f4db1c7a95ba57720198 Mon Sep 17 00:00:00 2001 From: seydx Date: Thu, 15 May 2025 10:45:04 +0200 Subject: [PATCH] support h265 --- pkg/tuya/api.go | 70 ++++++----- pkg/tuya/client.go | 45 +++++-- pkg/tuya/dc.go | 253 ++++++++++++++++++++++++++++++++++++++++ pkg/tuya/frameBuffer.go | 86 ++++++++++++++ pkg/tuya/mqtt.go | 83 +++---------- 5 files changed, 428 insertions(+), 109 deletions(-) create mode 100644 pkg/tuya/dc.go create mode 100644 pkg/tuya/frameBuffer.go diff --git a/pkg/tuya/api.go b/pkg/tuya/api.go index 13e8265e..6ffa3dd7 100644 --- a/pkg/tuya/api.go +++ b/pkg/tuya/api.go @@ -67,22 +67,26 @@ type P2PConfig struct { Ices []OpenApiICE `json:"ices"` } +type AudioSkill struct { + Channels int `json:"channels"` + DataBit int `json:"dataBit"` + CodecType int `json:"codecType"` + SampleRate int `json:"sampleRate"` +} + +type VideoSkill struct { + StreamType int `json:"streamType"` // 2 = main stream, 4 = sub stream + ProfileId string `json:"profileId,omitempty"` + CodecType int `json:"codecType"` // 2 = H264, 4 = H265 + Width int `json:"width"` + Height int `json:"height"` + SampleRate int `json:"sampleRate"` +} + type Skill struct { - WebRTC int `json:"webrtc"` - Audios []struct { - Channels int `json:"channels"` - DataBit int `json:"dataBit"` - CodecType int `json:"codecType"` - SampleRate int `json:"sampleRate"` - } `json:"audios"` - Videos []struct { - StreamType int `json:"streamType"` // 2 = main stream, 4 = sub stream - ProfileId string `json:"profileId"` - Width int `json:"width"` - CodecType int `json:"codecType"` // 2 = H264, 4 = H265 - SampleRate int `json:"sampleRate"` - Height int `json:"height"` - } `json:"videos"` + WebRTC int `json:"webrtc"` + Audios []AudioSkill `json:"audios"` + Videos []VideoSkill `json:"videos"` } type WebRTConfig struct { @@ -299,20 +303,9 @@ func (c *TuyaClient) InitDevice() (err error) { c.auth = webRTCConfigResponse.Result.Auth c.skill = &Skill{ - Audios: []struct { - Channels int `json:"channels"` - DataBit int `json:"dataBit"` - CodecType int `json:"codecType"` - SampleRate int `json:"sampleRate"` - }{}, - Videos: []struct { - StreamType int `json:"streamType"` - ProfileId string `json:"profileId"` - Width int `json:"width"` - CodecType int `json:"codecType"` - SampleRate int `json:"sampleRate"` - Height int `json:"height"` - }{}, + WebRTC: 3, // basic webrtc + Audios: make([]AudioSkill, 0), + Videos: make([]VideoSkill, 0), } if webRTCConfigResponse.Result.Skill != "" { @@ -393,6 +386,11 @@ func (c *TuyaClient) InitDevice() (err error) { ClockRate: uint32(90000), PayloadType: 96, }, + { + Name: core.CodecH265, + ClockRate: uint32(90000), + PayloadType: 96, + }, }, }) } @@ -536,6 +534,20 @@ func getAudioCodec(codecType int) string { } } +func (c *TuyaClient) isHEVC(streamType int) bool { + for _, video := range c.skill.Videos { + if video.StreamType == streamType { + return video.CodecType == 4 + } + } + + return false +} + +func (c *TuyaClient) isClaritySupported(webrtcValue int) bool { + return (webrtcValue & (1 << 5)) != 0 +} + func (c *TuyaClient) calBusinessSign(ts int64) string { data := fmt.Sprintf("%s%s%s%d", c.clientId, c.accessToken, c.clientSecret, ts) val := md5.Sum([]byte(data)) diff --git a/pkg/tuya/client.go b/pkg/tuya/client.go index 3fb7bf07..37697310 100644 --- a/pkg/tuya/client.go +++ b/pkg/tuya/client.go @@ -14,9 +14,10 @@ import ( ) type Client struct { - api *TuyaClient - conn *webrtc.Conn - done chan struct{} + api *TuyaClient + conn *webrtc.Conn + dcConn *DCConn + done chan struct{} } const ( @@ -92,6 +93,8 @@ func Dial(rawURL string) (core.Producer, error) { } return streams.GetProducer(client.api.hlsURL) } else { + isHEVC := client.api.isHEVC(client.api.getStreamType(streamType)) + conf := pion.Configuration{ ICEServers: client.api.iceServers, ICETransportPolicy: pion.ICETransportPolicyAll, @@ -116,7 +119,7 @@ func Dial(rawURL string) (core.Producer, error) { // protect from blocking on errors defer sendOffer.Done(nil) - // waiter will wait PC error or WS error or nil (connection OK) + // waiter will wait PC error var connState core.Waiter client.conn = webrtc.NewConn(pc) @@ -167,6 +170,15 @@ func Dial(rawURL string) (core.Producer, error) { client.Stop() } + // Set up data channel for HEVC + if isHEVC { + client.dcConn, err = NewDCConn(pc, client) + if err != nil { + client.api.Close() + return nil, err + } + } + client.conn.Listen(func(msg any) { switch msg := msg.(type) { case *pion.ICECandidate: @@ -182,6 +194,7 @@ func Dial(rawURL string) (core.Producer, error) { case pion.PeerConnectionStateConnected: connState.Done(nil) default: + client.Stop() connState.Done(errors.New("webrtc: " + msg.String())) } } @@ -200,9 +213,18 @@ func Dial(rawURL string) (core.Producer, error) { offer = re.ReplaceAllString(offer, "") // Send offer - client.api.sendOffer(offer, tuyaAPI.getStreamType(streamType)) + client.api.sendOffer(offer, streamType) sendOffer.Done(nil) + if client.dcConn != nil { + if err = client.dcConn.connected.Wait(); err != nil { + client.Stop() + return nil, err + } + + return client.dcConn, nil + } + if err = connState.Wait(); err != nil { return nil, err } @@ -223,12 +245,7 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece // RepackG711 will not work, so add default logic without repacking payloadType := codec.PayloadType - localTrack := c.conn.GetSenderTrack(media.ID) - if localTrack == nil { - return errors.New("webrtc: can't get track") - } - sender := core.NewSender(media, codec) sender.Handler = func(packet *rtp.Packet) { c.conn.Send += packet.MarshalSize() @@ -243,6 +260,10 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece } func (c *Client) Start() error { + if c.dcConn != nil { + c.dcConn.Start() + } + return c.conn.Start() } @@ -258,6 +279,10 @@ func (c *Client) Stop() error { _ = c.conn.Stop() } + if c.dcConn != nil { + _ = c.dcConn.Stop() + } + if c.api != nil { c.api.Close() } diff --git a/pkg/tuya/dc.go b/pkg/tuya/dc.go new file mode 100644 index 00000000..2ccd1313 --- /dev/null +++ b/pkg/tuya/dc.go @@ -0,0 +1,253 @@ +package tuya + +import ( + "encoding/json" + "errors" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/mp4" + "github.com/pion/rtp" + pion "github.com/pion/webrtc/v4" +) + +type DCConn struct { + core.Connection + + client *Client + dc *pion.DataChannel + dem *mp4.Demuxer + queue *FrameBufferQueue + msgs chan pion.DataChannelMessage + connected core.Waiter + closed core.Waiter + initialized bool +} + +type DataChannelMessage struct { + Type string `json:"type"` + Msg string `json:"msg"` +} + +func NewDCConn(pc *pion.PeerConnection, c *Client) (*DCConn, error) { + maxRetransmits := uint16(5) + ordered := true + dc, err := pc.CreateDataChannel("fmp4Stream", &pion.DataChannelInit{ + MaxRetransmits: &maxRetransmits, + Ordered: &ordered, + }) + + if err != nil { + return nil, err + } + + conn := &DCConn{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "webrtc/fmp4", + Transport: dc, + }, + client: c, + dc: dc, + dem: &mp4.Demuxer{}, + queue: NewFrameBufferQueue(), + msgs: make(chan pion.DataChannelMessage, 10), // Saw max 4 messages in a row, 10 should be enough + initialized: false, + } + + dc.OnMessage(func(msg pion.DataChannelMessage) { + conn.msgs <- msg + }) + + dc.OnError(func(err error) { + conn.connected.Done(err) + }) + + dc.OnClose(func() { + close(conn.msgs) + conn.connected.Done(errors.New("datachannel: closed")) + }) + + go conn.initializationLoop() + + return conn, nil +} + +func (c *DCConn) initializationLoop() { + for msg := range c.msgs { + if c.initialized { + return + } + + err := c.probe(msg) + if err != nil { + c.connected.Done(err) + return + } + + if c.initialized { + c.connected.Done(nil) + return + } + } +} + +func (c *DCConn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + if media.Direction == core.DirectionSendRecv || media.Direction == core.DirectionSendonly { + return c.client.GetTrack(media, codec) + } + + for _, receiver := range c.Receivers { + if receiver.Codec == codec { + return receiver, nil + } + } + receiver := core.NewReceiver(media, codec) + c.Receivers = append(c.Receivers, receiver) + return receiver, nil +} + +func (c *DCConn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + payloadType := codec.PayloadType + localTrack := c.client.conn.GetSenderTrack(media.ID) + sender := core.NewSender(media, codec) + sender.Handler = func(packet *rtp.Packet) { + c.Send += packet.MarshalSize() + //important to send with remote PayloadType + _ = localTrack.WriteRTP(payloadType, packet) + } + + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + + return nil +} + +func (c *DCConn) Start() error { + receivers := make(map[uint32]*core.Receiver) + for _, receiver := range c.Receivers { + trackID := c.dem.GetTrackID(receiver.Codec) + receivers[trackID] = receiver + } + + ch := make(chan []byte, 10) + defer close(ch) + + go func() { + for data := range ch { + allTracks := c.dem.DemuxAll(data) + for _, trackData := range allTracks { + trackID := trackData.TrackID + packets := trackData.Packets + receiver := receivers[trackID] + if receiver == nil { + continue + } + + for _, packet := range packets { + receiver.WriteRTP(packet) + } + } + } + }() + + go func() { + for msg := range c.msgs { + if len(msg.Data) >= 4 { + segmentNum := int(msg.Data[1]) + fragmentCount := int(msg.Data[2]) + fragmentSeq := int(msg.Data[3]) + mp4Data := msg.Data[4:] + + c.queue.AddFragment(segmentNum, fragmentCount, fragmentSeq, mp4Data) + + if c.queue.IsSegmentComplete(segmentNum, fragmentCount) { + b := c.queue.GetCombinedBuffer(segmentNum) + c.Recv += len(b) + ch <- b + } + } + } + }() + + c.closed.Wait() + return nil +} + +func (c *DCConn) sendMessageToDataChannel(message string) error { + if c.dc != nil { + return c.dc.SendText(message) + } + + return nil +} + +func (c *DCConn) probe(msg pion.DataChannelMessage) (err error) { + if msg.IsString { + var message DataChannelMessage + if err = json.Unmarshal(msg.Data, &message); err != nil { + return err + } + + switch message.Type { + case "codec": + response, _ := json.Marshal(DataChannelMessage{ + Type: "start", + Msg: "fmp4", + }) + + err = c.sendMessageToDataChannel(string(response)) + if err != nil { + return err + } + + case "recv": + response, _ := json.Marshal(DataChannelMessage{ + Type: "complete", + Msg: "", + }) + + err = c.sendMessageToDataChannel(string(response)) + if err != nil { + return err + } + } + + } else { + if len(msg.Data) >= 4 { + messageType := msg.Data[0] + segmentNum := int(msg.Data[1]) + fragmentCount := int(msg.Data[2]) + fragmentSeq := int(msg.Data[3]) + mp4Data := msg.Data[4:] + + // initialization segment + if messageType == 0 && segmentNum == 1 && fragmentCount == 1 && fragmentSeq == 1 { + medias := c.dem.Probe(mp4Data) + c.Medias = append(c.Medias, medias...) + + // Add backchannel + webrtcMedias := c.client.GetMedias() + for _, media := range webrtcMedias { + if media.Kind == core.KindAudio { + if media.Direction == core.DirectionSendRecv || media.Direction == core.DirectionSendonly { + c.Medias = append(c.Medias, media) + } + } + } + + c.initialized = true + } + } + } + + return nil +} + +func (c *DCConn) Stop() error { + if c.dc != nil && c.dc.ReadyState() == pion.DataChannelStateOpen { + _ = c.dc.Close() + } + + c.closed.Done(nil) + return nil +} diff --git a/pkg/tuya/frameBuffer.go b/pkg/tuya/frameBuffer.go new file mode 100644 index 00000000..bbcb4ff5 --- /dev/null +++ b/pkg/tuya/frameBuffer.go @@ -0,0 +1,86 @@ +package tuya + +import ( + "sort" + "sync" +) + +type FrameBufferQueue struct { + segments map[int]map[int][]byte // segNum -> fragSeq -> data + mu sync.Mutex +} + +func NewFrameBufferQueue() *FrameBufferQueue { + return &FrameBufferQueue{ + segments: make(map[int]map[int][]byte), + } +} + +func (q *FrameBufferQueue) AddFragment(segmentNum, fragmentCount, fragmentSeq int, data []byte) { + q.mu.Lock() + defer q.mu.Unlock() + + if _, ok := q.segments[segmentNum]; !ok { + q.segments[segmentNum] = make(map[int][]byte) + } + + q.segments[segmentNum][fragmentSeq] = data +} + +func (q *FrameBufferQueue) IsSegmentComplete(segmentNum, fragmentCount int) bool { + q.mu.Lock() + defer q.mu.Unlock() + + if frags, ok := q.segments[segmentNum]; ok { + // Make sure we have the right number of fragments + if len(frags) != fragmentCount { + return false + } + + // Check if we have all sequences from 1 to fragmentCount + for i := 1; i <= fragmentCount; i++ { + if _, ok := frags[i]; !ok { + return false + } + } + + return true + } + + return false +} + +func (q *FrameBufferQueue) GetCombinedBuffer(segNum int) []byte { + q.mu.Lock() + defer q.mu.Unlock() + + if frags, ok := q.segments[segNum]; ok { + // Sort fragments by sequence number + var keys []int + for k := range frags { + keys = append(keys, k) + } + sort.Ints(keys) + + // Calculate total size for pre-allocation + totalSize := 0 + for _, k := range keys { + totalSize += len(frags[k]) + } + + // Pre-allocate buffer for better performance + combined := make([]byte, 0, totalSize) + + // Combine fragments in sequence order + for _, k := range keys { + combined = append(combined, frags[k]...) + } + + // Remove this segment to free memory + delete(q.segments, segNum) + + return combined + } + + return nil +} diff --git a/pkg/tuya/mqtt.go b/pkg/tuya/mqtt.go index 88f3a199..9eb25b49 100644 --- a/pkg/tuya/mqtt.go +++ b/pkg/tuya/mqtt.go @@ -208,68 +208,25 @@ func (c *TuyaMQTT) onError(err error) { } } -func (c *TuyaClient) sendOffer(sdp string, streamType int) { - // Note: - // H265 is currently not supported because Tuya does not send H265 data, and therefore also no audio over the normal WebRTC connection. - // The WebRTC connection is used only for sending audio back to the device (backchannel). - // Tuya expects a separate WebRTC DataChannel for H265 data and sends the H265 video and audio data packaged as fMP4 data back. - // These must then be processed separately (WIP - Work In Progress) +func (c *TuyaClient) sendOffer(sdp string, streamType string) { + fixedStreamType := c.getStreamType(streamType) + isHEVC := c.isHEVC(fixedStreamType) - // Note 2: - // Even if we don't receive any data, the peer connection is correctly established and connected - - // Note 3: - // It seems that if even one stream is HEVC, we also need to use the datachannel for the substream, even if that substream is using H264. - - // Example Answer (H265/PCMU with backchannel): - - /* - v=0 - o=- 1747174385 1 IN IP4 127.0.0.1 - s=- - t=0 0 - a=group:BUNDLE 0 1 - a=msid-semantic: WMS UMSklk - m=audio 9 UDP/TLS/RTP/SAVPF 0 - c=IN IP4 0.0.0.0 - a=rtcp:9 IN IP4 0.0.0.0 - a=ice-ufrag:zuRr - a=ice-pwd:EDeWXz847P810fyDyKxbmTdX - a=ice-options:trickle - a=fingerprint:sha-256 02:f5:44:8e:c6:5d:5c:59:49:50:a3:84:d5:e5:b9:35:bb:51:5a:0c:4d:a5:60:89:0f:e6:cb:0e:57:21:a0:14 - a=setup:active - a=mid:0 - a=sendrecv - a=msid:UMSklk NiNNboEn1rJWoQYtpguoKr1GBwpvPST - a=rtcp-mux - a=rtpmap:0 PCMU/8000 - a=ssrc:832759612 cname:bfa87264438073154dhdek - m=video 9 UDP/TLS/RTP/SAVPF 0 - c=IN IP4 0.0.0.0 - a=rtcp:9 IN IP4 0.0.0.0 - a=ice-ufrag:zuRr - a=ice-pwd:EDeWXz847P810fyDyKxbmTdX - a=ice-options:trickle - a=fingerprint:sha-256 02:f5:44:8e:c6:5d:5c:59:49:50:a3:84:d5:e5:b9:35:bb:51:5a:0c:4d:a5:60:89:0f:e6:cb:0e:57:21:a0:14 - a=setup:active - a=mid:1 - a=sendonly - a=msid:UMSklk l9o6icIVb7n7vDdp0KhocYnsijhd774 - a=rtcp-mux - a=rtpmap:0 /0 - a=rtcp-fb:0 ccm fir - a=rtcp-fb:0 nack - a=rtcp-fb:0 nack pli - a=fmtp:0 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id= - a=ssrc:0 cname:bfa87264438073154dhdek - */ + if isHEVC { + // On HEVC we use streamType 0 for main stream and 1 for sub stream + if streamType == "main" { + fixedStreamType = 0 + } else { + fixedStreamType = 1 + } + } c.sendMqttMessage("offer", 302, "", OfferFrame{ Mode: "webrtc", Sdp: sdp, - StreamType: streamType, + StreamType: fixedStreamType, Auth: c.auth, - DatachannelEnable: c.isHEVC(streamType), + DatachannelEnable: isHEVC, }) } @@ -344,17 +301,3 @@ func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transacti c.mqtt.onError(fmt.Errorf("mqtt publish fail: %s, topic: %s", token.Error().Error(), c.mqtt.publishTopic)) } } - -func (c *TuyaClient) isHEVC(streamType int) bool { - for _, video := range c.skill.Videos { - if video.StreamType == streamType { - return video.CodecType == 4 - } - } - - return false -} - -func (c *TuyaClient) isClaritySupported(webrtcValue int) bool { - return (webrtcValue & (1 << 5)) != 0 -}