support h265

This commit is contained in:
seydx
2025-05-15 10:45:04 +02:00
parent 524cdb7176
commit 3d222136f9
5 changed files with 428 additions and 109 deletions
+41 -29
View File
@@ -67,22 +67,26 @@ type P2PConfig struct {
Ices []OpenApiICE `json:"ices"`
}
type AudioSkill struct {
Channels int `json:"channels"`
DataBit int `json:"dataBit"`
CodecType int `json:"codecType"`
SampleRate int `json:"sampleRate"`
}
type VideoSkill struct {
StreamType int `json:"streamType"` // 2 = main stream, 4 = sub stream
ProfileId string `json:"profileId,omitempty"`
CodecType int `json:"codecType"` // 2 = H264, 4 = H265
Width int `json:"width"`
Height int `json:"height"`
SampleRate int `json:"sampleRate"`
}
type Skill struct {
WebRTC int `json:"webrtc"`
Audios []struct {
Channels int `json:"channels"`
DataBit int `json:"dataBit"`
CodecType int `json:"codecType"`
SampleRate int `json:"sampleRate"`
} `json:"audios"`
Videos []struct {
StreamType int `json:"streamType"` // 2 = main stream, 4 = sub stream
ProfileId string `json:"profileId"`
Width int `json:"width"`
CodecType int `json:"codecType"` // 2 = H264, 4 = H265
SampleRate int `json:"sampleRate"`
Height int `json:"height"`
} `json:"videos"`
WebRTC int `json:"webrtc"`
Audios []AudioSkill `json:"audios"`
Videos []VideoSkill `json:"videos"`
}
type WebRTConfig struct {
@@ -299,20 +303,9 @@ func (c *TuyaClient) InitDevice() (err error) {
c.auth = webRTCConfigResponse.Result.Auth
c.skill = &Skill{
Audios: []struct {
Channels int `json:"channels"`
DataBit int `json:"dataBit"`
CodecType int `json:"codecType"`
SampleRate int `json:"sampleRate"`
}{},
Videos: []struct {
StreamType int `json:"streamType"`
ProfileId string `json:"profileId"`
Width int `json:"width"`
CodecType int `json:"codecType"`
SampleRate int `json:"sampleRate"`
Height int `json:"height"`
}{},
WebRTC: 3, // basic webrtc
Audios: make([]AudioSkill, 0),
Videos: make([]VideoSkill, 0),
}
if webRTCConfigResponse.Result.Skill != "" {
@@ -393,6 +386,11 @@ func (c *TuyaClient) InitDevice() (err error) {
ClockRate: uint32(90000),
PayloadType: 96,
},
{
Name: core.CodecH265,
ClockRate: uint32(90000),
PayloadType: 96,
},
},
})
}
@@ -536,6 +534,20 @@ func getAudioCodec(codecType int) string {
}
}
func (c *TuyaClient) isHEVC(streamType int) bool {
for _, video := range c.skill.Videos {
if video.StreamType == streamType {
return video.CodecType == 4
}
}
return false
}
func (c *TuyaClient) isClaritySupported(webrtcValue int) bool {
return (webrtcValue & (1 << 5)) != 0
}
func (c *TuyaClient) calBusinessSign(ts int64) string {
data := fmt.Sprintf("%s%s%s%d", c.clientId, c.accessToken, c.clientSecret, ts)
val := md5.Sum([]byte(data))
+35 -10
View File
@@ -14,9 +14,10 @@ import (
)
type Client struct {
api *TuyaClient
conn *webrtc.Conn
done chan struct{}
api *TuyaClient
conn *webrtc.Conn
dcConn *DCConn
done chan struct{}
}
const (
@@ -92,6 +93,8 @@ func Dial(rawURL string) (core.Producer, error) {
}
return streams.GetProducer(client.api.hlsURL)
} else {
isHEVC := client.api.isHEVC(client.api.getStreamType(streamType))
conf := pion.Configuration{
ICEServers: client.api.iceServers,
ICETransportPolicy: pion.ICETransportPolicyAll,
@@ -116,7 +119,7 @@ func Dial(rawURL string) (core.Producer, error) {
// protect from blocking on errors
defer sendOffer.Done(nil)
// waiter will wait PC error or WS error or nil (connection OK)
// waiter will wait PC error
var connState core.Waiter
client.conn = webrtc.NewConn(pc)
@@ -167,6 +170,15 @@ func Dial(rawURL string) (core.Producer, error) {
client.Stop()
}
// Set up data channel for HEVC
if isHEVC {
client.dcConn, err = NewDCConn(pc, client)
if err != nil {
client.api.Close()
return nil, err
}
}
client.conn.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:
@@ -182,6 +194,7 @@ func Dial(rawURL string) (core.Producer, error) {
case pion.PeerConnectionStateConnected:
connState.Done(nil)
default:
client.Stop()
connState.Done(errors.New("webrtc: " + msg.String()))
}
}
@@ -200,9 +213,18 @@ func Dial(rawURL string) (core.Producer, error) {
offer = re.ReplaceAllString(offer, "")
// Send offer
client.api.sendOffer(offer, tuyaAPI.getStreamType(streamType))
client.api.sendOffer(offer, streamType)
sendOffer.Done(nil)
if client.dcConn != nil {
if err = client.dcConn.connected.Wait(); err != nil {
client.Stop()
return nil, err
}
return client.dcConn, nil
}
if err = connState.Wait(); err != nil {
return nil, err
}
@@ -223,12 +245,7 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
// RepackG711 will not work, so add default logic without repacking
payloadType := codec.PayloadType
localTrack := c.conn.GetSenderTrack(media.ID)
if localTrack == nil {
return errors.New("webrtc: can't get track")
}
sender := core.NewSender(media, codec)
sender.Handler = func(packet *rtp.Packet) {
c.conn.Send += packet.MarshalSize()
@@ -243,6 +260,10 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
}
func (c *Client) Start() error {
if c.dcConn != nil {
c.dcConn.Start()
}
return c.conn.Start()
}
@@ -258,6 +279,10 @@ func (c *Client) Stop() error {
_ = c.conn.Stop()
}
if c.dcConn != nil {
_ = c.dcConn.Stop()
}
if c.api != nil {
c.api.Close()
}
+253
View File
@@ -0,0 +1,253 @@
package tuya
import (
"encoding/json"
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/pion/rtp"
pion "github.com/pion/webrtc/v4"
)
type DCConn struct {
core.Connection
client *Client
dc *pion.DataChannel
dem *mp4.Demuxer
queue *FrameBufferQueue
msgs chan pion.DataChannelMessage
connected core.Waiter
closed core.Waiter
initialized bool
}
type DataChannelMessage struct {
Type string `json:"type"`
Msg string `json:"msg"`
}
func NewDCConn(pc *pion.PeerConnection, c *Client) (*DCConn, error) {
maxRetransmits := uint16(5)
ordered := true
dc, err := pc.CreateDataChannel("fmp4Stream", &pion.DataChannelInit{
MaxRetransmits: &maxRetransmits,
Ordered: &ordered,
})
if err != nil {
return nil, err
}
conn := &DCConn{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "webrtc/fmp4",
Transport: dc,
},
client: c,
dc: dc,
dem: &mp4.Demuxer{},
queue: NewFrameBufferQueue(),
msgs: make(chan pion.DataChannelMessage, 10), // Saw max 4 messages in a row, 10 should be enough
initialized: false,
}
dc.OnMessage(func(msg pion.DataChannelMessage) {
conn.msgs <- msg
})
dc.OnError(func(err error) {
conn.connected.Done(err)
})
dc.OnClose(func() {
close(conn.msgs)
conn.connected.Done(errors.New("datachannel: closed"))
})
go conn.initializationLoop()
return conn, nil
}
func (c *DCConn) initializationLoop() {
for msg := range c.msgs {
if c.initialized {
return
}
err := c.probe(msg)
if err != nil {
c.connected.Done(err)
return
}
if c.initialized {
c.connected.Done(nil)
return
}
}
}
func (c *DCConn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
if media.Direction == core.DirectionSendRecv || media.Direction == core.DirectionSendonly {
return c.client.GetTrack(media, codec)
}
for _, receiver := range c.Receivers {
if receiver.Codec == codec {
return receiver, nil
}
}
receiver := core.NewReceiver(media, codec)
c.Receivers = append(c.Receivers, receiver)
return receiver, nil
}
func (c *DCConn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
payloadType := codec.PayloadType
localTrack := c.client.conn.GetSenderTrack(media.ID)
sender := core.NewSender(media, codec)
sender.Handler = func(packet *rtp.Packet) {
c.Send += packet.MarshalSize()
//important to send with remote PayloadType
_ = localTrack.WriteRTP(payloadType, packet)
}
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *DCConn) Start() error {
receivers := make(map[uint32]*core.Receiver)
for _, receiver := range c.Receivers {
trackID := c.dem.GetTrackID(receiver.Codec)
receivers[trackID] = receiver
}
ch := make(chan []byte, 10)
defer close(ch)
go func() {
for data := range ch {
allTracks := c.dem.DemuxAll(data)
for _, trackData := range allTracks {
trackID := trackData.TrackID
packets := trackData.Packets
receiver := receivers[trackID]
if receiver == nil {
continue
}
for _, packet := range packets {
receiver.WriteRTP(packet)
}
}
}
}()
go func() {
for msg := range c.msgs {
if len(msg.Data) >= 4 {
segmentNum := int(msg.Data[1])
fragmentCount := int(msg.Data[2])
fragmentSeq := int(msg.Data[3])
mp4Data := msg.Data[4:]
c.queue.AddFragment(segmentNum, fragmentCount, fragmentSeq, mp4Data)
if c.queue.IsSegmentComplete(segmentNum, fragmentCount) {
b := c.queue.GetCombinedBuffer(segmentNum)
c.Recv += len(b)
ch <- b
}
}
}
}()
c.closed.Wait()
return nil
}
func (c *DCConn) sendMessageToDataChannel(message string) error {
if c.dc != nil {
return c.dc.SendText(message)
}
return nil
}
func (c *DCConn) probe(msg pion.DataChannelMessage) (err error) {
if msg.IsString {
var message DataChannelMessage
if err = json.Unmarshal(msg.Data, &message); err != nil {
return err
}
switch message.Type {
case "codec":
response, _ := json.Marshal(DataChannelMessage{
Type: "start",
Msg: "fmp4",
})
err = c.sendMessageToDataChannel(string(response))
if err != nil {
return err
}
case "recv":
response, _ := json.Marshal(DataChannelMessage{
Type: "complete",
Msg: "",
})
err = c.sendMessageToDataChannel(string(response))
if err != nil {
return err
}
}
} else {
if len(msg.Data) >= 4 {
messageType := msg.Data[0]
segmentNum := int(msg.Data[1])
fragmentCount := int(msg.Data[2])
fragmentSeq := int(msg.Data[3])
mp4Data := msg.Data[4:]
// initialization segment
if messageType == 0 && segmentNum == 1 && fragmentCount == 1 && fragmentSeq == 1 {
medias := c.dem.Probe(mp4Data)
c.Medias = append(c.Medias, medias...)
// Add backchannel
webrtcMedias := c.client.GetMedias()
for _, media := range webrtcMedias {
if media.Kind == core.KindAudio {
if media.Direction == core.DirectionSendRecv || media.Direction == core.DirectionSendonly {
c.Medias = append(c.Medias, media)
}
}
}
c.initialized = true
}
}
}
return nil
}
func (c *DCConn) Stop() error {
if c.dc != nil && c.dc.ReadyState() == pion.DataChannelStateOpen {
_ = c.dc.Close()
}
c.closed.Done(nil)
return nil
}
+86
View File
@@ -0,0 +1,86 @@
package tuya
import (
"sort"
"sync"
)
type FrameBufferQueue struct {
segments map[int]map[int][]byte // segNum -> fragSeq -> data
mu sync.Mutex
}
func NewFrameBufferQueue() *FrameBufferQueue {
return &FrameBufferQueue{
segments: make(map[int]map[int][]byte),
}
}
func (q *FrameBufferQueue) AddFragment(segmentNum, fragmentCount, fragmentSeq int, data []byte) {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.segments[segmentNum]; !ok {
q.segments[segmentNum] = make(map[int][]byte)
}
q.segments[segmentNum][fragmentSeq] = data
}
func (q *FrameBufferQueue) IsSegmentComplete(segmentNum, fragmentCount int) bool {
q.mu.Lock()
defer q.mu.Unlock()
if frags, ok := q.segments[segmentNum]; ok {
// Make sure we have the right number of fragments
if len(frags) != fragmentCount {
return false
}
// Check if we have all sequences from 1 to fragmentCount
for i := 1; i <= fragmentCount; i++ {
if _, ok := frags[i]; !ok {
return false
}
}
return true
}
return false
}
func (q *FrameBufferQueue) GetCombinedBuffer(segNum int) []byte {
q.mu.Lock()
defer q.mu.Unlock()
if frags, ok := q.segments[segNum]; ok {
// Sort fragments by sequence number
var keys []int
for k := range frags {
keys = append(keys, k)
}
sort.Ints(keys)
// Calculate total size for pre-allocation
totalSize := 0
for _, k := range keys {
totalSize += len(frags[k])
}
// Pre-allocate buffer for better performance
combined := make([]byte, 0, totalSize)
// Combine fragments in sequence order
for _, k := range keys {
combined = append(combined, frags[k]...)
}
// Remove this segment to free memory
delete(q.segments, segNum)
return combined
}
return nil
}
+13 -70
View File
@@ -208,68 +208,25 @@ func (c *TuyaMQTT) onError(err error) {
}
}
func (c *TuyaClient) sendOffer(sdp string, streamType int) {
// Note:
// H265 is currently not supported because Tuya does not send H265 data, and therefore also no audio over the normal WebRTC connection.
// The WebRTC connection is used only for sending audio back to the device (backchannel).
// Tuya expects a separate WebRTC DataChannel for H265 data and sends the H265 video and audio data packaged as fMP4 data back.
// These must then be processed separately (WIP - Work In Progress)
func (c *TuyaClient) sendOffer(sdp string, streamType string) {
fixedStreamType := c.getStreamType(streamType)
isHEVC := c.isHEVC(fixedStreamType)
// Note 2:
// Even if we don't receive any data, the peer connection is correctly established and connected
// Note 3:
// It seems that if even one stream is HEVC, we also need to use the datachannel for the substream, even if that substream is using H264.
// Example Answer (H265/PCMU with backchannel):
/*
v=0
o=- 1747174385 1 IN IP4 127.0.0.1
s=-
t=0 0
a=group:BUNDLE 0 1
a=msid-semantic: WMS UMSklk
m=audio 9 UDP/TLS/RTP/SAVPF 0
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:zuRr
a=ice-pwd:EDeWXz847P810fyDyKxbmTdX
a=ice-options:trickle
a=fingerprint:sha-256 02:f5:44:8e:c6:5d:5c:59:49:50:a3:84:d5:e5:b9:35:bb:51:5a:0c:4d:a5:60:89:0f:e6:cb:0e:57:21:a0:14
a=setup:active
a=mid:0
a=sendrecv
a=msid:UMSklk NiNNboEn1rJWoQYtpguoKr1GBwpvPST
a=rtcp-mux
a=rtpmap:0 PCMU/8000
a=ssrc:832759612 cname:bfa87264438073154dhdek
m=video 9 UDP/TLS/RTP/SAVPF 0
c=IN IP4 0.0.0.0
a=rtcp:9 IN IP4 0.0.0.0
a=ice-ufrag:zuRr
a=ice-pwd:EDeWXz847P810fyDyKxbmTdX
a=ice-options:trickle
a=fingerprint:sha-256 02:f5:44:8e:c6:5d:5c:59:49:50:a3:84:d5:e5:b9:35:bb:51:5a:0c:4d:a5:60:89:0f:e6:cb:0e:57:21:a0:14
a=setup:active
a=mid:1
a=sendonly
a=msid:UMSklk l9o6icIVb7n7vDdp0KhocYnsijhd774
a=rtcp-mux
a=rtpmap:0 /0
a=rtcp-fb:0 ccm fir
a=rtcp-fb:0 nack
a=rtcp-fb:0 nack pli
a=fmtp:0 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=
a=ssrc:0 cname:bfa87264438073154dhdek
*/
if isHEVC {
// On HEVC we use streamType 0 for main stream and 1 for sub stream
if streamType == "main" {
fixedStreamType = 0
} else {
fixedStreamType = 1
}
}
c.sendMqttMessage("offer", 302, "", OfferFrame{
Mode: "webrtc",
Sdp: sdp,
StreamType: streamType,
StreamType: fixedStreamType,
Auth: c.auth,
DatachannelEnable: c.isHEVC(streamType),
DatachannelEnable: isHEVC,
})
}
@@ -344,17 +301,3 @@ func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transacti
c.mqtt.onError(fmt.Errorf("mqtt publish fail: %s, topic: %s", token.Error().Error(), c.mqtt.publishTopic))
}
}
func (c *TuyaClient) isHEVC(streamType int) bool {
for _, video := range c.skill.Videos {
if video.StreamType == streamType {
return video.CodecType == 4
}
}
return false
}
func (c *TuyaClient) isClaritySupported(webrtcValue int) bool {
return (webrtcValue & (1 << 5)) != 0
}