diff --git a/go.sum b/go.sum index 7e1b0cee..4dfebcf6 100644 --- a/go.sum +++ b/go.sum @@ -7,9 +7,10 @@ github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwf github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= github.com/expr-lang/expr v1.17.2 h1:o0A99O/Px+/DTjEnQiodAgOIK9PPxL8DtXhBRKC+Iso= github.com/expr-lang/expr v1.17.2/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/expr-lang/expr v1.17.5 h1:i1WrMvcdLF249nSNlpQZN1S6NXuW9WaOfF5tPi3aw3k= @@ -86,7 +87,6 @@ github.com/pion/webrtc/v4 v4.1.3 h1:YZ67Boj9X/hk190jJZ8+HFGQ6DqSZ/fYP3sLAZv7c3c= github.com/pion/webrtc/v4 v4.1.3/go.mod h1:rsq+zQ82ryfR9vbb0L1umPJ6Ogq7zm8mcn9fcGnxomM= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= @@ -98,8 +98,6 @@ github.com/sigurn/crc16 v0.0.0-20240131213347-83fcde1e29d1 h1:NVK+OqnavpyFmUiKfU github.com/sigurn/crc16 v0.0.0-20240131213347-83fcde1e29d1/go.mod h1:9/etS5gpQq9BJsJMWg1wpLbfuSnkm8dPF6FdW2JXVhA= github.com/sigurn/crc8 v0.0.0-20220107193325-2243fe600f9f h1:1R9KdKjCNSd7F8iGTxIpoID9prlYH8nuNYKt0XvweHA= github.com/sigurn/crc8 v0.0.0-20220107193325-2243fe600f9f/go.mod h1:vQhwQ4meQEDfahT5kd61wLAF5AAeh5ZPLVI4JJ/tYo8= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tadglines/go-pkgs v0.0.0-20210623144937-b983b20f54f9 h1:aeN+ghOV0b2VCmKKO3gqnDQ8mLbpABZgRR2FVYx4ouI= @@ -135,6 +133,5 @@ golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/tuya/tuya.go b/internal/tuya/tuya.go new file mode 100644 index 00000000..c3b34e4a --- /dev/null +++ b/internal/tuya/tuya.go @@ -0,0 +1,13 @@ +package tuya + +import ( + "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/tuya" +) + +func Init() { + streams.HandleFunc("tuya", func(source string) (core.Producer, error) { + return tuya.Dial(source) + }) +} diff --git a/main.go b/main.go index e85c5900..3bbf632f 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/srtp" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/tapo" + "github.com/AlexxIT/go2rtc/internal/tuya" "github.com/AlexxIT/go2rtc/internal/v4l2" "github.com/AlexxIT/go2rtc/internal/webrtc" "github.com/AlexxIT/go2rtc/internal/webtorrent" @@ -88,6 +89,7 @@ func main() { roborock.Init() // roborock source homekit.Init() // homekit source ring.Init() // ring source + tuya.Init() // tuya source nest.Init() // nest source bubble.Init() // bubble source expr.Init() // expr source diff --git a/pkg/tuya/api.go b/pkg/tuya/api.go new file mode 100644 index 00000000..4ea609ca --- /dev/null +++ b/pkg/tuya/api.go @@ -0,0 +1,285 @@ +package tuya + +import ( + "bytes" + "crypto/md5" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/webrtc" + "github.com/google/uuid" + pionWebrtc "github.com/pion/webrtc/v4" +) + +type TuyaClient struct { + httpClient *http.Client + mqtt *TuyaMQTT + apiURL string + sessionID string + clientID string + deviceID string + accessToken string + refreshToken string + secret string + expireTime int64 + uid string + motoID string + auth string + iceServers []pionWebrtc.ICEServer +} + +type Token struct { + UID string `json:"uid"` + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpireTime int64 `json:"expire_time"` +} + +type AudioAttributes struct { + CallMode []int `json:"call_mode"` + HardwareCapability []int `json:"hardware_capability"` +} + +type OpenApiICE struct { + Urls string `json:"urls"` + Username string `json:"username"` + Credential string `json:"credential"` + TTL int `json:"ttl"` +} + +type WebICE struct { + Urls string `json:"urls"` + Username string `json:"username,omitempty"` + Credential string `json:"credential,omitempty"` +} + +type P2PConfig struct { + Ices []OpenApiICE `json:"ices"` +} + +type WebRTConfig struct { + AudioAttributes AudioAttributes `json:"audio_attributes"` + Auth string `json:"auth"` + ID string `json:"id"` + MotoID string `json:"moto_id"` + P2PConfig P2PConfig `json:"p2p_config"` + Skill string `json:"skill"` + SupportsWebRTC bool `json:"supports_webrtc"` + VideoClaritiy int `json:"video_clarity"` +} + +type TokenResponse struct { + Result Token `json:"result"` +} + +type WebRTCConfigResponse struct { + Result WebRTConfig `json:"result"` +} + +type OpenIoTHubConfigRequest struct { + UID string `json:"uid"` + UniqueID string `json:"unique_id"` + LinkType string `json:"link_type"` + Topics string `json:"topics"` +} + +type OpenIoTHubConfigResponse struct { + Success bool `json:"success"` + Result OpenIoTHubConfig `json:"result"` +} + +type OpenIoTHubConfig struct { + Url string `json:"url"` + ClientID string `json:"client_id"` + Username string `json:"username"` + Password string `json:"password"` + + SinkTopic struct { + IPC string `json:"ipc"` + } `json:"sink_topic"` + + SourceSink struct { + IPC string `json:"ipc"` + } `json:"source_topic"` + + ExpireTime int `json:"expire_time"` +} + +const ( + defaultTimeout = 5 * time.Second +) + +func NewTuyaClient(openAPIURL string, deviceID string, uid string, clientID string, secret string) (*TuyaClient, error) { + client := &TuyaClient{ + httpClient: &http.Client{Timeout: defaultTimeout}, + mqtt: &TuyaMQTT{waiter: core.Waiter{}}, + apiURL: openAPIURL, + sessionID: core.RandString(6, 62), + clientID: clientID, + deviceID: deviceID, + secret: secret, + uid: uid, + } + + if err := client.InitToken(); err != nil { + return nil, fmt.Errorf("failed to initialize token: %w", err) + } + + if err := client.InitDevice(); err != nil { + return nil, fmt.Errorf("failed to initialize device: %w", err) + } + + if err := client.StartMQTT(); err != nil { + return nil, fmt.Errorf("failed to start MQTT: %w", err) + } + + return client, nil +} + +func (c *TuyaClient) Close() { + c.StopMQTT() + c.httpClient.CloseIdleConnections() +} + +func(c *TuyaClient) Request(method string, url string, body any) ([]byte, error) { + var bodyReader io.Reader + if body != nil { + jsonBody, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("failed to marshal request body: %w", err) + } + bodyReader = bytes.NewReader(jsonBody) + } + + req, err := http.NewRequest(method, url, bodyReader) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + ts := time.Now().UnixNano() / 1000000 + sign := c.calBusinessSign(ts) + + req.Header.Set("Accept", "*") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Access-Control-Allow-Origin", "*") + req.Header.Set("Access-Control-Allow-Methods", "*") + req.Header.Set("Access-Control-Allow-Headers", "*") + req.Header.Set("mode", "no-cors") + req.Header.Set("client_id", c.clientID) + req.Header.Set("access_token", c.accessToken) + req.Header.Set("sign", sign) + req.Header.Set("t", strconv.FormatInt(ts, 10)) + + response, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %w", err) + } + defer response.Body.Close() + + res, err := io.ReadAll(response.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("request failed with status code %d: %s", response.StatusCode, string(res)) + } + + return res, nil +} + +func(c *TuyaClient) InitToken() (err error) { + url := fmt.Sprintf("https://%s/v1.0/token?grant_type=1", c.apiURL) + + c.accessToken = "" + c.refreshToken = "" + + body, err := c.Request("GET", url, nil) + if err != nil { + return fmt.Errorf("failed to get token: %w", err) + } + + var tokenResponse TokenResponse + err = json.Unmarshal(body, &tokenResponse) + if err != nil { + return fmt.Errorf("failed to unmarshal token response: %w", err) + } + + c.accessToken = tokenResponse.Result.AccessToken + c.refreshToken = tokenResponse.Result.RefreshToken + c.expireTime = tokenResponse.Result.ExpireTime + + return nil +} + +func(c *TuyaClient) InitDevice() (err error) { + url := fmt.Sprintf("https://%s/v1.0/users/%s/devices/%s/webrtc-configs", c.apiURL, c.uid, c.deviceID) + + body, err := c.Request("GET", url, nil) + if err != nil { + return fmt.Errorf("failed to get webrtc-configs: %w", err) + } + + var webRTCConfigResponse WebRTCConfigResponse + err = json.Unmarshal(body, &webRTCConfigResponse) + if err != nil { + return fmt.Errorf("failed to unmarshal webrtc-configs response: %w", err) + } + + c.motoID = webRTCConfigResponse.Result.MotoID + c.auth = webRTCConfigResponse.Result.Auth + + iceServersBytes, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices) + if err != nil { + return fmt.Errorf("failed to marshal ICE servers: %w", err) + } + + + c.iceServers, err = webrtc.UnmarshalICEServers([]byte(iceServersBytes)) + if err != nil { + return fmt.Errorf("failed to unmarshal ICE servers: %w", err) + } + + return nil +} + + +func(c *TuyaClient) LoadHubConfig() (config *OpenIoTHubConfig, err error) { + url := fmt.Sprintf("https://%s/v2.0/open-iot-hub/access/config", c.apiURL) + + request := &OpenIoTHubConfigRequest{ + UID: c.uid, + UniqueID: uuid.New().String(), + LinkType: "mqtt", + Topics: "ipc", + } + + var openIoTHubConfigResponse OpenIoTHubConfigResponse + body, err := c.Request("POST", url, request) + if err != nil { + return nil, fmt.Errorf("failed to get OpenIoTHub config: %w", err) + } + + err = json.Unmarshal(body, &openIoTHubConfigResponse) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal OpenIoTHub config response: %w", err) + } + + if !openIoTHubConfigResponse.Success { + return nil, fmt.Errorf("failed to get OpenIoTHub config: %s", string(body)) + } + + return &openIoTHubConfigResponse.Result, nil +} + +func(c *TuyaClient) calBusinessSign(ts int64) string { + data := fmt.Sprintf("%s%s%s%d", c.clientID, c.accessToken, c.secret, ts) + val := md5.Sum([]byte(data)) + res := fmt.Sprintf("%X", val) + return res +} \ No newline at end of file diff --git a/pkg/tuya/client.go b/pkg/tuya/client.go new file mode 100644 index 00000000..c36cd502 --- /dev/null +++ b/pkg/tuya/client.go @@ -0,0 +1,246 @@ +package tuya + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "strconv" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/webrtc" + pion "github.com/pion/webrtc/v4" +) + +type Client struct { + api *TuyaClient + prod core.Producer + done chan struct{} +} + +const ( + DefaultCnURL = "openapi.tuyacn.com" + DefaultWestUsURL = "openapi.tuyaus.com" + DefaultEastUsURL = "openapi-ueaz.tuyaus.com" + DefaultCentralEuURL = "openapi.tuyaeu.com" + DefaultWestEuURL = "openapi-weaz.tuyaeu.com" + DefaultInURL = "openapi.tuyain.com" +) + +func Dial(rawURL string) (*Client, error) { + // Parse URL and validate basic params + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + query := u.Query() + deviceID := query.Get("device_id") + uid := query.Get("uid") + clientID := query.Get("client_id") + secret := query.Get("secret") + resolution := query.Get("resolution") + + if deviceID == "" || uid == "" || clientID == "" || secret == "" { + return nil, errors.New("tuya: wrong query") + } + + // Initialize Tuya API client + tuyaAPI, err := NewTuyaClient(u.Hostname(), deviceID, uid, clientID, secret) + if err != nil { + return nil, err + } + + client := &Client{ + api: tuyaAPI, + done: make(chan struct{}), + } + + conf := pion.Configuration{ + ICEServers: client.api.iceServers, + // ICETransportPolicy: pion.ICETransportPolicyAll, + // BundlePolicy: pion.BundlePolicyMaxBundle, + } + + api, err := webrtc.NewAPI() + if err != nil { + client.api.Close() + return nil, err + } + + pc, err := api.NewPeerConnection(conf) + if err != nil { + client.api.Close() + return nil, err + } + + // protect from sending ICE candidate before Offer + var sendOffer core.Waiter + + // protect from blocking on errors + defer sendOffer.Done(nil) + + // waiter will wait PC error or WS error or nil (connection OK) + var connState core.Waiter + + prod := webrtc.NewConn(pc) + prod.FormatName = "tuya/webrtc" + prod.Mode = core.ModeActiveProducer + prod.Protocol = "mqtt" + prod.URL = rawURL + + client.prod = prod + + // Set up MQTT handlers + client.api.mqtt.handleAnswer = func(answer AnswerFrame) { + desc := pion.SessionDescription{ + Type: pion.SDPTypePranswer, + SDP: answer.Sdp, + } + + if err = pc.SetRemoteDescription(desc); err != nil { + return + } + + prod.SetAnswer(answer.Sdp) + if err != nil { + client.Stop() + } + + prod.SDP = answer.Sdp + } + + client.api.mqtt.handleCandidate = func(candidate CandidateFrame) { + if candidate.Candidate != "" { + prod.AddCandidate(candidate.Candidate) + if err != nil { + client.Stop() + } + } + } + + client.api.mqtt.handleDisconnect = func() { + client.Stop() + } + + client.api.mqtt.handleError = func(err error) { + fmt.Printf("Tuya error: %s\n", err.Error()) + client.Stop() + } + + prod.Listen(func(msg any) { + switch msg := msg.(type) { + case *pion.ICECandidate: + _ = sendOffer.Wait() + client.api.sendCandidate("a=" + msg.ToJSON().Candidate) + + case pion.PeerConnectionState: + switch msg { + case pion.PeerConnectionStateNew: + break + case pion.PeerConnectionStateConnecting: + break + case pion.PeerConnectionStateConnected: + connState.Done(nil) + default: + connState.Done(errors.New("webrtc: " + msg.String())) + } + } + }) + + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendRecv, + Codecs: []*core.Codec{ + { + Name: "PCMU", + ClockRate: 8000, + Channels: 1, + }, + }, + }, + { + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: "H264", + ClockRate: 90000, + }, + }, + }, + } + + // Create offer + offer, err := prod.CreateOffer(medias) + if err != nil { + client.api.Close() + return nil, err + } + + // Send offer + client.api.sendOffer(offer) + sendOffer.Done(nil) + + if err = connState.Wait(); err != nil { + return nil, err + } + + if resolution != "" { + value, err := strconv.Atoi(resolution) + if err == nil { + client.api.sendResolution(value) + } + } + + return client, nil +} + +func (c *Client) GetMedias() []*core.Media { + return c.prod.GetMedias() +} + +func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return c.prod.GetTrack(media, codec) +} + +func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + if webrtcProd, ok := c.prod.(*webrtc.Conn); ok { + return webrtcProd.AddTrack(media, codec, track) + } + + return fmt.Errorf("add track not supported") +} + +func (c *Client) Start() error { + return c.prod.Start() +} + +func (c *Client) Stop() error { + select { + case <-c.done: + return nil + default: + close(c.done) + } + + if c.prod != nil { + _ = c.prod.Stop() + } + + if c.api != nil { + c.api.Close() + c.api = nil + } + + return nil +} + +func (c *Client) MarshalJSON() ([]byte, error) { + if webrtcProd, ok := c.prod.(*webrtc.Conn); ok { + return webrtcProd.MarshalJSON() + } + + return json.Marshal(c.prod) +} diff --git a/pkg/tuya/mqtt.go b/pkg/tuya/mqtt.go new file mode 100644 index 00000000..eff9d8e7 --- /dev/null +++ b/pkg/tuya/mqtt.go @@ -0,0 +1,274 @@ +package tuya + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type TuyaMQTT struct { + client mqtt.Client + waiter core.Waiter + publishTopic string + subscribeTopic string + uid string + closed bool + handleAnswer func(answer AnswerFrame) + handleCandidate func(candidate CandidateFrame) + handleDisconnect func() + handleError func(err error) +} + +type MqttFrameHeader struct { + Type string `json:"type"` + From string `json:"from"` + To string `json:"to"` + SubDevID string `json:"sub_dev_id"` + SessionID string `json:"sessionid"` + MotoID string `json:"moto_id"` + TransactionID string `json:"tid"` +} + +type MqttFrame struct { + Header MqttFrameHeader `json:"header"` + Message json.RawMessage `json:"msg"` +} + +type OfferFrame struct { + Mode string `json:"mode"` + Sdp string `json:"sdp"` + StreamType uint32 `json:"stream_type"` + Auth string `json:"auth"` +} + +type AnswerFrame struct { + Mode string `json:"mode"` + Sdp string `json:"sdp"` +} + +type CandidateFrame struct { + Mode string `json:"mode"` + Candidate string `json:"candidate"` +} + +type ResolutionFrame struct { + Mode string `json:"mode"` + Value int `json:"value"` +} + +type DisconnectFrame struct { + Mode string `json:"mode"` +} + +type MqttMessage struct { + Protocol int `json:"protocol"` + Pv string `json:"pv"` + T int64 `json:"t"` + Data MqttFrame `json:"data"` +} + +func(c *TuyaClient) StartMQTT() error { + hubConfig, err := c.LoadHubConfig() + if err != nil { + return fmt.Errorf("failed to load hub config: %w", err) + } + + c.mqtt.publishTopic = hubConfig.SinkTopic.IPC + c.mqtt.subscribeTopic = hubConfig.SourceSink.IPC + + c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "moto_id", c.motoID, 1) + c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "{device_id}", c.deviceID, 1) + + parts := strings.Split(c.mqtt.subscribeTopic, "/") + c.mqtt.uid = parts[3] + + opts := mqtt.NewClientOptions().AddBroker(hubConfig.Url). + SetClientID(hubConfig.ClientID). + SetUsername(hubConfig.Username). + SetPassword(hubConfig.Password). + SetOnConnectHandler(c.onConnect). + SetConnectTimeout(10 * time.Second) + + c.mqtt.client = mqtt.NewClient(opts) + + if token := c.mqtt.client.Connect(); token.Wait() && token.Error() != nil { + return fmt.Errorf("failed to connect to MQTT broker: %w", token.Error()) + } + + if err := c.mqtt.waiter.Wait(); err != nil { + return err + } + + return nil +} + +func(c *TuyaClient) StopMQTT() { + c.sendDisconnect() + + if c.mqtt.client != nil { + c.mqtt.client.Disconnect(1000) + } +} + +func(c *TuyaClient) onConnect(client mqtt.Client) { + if token := client.Subscribe(c.mqtt.subscribeTopic, 1, c.consume); token.Wait() && token.Error() != nil { + c.mqtt.waiter.Done(token.Error()) + return + } + + c.mqtt.waiter.Done(nil) +} + +func(c *TuyaClient) consume(client mqtt.Client, msg mqtt.Message) { + var rmqtt MqttMessage + if err := json.Unmarshal(msg.Payload(), &rmqtt); err != nil { + c.mqtt.onError(fmt.Errorf("unmarshal mqtt message fail: %s, payload: %s", err.Error(), string(msg.Payload()))) + return + } + + if rmqtt.Data.Header.SessionID != c.sessionID { + return + } + + switch rmqtt.Data.Header.Type { + case "answer": + c.mqtt.onMqttAnswer(&rmqtt) + case "candidate": + c.mqtt.onMqttCandidate(&rmqtt) + case "disconnect": + c.mqtt.onMqttDisconnect() + } +} + +func(c *TuyaMQTT) onMqttAnswer(msg *MqttMessage) { + var answerFrame AnswerFrame + if err := json.Unmarshal(msg.Data.Message, &answerFrame); err != nil { + c.onError(fmt.Errorf("unmarshal mqtt answer frame fail: %s, session: %s, frame: %s", + err.Error(), + msg.Data.Header.SessionID, + string(msg.Data.Message))) + return + } + + c.onAnswer(answerFrame) +} + +func(c *TuyaMQTT) onMqttCandidate(msg *MqttMessage) { + var candidateFrame CandidateFrame + if err := json.Unmarshal(msg.Data.Message, &candidateFrame); err != nil { + c.onError(fmt.Errorf("unmarshal mqtt candidate frame fail: %s, session: %s, frame: %s", + err.Error(), + msg.Data.Header.SessionID, + string(msg.Data.Message))) + return + } + + // candidate from device start with "a=", end with "\r\n", which are not needed by Chrome webRTC + candidateFrame.Candidate = strings.TrimPrefix(candidateFrame.Candidate, "a=") + candidateFrame.Candidate = strings.TrimSuffix(candidateFrame.Candidate, "\r\n") + + c.onCandidate(candidateFrame) +} + +func(c *TuyaMQTT) onMqttDisconnect() { + c.closed = true + c.onDisconnect() +} + +func(c *TuyaMQTT) onAnswer(answer AnswerFrame) { + if c.handleAnswer != nil { + c.handleAnswer(answer) + } +} + +func(c *TuyaMQTT) onCandidate(candidate CandidateFrame) { + if c.handleCandidate != nil { + c.handleCandidate(candidate) + } +} + +func(c *TuyaMQTT) onDisconnect() { + if c.handleDisconnect != nil { + c.handleDisconnect() + } +} + +func(c *TuyaMQTT) onError(err error) { + if c.handleError != nil { + c.handleError(err) + } +} + +func (c *TuyaClient) sendOffer(sdp string) { + c.sendMqttMessage("offer", 302, "", OfferFrame{ + Mode: "webrtc", + Sdp: sdp, + StreamType: 1, + Auth: c.auth, + }) +} + +func (c *TuyaClient) sendCandidate(candidate string) { + c.sendMqttMessage("candidate", 302, "", CandidateFrame{ + Mode: "webrtc", + Candidate: candidate, + }) +} + +func (c *TuyaClient) sendResolution(resolution int) { + c.sendMqttMessage("resolution", 302, "", ResolutionFrame{ + Mode: "webrtc", + Value: resolution, + }) +} + +func(c *TuyaClient) sendDisconnect() { + c.sendMqttMessage("disconnect", 302, "", DisconnectFrame{ + Mode: "webrtc", + }) +} + +func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transactionID string, data interface{}) { + if c.mqtt.closed { + c.mqtt.onError(fmt.Errorf("mqtt client is closed, send mqtt message fail")) + return + } + + jsonMessage, err := json.Marshal(data) + if err != nil { + c.mqtt.onError(fmt.Errorf("marshal mqtt message fail: %s", err.Error())) + return + } + + msg := &MqttMessage{ + Protocol: protocol, + Pv: "2.2", + T: time.Now().Unix(), + Data: MqttFrame{ + Header: MqttFrameHeader{ + Type: messageType, + From: c.mqtt.uid, + To: c.deviceID, + SessionID: c.sessionID, + MotoID: c.motoID, + TransactionID: transactionID, + }, + Message: jsonMessage, + }, + } + + payload, err := json.Marshal(msg) + if err != nil { + c.mqtt.onError(fmt.Errorf("marshal mqtt message fail: %s", err.Error())) + return + } + + token := c.mqtt.client.Publish(c.mqtt.publishTopic, 1, false, payload) + if token.Wait() && token.Error() != nil { + c.mqtt.onError(fmt.Errorf("mqtt publish fail: %s, topic: %s", token.Error().Error(), c.mqtt.publishTopic)) + } +}