- support adding cameras via interface
- support qr code auth - support resolution change - support h265 - refactor code
This commit is contained in:
+139
-128
@@ -10,13 +10,18 @@ import (
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
type TuyaMQTT struct {
|
||||
type TuyaMqttClient struct {
|
||||
client mqtt.Client
|
||||
waiter core.Waiter
|
||||
publishTopic string
|
||||
subscribeTopic string
|
||||
auth string
|
||||
uid string
|
||||
motoId string
|
||||
deviceId string
|
||||
sessionId string
|
||||
closed bool
|
||||
webrtcVersion int
|
||||
handleAnswer func(answer AnswerFrame)
|
||||
handleCandidate func(candidate CandidateFrame)
|
||||
handleDisconnect func()
|
||||
@@ -56,14 +61,14 @@ type CandidateFrame struct {
|
||||
Candidate string `json:"candidate"`
|
||||
}
|
||||
|
||||
// type ResolutionFrame struct {
|
||||
// Mode string `json:"mode"`
|
||||
// Value int `json:"value"` // 0: HD, 1: SD
|
||||
// }
|
||||
type ResolutionFrame struct {
|
||||
Mode string `json:"mode"`
|
||||
Value int `json:"cmdValue"` // 0: HD, 1: SD
|
||||
}
|
||||
|
||||
type SpeakerFrame struct {
|
||||
Mode string `json:"mode"`
|
||||
Value int `json:"value"` // 0: off, 1: on
|
||||
Value int `json:"cmdValue"` // 0: off, 1: on
|
||||
}
|
||||
|
||||
type DisconnectFrame struct {
|
||||
@@ -77,20 +82,27 @@ type MqttMessage struct {
|
||||
Data MqttFrame `json:"data"`
|
||||
}
|
||||
|
||||
func (c *TuyaClient) StartMQTT() error {
|
||||
hubConfig, err := c.LoadHubConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
func NewTuyaMqttClient(deviceId string) *TuyaMqttClient {
|
||||
return &TuyaMqttClient{
|
||||
deviceId: deviceId,
|
||||
sessionId: core.RandString(6, 62),
|
||||
waiter: core.Waiter{},
|
||||
}
|
||||
}
|
||||
|
||||
c.mqtt.publishTopic = hubConfig.SinkTopic.IPC
|
||||
c.mqtt.subscribeTopic = hubConfig.SourceSink.IPC
|
||||
func (c *TuyaMqttClient) Start(hubConfig *MQTTConfig, webrtcConfig *WebRTCConfig, webrtcVersion int) error {
|
||||
c.webrtcVersion = webrtcVersion
|
||||
c.motoId = webrtcConfig.MotoID
|
||||
c.auth = webrtcConfig.Auth
|
||||
|
||||
c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "moto_id", c.motoId, 1)
|
||||
c.mqtt.publishTopic = strings.Replace(c.mqtt.publishTopic, "{device_id}", c.deviceId, 1)
|
||||
c.publishTopic = hubConfig.PublishTopic
|
||||
c.subscribeTopic = hubConfig.SubscribeTopic
|
||||
|
||||
parts := strings.Split(c.mqtt.subscribeTopic, "/")
|
||||
c.mqtt.uid = parts[3]
|
||||
c.publishTopic = strings.Replace(c.publishTopic, "moto_id", c.motoId, 1)
|
||||
c.publishTopic = strings.Replace(c.publishTopic, "{device_id}", c.deviceId, 1)
|
||||
|
||||
parts := strings.Split(c.subscribeTopic, "/")
|
||||
c.uid = parts[3]
|
||||
|
||||
opts := mqtt.NewClientOptions().AddBroker(hubConfig.Url).
|
||||
SetClientID(hubConfig.ClientID).
|
||||
@@ -99,113 +111,27 @@ func (c *TuyaClient) StartMQTT() error {
|
||||
SetOnConnectHandler(c.onConnect).
|
||||
SetConnectTimeout(10 * time.Second)
|
||||
|
||||
c.mqtt.client = mqtt.NewClient(opts)
|
||||
c.client = mqtt.NewClient(opts)
|
||||
|
||||
if token := c.mqtt.client.Connect(); token.Wait() && token.Error() != nil {
|
||||
if token := c.client.Connect(); token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
if err := c.mqtt.waiter.Wait(); err != nil {
|
||||
if err := c.waiter.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TuyaClient) StopMQTT() {
|
||||
if c.mqtt.client != nil {
|
||||
_ = c.sendDisconnect()
|
||||
c.mqtt.client.Disconnect(1000)
|
||||
func (c *TuyaMqttClient) Stop() {
|
||||
if c.client != nil {
|
||||
_ = c.SendDisconnect()
|
||||
c.client.Disconnect(1000)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaClient) onConnect(client mqtt.Client) {
|
||||
if token := client.Subscribe(c.mqtt.subscribeTopic, 1, c.consume); token.Wait() && token.Error() != nil {
|
||||
c.mqtt.waiter.Done(token.Error())
|
||||
return
|
||||
}
|
||||
|
||||
c.mqtt.waiter.Done(nil)
|
||||
}
|
||||
|
||||
func (c *TuyaClient) consume(client mqtt.Client, msg mqtt.Message) {
|
||||
var rmqtt MqttMessage
|
||||
if err := json.Unmarshal(msg.Payload(), &rmqtt); err != nil {
|
||||
c.mqtt.onError(err)
|
||||
return
|
||||
}
|
||||
|
||||
if rmqtt.Data.Header.SessionID != c.sessionId {
|
||||
return
|
||||
}
|
||||
|
||||
switch rmqtt.Data.Header.Type {
|
||||
case "answer":
|
||||
c.mqtt.onMqttAnswer(&rmqtt)
|
||||
case "candidate":
|
||||
c.mqtt.onMqttCandidate(&rmqtt)
|
||||
case "disconnect":
|
||||
c.mqtt.onMqttDisconnect()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaMQTT) onMqttAnswer(msg *MqttMessage) {
|
||||
var answerFrame AnswerFrame
|
||||
if err := json.Unmarshal(msg.Data.Message, &answerFrame); err != nil {
|
||||
c.onError(err)
|
||||
return
|
||||
}
|
||||
|
||||
c.onAnswer(answerFrame)
|
||||
}
|
||||
|
||||
func (c *TuyaMQTT) onMqttCandidate(msg *MqttMessage) {
|
||||
var candidateFrame CandidateFrame
|
||||
if err := json.Unmarshal(msg.Data.Message, &candidateFrame); err != nil {
|
||||
c.onError(err)
|
||||
return
|
||||
}
|
||||
|
||||
// candidate from device start with "a=", end with "\r\n", which are not needed by Chrome webRTC
|
||||
candidateFrame.Candidate = strings.TrimPrefix(candidateFrame.Candidate, "a=")
|
||||
candidateFrame.Candidate = strings.TrimSuffix(candidateFrame.Candidate, "\r\n")
|
||||
|
||||
c.onCandidate(candidateFrame)
|
||||
}
|
||||
|
||||
func (c *TuyaMQTT) onMqttDisconnect() {
|
||||
c.closed = true
|
||||
c.onDisconnect()
|
||||
}
|
||||
|
||||
func (c *TuyaMQTT) onAnswer(answer AnswerFrame) {
|
||||
if c.handleAnswer != nil {
|
||||
c.handleAnswer(answer)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaMQTT) onCandidate(candidate CandidateFrame) {
|
||||
if c.handleCandidate != nil {
|
||||
c.handleCandidate(candidate)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaMQTT) onDisconnect() {
|
||||
if c.handleDisconnect != nil {
|
||||
c.handleDisconnect()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaMQTT) onError(err error) {
|
||||
if c.handleError != nil {
|
||||
c.handleError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaClient) sendOffer(sdp string, streamResolution string) error {
|
||||
streamType := c.getStreamType(streamResolution)
|
||||
isHEVC := c.isHEVC(streamType)
|
||||
|
||||
func (c *TuyaMqttClient) SendOffer(sdp string, streamResolution string, streamType int, isHEVC bool) error {
|
||||
if isHEVC {
|
||||
// On HEVC we use streamType 0 for main stream (hd) and 1 for sub stream (sd)
|
||||
if streamResolution == "hd" {
|
||||
@@ -224,40 +150,125 @@ func (c *TuyaClient) sendOffer(sdp string, streamResolution string) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (c *TuyaClient) sendCandidate(candidate string) error {
|
||||
func (c *TuyaMqttClient) SendCandidate(candidate string) error {
|
||||
return c.sendMqttMessage("candidate", 302, "", CandidateFrame{
|
||||
Mode: "webrtc",
|
||||
Candidate: candidate,
|
||||
})
|
||||
}
|
||||
|
||||
// func (c *TuyaClient) sendResolution(resolution int) error {
|
||||
// isClaritySupperted := (c.skill.WebRTC & (1 << 5)) != 0
|
||||
// if !isClaritySupperted {
|
||||
// return nil
|
||||
// }
|
||||
func (c *TuyaMqttClient) SendResolution(resolution int) error {
|
||||
// isClaritySupperted := (c.webrtcVersion & (1 << 5)) != 0
|
||||
// if !isClaritySupperted {
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// return c.sendMqttMessage("resolution", 302, "", ResolutionFrame{
|
||||
// Mode: "webrtc",
|
||||
// Value: resolution,
|
||||
// })
|
||||
// }
|
||||
// Protocol 312 is used for clarity
|
||||
return c.sendMqttMessage("resolution", 312, "", ResolutionFrame{
|
||||
Mode: "webrtc",
|
||||
Value: resolution,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *TuyaClient) sendSpeaker(speaker int) error {
|
||||
return c.sendMqttMessage("speaker", 302, "", SpeakerFrame{
|
||||
func (c *TuyaMqttClient) SendSpeaker(speaker int) error {
|
||||
// Protocol 312 is used for speaker
|
||||
return c.sendMqttMessage("speaker", 312, "", SpeakerFrame{
|
||||
Mode: "webrtc",
|
||||
Value: speaker,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *TuyaClient) sendDisconnect() error {
|
||||
func (c *TuyaMqttClient) SendDisconnect() error {
|
||||
return c.sendMqttMessage("disconnect", 302, "", DisconnectFrame{
|
||||
Mode: "webrtc",
|
||||
})
|
||||
}
|
||||
|
||||
func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transactionID string, data interface{}) error {
|
||||
if c.mqtt.closed {
|
||||
func (c *TuyaMqttClient) onConnect(client mqtt.Client) {
|
||||
if token := client.Subscribe(c.subscribeTopic, 1, c.consume); token.Wait() && token.Error() != nil {
|
||||
c.waiter.Done(token.Error())
|
||||
return
|
||||
}
|
||||
|
||||
c.waiter.Done(nil)
|
||||
}
|
||||
|
||||
func (c *TuyaMqttClient) consume(client mqtt.Client, msg mqtt.Message) {
|
||||
var rmqtt MqttMessage
|
||||
if err := json.Unmarshal(msg.Payload(), &rmqtt); err != nil {
|
||||
c.onError(err)
|
||||
return
|
||||
}
|
||||
|
||||
if rmqtt.Data.Header.SessionID != c.sessionId {
|
||||
return
|
||||
}
|
||||
|
||||
switch rmqtt.Data.Header.Type {
|
||||
case "answer":
|
||||
c.onMqttAnswer(&rmqtt)
|
||||
case "candidate":
|
||||
c.onMqttCandidate(&rmqtt)
|
||||
case "disconnect":
|
||||
c.onMqttDisconnect()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaMqttClient) onMqttAnswer(msg *MqttMessage) {
|
||||
var answerFrame AnswerFrame
|
||||
if err := json.Unmarshal(msg.Data.Message, &answerFrame); err != nil {
|
||||
c.onError(err)
|
||||
return
|
||||
}
|
||||
|
||||
c.onAnswer(answerFrame)
|
||||
}
|
||||
|
||||
func (c *TuyaMqttClient) onMqttCandidate(msg *MqttMessage) {
|
||||
var candidateFrame CandidateFrame
|
||||
if err := json.Unmarshal(msg.Data.Message, &candidateFrame); err != nil {
|
||||
c.onError(err)
|
||||
return
|
||||
}
|
||||
|
||||
// candidate from device start with "a=", end with "\r\n", which are not needed by Chrome webRTC
|
||||
candidateFrame.Candidate = strings.TrimPrefix(candidateFrame.Candidate, "a=")
|
||||
candidateFrame.Candidate = strings.TrimSuffix(candidateFrame.Candidate, "\r\n")
|
||||
|
||||
c.onCandidate(candidateFrame)
|
||||
}
|
||||
|
||||
func (c *TuyaMqttClient) onMqttDisconnect() {
|
||||
c.closed = true
|
||||
c.onDisconnect()
|
||||
}
|
||||
|
||||
func (c *TuyaMqttClient) onAnswer(answer AnswerFrame) {
|
||||
if c.handleAnswer != nil {
|
||||
c.handleAnswer(answer)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaMqttClient) onCandidate(candidate CandidateFrame) {
|
||||
if c.handleCandidate != nil {
|
||||
c.handleCandidate(candidate)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaMqttClient) onDisconnect() {
|
||||
if c.handleDisconnect != nil {
|
||||
c.handleDisconnect()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaMqttClient) onError(err error) {
|
||||
if c.handleError != nil {
|
||||
c.handleError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TuyaMqttClient) sendMqttMessage(messageType string, protocol int, transactionID string, data interface{}) error {
|
||||
if c.closed {
|
||||
return fmt.Errorf("mqtt client is closed, send mqtt message fail")
|
||||
}
|
||||
|
||||
@@ -273,7 +284,7 @@ func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transacti
|
||||
Data: MqttFrame{
|
||||
Header: MqttFrameHeader{
|
||||
Type: messageType,
|
||||
From: c.mqtt.uid,
|
||||
From: c.uid,
|
||||
To: c.deviceId,
|
||||
SessionID: c.sessionId,
|
||||
MotoID: c.motoId,
|
||||
@@ -288,7 +299,7 @@ func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transacti
|
||||
return err
|
||||
}
|
||||
|
||||
token := c.mqtt.client.Publish(c.mqtt.publishTopic, 1, false, payload)
|
||||
token := c.client.Publish(c.publishTopic, 1, false, payload)
|
||||
if token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user