add tuya source

This commit is contained in:
seydx
2025-05-10 18:34:02 +02:00
parent 34b103bbcb
commit e74fc6f198
6 changed files with 822 additions and 5 deletions
+2 -5
View File
@@ -7,9 +7,10 @@ github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwf
github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/expr-lang/expr v1.17.2 h1:o0A99O/Px+/DTjEnQiodAgOIK9PPxL8DtXhBRKC+Iso=
github.com/expr-lang/expr v1.17.2/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/expr-lang/expr v1.17.5 h1:i1WrMvcdLF249nSNlpQZN1S6NXuW9WaOfF5tPi3aw3k=
@@ -86,7 +87,6 @@ github.com/pion/webrtc/v4 v4.1.3 h1:YZ67Boj9X/hk190jJZ8+HFGQ6DqSZ/fYP3sLAZv7c3c=
github.com/pion/webrtc/v4 v4.1.3/go.mod h1:rsq+zQ82ryfR9vbb0L1umPJ6Ogq7zm8mcn9fcGnxomM=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
@@ -98,8 +98,6 @@ github.com/sigurn/crc16 v0.0.0-20240131213347-83fcde1e29d1 h1:NVK+OqnavpyFmUiKfU
github.com/sigurn/crc16 v0.0.0-20240131213347-83fcde1e29d1/go.mod h1:9/etS5gpQq9BJsJMWg1wpLbfuSnkm8dPF6FdW2JXVhA=
github.com/sigurn/crc8 v0.0.0-20220107193325-2243fe600f9f h1:1R9KdKjCNSd7F8iGTxIpoID9prlYH8nuNYKt0XvweHA=
github.com/sigurn/crc8 v0.0.0-20220107193325-2243fe600f9f/go.mod h1:vQhwQ4meQEDfahT5kd61wLAF5AAeh5ZPLVI4JJ/tYo8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tadglines/go-pkgs v0.0.0-20210623144937-b983b20f54f9 h1:aeN+ghOV0b2VCmKKO3gqnDQ8mLbpABZgRR2FVYx4ouI=
@@ -135,6 +133,5 @@ golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+13
View File
@@ -0,0 +1,13 @@
package tuya
import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tuya"
)
func Init() {
streams.HandleFunc("tuya", func(source string) (core.Producer, error) {
return tuya.Dial(source)
})
}
+2
View File
@@ -35,6 +35,7 @@ import (
"github.com/AlexxIT/go2rtc/internal/srtp"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/internal/tapo"
"github.com/AlexxIT/go2rtc/internal/tuya"
"github.com/AlexxIT/go2rtc/internal/v4l2"
"github.com/AlexxIT/go2rtc/internal/webrtc"
"github.com/AlexxIT/go2rtc/internal/webtorrent"
@@ -88,6 +89,7 @@ func main() {
roborock.Init() // roborock source
homekit.Init() // homekit source
ring.Init() // ring source
tuya.Init() // tuya source
nest.Init() // nest source
bubble.Init() // bubble source
expr.Init() // expr source
+285
View File
@@ -0,0 +1,285 @@
package tuya
import (
"bytes"
"crypto/md5"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
"github.com/google/uuid"
pionWebrtc "github.com/pion/webrtc/v4"
)
type TuyaClient struct {
httpClient *http.Client
mqtt *TuyaMQTT
apiURL string
sessionID string
clientID string
deviceID string
accessToken string
refreshToken string
secret string
expireTime int64
uid string
motoID string
auth string
iceServers []pionWebrtc.ICEServer
}
type Token struct {
UID string `json:"uid"`
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpireTime int64 `json:"expire_time"`
}
type AudioAttributes struct {
CallMode []int `json:"call_mode"`
HardwareCapability []int `json:"hardware_capability"`
}
type OpenApiICE struct {
Urls string `json:"urls"`
Username string `json:"username"`
Credential string `json:"credential"`
TTL int `json:"ttl"`
}
type WebICE struct {
Urls string `json:"urls"`
Username string `json:"username,omitempty"`
Credential string `json:"credential,omitempty"`
}
type P2PConfig struct {
Ices []OpenApiICE `json:"ices"`
}
type WebRTConfig struct {
AudioAttributes AudioAttributes `json:"audio_attributes"`
Auth string `json:"auth"`
ID string `json:"id"`
MotoID string `json:"moto_id"`
P2PConfig P2PConfig `json:"p2p_config"`
Skill string `json:"skill"`
SupportsWebRTC bool `json:"supports_webrtc"`
VideoClaritiy int `json:"video_clarity"`
}
type TokenResponse struct {
Result Token `json:"result"`
}
type WebRTCConfigResponse struct {
Result WebRTConfig `json:"result"`
}
type OpenIoTHubConfigRequest struct {
UID string `json:"uid"`
UniqueID string `json:"unique_id"`
LinkType string `json:"link_type"`
Topics string `json:"topics"`
}
type OpenIoTHubConfigResponse struct {
Success bool `json:"success"`
Result OpenIoTHubConfig `json:"result"`
}
type OpenIoTHubConfig struct {
Url string `json:"url"`
ClientID string `json:"client_id"`
Username string `json:"username"`
Password string `json:"password"`
SinkTopic struct {
IPC string `json:"ipc"`
} `json:"sink_topic"`
SourceSink struct {
IPC string `json:"ipc"`
} `json:"source_topic"`
ExpireTime int `json:"expire_time"`
}
const (
defaultTimeout = 5 * time.Second
)
func NewTuyaClient(openAPIURL string, deviceID string, uid string, clientID string, secret string) (*TuyaClient, error) {
client := &TuyaClient{
httpClient: &http.Client{Timeout: defaultTimeout},
mqtt: &TuyaMQTT{waiter: core.Waiter{}},
apiURL: openAPIURL,
sessionID: core.RandString(6, 62),
clientID: clientID,
deviceID: deviceID,
secret: secret,
uid: uid,
}
if err := client.InitToken(); err != nil {
return nil, fmt.Errorf("failed to initialize token: %w", err)
}
if err := client.InitDevice(); err != nil {
return nil, fmt.Errorf("failed to initialize device: %w", err)
}
if err := client.StartMQTT(); err != nil {
return nil, fmt.Errorf("failed to start MQTT: %w", err)
}
return client, nil
}
func (c *TuyaClient) Close() {
c.StopMQTT()
c.httpClient.CloseIdleConnections()
}
func(c *TuyaClient) Request(method string, url string, body any) ([]byte, error) {
var bodyReader io.Reader
if body != nil {
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal request body: %w", err)
}
bodyReader = bytes.NewReader(jsonBody)
}
req, err := http.NewRequest(method, url, bodyReader)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
ts := time.Now().UnixNano() / 1000000
sign := c.calBusinessSign(ts)
req.Header.Set("Accept", "*")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Access-Control-Allow-Origin", "*")
req.Header.Set("Access-Control-Allow-Methods", "*")
req.Header.Set("Access-Control-Allow-Headers", "*")
req.Header.Set("mode", "no-cors")
req.Header.Set("client_id", c.clientID)
req.Header.Set("access_token", c.accessToken)
req.Header.Set("sign", sign)
req.Header.Set("t", strconv.FormatInt(ts, 10))
response, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
defer response.Body.Close()
res, err := io.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request failed with status code %d: %s", response.StatusCode, string(res))
}
return res, nil
}
func(c *TuyaClient) InitToken() (err error) {
url := fmt.Sprintf("https://%s/v1.0/token?grant_type=1", c.apiURL)
c.accessToken = ""
c.refreshToken = ""
body, err := c.Request("GET", url, nil)
if err != nil {
return fmt.Errorf("failed to get token: %w", err)
}
var tokenResponse TokenResponse
err = json.Unmarshal(body, &tokenResponse)
if err != nil {
return fmt.Errorf("failed to unmarshal token response: %w", err)
}
c.accessToken = tokenResponse.Result.AccessToken
c.refreshToken = tokenResponse.Result.RefreshToken
c.expireTime = tokenResponse.Result.ExpireTime
return nil
}
func(c *TuyaClient) InitDevice() (err error) {
url := fmt.Sprintf("https://%s/v1.0/users/%s/devices/%s/webrtc-configs", c.apiURL, c.uid, c.deviceID)
body, err := c.Request("GET", url, nil)
if err != nil {
return fmt.Errorf("failed to get webrtc-configs: %w", err)
}
var webRTCConfigResponse WebRTCConfigResponse
err = json.Unmarshal(body, &webRTCConfigResponse)
if err != nil {
return fmt.Errorf("failed to unmarshal webrtc-configs response: %w", err)
}
c.motoID = webRTCConfigResponse.Result.MotoID
c.auth = webRTCConfigResponse.Result.Auth
iceServersBytes, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices)
if err != nil {
return fmt.Errorf("failed to marshal ICE servers: %w", err)
}
c.iceServers, err = webrtc.UnmarshalICEServers([]byte(iceServersBytes))
if err != nil {
return fmt.Errorf("failed to unmarshal ICE servers: %w", err)
}
return nil
}
func(c *TuyaClient) LoadHubConfig() (config *OpenIoTHubConfig, err error) {
url := fmt.Sprintf("https://%s/v2.0/open-iot-hub/access/config", c.apiURL)
request := &OpenIoTHubConfigRequest{
UID: c.uid,
UniqueID: uuid.New().String(),
LinkType: "mqtt",
Topics: "ipc",
}
var openIoTHubConfigResponse OpenIoTHubConfigResponse
body, err := c.Request("POST", url, request)
if err != nil {
return nil, fmt.Errorf("failed to get OpenIoTHub config: %w", err)
}
err = json.Unmarshal(body, &openIoTHubConfigResponse)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal OpenIoTHub config response: %w", err)
}
if !openIoTHubConfigResponse.Success {
return nil, fmt.Errorf("failed to get OpenIoTHub config: %s", string(body))
}
return &openIoTHubConfigResponse.Result, nil
}
func(c *TuyaClient) calBusinessSign(ts int64) string {
data := fmt.Sprintf("%s%s%s%d", c.clientID, c.accessToken, c.secret, ts)
val := md5.Sum([]byte(data))
res := fmt.Sprintf("%X", val)
return res
}
+246
View File
@@ -0,0 +1,246 @@
package tuya
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"strconv"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
pion "github.com/pion/webrtc/v4"
)
type Client struct {
api *TuyaClient
prod core.Producer
done chan struct{}
}
const (
DefaultCnURL = "openapi.tuyacn.com"
DefaultWestUsURL = "openapi.tuyaus.com"
DefaultEastUsURL = "openapi-ueaz.tuyaus.com"
DefaultCentralEuURL = "openapi.tuyaeu.com"
DefaultWestEuURL = "openapi-weaz.tuyaeu.com"
DefaultInURL = "openapi.tuyain.com"
)
func Dial(rawURL string) (*Client, error) {
// Parse URL and validate basic params
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
query := u.Query()
deviceID := query.Get("device_id")
uid := query.Get("uid")
clientID := query.Get("client_id")
secret := query.Get("secret")
resolution := query.Get("resolution")
if deviceID == "" || uid == "" || clientID == "" || secret == "" {
return nil, errors.New("tuya: wrong query")
}
// Initialize Tuya API client
tuyaAPI, err := NewTuyaClient(u.Hostname(), deviceID, uid, clientID, secret)
if err != nil {
return nil, err
}
client := &Client{
api: tuyaAPI,
done: make(chan struct{}),
}
conf := pion.Configuration{
ICEServers: client.api.iceServers,
// ICETransportPolicy: pion.ICETransportPolicyAll,
// BundlePolicy: pion.BundlePolicyMaxBundle,
}
api, err := webrtc.NewAPI()
if err != nil {
client.api.Close()
return nil, err
}
pc, err := api.NewPeerConnection(conf)
if err != nil {
client.api.Close()
return nil, err
}
// protect from sending ICE candidate before Offer
var sendOffer core.Waiter
// protect from blocking on errors
defer sendOffer.Done(nil)
// waiter will wait PC error or WS error or nil (connection OK)
var connState core.Waiter
prod := webrtc.NewConn(pc)
prod.FormatName = "tuya/webrtc"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "mqtt"
prod.URL = rawURL
client.prod = prod
// Set up MQTT handlers
client.api.mqtt.handleAnswer = func(answer AnswerFrame) {
desc := pion.SessionDescription{
Type: pion.SDPTypePranswer,
SDP: answer.Sdp,
}
if err = pc.SetRemoteDescription(desc); err != nil {
return
}
prod.SetAnswer(answer.Sdp)
if err != nil {
client.Stop()
}
prod.SDP = answer.Sdp
}
client.api.mqtt.handleCandidate = func(candidate CandidateFrame) {
if candidate.Candidate != "" {
prod.AddCandidate(candidate.Candidate)
if err != nil {
client.Stop()
}
}
}
client.api.mqtt.handleDisconnect = func() {
client.Stop()
}
client.api.mqtt.handleError = func(err error) {
fmt.Printf("Tuya error: %s\n", err.Error())
client.Stop()
}
prod.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:
_ = sendOffer.Wait()
client.api.sendCandidate("a=" + msg.ToJSON().Candidate)
case pion.PeerConnectionState:
switch msg {
case pion.PeerConnectionStateNew:
break
case pion.PeerConnectionStateConnecting:
break
case pion.PeerConnectionStateConnected:
connState.Done(nil)
default:
connState.Done(errors.New("webrtc: " + msg.String()))
}
}
})
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendRecv,
Codecs: []*core.Codec{
{
Name: "PCMU",
ClockRate: 8000,
Channels: 1,
},
},
},
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: "H264",
ClockRate: 90000,
},
},
},
}
// Create offer
offer, err := prod.CreateOffer(medias)
if err != nil {
client.api.Close()
return nil, err
}
// Send offer
client.api.sendOffer(offer)
sendOffer.Done(nil)
if err = connState.Wait(); err != nil {
return nil, err
}
if resolution != "" {
value, err := strconv.Atoi(resolution)
if err == nil {
client.api.sendResolution(value)
}
}
return client, nil
}
func (c *Client) GetMedias() []*core.Media {
return c.prod.GetMedias()
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return c.prod.GetTrack(media, codec)
}
func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
if webrtcProd, ok := c.prod.(*webrtc.Conn); ok {
return webrtcProd.AddTrack(media, codec, track)
}
return fmt.Errorf("add track not supported")
}
func (c *Client) Start() error {
return c.prod.Start()
}
func (c *Client) Stop() error {
select {
case <-c.done:
return nil
default:
close(c.done)
}
if c.prod != nil {
_ = c.prod.Stop()
}
if c.api != nil {
c.api.Close()
c.api = nil
}
return nil
}
func (c *Client) MarshalJSON() ([]byte, error) {
if webrtcProd, ok := c.prod.(*webrtc.Conn); ok {
return webrtcProd.MarshalJSON()
}
return json.Marshal(c.prod)
}
+274
View File
@@ -0,0 +1,274 @@
package tuya
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type TuyaMQTT struct {
client mqtt.Client
waiter core.Waiter
publishTopic string
subscribeTopic string
uid string
closed bool
handleAnswer func(answer AnswerFrame)
handleCandidate func(candidate CandidateFrame)
handleDisconnect func()
handleError func(err error)
}
type MqttFrameHeader struct {
Type string `json:"type"`
From string `json:"from"`
To string `json:"to"`
SubDevID string `json:"sub_dev_id"`
SessionID string `json:"sessionid"`
MotoID string `json:"moto_id"`
TransactionID string `json:"tid"`
}
type MqttFrame struct {
Header MqttFrameHeader `json:"header"`
Message json.RawMessage `json:"msg"`
}
type OfferFrame struct {
Mode string `json:"mode"`
Sdp string `json:"sdp"`
StreamType uint32 `json:"stream_type"`
Auth string `json:"auth"`
}
type AnswerFrame struct {
Mode string `json:"mode"`
Sdp string `json:"sdp"`
}
type CandidateFrame struct {
Mode string `json:"mode"`
Candidate string `json:"candidate"`
}
type ResolutionFrame struct {
Mode string `json:"mode"`
Value int `json:"value"`
}
type DisconnectFrame struct {
Mode string `json:"mode"`
}
type MqttMessage struct {
Protocol int `json:"protocol"`
Pv string `json:"pv"`
T int64 `json:"t"`
Data MqttFrame `json:"data"`
}
func(c *TuyaClient) StartMQTT() error {
hubConfig, err := c.LoadHubConfig()
if err != nil {
return fmt.Errorf("failed to load hub config: %w", err)
}
c.mqtt.publishTopic = hubConfig.SinkTopic.IPC
c.mqtt.subscribeTopic = hubConfig.SourceSink.IPC
c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "moto_id", c.motoID, 1)
c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "{device_id}", c.deviceID, 1)
parts := strings.Split(c.mqtt.subscribeTopic, "/")
c.mqtt.uid = parts[3]
opts := mqtt.NewClientOptions().AddBroker(hubConfig.Url).
SetClientID(hubConfig.ClientID).
SetUsername(hubConfig.Username).
SetPassword(hubConfig.Password).
SetOnConnectHandler(c.onConnect).
SetConnectTimeout(10 * time.Second)
c.mqtt.client = mqtt.NewClient(opts)
if token := c.mqtt.client.Connect(); token.Wait() && token.Error() != nil {
return fmt.Errorf("failed to connect to MQTT broker: %w", token.Error())
}
if err := c.mqtt.waiter.Wait(); err != nil {
return err
}
return nil
}
func(c *TuyaClient) StopMQTT() {
c.sendDisconnect()
if c.mqtt.client != nil {
c.mqtt.client.Disconnect(1000)
}
}
func(c *TuyaClient) onConnect(client mqtt.Client) {
if token := client.Subscribe(c.mqtt.subscribeTopic, 1, c.consume); token.Wait() && token.Error() != nil {
c.mqtt.waiter.Done(token.Error())
return
}
c.mqtt.waiter.Done(nil)
}
func(c *TuyaClient) consume(client mqtt.Client, msg mqtt.Message) {
var rmqtt MqttMessage
if err := json.Unmarshal(msg.Payload(), &rmqtt); err != nil {
c.mqtt.onError(fmt.Errorf("unmarshal mqtt message fail: %s, payload: %s", err.Error(), string(msg.Payload())))
return
}
if rmqtt.Data.Header.SessionID != c.sessionID {
return
}
switch rmqtt.Data.Header.Type {
case "answer":
c.mqtt.onMqttAnswer(&rmqtt)
case "candidate":
c.mqtt.onMqttCandidate(&rmqtt)
case "disconnect":
c.mqtt.onMqttDisconnect()
}
}
func(c *TuyaMQTT) onMqttAnswer(msg *MqttMessage) {
var answerFrame AnswerFrame
if err := json.Unmarshal(msg.Data.Message, &answerFrame); err != nil {
c.onError(fmt.Errorf("unmarshal mqtt answer frame fail: %s, session: %s, frame: %s",
err.Error(),
msg.Data.Header.SessionID,
string(msg.Data.Message)))
return
}
c.onAnswer(answerFrame)
}
func(c *TuyaMQTT) onMqttCandidate(msg *MqttMessage) {
var candidateFrame CandidateFrame
if err := json.Unmarshal(msg.Data.Message, &candidateFrame); err != nil {
c.onError(fmt.Errorf("unmarshal mqtt candidate frame fail: %s, session: %s, frame: %s",
err.Error(),
msg.Data.Header.SessionID,
string(msg.Data.Message)))
return
}
// candidate from device start with "a=", end with "\r\n", which are not needed by Chrome webRTC
candidateFrame.Candidate = strings.TrimPrefix(candidateFrame.Candidate, "a=")
candidateFrame.Candidate = strings.TrimSuffix(candidateFrame.Candidate, "\r\n")
c.onCandidate(candidateFrame)
}
func(c *TuyaMQTT) onMqttDisconnect() {
c.closed = true
c.onDisconnect()
}
func(c *TuyaMQTT) onAnswer(answer AnswerFrame) {
if c.handleAnswer != nil {
c.handleAnswer(answer)
}
}
func(c *TuyaMQTT) onCandidate(candidate CandidateFrame) {
if c.handleCandidate != nil {
c.handleCandidate(candidate)
}
}
func(c *TuyaMQTT) onDisconnect() {
if c.handleDisconnect != nil {
c.handleDisconnect()
}
}
func(c *TuyaMQTT) onError(err error) {
if c.handleError != nil {
c.handleError(err)
}
}
func (c *TuyaClient) sendOffer(sdp string) {
c.sendMqttMessage("offer", 302, "", OfferFrame{
Mode: "webrtc",
Sdp: sdp,
StreamType: 1,
Auth: c.auth,
})
}
func (c *TuyaClient) sendCandidate(candidate string) {
c.sendMqttMessage("candidate", 302, "", CandidateFrame{
Mode: "webrtc",
Candidate: candidate,
})
}
func (c *TuyaClient) sendResolution(resolution int) {
c.sendMqttMessage("resolution", 302, "", ResolutionFrame{
Mode: "webrtc",
Value: resolution,
})
}
func(c *TuyaClient) sendDisconnect() {
c.sendMqttMessage("disconnect", 302, "", DisconnectFrame{
Mode: "webrtc",
})
}
func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transactionID string, data interface{}) {
if c.mqtt.closed {
c.mqtt.onError(fmt.Errorf("mqtt client is closed, send mqtt message fail"))
return
}
jsonMessage, err := json.Marshal(data)
if err != nil {
c.mqtt.onError(fmt.Errorf("marshal mqtt message fail: %s", err.Error()))
return
}
msg := &MqttMessage{
Protocol: protocol,
Pv: "2.2",
T: time.Now().Unix(),
Data: MqttFrame{
Header: MqttFrameHeader{
Type: messageType,
From: c.mqtt.uid,
To: c.deviceID,
SessionID: c.sessionID,
MotoID: c.motoID,
TransactionID: transactionID,
},
Message: jsonMessage,
},
}
payload, err := json.Marshal(msg)
if err != nil {
c.mqtt.onError(fmt.Errorf("marshal mqtt message fail: %s", err.Error()))
return
}
token := c.mqtt.client.Publish(c.mqtt.publishTopic, 1, false, payload)
if token.Wait() && token.Error() != nil {
c.mqtt.onError(fmt.Errorf("mqtt publish fail: %s, topic: %s", token.Error().Error(), c.mqtt.publishTopic))
}
}