From 25e7ac531eba942c0b5c6f88ceb3fdbe5d31813f Mon Sep 17 00:00:00 2001 From: seydx Date: Tue, 28 Oct 2025 11:12:44 +0100 Subject: [PATCH] Cleanup and update comments --- pkg/tuya/client.go | 39 +++++++++++++++++++++++++----------- pkg/tuya/interface.go | 26 +++++++++++++++++------- pkg/tuya/mqtt.go | 46 +++++++++++++++++++++++++------------------ 3 files changed, 73 insertions(+), 38 deletions(-) diff --git a/pkg/tuya/client.go b/pkg/tuya/client.go index a7e43061..19b926d8 100644 --- a/pkg/tuya/client.go +++ b/pkg/tuya/client.go @@ -17,25 +17,28 @@ import ( ) type Client struct { - api TuyaAPI - conn *webrtc.Conn - pc *pion.PeerConnection + api TuyaAPI + conn *webrtc.Conn + pc *pion.PeerConnection + connected core.Waiter + closed bool + + // HEVC only: dc *pion.DataChannel videoSSRC *uint32 audioSSRC *uint32 streamType int isHEVC bool - connected core.Waiter - closed bool handlersMu sync.RWMutex handlers map[uint32]func(*rtp.Packet) } type DataChannelMessage struct { - Type string `json:"type"` + Type string `json:"type"` // "codec", "start", "recv", "complete" Msg string `json:"msg"` } +// RecvMessage contains SSRC values for video/audio streams type RecvMessage struct { Video struct { SSRC uint32 `json:"ssrc"` @@ -159,7 +162,8 @@ func Dial(rawURL string) (core.Producer, error) { } if client.isHEVC { - // Tuya seems to answers always with H264 and PCMU/8000 and PCMA/8000 codecs, replace with real codecs + // We need to replace the SDP codecs with the real ones from Skill. + // The actual media comes via DataChannel, not RTP tracks. for _, media := range client.conn.Medias { if media.Kind == core.KindVideo { @@ -202,9 +206,7 @@ func Dial(rawURL string) (core.Producer, error) { client.Close(err) } - // On HEVC, use DataChannel to receive video/audio if client.isHEVC { - // Create a new DataChannel maxRetransmits := uint16(5) ordered := true client.dc, err = client.pc.CreateDataChannel("fmp4Stream", &pion.DataChannelInit{ @@ -212,18 +214,22 @@ func Dial(rawURL string) (core.Producer, error) { Ordered: &ordered, }) - // Set up data channel handler + // DataChannel receives two types of messages: + // 1. String messages: Control messages (codec, recv) + // 2. Binary messages: RTP packets with video/audio client.dc.OnMessage(func(msg pion.DataChannelMessage) { if msg.IsString { + // Handle control messages (codec, recv, etc.) if connected, err := client.probe(msg); err != nil { client.Close(err) } else if connected { client.connected.Done(nil) } } else { + // Handle RTP packets - Route by SSRC retrieved from "recv" message packet := &rtp.Packet{} if err := packet.Unmarshal(msg.Data); err != nil { - // skip + // Skip invalid packets return } @@ -339,6 +345,9 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece return errors.New("webrtc: can't get track") } + // DISABLED: Speaker Protocol 312 command + // JavaScript client doesn't send this on first call either + // Only subsequent calls (when speakerChloron is set) send Protocol 312 // mqttClient := c.api.GetMqtt() // if mqttClient != nil { // _ = mqttClient.SendSpeaker(1) @@ -352,14 +361,16 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece _ = localTrack.WriteRTP(payloadType, packet) } + // Tuya cameras require specific frame sizes + // See: https://developer.tuya.com/en/docs/iot-device-dev/tuyaos-package-ipc-device?id=Kcn1px33iptn2#title-29 switch track.Codec.Name { case core.CodecPCMA, core.CodecPCMU, core.CodecPCM, core.CodecPCML: - // https://developer.tuya.com/en/docs/iot-device-dev/tuyaos-package-ipc-device?id=Kcn1px33iptn2#title-29-Why%20can%E2%80%99t%20WebRTC%20play%20audio%3F frameSize := 240 if track.Codec.Name == core.CodecPCM { frameSize = 560 } + // Repack to required frame size sender.Handler = pcm.RepackG711(false, frameSize, sender.Handler) sender.Handler = pcm.TranscodeHandler(codec, track.Codec, sender.Handler) } @@ -463,6 +474,7 @@ func (c *Client) probe(msg pion.DataChannelMessage) (bool, error) { switch message.Type { case "codec": + // Camera responded to our codec request - now request frame start frameRequest, _ := json.Marshal(DataChannelMessage{ Type: "start", Msg: "frame", @@ -474,6 +486,8 @@ func (c *Client) probe(msg pion.DataChannelMessage) (bool, error) { } case "recv": + // Camera sends SSRC values for video/audio streams + // We need these to route incoming RTP packets correctly var recvMessage RecvMessage if err := json.Unmarshal([]byte(message.Msg), &recvMessage); err != nil { return false, err @@ -484,6 +498,7 @@ func (c *Client) probe(msg pion.DataChannelMessage) (bool, error) { c.videoSSRC = &videoSSRC c.audioSSRC = &audioSSRC + // Send "complete" to tell camera we're ready to receive RTP packets completeMsg, _ := json.Marshal(DataChannelMessage{ Type: "complete", Msg: "", diff --git a/pkg/tuya/interface.go b/pkg/tuya/interface.go index 74fcd585..25ba0ddd 100644 --- a/pkg/tuya/interface.go +++ b/pkg/tuya/interface.go @@ -66,17 +66,17 @@ type AudioSkill struct { } type VideoSkill struct { - StreamType int `json:"streamType"` // 2 = main stream (hd), 4 = sub stream (sd) - ProfileId string `json:"profileId,omitempty"` - CodecType int `json:"codecType"` // 2 = H264, 4 = H265 + StreamType int `json:"streamType"` // 2 = main stream (HD), 4 = sub stream (SD) + CodecType int `json:"codecType"` // 2 = H264, 4 = H265 (HEVC) Width int `json:"width"` Height int `json:"height"` SampleRate int `json:"sampleRate"` + ProfileId string `json:"profileId,omitempty"` } type Skill struct { - WebRTC int `json:"webrtc"` - LowPower int `json:"lowPower,omitempty"` + WebRTC int `json:"webrtc"` // Bit flags: bit 4=speaker, bit 5=clarity, bit 6=record + LowPower int `json:"lowPower,omitempty"` // 1 = battery-powered camera Audios []AudioSkill `json:"audios"` Videos []VideoSkill `json:"videos"` } @@ -128,6 +128,14 @@ func (c *TuyaClient) GetMqtt() *TuyaMqttClient { return c.mqtt } +// GetStreamType returns the Skill StreamType for the requested resolution +// Returns Skill values (2 or 4), not MQTT values (0 or 1) +// - "hd" → highest resolution streamType (usually 2 = mainStream) +// - "sd" → lowest resolution streamType (usually 4 = substream) +// +// These values must be mapped before sending to MQTT: +// - streamType 2 → MQTT stream_type 0 +// - streamType 4 → MQTT stream_type 1 func (c *TuyaClient) GetStreamType(streamResolution string) int { // Default streamType if nothing is found defaultStreamType := 1 @@ -136,7 +144,7 @@ func (c *TuyaClient) GetStreamType(streamResolution string) int { return defaultStreamType } - // Find the highest and lowest resolution + // Find the highest and lowest resolution based on pixel count var highestResType = defaultStreamType var highestRes = 0 var lowestResType = defaultStreamType @@ -169,10 +177,14 @@ func (c *TuyaClient) GetStreamType(streamResolution string) int { } } +// IsHEVC checks if the given streamType uses H265 (HEVC) codec +// HEVC cameras use DataChannel, H264 cameras use RTP tracks +// - codecType 4 = H265 (HEVC) → DataChannel mode +// - codecType 2 = H264 → Normal RTP mode func (c *TuyaClient) IsHEVC(streamType int) bool { for _, video := range c.skill.Videos { if video.StreamType == streamType { - return video.CodecType == 4 + return video.CodecType == 4 // 4 = H265/HEVC } } diff --git a/pkg/tuya/mqtt.go b/pkg/tuya/mqtt.go index 157c441a..5f64ef48 100644 --- a/pkg/tuya/mqtt.go +++ b/pkg/tuya/mqtt.go @@ -52,9 +52,9 @@ type MqttFrame struct { type OfferFrame struct { Mode string `json:"mode"` Sdp string `json:"sdp"` - StreamType int `json:"stream_type"` + StreamType int `json:"stream_type"` // 0: mainStream(HD), 1: substream(SD) Auth string `json:"auth"` - DatachannelEnable bool `json:"datachannel_enable"` + DatachannelEnable bool `json:"datachannel_enable"` // true for HEVC, false for H264 Token []ICEServer `json:"token"` } @@ -165,8 +165,11 @@ func (c *TuyaMqttClient) Stop() { c.closed = true } +// WakeUp sends a wake-up signal to battery-powered cameras (LowPower mode). +// The camera wakes up and starts responding immediately - we don't wait for dps[149]. +// Note: LowPower cameras sleep after ~3 minutes of inactivity. func (c *TuyaMqttClient) WakeUp(localKey string) error { - // Calculate CRC32 of localKey + // Calculate CRC32 of localKey as wake-up payload crc := crc32.ChecksumIEEE([]byte(localKey)) // Convert to hex string @@ -189,7 +192,8 @@ func (c *TuyaMqttClient) WakeUp(localKey string) error { return fmt.Errorf("failed to publish wake-up message: %w", token.Error()) } - // Subscribe to lowPower topic: smart/decrypt/in/{deviceId} + // Subscribe to lowPower topic to receive dps[149] status updates + // (we don't wait for this signal - camera responds immediately) lowPowerTopic := fmt.Sprintf("smart/decrypt/in/%s", c.deviceId) if token := c.client.Subscribe(lowPowerTopic, 1, c.onLowPowerMessage); token.Wait() && token.Error() != nil { return fmt.Errorf("failed to subscribe to lowPower topic: %w", token.Error()) @@ -199,6 +203,7 @@ func (c *TuyaMqttClient) WakeUp(localKey string) error { } func (c *TuyaMqttClient) SendOffer(sdp string, streamResolution string, streamType int, isHEVC bool) error { + // Map Skill StreamType to MQTT stream_type values // streamType comes from GetStreamType() and uses Skill StreamType values: // - mainStream = 2 (HD) // - substream = 4 (SD) @@ -220,7 +225,7 @@ func (c *TuyaMqttClient) SendOffer(sdp string, streamResolution string, streamTy Sdp: sdp, StreamType: mqttStreamType, Auth: c.auth, - DatachannelEnable: isHEVC, + DatachannelEnable: isHEVC, // must be true for HEVC Token: c.iceServers, }) } @@ -233,30 +238,32 @@ func (c *TuyaMqttClient) SendCandidate(candidate string) error { } func (c *TuyaMqttClient) SendResolution(resolution int) error { - // isClaritySupperted := (c.webrtcVersion & (1 << 5)) != 0 - // if !isClaritySupperted { - // return nil - // } + // Check if camera supports clarity switching + isClaritySupported := (c.webrtcVersion & (1 << 5)) != 0 + if !isClaritySupported { + return nil + } - // Protocol 312 is used for clarity return c.sendMqttMessage("resolution", 312, "", ResolutionFrame{ Mode: "webrtc", - Value: resolution, + Value: resolution, // 0: HD, 1: SD }) } func (c *TuyaMqttClient) SendSpeaker(speaker int) error { - // Protocol 312 is used for speaker - return c.sendMqttMessage("speaker", 312, "", SpeakerFrame{ + if err := c.sendMqttMessage("speaker", 312, "", SpeakerFrame{ Mode: "webrtc", - Value: speaker, - }) + Value: speaker, // 0: off, 1: on + }); err != nil { + return err + } - // if err := c.speakerWaiter.Wait(); err != nil { - // return fmt.Errorf("speaker wait failed: %w", err) - // } + // Wait for camera response + if err := c.speakerWaiter.Wait(); err != nil { + return fmt.Errorf("speaker wait failed: %w", err) + } - // return nil + return nil } func (c *TuyaMqttClient) SendDisconnect() error { @@ -281,6 +288,7 @@ func (c *TuyaMqttClient) onMessage(client mqtt.Client, msg mqtt.Message) { return } + // Filter by session ID to prevent processing messages from other sessions if rmqtt.Data.Header.SessionID != c.sessionId { return }