This commit is contained in:
seydx
2025-05-12 22:54:40 +02:00
parent 43b7a662c1
commit 05c12b34e5
3 changed files with 339 additions and 260 deletions
+166 -88
View File
@@ -22,18 +22,20 @@ type TuyaClient struct {
apiURL string apiURL string
rtspURL string rtspURL string
hlsURL string hlsURL string
sessionID string sessionId string
clientID string clientId string
deviceID string clientSecret string
deviceId string
accessToken string accessToken string
refreshToken string refreshToken string
secret string
expireTime int64 expireTime int64
uid string uid string
motoID string motoId string
auth string auth string
skill *Skill
iceServers []pionWebrtc.ICEServer iceServers []pionWebrtc.ICEServer
medias []*core.Media medias []*core.Media
hasBackchannel bool
} }
type Token struct { type Token struct {
@@ -43,17 +45,6 @@ type Token struct {
ExpireTime int64 `json:"expire_time"` ExpireTime int64 `json:"expire_time"`
} }
type AllocateRequest struct {
Type string `json:"type"`
}
type AllocateResponse struct {
Success bool `json:"success"`
Result struct {
URL string `json:"url"`
} `json:"result"`
}
type AudioAttributes struct { type AudioAttributes struct {
CallMode []int `json:"call_mode"` // 1 = one way, 2 = two way CallMode []int `json:"call_mode"` // 1 = one way, 2 = two way
HardwareCapability []int `json:"hardware_capability"` // 1 = mic, 2 = speaker HardwareCapability []int `json:"hardware_capability"` // 1 = mic, 2 = speaker
@@ -85,7 +76,7 @@ type Skill struct {
SampleRate int `json:"sampleRate"` SampleRate int `json:"sampleRate"`
} `json:"audios"` } `json:"audios"`
Videos []struct { Videos []struct {
StreamType int `json:"streamType"` // streamType = 2 => H265 and streamType = 4 => H264 StreamType int `json:"streamType"` // streamType = 2 => main stream - streamType = 4 => sub stream
ProfileId string `json:"profileId"` ProfileId string `json:"profileId"`
Width int `json:"width"` Width int `json:"width"`
CodecType int `json:"codecType"` CodecType int `json:"codecType"`
@@ -100,17 +91,54 @@ type WebRTConfig struct {
ID string `json:"id"` ID string `json:"id"`
MotoID string `json:"moto_id"` MotoID string `json:"moto_id"`
P2PConfig P2PConfig `json:"p2p_config"` P2PConfig P2PConfig `json:"p2p_config"`
ProtocolVersion string `json:"protocol_version"`
Skill string `json:"skill"` Skill string `json:"skill"`
SupportsWebRTCRecord bool `json:"supports_webrtc_record"`
SupportsWebRTC bool `json:"supports_webrtc"` SupportsWebRTC bool `json:"supports_webrtc"`
VedioClaritiy int `json:"vedio_clarity"`
VideoClaritiy int `json:"video_clarity"` VideoClaritiy int `json:"video_clarity"`
VideoClarities []int `json:"video_clarities"`
}
type OpenIoTHubConfig struct {
Url string `json:"url"`
ClientID string `json:"client_id"`
Username string `json:"username"`
Password string `json:"password"`
SinkTopic struct {
IPC string `json:"ipc"`
} `json:"sink_topic"`
SourceSink struct {
IPC string `json:"ipc"`
} `json:"source_topic"`
ExpireTime int `json:"expire_time"`
} }
type WebRTCConfigResponse struct { type WebRTCConfigResponse struct {
Success bool `json:"success"`
Result WebRTConfig `json:"result"` Result WebRTConfig `json:"result"`
Msg string `json:"msg,omitempty"`
Code int `json:"code,omitempty"`
} }
type TokenResponse struct { type TokenResponse struct {
Success bool `json:"success"`
Result Token `json:"result"` Result Token `json:"result"`
Msg string `json:"msg,omitempty"`
Code int `json:"code,omitempty"`
}
type AllocateRequest struct {
Type string `json:"type"`
}
type AllocateResponse struct {
Success bool `json:"success"`
Result struct {
URL string `json:"url"`
} `json:"result"`
Msg string `json:"msg,omitempty"`
Code int `json:"code,omitempty"`
} }
type OpenIoTHubConfigRequest struct { type OpenIoTHubConfigRequest struct {
@@ -123,50 +151,36 @@ type OpenIoTHubConfigRequest struct {
type OpenIoTHubConfigResponse struct { type OpenIoTHubConfigResponse struct {
Success bool `json:"success"` Success bool `json:"success"`
Result OpenIoTHubConfig `json:"result"` Result OpenIoTHubConfig `json:"result"`
} Msg string `json:"msg,omitempty"`
Code int `json:"code,omitempty"`
type OpenIoTHubConfig struct {
Url string `json:"url"`
ClientID string `json:"client_id"`
Username string `json:"username"`
Password string `json:"password"`
SinkTopic struct {
IPC string `json:"ipc"`
} `json:"sink_topic"`
SourceSink struct {
IPC string `json:"ipc"`
} `json:"source_topic"`
ExpireTime int `json:"expire_time"`
} }
const ( const (
defaultTimeout = 5 * time.Second defaultTimeout = 5 * time.Second
) )
func NewTuyaClient(openAPIURL string, deviceID string, uid string, clientID string, secret string, streamType string) (*TuyaClient, error) { func NewTuyaClient(openAPIURL string, deviceId string, uid string, clientId string, clientSecret string, streamMode string) (*TuyaClient, error) {
client := &TuyaClient{ client := &TuyaClient{
httpClient: &http.Client{Timeout: defaultTimeout}, httpClient: &http.Client{Timeout: defaultTimeout},
mqtt: &TuyaMQTT{waiter: core.Waiter{}}, mqtt: &TuyaMQTT{waiter: core.Waiter{}},
apiURL: openAPIURL, apiURL: openAPIURL,
sessionID: core.RandString(6, 62), sessionId: core.RandString(6, 62),
clientID: clientID, clientId: clientId,
deviceID: deviceID, deviceId: deviceId,
secret: secret, clientSecret: clientSecret,
uid: uid, uid: uid,
hasBackchannel: false,
} }
if err := client.InitToken(); err != nil { if err := client.InitToken(); err != nil {
return nil, fmt.Errorf("failed to initialize token: %w", err) return nil, fmt.Errorf("failed to initialize token: %w", err)
} }
if streamType == "rtsp" { if streamMode == "rtsp" {
if err := client.GetStreamUrl("rtsp"); err != nil { if err := client.GetStreamUrl("rtsp"); err != nil {
return nil, fmt.Errorf("failed to get RTSP URL: %w", err) return nil, fmt.Errorf("failed to get RTSP URL: %w", err)
} }
} else if streamType == "hls" { } else if streamMode == "hls" {
if err := client.GetStreamUrl("hls"); err != nil { if err := client.GetStreamUrl("hls"); err != nil {
return nil, fmt.Errorf("failed to get HLS URL: %w", err) return nil, fmt.Errorf("failed to get HLS URL: %w", err)
} }
@@ -193,14 +207,14 @@ func(c *TuyaClient) Request(method string, url string, body any) ([]byte, error)
if body != nil { if body != nil {
jsonBody, err := json.Marshal(body) jsonBody, err := json.Marshal(body)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err) return nil, err
} }
bodyReader = bytes.NewReader(jsonBody) bodyReader = bytes.NewReader(jsonBody)
} }
req, err := http.NewRequest(method, url, bodyReader) req, err := http.NewRequest(method, url, bodyReader)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err) return nil, err
} }
ts := time.Now().UnixNano() / 1000000 ts := time.Now().UnixNano() / 1000000
@@ -212,24 +226,24 @@ func(c *TuyaClient) Request(method string, url string, body any) ([]byte, error)
req.Header.Set("Access-Control-Allow-Methods", "*") req.Header.Set("Access-Control-Allow-Methods", "*")
req.Header.Set("Access-Control-Allow-Headers", "*") req.Header.Set("Access-Control-Allow-Headers", "*")
req.Header.Set("mode", "no-cors") req.Header.Set("mode", "no-cors")
req.Header.Set("client_id", c.clientID) req.Header.Set("client_id", c.clientId)
req.Header.Set("access_token", c.accessToken) req.Header.Set("access_token", c.accessToken)
req.Header.Set("sign", sign) req.Header.Set("sign", sign)
req.Header.Set("t", strconv.FormatInt(ts, 10)) req.Header.Set("t", strconv.FormatInt(ts, 10))
response, err := c.httpClient.Do(req) response, err := c.httpClient.Do(req)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err) return nil, err
} }
defer response.Body.Close() defer response.Body.Close()
res, err := io.ReadAll(response.Body) res, err := io.ReadAll(response.Body)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err) return nil, err
} }
if response.StatusCode != http.StatusOK { if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request failed with status code %d: %s", response.StatusCode, string(res)) return nil, err
} }
return res, nil return res, nil
@@ -243,13 +257,17 @@ func(c *TuyaClient) InitToken() (err error) {
body, err := c.Request("GET", url, nil) body, err := c.Request("GET", url, nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to get token: %w", err) return err
} }
var tokenResponse TokenResponse var tokenResponse TokenResponse
err = json.Unmarshal(body, &tokenResponse) err = json.Unmarshal(body, &tokenResponse)
if err != nil { if err != nil {
return fmt.Errorf("failed to unmarshal token response: %w", err) return err
}
if !tokenResponse.Success {
return fmt.Errorf("error: %s", tokenResponse.Msg)
} }
c.accessToken = tokenResponse.Result.AccessToken c.accessToken = tokenResponse.Result.AccessToken
@@ -260,38 +278,63 @@ func(c *TuyaClient) InitToken() (err error) {
} }
func(c *TuyaClient) InitDevice() (err error) { func(c *TuyaClient) InitDevice() (err error) {
url := fmt.Sprintf("https://%s/v1.0/users/%s/devices/%s/webrtc-configs", c.apiURL, c.uid, c.deviceID) url := fmt.Sprintf("https://%s/v1.0/users/%s/devices/%s/webrtc-configs", c.apiURL, c.uid, c.deviceId)
body, err := c.Request("GET", url, nil) body, err := c.Request("GET", url, nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to get webrtc-configs: %w", err) return err
} }
var webRTCConfigResponse WebRTCConfigResponse var webRTCConfigResponse WebRTCConfigResponse
err = json.Unmarshal(body, &webRTCConfigResponse) err = json.Unmarshal(body, &webRTCConfigResponse)
if err != nil { if err != nil {
return fmt.Errorf("failed to unmarshal webrtc-configs response: %w", err) return err
} }
c.motoID = webRTCConfigResponse.Result.MotoID if !webRTCConfigResponse.Success {
return fmt.Errorf("error: %s", webRTCConfigResponse.Msg)
}
c.motoId = webRTCConfigResponse.Result.MotoID
c.auth = webRTCConfigResponse.Result.Auth c.auth = webRTCConfigResponse.Result.Auth
var skill Skill c.skill = &Skill{
err = json.Unmarshal([]byte(webRTCConfigResponse.Result.Skill), &skill) Audios: []struct {
if err != nil { Channels int `json:"channels"`
return fmt.Errorf("failed to unmarshal skill: %w", err) DataBit int `json:"dataBit"`
CodecType int `json:"codecType"`
SampleRate int `json:"sampleRate"`
}{},
Videos: []struct {
StreamType int `json:"streamType"`
ProfileId string `json:"profileId"`
Width int `json:"width"`
CodecType int `json:"codecType"`
SampleRate int `json:"sampleRate"`
Height int `json:"height"`
}{},
}
if webRTCConfigResponse.Result.Skill != "" {
_ = json.Unmarshal([]byte(webRTCConfigResponse.Result.Skill), c.skill)
} }
var audioDirection string var audioDirection string
if contains(webRTCConfigResponse.Result.AudioAttributes.CallMode, 2) && contains(webRTCConfigResponse.Result.AudioAttributes.HardwareCapability, 1) { if contains(webRTCConfigResponse.Result.AudioAttributes.CallMode, 2) &&
contains(webRTCConfigResponse.Result.AudioAttributes.HardwareCapability, 1) {
audioDirection = core.DirectionSendRecv audioDirection = core.DirectionSendRecv
c.hasBackchannel = true
} else { } else {
audioDirection = core.DirectionRecvonly audioDirection = core.DirectionRecvonly
c.hasBackchannel = false
} }
c.medias = make([]*core.Media, 0) c.medias = make([]*core.Media, 0)
if len(skill.Audios) > 0 {
for _, audio := range skill.Audios { if len(c.skill.Audios) > 0 {
// Use the first Audio-Codec
audio := c.skill.Audios[0]
c.medias = append(c.medias, &core.Media{ c.medias = append(c.medias, &core.Media{
Kind: core.KindAudio, Kind: core.KindAudio,
Direction: audioDirection, Direction: audioDirection,
@@ -303,8 +346,8 @@ func(c *TuyaClient) InitDevice() (err error) {
}, },
}, },
}) })
}
} else { } else {
// Use default values for Audio
c.medias = append(c.medias, &core.Media{ c.medias = append(c.medias, &core.Media{
Kind: core.KindAudio, Kind: core.KindAudio,
Direction: core.DirectionRecvonly, Direction: core.DirectionRecvonly,
@@ -318,32 +361,28 @@ func(c *TuyaClient) InitDevice() (err error) {
}) })
} }
if len(skill.Videos) > 0 { if len(c.skill.Videos) > 0 {
// take only the first video codec // Use the first Video-Codec
video := skill.Videos[0] video := c.skill.Videos[0]
var name string
switch video.CodecType {
case 4:
name = core.CodecH265
case 2:
name = core.CodecH264
default:
name = core.CodecH264
}
c.medias = append(c.medias, &core.Media{ c.medias = append(c.medias, &core.Media{
Kind: core.KindVideo, Kind: core.KindVideo,
Direction: core.DirectionRecvonly, Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{ Codecs: []*core.Codec{
{ {
Name: name, Name: core.CodecH265,
ClockRate: uint32(video.SampleRate),
PayloadType: 96,
},
{
Name: core.CodecH264,
ClockRate: uint32(video.SampleRate), ClockRate: uint32(video.SampleRate),
PayloadType: 96, PayloadType: 96,
}, },
}, },
}) })
} else { } else {
// Use default values for Video
c.medias = append(c.medias, &core.Media{ c.medias = append(c.medias, &core.Media{
Kind: core.KindVideo, Kind: core.KindVideo,
Direction: core.DirectionRecvonly, Direction: core.DirectionRecvonly,
@@ -359,20 +398,19 @@ func(c *TuyaClient) InitDevice() (err error) {
iceServersBytes, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices) iceServersBytes, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices)
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal ICE servers: %w", err) return err
} }
c.iceServers, err = webrtc.UnmarshalICEServers([]byte(iceServersBytes)) c.iceServers, err = webrtc.UnmarshalICEServers([]byte(iceServersBytes))
if err != nil { if err != nil {
return fmt.Errorf("failed to unmarshal ICE servers: %w", err) return err
} }
return nil return nil
} }
func(c *TuyaClient) GetStreamUrl(streamType string) (err error) { func(c *TuyaClient) GetStreamUrl(streamType string) (err error) {
url := fmt.Sprintf("https://%s/v1.0/devices/%s/stream/actions/allocate", c.apiURL, c.deviceID) url := fmt.Sprintf("https://%s/v1.0/devices/%s/stream/actions/allocate", c.apiURL, c.deviceId)
request := &AllocateRequest{ request := &AllocateRequest{
Type: streamType, Type: streamType,
@@ -380,26 +418,24 @@ func(c *TuyaClient) GetStreamUrl(streamType string) (err error) {
body, err := c.Request("POST", url, request) body, err := c.Request("POST", url, request)
if err != nil { if err != nil {
return fmt.Errorf("failed to get rtsp url: %w", err) return err
} }
var allosResponse AllocateResponse var allosResponse AllocateResponse
err = json.Unmarshal(body, &allosResponse) err = json.Unmarshal(body, &allosResponse)
if err != nil { if err != nil {
return fmt.Errorf("failed to unmarshal stream response: %w", err) return err
} }
if !allosResponse.Success { if !allosResponse.Success {
return fmt.Errorf("failed to get stream url: %s", string(body)) return fmt.Errorf("error: %s", allosResponse.Msg)
} }
switch streamType { switch streamType {
case "rtsp": case "rtsp":
c.rtspURL = allosResponse.Result.URL c.rtspURL = allosResponse.Result.URL
fmt.Printf("RTSP URL: %s\n", c.rtspURL)
case "hls": case "hls":
c.hlsURL = allosResponse.Result.URL c.hlsURL = allosResponse.Result.URL
fmt.Printf("HLS URL: %s\n", c.hlsURL)
default: default:
return fmt.Errorf("unsupported stream type: %s", streamType) return fmt.Errorf("unsupported stream type: %s", streamType)
} }
@@ -419,24 +455,66 @@ func(c *TuyaClient) LoadHubConfig() (config *OpenIoTHubConfig, err error) {
body, err := c.Request("POST", url, request) body, err := c.Request("POST", url, request)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get OpenIoTHub config: %w", err) return nil, err
} }
var openIoTHubConfigResponse OpenIoTHubConfigResponse var openIoTHubConfigResponse OpenIoTHubConfigResponse
err = json.Unmarshal(body, &openIoTHubConfigResponse) err = json.Unmarshal(body, &openIoTHubConfigResponse)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to unmarshal OpenIoTHub config response: %w", err) return nil, err
} }
if !openIoTHubConfigResponse.Success { if !openIoTHubConfigResponse.Success {
return nil, fmt.Errorf("failed to get OpenIoTHub config: %s", string(body)) return nil, fmt.Errorf("error: %s", openIoTHubConfigResponse.Msg)
} }
return &openIoTHubConfigResponse.Result, nil return &openIoTHubConfigResponse.Result, nil
} }
// Search the streamType based on the selection "main" or "sub"
func (c *TuyaClient) getStreamType(streamChoice string) uint32 {
// Default streamType if nothing is found
defaultStreamType := uint32(1)
if c.skill == nil || len(c.skill.Videos) == 0 {
return defaultStreamType
}
// Find the highest and lowest resolution
var highestResType uint32 = defaultStreamType
var highestRes int = 0
var lowestResType uint32 = defaultStreamType
var lowestRes int = 0
for _, video := range c.skill.Videos {
res := video.Width * video.Height
// Highest Resolution
if res > highestRes {
highestRes = res
highestResType = uint32(video.StreamType)
}
// Lower Resolution (or first if not set yet)
if lowestRes == 0 || res < lowestRes {
lowestRes = res
lowestResType = uint32(video.StreamType)
}
}
// Return the streamType based on the selection
switch streamChoice {
case "main":
return highestResType
case "sub":
return lowestResType
default:
return defaultStreamType
}
}
func(c *TuyaClient) calBusinessSign(ts int64) string { func(c *TuyaClient) calBusinessSign(ts int64) string {
data := fmt.Sprintf("%s%s%s%d", c.clientID, c.accessToken, c.secret, ts) data := fmt.Sprintf("%s%s%s%d", c.clientId, c.accessToken, c.clientSecret, ts)
val := md5.Sum([]byte(data)) val := md5.Sum([]byte(data))
res := fmt.Sprintf("%X", val) res := fmt.Sprintf("%X", val)
return res return res
+23 -22
View File
@@ -5,7 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/url" "net/url"
"strconv" "regexp"
"github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
@@ -38,13 +38,13 @@ func Dial(rawURL string) (core.Producer, error) {
query := u.Query() query := u.Query()
deviceID := query.Get("device_id") deviceID := query.Get("device_id")
uid := query.Get("uid") uid := query.Get("uid")
clientID := query.Get("client_id") clientId := query.Get("client_id")
secret := query.Get("secret") clientSecret := query.Get("client_secret")
resolution := query.Get("resolution")
streamType := query.Get("type") streamType := query.Get("type")
useRTSP := streamType == "rtsp" streamMode := query.Get("mode")
useHLS := streamType == "hls" useRTSP := streamMode == "rtsp"
useWebRTC := streamType == "webrtc" || streamType == "" useHLS := streamMode == "hls"
useWebRTC := streamMode == "webrtc" || streamMode == ""
// check if host is correct // check if host is correct
switch u.Hostname() { switch u.Hostname() {
@@ -58,8 +58,12 @@ func Dial(rawURL string) (core.Producer, error) {
return nil, fmt.Errorf("tuya: wrong host %s", u.Hostname()) return nil, fmt.Errorf("tuya: wrong host %s", u.Hostname())
} }
if deviceID == "" || uid == "" || clientID == "" || secret == "" { if deviceID == "" || clientId == "" || clientSecret == "" {
return nil, errors.New("tuya: wrong query") return nil, errors.New("tuya: no device_id, client_id or client_secret")
}
if useWebRTC && uid == "" {
return nil, errors.New("tuya: no uid")
} }
if !useRTSP && !useHLS && !useWebRTC { if !useRTSP && !useHLS && !useWebRTC {
@@ -67,7 +71,7 @@ func Dial(rawURL string) (core.Producer, error) {
} }
// Initialize Tuya API client // Initialize Tuya API client
tuyaAPI, err := NewTuyaClient(u.Hostname(), deviceID, uid, clientID, secret, streamType) tuyaAPI, err := NewTuyaClient(u.Hostname(), deviceID, uid, clientId, clientSecret, streamType)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -157,7 +161,7 @@ func Dial(rawURL string) (core.Producer, error) {
} }
client.api.mqtt.handleError = func(err error) { client.api.mqtt.handleError = func(err error) {
fmt.Printf("Tuya error: %s\n", err.Error()) // fmt.Printf("tuya: error: %s\n", err.Error())
client.Stop() client.Stop()
} }
@@ -188,21 +192,18 @@ func Dial(rawURL string) (core.Producer, error) {
return nil, err return nil, err
} }
// horter sdp, remove a=extmap... line, device ONLY allow 8KB json payload
re := regexp.MustCompile(`\r\na=extmap[^\r\n]*`)
offer = re.ReplaceAllString(offer, "")
// Send offer // Send offer
client.api.sendOffer(offer) client.api.sendOffer(offer, tuyaAPI.getStreamType(streamType))
sendOffer.Done(nil) sendOffer.Done(nil)
if err = connState.Wait(); err != nil { if err = connState.Wait(); err != nil {
return nil, err return nil, err
} }
if resolution != "" {
value, err := strconv.Atoi(resolution)
if err == nil {
client.api.sendResolution(value)
}
}
return client, nil return client, nil
} }
} }
@@ -216,11 +217,11 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver,
} }
func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
if webrtcProd, ok := c.prod.(*webrtc.Conn); ok { if prod, ok := c.prod.(*webrtc.Conn); ok {
return webrtcProd.AddTrack(media, codec, track) return prod.AddTrack(media, codec, track)
} }
return fmt.Errorf("add track not supported") return nil
} }
func (c *Client) Start() error { func (c *Client) Start() error {
+10 -10
View File
@@ -74,14 +74,14 @@ type MqttMessage struct {
func(c *TuyaClient) StartMQTT() error { func(c *TuyaClient) StartMQTT() error {
hubConfig, err := c.LoadHubConfig() hubConfig, err := c.LoadHubConfig()
if err != nil { if err != nil {
return fmt.Errorf("failed to load hub config: %w", err) return err
} }
c.mqtt.publishTopic = hubConfig.SinkTopic.IPC c.mqtt.publishTopic = hubConfig.SinkTopic.IPC
c.mqtt.subscribeTopic = hubConfig.SourceSink.IPC c.mqtt.subscribeTopic = hubConfig.SourceSink.IPC
c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "moto_id", c.motoID, 1) c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "moto_id", c.motoId, 1)
c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "{device_id}", c.deviceID, 1) c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "{device_id}", c.deviceId, 1)
parts := strings.Split(c.mqtt.subscribeTopic, "/") parts := strings.Split(c.mqtt.subscribeTopic, "/")
c.mqtt.uid = parts[3] c.mqtt.uid = parts[3]
@@ -96,7 +96,7 @@ func(c *TuyaClient) StartMQTT() error {
c.mqtt.client = mqtt.NewClient(opts) c.mqtt.client = mqtt.NewClient(opts)
if token := c.mqtt.client.Connect(); token.Wait() && token.Error() != nil { if token := c.mqtt.client.Connect(); token.Wait() && token.Error() != nil {
return fmt.Errorf("failed to connect to MQTT broker: %w", token.Error()) return token.Error()
} }
if err := c.mqtt.waiter.Wait(); err != nil { if err := c.mqtt.waiter.Wait(); err != nil {
@@ -129,7 +129,7 @@ func(c *TuyaClient) consume(client mqtt.Client, msg mqtt.Message) {
return return
} }
if rmqtt.Data.Header.SessionID != c.sessionID { if rmqtt.Data.Header.SessionID != c.sessionId {
return return
} }
@@ -202,11 +202,11 @@ func(c *TuyaMQTT) onError(err error) {
} }
} }
func (c *TuyaClient) sendOffer(sdp string) { func (c *TuyaClient) sendOffer(sdp string, streamType uint32) {
c.sendMqttMessage("offer", 302, "", OfferFrame{ c.sendMqttMessage("offer", 302, "", OfferFrame{
Mode: "webrtc", Mode: "webrtc",
Sdp: sdp, Sdp: sdp,
StreamType: 1, StreamType: streamType,
Auth: c.auth, Auth: c.auth,
}) })
} }
@@ -251,9 +251,9 @@ func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transacti
Header: MqttFrameHeader{ Header: MqttFrameHeader{
Type: messageType, Type: messageType,
From: c.mqtt.uid, From: c.mqtt.uid,
To: c.deviceID, To: c.deviceId,
SessionID: c.sessionID, SessionID: c.sessionId,
MotoID: c.motoID, MotoID: c.motoId,
TransactionID: transactionID, TransactionID: transactionID,
}, },
Message: jsonMessage, Message: jsonMessage,