From e7044a93f62b2b4004f18b6de45b890d61032275 Mon Sep 17 00:00:00 2001 From: seydx Date: Sat, 17 May 2025 15:23:54 +0200 Subject: [PATCH] fix video/audio and minor improvements --- pkg/tuya/api.go | 73 ++++++++++++++++++++++++++++++++++++++++++++++ pkg/tuya/client.go | 43 ++++++++++++++++++++------- pkg/tuya/mqtt.go | 56 +++++++++++++++-------------------- 3 files changed, 128 insertions(+), 44 deletions(-) diff --git a/pkg/tuya/api.go b/pkg/tuya/api.go index d08a68dc..3842ed02 100644 --- a/pkg/tuya/api.go +++ b/pkg/tuya/api.go @@ -375,6 +375,79 @@ func (c *TuyaClient) Request(method string, url string, body any) ([]byte, error return res, nil } +func (c *TuyaClient) getVideoCodecs() []*core.Codec { + if len(c.skill.Videos) > 0 { + codecs := make([]*core.Codec, 0) + + for _, video := range c.skill.Videos { + name := core.CodecH264 + if c.isHEVC(video.StreamType) { + name = core.CodecH265 + } + + codec := &core.Codec{ + Name: name, + ClockRate: uint32(video.SampleRate), + } + + codecs = append(codecs, codec) + } + + if len(codecs) > 0 { + return codecs + } + } + + return nil +} + +func (c *TuyaClient) getAudioCodecs() []*core.Codec { + if len(c.skill.Audios) > 0 { + codecs := make([]*core.Codec, 0) + + for _, audio := range c.skill.Audios { + name := getAudioCodecName(&audio) + + codec := &core.Codec{ + Name: name, + ClockRate: uint32(audio.SampleRate), + Channels: uint8(audio.Channels), + } + codecs = append(codecs, codec) + } + + if len(codecs) > 0 { + return codecs + } + } + + return nil +} + +// https://protect-us.ismartlife.me/ +func getAudioCodecName(audioSkill *AudioSkill) string { + switch audioSkill.CodecType { + // case 100: + // return "ADPCM" + case 101: + return core.CodecPCML + case 102, 103, 104: + return core.CodecAAC + case 105: + return core.CodecPCMU + case 106: + return core.CodecPCMA + // case 107: + // return "G726-32" + // case 108: + // return "SPEEX" + case 109: + return core.CodecMP3 + default: + return core.CodecPCML + } +} + func (c *TuyaClient) getStreamType(streamRole string) int { // Default streamType if nothing is found defaultStreamType := 1 diff --git a/pkg/tuya/client.go b/pkg/tuya/client.go index ecf4d480..bebdacd5 100644 --- a/pkg/tuya/client.go +++ b/pkg/tuya/client.go @@ -129,13 +129,13 @@ func Dial(rawURL string) (core.Producer, error) { api, err := webrtc.NewAPI() if err != nil { - client.api.Close() + client.Stop() return nil, err } client.pc, err = api.NewPeerConnection(conf) if err != nil { - client.api.Close() + client.Stop() return nil, err } @@ -167,18 +167,27 @@ func Dial(rawURL string) (core.Producer, error) { } if err = client.conn.SetAnswer(answer.Sdp); err != nil { - client.Stop() + client.connected.Done(err) return } if client.isHEVC { - // Tuya answers always with H264 codec, replace with HEVC + // Tuya seems to answers always with H264 and PCMU/8000 and PCMA/8000 codecs, replace with real codecs + for _, media := range client.conn.Medias { if media.Kind == core.KindVideo { - for _, codec := range media.Codecs { - if codec.Name == core.CodecH264 { - codec.Name = core.CodecH265 - } + codecs := client.api.getVideoCodecs() + if codecs != nil { + media.Codecs = codecs + } + } + } + + for _, media := range client.conn.Medias { + if media.Kind == core.KindAudio { + codecs := client.api.getAudioCodecs() + if codecs != nil { + media.Codecs = codecs } } } @@ -197,6 +206,7 @@ func Dial(rawURL string) (core.Producer, error) { } client.api.mqtt.handleDisconnect = func() { + // fmt.Println("tuya: disconnect") client.Stop() } @@ -222,6 +232,7 @@ func Dial(rawURL string) (core.Producer, error) { } else { packet := &rtp.Packet{} if err := packet.Unmarshal(msg.Data); err != nil { + // skip return } @@ -260,7 +271,9 @@ func Dial(rawURL string) (core.Producer, error) { switch msg := msg.(type) { case *pion.ICECandidate: _ = sendOffer.Wait() - client.api.sendCandidate("a=" + msg.ToJSON().Candidate) + if err := client.api.sendCandidate("a=" + msg.ToJSON().Candidate); err != nil { + client.connected.Done(err) + } case pion.PeerConnectionState: switch msg { @@ -289,7 +302,7 @@ func Dial(rawURL string) (core.Producer, error) { // Create offer offer, err := client.conn.CreateOffer(medias) if err != nil { - client.api.Close() + client.Stop() return nil, err } @@ -299,7 +312,11 @@ func Dial(rawURL string) (core.Producer, error) { offer = re.ReplaceAllString(offer, "") // Send offer - client.api.sendOffer(offer, streamRole) + if err := client.api.sendOffer(offer, streamRole); err != nil { + client.Stop() + return nil, fmt.Errorf("tuya: %w", err) + } + sendOffer.Done(nil) // Wait for connection @@ -327,6 +344,8 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece return errors.New("webrtc: can't get track") } + _ = c.api.sendSpeaker(1) + payloadType := codec.PayloadType sender := core.NewSender(media, codec) @@ -376,6 +395,8 @@ func (c *Client) Stop() error { return nil } + c.closed = true + for ssrc := range c.handlers { delete(c.handlers, ssrc) } diff --git a/pkg/tuya/mqtt.go b/pkg/tuya/mqtt.go index ea69f60e..ebc27894 100644 --- a/pkg/tuya/mqtt.go +++ b/pkg/tuya/mqtt.go @@ -56,10 +56,10 @@ type CandidateFrame struct { Candidate string `json:"candidate"` } -type ResolutionFrame struct { - Mode string `json:"mode"` - Value int `json:"value"` // 0: HD, 1: SD -} +// type ResolutionFrame struct { +// Mode string `json:"mode"` +// Value int `json:"value"` // 0: HD, 1: SD +// } type SpeakerFrame struct { Mode string `json:"mode"` @@ -114,7 +114,7 @@ func (c *TuyaClient) StartMQTT() error { func (c *TuyaClient) StopMQTT() { if c.mqtt.client != nil { - c.sendDisconnect() + _ = c.sendDisconnect() c.mqtt.client.Disconnect(1000) } } @@ -202,7 +202,7 @@ func (c *TuyaMQTT) onError(err error) { } } -func (c *TuyaClient) sendOffer(sdp string, streamRole string) { +func (c *TuyaClient) sendOffer(sdp string, streamRole string) error { streamType := c.getStreamType(streamRole) isHEVC := c.isHEVC(streamType) @@ -215,53 +215,43 @@ func (c *TuyaClient) sendOffer(sdp string, streamRole string) { } } - err := c.sendMqttMessage("offer", 302, "", OfferFrame{ + return c.sendMqttMessage("offer", 302, "", OfferFrame{ Mode: "webrtc", Sdp: sdp, StreamType: streamType, Auth: c.auth, DatachannelEnable: isHEVC, }) - - if err != nil { - c.mqtt.onError(err) - return - } } -func (c *TuyaClient) sendCandidate(candidate string) { - err := c.sendMqttMessage("candidate", 302, "", CandidateFrame{ +func (c *TuyaClient) sendCandidate(candidate string) error { + return c.sendMqttMessage("candidate", 302, "", CandidateFrame{ Mode: "webrtc", Candidate: candidate, }) - - if err != nil { - c.mqtt.onError(err) - return - } } -func (c *TuyaClient) sendResolution(resolution int) { - isClaritySupperted := (c.skill.WebRTC & (1 << 5)) != 0 - if !isClaritySupperted { - return - } +// func (c *TuyaClient) sendResolution(resolution int) error { +// isClaritySupperted := (c.skill.WebRTC & (1 << 5)) != 0 +// if !isClaritySupperted { +// return nil +// } - c.sendMqttMessage("resolution", 302, "", ResolutionFrame{ - Mode: "webrtc", - Value: resolution, - }) -} +// return c.sendMqttMessage("resolution", 302, "", ResolutionFrame{ +// Mode: "webrtc", +// Value: resolution, +// }) +// } -func (c *TuyaClient) sendSpeaker(speaker int) { - c.sendMqttMessage("speaker", 302, "", SpeakerFrame{ +func (c *TuyaClient) sendSpeaker(speaker int) error { + return c.sendMqttMessage("speaker", 302, "", SpeakerFrame{ Mode: "webrtc", Value: speaker, }) } -func (c *TuyaClient) sendDisconnect() { - c.sendMqttMessage("disconnect", 302, "", DisconnectFrame{ +func (c *TuyaClient) sendDisconnect() error { + return c.sendMqttMessage("disconnect", 302, "", DisconnectFrame{ Mode: "webrtc", }) }