From 863174839cd1135922c2c0a2c36fcb6d914cf524 Mon Sep 17 00:00:00 2001 From: seydx Date: Sun, 26 Oct 2025 16:39:59 +0100 Subject: [PATCH] Fix video/audio ssrc and low power cameras --- pkg/tuya/client.go | 18 ++++---- pkg/tuya/cloud_api.go | 7 +++ pkg/tuya/interface.go | 9 ++-- pkg/tuya/mqtt.go | 105 ++++++++++++++++++++++++++++++++++++------ pkg/tuya/smart_api.go | 7 +++ 5 files changed, 122 insertions(+), 24 deletions(-) diff --git a/pkg/tuya/client.go b/pkg/tuya/client.go index c6a34d6e..d1a549a8 100644 --- a/pkg/tuya/client.go +++ b/pkg/tuya/client.go @@ -20,8 +20,8 @@ type Client struct { conn *webrtc.Conn pc *pion.PeerConnection dc *pion.DataChannel - videoSSRC uint32 - audioSSRC uint32 + videoSSRC *uint32 + audioSSRC *uint32 streamType int isHEVC bool connected core.Waiter @@ -374,16 +374,16 @@ func (c *Client) Start() error { } } - if c.videoSSRC != 0 { - c.setHandler(c.videoSSRC, func(packet *rtp.Packet) { + if c.videoSSRC != nil { + c.setHandler(*c.videoSSRC, func(packet *rtp.Packet) { if video != nil { video.WriteRTP(packet) } }) } - if c.audioSSRC != 0 { - c.setHandler(c.audioSSRC, func(packet *rtp.Packet) { + if c.audioSSRC != nil { + c.setHandler(*c.audioSSRC, func(packet *rtp.Packet) { if audio != nil { audio.WriteRTP(packet) } @@ -469,8 +469,10 @@ func (c *Client) probe(msg pion.DataChannelMessage) (bool, error) { return false, err } - c.videoSSRC = recvMessage.Video.SSRC - c.audioSSRC = recvMessage.Audio.SSRC + videoSSRC := recvMessage.Video.SSRC + audioSSRC := recvMessage.Audio.SSRC + c.videoSSRC = &videoSSRC + c.audioSSRC = &audioSSRC completeMsg, _ := json.Marshal(DataChannelMessage{ Type: "complete", diff --git a/pkg/tuya/cloud_api.go b/pkg/tuya/cloud_api.go index 4d25c2be..c34d0fe4 100644 --- a/pkg/tuya/cloud_api.go +++ b/pkg/tuya/cloud_api.go @@ -117,6 +117,10 @@ func (c *TuyaCloudApiClient) Init() error { return fmt.Errorf("failed to start MQTT: %w", err) } + if c.skill.LowPower > 0 { + _ = c.mqtt.WakeUp(c.localKey) + } + return nil } @@ -212,6 +216,9 @@ func (c *TuyaCloudApiClient) loadWebrtcConfig() (*WebRTCConfig, error) { return nil, err } + // Store LocalKey (not sure if cloud api provides this, but we need it for low power cameras) + c.localKey = webRTCConfigResponse.Result.LocalKey + iceServers, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices) if err != nil { return nil, err diff --git a/pkg/tuya/interface.go b/pkg/tuya/interface.go index f4f530aa..74fcd585 100644 --- a/pkg/tuya/interface.go +++ b/pkg/tuya/interface.go @@ -31,6 +31,7 @@ type TuyaClient struct { baseUrl string expireTime int64 deviceId string + localKey string skill *Skill iceServers []pionWebrtc.ICEServer } @@ -74,15 +75,17 @@ type VideoSkill struct { } type Skill struct { - WebRTC int `json:"webrtc"` - Audios []AudioSkill `json:"audios"` - Videos []VideoSkill `json:"videos"` + WebRTC int `json:"webrtc"` + LowPower int `json:"lowPower,omitempty"` + Audios []AudioSkill `json:"audios"` + Videos []VideoSkill `json:"videos"` } type WebRTCConfig struct { AudioAttributes AudioAttributes `json:"audio_attributes"` Auth string `json:"auth"` ID string `json:"id"` + LocalKey string `json:"local_key,omitempty"` MotoID string `json:"moto_id"` P2PConfig P2PConfig `json:"p2p_config"` ProtocolVersion string `json:"protocol_version"` diff --git a/pkg/tuya/mqtt.go b/pkg/tuya/mqtt.go index 0559e8bc..79a30102 100644 --- a/pkg/tuya/mqtt.go +++ b/pkg/tuya/mqtt.go @@ -1,8 +1,11 @@ package tuya import ( + "encoding/hex" "encoding/json" + "errors" "fmt" + "hash/crc32" "strings" "time" @@ -13,6 +16,7 @@ import ( type TuyaMqttClient struct { client mqtt.Client waiter core.Waiter + wakeupWaiter core.Waiter publishTopic string subscribeTopic string auth string @@ -75,6 +79,15 @@ type DisconnectFrame struct { Mode string `json:"mode"` } +// {"protocol":4,"t":1761487814,"data":{"dps":{"152":"0","160":1,"170":false}}} +type DPSMessage struct { + Protocol int `json:"protocol"` + T int `json:"t"` + Data struct { + Dps map[string]interface{} `json:"dps"` + } `json:"data"` +} + type MqttMessage struct { Protocol int `json:"protocol"` Pv string `json:"pv"` @@ -84,9 +97,10 @@ type MqttMessage struct { func NewTuyaMqttClient(deviceId string) *TuyaMqttClient { return &TuyaMqttClient{ - deviceId: deviceId, - sessionId: core.RandString(6, 62), - waiter: core.Waiter{}, + deviceId: deviceId, + sessionId: core.RandString(6, 62), + waiter: core.Waiter{}, + wakeupWaiter: core.Waiter{}, } } @@ -129,26 +143,70 @@ func (c *TuyaMqttClient) Start(hubConfig *MQTTConfig, webrtcConfig *WebRTCConfig } func (c *TuyaMqttClient) Stop() { + c.closed = true + c.waiter.Done(errors.New("mqtt: stopped")) + c.wakeupWaiter.Done(errors.New("mqtt: stopped")) + if c.client != nil { _ = c.SendDisconnect() c.client.Disconnect(1000) } } -func (c *TuyaMqttClient) SendOffer(sdp string, streamResolution string, streamType int, isHEVC bool) error { - if isHEVC { - // On HEVC we use streamType 0 for main stream (hd) and 1 for sub stream (sd) - if streamResolution == "hd" { - streamType = 0 - } else { - streamType = 1 +func (c *TuyaMqttClient) WakeUp(localKey string) error { + // Calculate CRC32 of localKey + crc := crc32.ChecksumIEEE([]byte(localKey)) + + // Convert to hex string + hexStr := fmt.Sprintf("%08x", crc) + + // Convert hex string to byte array (2 chars at a time) + payload := make([]byte, len(hexStr)/2) + for i := 0; i < len(hexStr); i += 2 { + b, err := hex.DecodeString(hexStr[i : i+2]) + if err != nil { + return fmt.Errorf("failed to decode hex: %w", err) } + payload[i/2] = b[0] + } + + // Publish to wake-up topic: m/w/{deviceId} + wakeUpTopic := fmt.Sprintf("m/w/%s", c.deviceId) + token := c.client.Publish(wakeUpTopic, 1, false, payload) + if token.Wait() && token.Error() != nil { + return fmt.Errorf("failed to publish wake-up message: %w", token.Error()) + } + + // Subscribe to lowPower topic: smart/decrypt/in/{deviceId} + lowPowerTopic := fmt.Sprintf("smart/decrypt/in/%s", c.deviceId) + if token := c.client.Subscribe(lowPowerTopic, 1, c.onLowPowerMessage); token.Wait() && token.Error() != nil { + return fmt.Errorf("failed to subscribe to lowPower topic: %w", token.Error()) + } + + return nil +} + +func (c *TuyaMqttClient) SendOffer(sdp string, streamResolution string, streamType int, isHEVC bool) error { + // streamType comes from GetStreamType() and uses Skill StreamType values: + // - mainStream = 2 (HD) + // - substream = 4 (SD) + // + // But MQTT expects mapped stream_type values: + // - mainStream (2) → stream_type: 0 + // - substream (4) → stream_type: 1 + + mqttStreamType := streamType + switch streamType { + case 2: + mqttStreamType = 0 // mainStream (HD) + case 4: + mqttStreamType = 1 // substream (SD) } return c.sendMqttMessage("offer", 302, "", OfferFrame{ Mode: "webrtc", Sdp: sdp, - StreamType: streamType, + StreamType: mqttStreamType, Auth: c.auth, DatachannelEnable: isHEVC, }) @@ -189,7 +247,7 @@ func (c *TuyaMqttClient) SendDisconnect() error { } func (c *TuyaMqttClient) onConnect(client mqtt.Client) { - if token := client.Subscribe(c.subscribeTopic, 1, c.consume); token.Wait() && token.Error() != nil { + if token := client.Subscribe(c.subscribeTopic, 1, c.onMessage); token.Wait() && token.Error() != nil { c.waiter.Done(token.Error()) return } @@ -197,7 +255,7 @@ func (c *TuyaMqttClient) onConnect(client mqtt.Client) { c.waiter.Done(nil) } -func (c *TuyaMqttClient) consume(client mqtt.Client, msg mqtt.Message) { +func (c *TuyaMqttClient) onMessage(client mqtt.Client, msg mqtt.Message) { var rmqtt MqttMessage if err := json.Unmarshal(msg.Payload(), &rmqtt); err != nil { c.onError(err) @@ -218,6 +276,27 @@ func (c *TuyaMqttClient) consume(client mqtt.Client, msg mqtt.Message) { } } +func (c *TuyaMqttClient) onLowPowerMessage(client mqtt.Client, msg mqtt.Message) { + var message DPSMessage + if err := json.Unmarshal(msg.Payload(), &message); err != nil { + return + } + + // Check if protocol is 4 and dps[149] is true (motion detection = camera ready) + if message.Protocol == 4 { + if val, ok := message.Data.Dps["149"]; ok { + if ready, ok := val.(bool); ok && ready { + // Camera is now ready after wake-up (dps[149]:true received). + // However, we don't wait for this signal (like ismartlife.me doesn't either). + // The camera starts responding immediately after WakeUp() is called, + // so we proceed with the connection without blocking. + // This waiter is kept for potential future use or debugging. + c.wakeupWaiter.Done(nil) + } + } + } +} + func (c *TuyaMqttClient) onMqttAnswer(msg *MqttMessage) { var answerFrame AnswerFrame if err := json.Unmarshal(msg.Data.Message, &answerFrame); err != nil { diff --git a/pkg/tuya/smart_api.go b/pkg/tuya/smart_api.go index 5f4e74f9..3c96fe98 100644 --- a/pkg/tuya/smart_api.go +++ b/pkg/tuya/smart_api.go @@ -314,6 +314,10 @@ func (c *TuyaSmartApiClient) Init() error { return fmt.Errorf("failed to start MQTT: %w", err) } + if c.skill.LowPower > 0 { + _ = c.mqtt.WakeUp(c.localKey) + } + return nil } @@ -498,6 +502,9 @@ func (c *TuyaSmartApiClient) loadWebrtcConfig() (*WebRTCConfig, error) { return nil, err } + // Store LocalKey + c.localKey = webRTCConfigResponse.Result.LocalKey + iceServers, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices) if err != nil { return nil, err