This commit is contained in:
seydx
2025-05-17 14:25:18 +02:00
parent 16a812c8b8
commit a9bcb46f38
7 changed files with 415 additions and 710 deletions
+158 -289
View File
@@ -17,25 +17,23 @@ import (
) )
type TuyaClient struct { type TuyaClient struct {
httpClient *http.Client httpClient *http.Client
mqtt *TuyaMQTT mqtt *TuyaMQTT
apiURL string apiURL string
rtspURL string rtspURL string
hlsURL string hlsURL string
sessionId string sessionId string
clientId string clientId string
clientSecret string clientSecret string
deviceId string deviceId string
accessToken string accessToken string
refreshToken string refreshToken string
expireTime int64 expireTime int64
uid string uid string
motoId string motoId string
auth string auth string
skill *Skill skill *Skill
iceServers []pionWebrtc.ICEServer iceServers []pionWebrtc.ICEServer
medias []*core.Media
hasBackchannel bool
} }
type Token struct { type Token struct {
@@ -159,21 +157,16 @@ type OpenIoTHubConfigResponse struct {
Code int `json:"code,omitempty"` Code int `json:"code,omitempty"`
} }
const ( func NewTuyaClient(openAPIURL string, deviceId string, uid string, clientId string, clientSecret string, streamMode string, streamRole string) (*TuyaClient, error) {
defaultTimeout = 5 * time.Second
)
func NewTuyaClient(openAPIURL string, deviceId string, uid string, clientId string, clientSecret string, streamMode string) (*TuyaClient, error) {
client := &TuyaClient{ client := &TuyaClient{
httpClient: &http.Client{Timeout: defaultTimeout}, httpClient: &http.Client{Timeout: 5 * time.Second},
mqtt: &TuyaMQTT{waiter: core.Waiter{}}, mqtt: &TuyaMQTT{waiter: core.Waiter{}},
apiURL: openAPIURL, apiURL: openAPIURL,
sessionId: core.RandString(6, 62), sessionId: core.RandString(6, 62),
clientId: clientId, clientId: clientId,
deviceId: deviceId, deviceId: deviceId,
clientSecret: clientSecret, clientSecret: clientSecret,
uid: uid, uid: uid,
hasBackchannel: false,
} }
if err := client.InitToken(); err != nil { if err := client.InitToken(); err != nil {
@@ -189,7 +182,7 @@ func NewTuyaClient(openAPIURL string, deviceId string, uid string, clientId stri
return nil, fmt.Errorf("failed to get HLS URL: %w", err) return nil, fmt.Errorf("failed to get HLS URL: %w", err)
} }
} else { } else {
if err := client.InitDevice(); err != nil { if err := client.InitDevice(streamRole); err != nil {
return nil, fmt.Errorf("failed to initialize device: %w", err) return nil, fmt.Errorf("failed to initialize device: %w", err)
} }
@@ -206,6 +199,135 @@ func (c *TuyaClient) Close() {
c.httpClient.CloseIdleConnections() c.httpClient.CloseIdleConnections()
} }
func (c *TuyaClient) InitToken() (err error) {
url := fmt.Sprintf("https://%s/v1.0/token?grant_type=1", c.apiURL)
c.accessToken = ""
c.refreshToken = ""
body, err := c.Request("GET", url, nil)
if err != nil {
return err
}
var tokenResponse TokenResponse
err = json.Unmarshal(body, &tokenResponse)
if err != nil {
return err
}
if !tokenResponse.Success {
return fmt.Errorf(tokenResponse.Msg)
}
c.accessToken = tokenResponse.Result.AccessToken
c.refreshToken = tokenResponse.Result.RefreshToken
c.expireTime = tokenResponse.Result.ExpireTime
return nil
}
func (c *TuyaClient) InitDevice(streamRole string) (err error) {
url := fmt.Sprintf("https://%s/v1.0/users/%s/devices/%s/webrtc-configs", c.apiURL, c.uid, c.deviceId)
body, err := c.Request("GET", url, nil)
if err != nil {
return err
}
var webRTCConfigResponse WebRTCConfigResponse
err = json.Unmarshal(body, &webRTCConfigResponse)
if err != nil {
return err
}
if !webRTCConfigResponse.Success {
return fmt.Errorf(webRTCConfigResponse.Msg)
}
err = json.Unmarshal([]byte(webRTCConfigResponse.Result.Skill), &c.skill)
if err != nil {
return err
}
iceServers, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices)
if err != nil {
return err
}
c.iceServers, err = webrtc.UnmarshalICEServers(iceServers)
if err != nil {
return err
}
c.motoId = webRTCConfigResponse.Result.MotoID
c.auth = webRTCConfigResponse.Result.Auth
return nil
}
func (c *TuyaClient) GetStreamUrl(streamType string) (err error) {
url := fmt.Sprintf("https://%s/v1.0/devices/%s/stream/actions/allocate", c.apiURL, c.deviceId)
request := &AllocateRequest{
Type: streamType,
}
body, err := c.Request("POST", url, request)
if err != nil {
return err
}
var allosResponse AllocateResponse
err = json.Unmarshal(body, &allosResponse)
if err != nil {
return err
}
if !allosResponse.Success {
return fmt.Errorf(allosResponse.Msg)
}
switch streamType {
case "rtsp":
c.rtspURL = allosResponse.Result.URL
case "hls":
c.hlsURL = allosResponse.Result.URL
default:
return fmt.Errorf("unsupported stream type: %s", streamType)
}
return nil
}
func (c *TuyaClient) LoadHubConfig() (config *OpenIoTHubConfig, err error) {
url := fmt.Sprintf("https://%s/v2.0/open-iot-hub/access/config", c.apiURL)
request := &OpenIoTHubConfigRequest{
UID: c.uid,
UniqueID: uuid.New().String(),
LinkType: "mqtt",
Topics: "ipc",
}
body, err := c.Request("POST", url, request)
if err != nil {
return nil, err
}
var openIoTHubConfigResponse OpenIoTHubConfigResponse
err = json.Unmarshal(body, &openIoTHubConfigResponse)
if err != nil {
return nil, err
}
if !openIoTHubConfigResponse.Success {
return nil, fmt.Errorf(openIoTHubConfigResponse.Msg)
}
return &openIoTHubConfigResponse.Result, nil
}
func (c *TuyaClient) Request(method string, url string, body any) ([]byte, error) { func (c *TuyaClient) Request(method string, url string, body any) ([]byte, error) {
var bodyReader io.Reader var bodyReader io.Reader
if body != nil { if body != nil {
@@ -253,224 +375,7 @@ func (c *TuyaClient) Request(method string, url string, body any) ([]byte, error
return res, nil return res, nil
} }
func (c *TuyaClient) InitToken() (err error) { func (c *TuyaClient) getStreamType(streamRole string) int {
url := fmt.Sprintf("https://%s/v1.0/token?grant_type=1", c.apiURL)
c.accessToken = ""
c.refreshToken = ""
body, err := c.Request("GET", url, nil)
if err != nil {
return err
}
var tokenResponse TokenResponse
err = json.Unmarshal(body, &tokenResponse)
if err != nil {
return err
}
if !tokenResponse.Success {
return fmt.Errorf("error: %s", tokenResponse.Msg)
}
c.accessToken = tokenResponse.Result.AccessToken
c.refreshToken = tokenResponse.Result.RefreshToken
c.expireTime = tokenResponse.Result.ExpireTime
return nil
}
func (c *TuyaClient) InitDevice() (err error) {
url := fmt.Sprintf("https://%s/v1.0/users/%s/devices/%s/webrtc-configs", c.apiURL, c.uid, c.deviceId)
body, err := c.Request("GET", url, nil)
if err != nil {
return err
}
var webRTCConfigResponse WebRTCConfigResponse
err = json.Unmarshal(body, &webRTCConfigResponse)
if err != nil {
return err
}
if !webRTCConfigResponse.Success {
return fmt.Errorf("error: %s", webRTCConfigResponse.Msg)
}
c.motoId = webRTCConfigResponse.Result.MotoID
c.auth = webRTCConfigResponse.Result.Auth
c.skill = &Skill{
WebRTC: 3, // basic webrtc
Audios: make([]AudioSkill, 0),
Videos: make([]VideoSkill, 0),
}
if webRTCConfigResponse.Result.Skill != "" {
_ = json.Unmarshal([]byte(webRTCConfigResponse.Result.Skill), c.skill)
}
c.hasBackchannel = contains(webRTCConfigResponse.Result.AudioAttributes.CallMode, 2) &&
contains(webRTCConfigResponse.Result.AudioAttributes.HardwareCapability, 1)
c.medias = make([]*core.Media, 0)
if len(c.skill.Audios) > 0 {
direction := core.DirectionRecvonly
if c.hasBackchannel {
direction = core.DirectionSendRecv
}
codecs := make([]*core.Codec, 0)
for _, audio := range c.skill.Audios {
codecs = append(codecs, &core.Codec{
Name: getAudioCodec(audio.CodecType),
ClockRate: uint32(audio.SampleRate),
Channels: uint8(audio.Channels),
})
}
c.medias = append(c.medias, &core.Media{
Kind: core.KindAudio,
Direction: direction,
Codecs: codecs,
})
} else {
// Use default values for Audio
c.medias = append(c.medias, &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecPCMU,
ClockRate: uint32(8000),
Channels: uint8(1),
},
},
})
}
if len(c.skill.Videos) > 0 {
codecs := make([]*core.Codec, 0)
for _, video := range c.skill.Videos {
if video.CodecType == 2 {
codecs = append(codecs, &core.Codec{
Name: core.CodecH264,
ClockRate: uint32(video.SampleRate),
PayloadType: 96,
})
} else if video.CodecType == 4 {
codecs = append(codecs, &core.Codec{
Name: core.CodecH265,
ClockRate: uint32(video.SampleRate),
PayloadType: 96,
})
}
}
c.medias = append(c.medias, &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: codecs,
})
} else {
// Use default values for Video
c.medias = append(c.medias, &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecH264,
ClockRate: uint32(90000),
PayloadType: 96,
},
{
Name: core.CodecH265,
ClockRate: uint32(90000),
PayloadType: 96,
},
},
})
}
iceServersBytes, err := json.Marshal(&webRTCConfigResponse.Result.P2PConfig.Ices)
if err != nil {
return err
}
c.iceServers, err = webrtc.UnmarshalICEServers([]byte(iceServersBytes))
if err != nil {
return err
}
return nil
}
func (c *TuyaClient) GetStreamUrl(streamType string) (err error) {
url := fmt.Sprintf("https://%s/v1.0/devices/%s/stream/actions/allocate", c.apiURL, c.deviceId)
request := &AllocateRequest{
Type: streamType,
}
body, err := c.Request("POST", url, request)
if err != nil {
return err
}
var allosResponse AllocateResponse
err = json.Unmarshal(body, &allosResponse)
if err != nil {
return err
}
if !allosResponse.Success {
return fmt.Errorf("error: %s", allosResponse.Msg)
}
switch streamType {
case "rtsp":
c.rtspURL = allosResponse.Result.URL
case "hls":
c.hlsURL = allosResponse.Result.URL
default:
return fmt.Errorf("unsupported stream type: %s", streamType)
}
return nil
}
func (c *TuyaClient) LoadHubConfig() (config *OpenIoTHubConfig, err error) {
url := fmt.Sprintf("https://%s/v2.0/open-iot-hub/access/config", c.apiURL)
request := &OpenIoTHubConfigRequest{
UID: c.uid,
UniqueID: uuid.New().String(),
LinkType: "mqtt",
Topics: "ipc",
}
body, err := c.Request("POST", url, request)
if err != nil {
return nil, err
}
var openIoTHubConfigResponse OpenIoTHubConfigResponse
err = json.Unmarshal(body, &openIoTHubConfigResponse)
if err != nil {
return nil, err
}
if !openIoTHubConfigResponse.Success {
return nil, fmt.Errorf("error: %s", openIoTHubConfigResponse.Msg)
}
return &openIoTHubConfigResponse.Result, nil
}
func (c *TuyaClient) getStreamType(streamChoice string) int {
// Default streamType if nothing is found // Default streamType if nothing is found
defaultStreamType := 1 defaultStreamType := 1
@@ -501,7 +406,7 @@ func (c *TuyaClient) getStreamType(streamChoice string) int {
} }
// Return the streamType based on the selection // Return the streamType based on the selection
switch streamChoice { switch streamRole {
case "main": case "main":
return highestResType return highestResType
case "sub": case "sub":
@@ -511,29 +416,6 @@ func (c *TuyaClient) getStreamType(streamChoice string) int {
} }
} }
func getAudioCodec(codecType int) string {
switch codecType {
// case 100:
// return "ADPCM"
case 101:
return core.CodecPCM
case 102, 103, 104:
return core.CodecAAC
case 105:
return core.CodecPCMU
case 106:
return core.CodecPCMA
// case 107:
// return "G726-32"
// case 108:
// return "SPEEX"
case 109:
return core.CodecMP3
default:
return core.CodecPCMU
}
}
func (c *TuyaClient) isHEVC(streamType int) bool { func (c *TuyaClient) isHEVC(streamType int) bool {
for _, video := range c.skill.Videos { for _, video := range c.skill.Videos {
if video.StreamType == streamType { if video.StreamType == streamType {
@@ -544,22 +426,9 @@ func (c *TuyaClient) isHEVC(streamType int) bool {
return false return false
} }
func (c *TuyaClient) isClaritySupported(webrtcValue int) bool {
return (webrtcValue & (1 << 5)) != 0
}
func (c *TuyaClient) calBusinessSign(ts int64) string { func (c *TuyaClient) calBusinessSign(ts int64) string {
data := fmt.Sprintf("%s%s%s%d", c.clientId, c.accessToken, c.clientSecret, ts) data := fmt.Sprintf("%s%s%s%d", c.clientId, c.accessToken, c.clientSecret, ts)
val := md5.Sum([]byte(data)) val := md5.Sum([]byte(data))
res := fmt.Sprintf("%X", val) res := fmt.Sprintf("%X", val)
return res return res
} }
func contains(slice []int, val int) bool {
for _, item := range slice {
if item == val {
return true
}
}
return false
}
+224 -53
View File
@@ -1,6 +1,7 @@
package tuya package tuya
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/url" "net/url"
@@ -14,10 +15,30 @@ import (
) )
type Client struct { type Client struct {
api *TuyaClient api *TuyaClient
conn *webrtc.Conn conn *webrtc.Conn
dcConn *DCConn pc *pion.PeerConnection
done chan struct{} dc *pion.DataChannel
videoSSRC uint32
audioSSRC uint32
isHEVC bool
connected core.Waiter
closed bool
handlers map[uint32]func(*rtp.Packet)
}
type DataChannelMessage struct {
Type string `json:"type"`
Msg string `json:"msg"`
}
type RecvMessage struct {
Video struct {
SSRC uint32 `json:"ssrc"`
} `json:"video"`
Audio struct {
SSRC uint32 `json:"ssrc"`
} `json:"audio"`
} }
const ( const (
@@ -30,7 +51,6 @@ const (
) )
func Dial(rawURL string) (core.Producer, error) { func Dial(rawURL string) (core.Producer, error) {
// Parse URL and validate basic params
u, err := url.Parse(rawURL) u, err := url.Parse(rawURL)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -41,8 +61,13 @@ func Dial(rawURL string) (core.Producer, error) {
uid := query.Get("uid") uid := query.Get("uid")
clientId := query.Get("client_id") clientId := query.Get("client_id")
clientSecret := query.Get("client_secret") clientSecret := query.Get("client_secret")
streamType := query.Get("type") streamRole := query.Get("role")
streamMode := query.Get("mode") streamMode := query.Get("mode")
if streamRole == "" || (streamRole != "main" && streamRole != "sub") {
streamRole = "main"
}
useRTSP := streamMode == "rtsp" useRTSP := streamMode == "rtsp"
useHLS := streamMode == "hls" useHLS := streamMode == "hls"
useWebRTC := streamMode == "webrtc" || streamMode == "" useWebRTC := streamMode == "webrtc" || streamMode == ""
@@ -72,14 +97,14 @@ func Dial(rawURL string) (core.Producer, error) {
} }
// Initialize Tuya API client // Initialize Tuya API client
tuyaAPI, err := NewTuyaClient(u.Hostname(), deviceID, uid, clientId, clientSecret, streamMode) tuyaAPI, err := NewTuyaClient(u.Hostname(), deviceID, uid, clientId, clientSecret, streamMode, streamRole)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("tuya: %w", err)
} }
client := &Client{ client := &Client{
api: tuyaAPI, api: tuyaAPI,
done: make(chan struct{}), handlers: make(map[uint32]func(*rtp.Packet)),
} }
if useRTSP { if useRTSP {
@@ -93,8 +118,9 @@ func Dial(rawURL string) (core.Producer, error) {
} }
return streams.GetProducer(client.api.hlsURL) return streams.GetProducer(client.api.hlsURL)
} else { } else {
isHEVC := client.api.isHEVC(client.api.getStreamType(streamType)) client.isHEVC = client.api.isHEVC(client.api.getStreamType(streamRole))
// Create a new PeerConnection
conf := pion.Configuration{ conf := pion.Configuration{
ICEServers: client.api.iceServers, ICEServers: client.api.iceServers,
ICETransportPolicy: pion.ICETransportPolicyAll, ICETransportPolicy: pion.ICETransportPolicyAll,
@@ -107,7 +133,7 @@ func Dial(rawURL string) (core.Producer, error) {
return nil, err return nil, err
} }
pc, err := api.NewPeerConnection(conf) client.pc, err = api.NewPeerConnection(conf)
if err != nil { if err != nil {
client.api.Close() client.api.Close()
return nil, err return nil, err
@@ -119,10 +145,8 @@ func Dial(rawURL string) (core.Producer, error) {
// protect from blocking on errors // protect from blocking on errors
defer sendOffer.Done(nil) defer sendOffer.Done(nil)
// waiter will wait PC error // Create new WebRTC connection
var connState core.Waiter client.conn = webrtc.NewConn(client.pc)
client.conn = webrtc.NewConn(pc)
client.conn.FormatName = "tuya/webrtc" client.conn.FormatName = "tuya/webrtc"
client.conn.Mode = core.ModeActiveProducer client.conn.Mode = core.ModeActiveProducer
client.conn.Protocol = "mqtt" client.conn.Protocol = "mqtt"
@@ -137,8 +161,8 @@ func Dial(rawURL string) (core.Producer, error) {
SDP: answer.Sdp, SDP: answer.Sdp,
} }
if err = pc.SetRemoteDescription(desc); err != nil { if err = client.pc.SetRemoteDescription(desc); err != nil {
client.Stop() client.connected.Done(err)
return return
} }
@@ -147,7 +171,18 @@ func Dial(rawURL string) (core.Producer, error) {
return return
} }
client.conn.SDP = answer.Sdp if client.isHEVC {
// Tuya answers always with H264 codec, replace with HEVC
for _, media := range client.conn.Medias {
if media.Kind == core.KindVideo {
for _, codec := range media.Codecs {
if codec.Name == core.CodecH264 {
codec.Name = core.CodecH265
}
}
}
}
}
} }
client.api.mqtt.handleCandidate = func(candidate CandidateFrame) { client.api.mqtt.handleCandidate = func(candidate CandidateFrame) {
@@ -170,15 +205,57 @@ func Dial(rawURL string) (core.Producer, error) {
client.Stop() client.Stop()
} }
// Set up data channel for HEVC // On HEVC, use DataChannel to receive video/audio
if isHEVC { if client.isHEVC {
client.dcConn, err = NewDCConn(pc, client) // Create a new DataChannel
if err != nil { maxRetransmits := uint16(5)
client.api.Close() ordered := true
return nil, err client.dc, err = client.pc.CreateDataChannel("fmp4Stream", &pion.DataChannelInit{
} MaxRetransmits: &maxRetransmits,
Ordered: &ordered,
})
// Set up data channel handler
client.dc.OnMessage(func(msg pion.DataChannelMessage) {
if msg.IsString {
client.probe(msg)
} else {
packet := &rtp.Packet{}
if err := packet.Unmarshal(msg.Data); err != nil {
return
}
if handler, ok := client.handlers[packet.SSRC]; ok {
handler(packet)
}
}
})
client.dc.OnError(func(err error) {
// fmt.Printf("tuya: datachannel error: %s\n", err.Error())
client.connected.Done(err)
})
client.dc.OnClose(func() {
// fmt.Println("tuya: datachannel closed")
client.connected.Done(errors.New("datachannel: closed"))
})
client.dc.OnOpen(func() {
// fmt.Println("tuya: datachannel opened")
codecRequest, _ := json.Marshal(DataChannelMessage{
Type: "codec",
Msg: "",
})
if err := client.sendMessageToDataChannel(codecRequest); err != nil {
client.connected.Done(fmt.Errorf("failed to send codec request: %w", err))
}
})
} }
// Set up pc handler
client.conn.Listen(func(msg any) { client.conn.Listen(func(msg any) {
switch msg := msg.(type) { switch msg := msg.(type) {
case *pion.ICECandidate: case *pion.ICECandidate:
@@ -192,16 +269,25 @@ func Dial(rawURL string) (core.Producer, error) {
case pion.PeerConnectionStateConnecting: case pion.PeerConnectionStateConnecting:
break break
case pion.PeerConnectionStateConnected: case pion.PeerConnectionStateConnected:
connState.Done(nil) // On HEVC, wait for DataChannel to be opened and camera to send codec info
if !client.isHEVC {
client.connected.Done(nil)
}
default: default:
client.Stop() client.Stop()
connState.Done(errors.New("webrtc: " + msg.String())) client.connected.Done(errors.New("webrtc: " + msg.String()))
} }
} }
}) })
// Audio first, otherwise tuya will send corrupt sdp
medias := []*core.Media{
{Kind: core.KindAudio, Direction: core.DirectionSendRecv},
{Kind: core.KindVideo, Direction: core.DirectionRecvonly},
}
// Create offer // Create offer
offer, err := client.conn.CreateOffer(client.api.medias) offer, err := client.conn.CreateOffer(medias)
if err != nil { if err != nil {
client.api.Close() client.api.Close()
return nil, err return nil, err
@@ -213,20 +299,12 @@ func Dial(rawURL string) (core.Producer, error) {
offer = re.ReplaceAllString(offer, "") offer = re.ReplaceAllString(offer, "")
// Send offer // Send offer
client.api.sendOffer(offer, streamType) client.api.sendOffer(offer, streamRole)
sendOffer.Done(nil) sendOffer.Done(nil)
if client.dcConn != nil { // Wait for connection
if err = client.dcConn.connected.Wait(); err != nil { if err = client.connected.Wait(); err != nil {
client.Stop() return nil, fmt.Errorf("tuya: %w", err)
return nil, err
}
return client.dcConn, nil
}
if err = connState.Wait(); err != nil {
return nil, err
} }
return client, nil return client, nil
@@ -242,10 +320,15 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver,
} }
func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
// RepackG711 will not work, so add default logic without repacking // Manually handle backchannel, because repacking audio through go2rtc does not work
localTrack := c.getSender()
if localTrack == nil {
return errors.New("webrtc: can't get track")
}
payloadType := codec.PayloadType payloadType := codec.PayloadType
localTrack := c.conn.GetSenderTrack(media.ID)
sender := core.NewSender(media, codec) sender := core.NewSender(media, codec)
sender.Handler = func(packet *rtp.Packet) { sender.Handler = func(packet *rtp.Packet) {
c.conn.Send += packet.MarshalSize() c.conn.Send += packet.MarshalSize()
@@ -260,29 +343,47 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
} }
func (c *Client) Start() error { func (c *Client) Start() error {
if c.dcConn != nil { if len(c.conn.Receivers) == 0 {
c.dcConn.Start() return errors.New("tuya: no receivers")
}
var video, audio *core.Receiver
for _, receiver := range c.conn.Receivers {
if receiver.Codec.IsVideo() {
video = receiver
} else if receiver.Codec.IsAudio() {
audio = receiver
}
}
c.handlers[c.videoSSRC] = func(packet *rtp.Packet) {
if video != nil {
video.WriteRTP(packet)
}
}
c.handlers[c.audioSSRC] = func(packet *rtp.Packet) {
if audio != nil {
audio.WriteRTP(packet)
}
} }
return c.conn.Start() return c.conn.Start()
} }
func (c *Client) Stop() error { func (c *Client) Stop() error {
select { if c.closed {
case <-c.done:
return nil return nil
default: }
close(c.done)
for ssrc := range c.handlers {
delete(c.handlers, ssrc)
} }
if c.conn != nil { if c.conn != nil {
_ = c.conn.Stop() _ = c.conn.Stop()
} }
if c.dcConn != nil {
_ = c.dcConn.Stop()
}
if c.api != nil { if c.api != nil {
c.api.Close() c.api.Close()
} }
@@ -293,3 +394,73 @@ func (c *Client) Stop() error {
func (c *Client) MarshalJSON() ([]byte, error) { func (c *Client) MarshalJSON() ([]byte, error) {
return c.conn.MarshalJSON() return c.conn.MarshalJSON()
} }
func (c *Client) probe(msg pion.DataChannelMessage) {
// fmt.Printf("[tuya] Received string message: %s\n", string(msg.Data))
var message DataChannelMessage
if err := json.Unmarshal([]byte(msg.Data), &message); err != nil {
c.connected.Done(fmt.Errorf("failed to parse datachannel message: %w", err))
}
switch message.Type {
case "codec":
// fmt.Printf("[tuya] Codec info from camera: %s\n", message.Msg)
frameRequest, _ := json.Marshal(DataChannelMessage{
Type: "start",
Msg: "frame",
})
err := c.sendMessageToDataChannel(frameRequest)
if err != nil {
c.connected.Done(fmt.Errorf("failed to send frame request: %w", err))
}
case "recv":
var recvMessage RecvMessage
if err := json.Unmarshal([]byte(message.Msg), &recvMessage); err != nil {
c.connected.Done(fmt.Errorf("failed to parse recv message: %w", err))
return
}
c.videoSSRC = recvMessage.Video.SSRC
c.audioSSRC = recvMessage.Audio.SSRC
completeMsg, _ := json.Marshal(DataChannelMessage{
Type: "complete",
Msg: "",
})
err := c.sendMessageToDataChannel(completeMsg)
if err != nil {
c.connected.Done(fmt.Errorf("failed to send complete message: %w", err))
}
c.connected.Done(nil)
}
}
func (c *Client) sendMessageToDataChannel(message []byte) error {
if c.dc != nil {
// fmt.Printf("[tuya] sending message to data channel: %s\n", message)
return c.dc.Send(message)
}
return nil
}
func (c *Client) getSender() *webrtc.Track {
for _, tr := range c.pc.GetTransceivers() {
if tr.Kind() == pion.RTPCodecTypeAudio {
if tr.Kind() == pion.RTPCodecType(pion.RTPTransceiverDirectionSendonly) || tr.Kind() == pion.RTPCodecType(pion.RTPTransceiverDirectionSendrecv) {
if s := tr.Sender(); s != nil {
if t := s.Track().(*webrtc.Track); t != nil {
return t
}
}
}
}
}
return nil
}
-253
View File
@@ -1,253 +0,0 @@
package tuya
import (
"encoding/json"
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/pion/rtp"
pion "github.com/pion/webrtc/v4"
)
type DCConn struct {
core.Connection
client *Client
dc *pion.DataChannel
dem *mp4.Demuxer
queue *FrameBufferQueue
msgs chan pion.DataChannelMessage
connected core.Waiter
closed core.Waiter
initialized bool
}
type DataChannelMessage struct {
Type string `json:"type"`
Msg string `json:"msg"`
}
func NewDCConn(pc *pion.PeerConnection, c *Client) (*DCConn, error) {
maxRetransmits := uint16(5)
ordered := true
dc, err := pc.CreateDataChannel("fmp4Stream", &pion.DataChannelInit{
MaxRetransmits: &maxRetransmits,
Ordered: &ordered,
})
if err != nil {
return nil, err
}
conn := &DCConn{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "webrtc/fmp4",
Transport: dc,
},
client: c,
dc: dc,
dem: &mp4.Demuxer{},
queue: NewFrameBufferQueue(),
msgs: make(chan pion.DataChannelMessage, 10), // Saw max 4 messages in a row, 10 should be enough
initialized: false,
}
dc.OnMessage(func(msg pion.DataChannelMessage) {
conn.msgs <- msg
})
dc.OnError(func(err error) {
conn.connected.Done(err)
})
dc.OnClose(func() {
close(conn.msgs)
conn.connected.Done(errors.New("datachannel: closed"))
})
go conn.initializationLoop()
return conn, nil
}
func (c *DCConn) initializationLoop() {
for msg := range c.msgs {
if c.initialized {
return
}
err := c.probe(msg)
if err != nil {
c.connected.Done(err)
return
}
if c.initialized {
c.connected.Done(nil)
return
}
}
}
func (c *DCConn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
if media.Direction == core.DirectionSendRecv || media.Direction == core.DirectionSendonly {
return c.client.GetTrack(media, codec)
}
for _, receiver := range c.Receivers {
if receiver.Codec == codec {
return receiver, nil
}
}
receiver := core.NewReceiver(media, codec)
c.Receivers = append(c.Receivers, receiver)
return receiver, nil
}
func (c *DCConn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
payloadType := codec.PayloadType
localTrack := c.client.conn.GetSenderTrack(media.ID)
sender := core.NewSender(media, codec)
sender.Handler = func(packet *rtp.Packet) {
c.Send += packet.MarshalSize()
//important to send with remote PayloadType
_ = localTrack.WriteRTP(payloadType, packet)
}
sender.HandleRTP(track)
c.Senders = append(c.Senders, sender)
return nil
}
func (c *DCConn) Start() error {
receivers := make(map[uint32]*core.Receiver)
for _, receiver := range c.Receivers {
trackID := c.dem.GetTrackID(receiver.Codec)
receivers[trackID] = receiver
}
ch := make(chan []byte, 10)
defer close(ch)
go func() {
for data := range ch {
allTracks := c.dem.DemuxAll(data)
for _, trackData := range allTracks {
trackID := trackData.TrackID
packets := trackData.Packets
receiver := receivers[trackID]
if receiver == nil {
continue
}
for _, packet := range packets {
receiver.WriteRTP(packet)
}
}
}
}()
go func() {
for msg := range c.msgs {
if len(msg.Data) >= 4 {
segmentNum := int(msg.Data[1])
fragmentCount := int(msg.Data[2])
fragmentSeq := int(msg.Data[3])
mp4Data := msg.Data[4:]
c.queue.AddFragment(segmentNum, fragmentCount, fragmentSeq, mp4Data)
if c.queue.IsSegmentComplete(segmentNum, fragmentCount) {
b := c.queue.GetCombinedBuffer(segmentNum)
c.Recv += len(b)
ch <- b
}
}
}
}()
c.closed.Wait()
return nil
}
func (c *DCConn) sendMessageToDataChannel(message string) error {
if c.dc != nil {
return c.dc.SendText(message)
}
return nil
}
func (c *DCConn) probe(msg pion.DataChannelMessage) (err error) {
if msg.IsString {
var message DataChannelMessage
if err = json.Unmarshal(msg.Data, &message); err != nil {
return err
}
switch message.Type {
case "codec":
response, _ := json.Marshal(DataChannelMessage{
Type: "start",
Msg: "fmp4",
})
err = c.sendMessageToDataChannel(string(response))
if err != nil {
return err
}
case "recv":
response, _ := json.Marshal(DataChannelMessage{
Type: "complete",
Msg: "",
})
err = c.sendMessageToDataChannel(string(response))
if err != nil {
return err
}
}
} else {
if len(msg.Data) >= 4 {
messageType := msg.Data[0]
segmentNum := int(msg.Data[1])
fragmentCount := int(msg.Data[2])
fragmentSeq := int(msg.Data[3])
mp4Data := msg.Data[4:]
// initialization segment
if messageType == 0 && segmentNum == 1 && fragmentCount == 1 && fragmentSeq == 1 {
medias := c.dem.Probe(mp4Data)
c.Medias = append(c.Medias, medias...)
// Add backchannel
webrtcMedias := c.client.GetMedias()
for _, media := range webrtcMedias {
if media.Kind == core.KindAudio {
if media.Direction == core.DirectionSendRecv || media.Direction == core.DirectionSendonly {
c.Medias = append(c.Medias, media)
}
}
}
c.initialized = true
}
}
}
return nil
}
func (c *DCConn) Stop() error {
if c.dc != nil && c.dc.ReadyState() == pion.DataChannelStateOpen {
_ = c.dc.Close()
}
c.closed.Done(nil)
return nil
}
-86
View File
@@ -1,86 +0,0 @@
package tuya
import (
"sort"
"sync"
)
type FrameBufferQueue struct {
segments map[int]map[int][]byte // segNum -> fragSeq -> data
mu sync.Mutex
}
func NewFrameBufferQueue() *FrameBufferQueue {
return &FrameBufferQueue{
segments: make(map[int]map[int][]byte),
}
}
func (q *FrameBufferQueue) AddFragment(segmentNum, fragmentCount, fragmentSeq int, data []byte) {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.segments[segmentNum]; !ok {
q.segments[segmentNum] = make(map[int][]byte)
}
q.segments[segmentNum][fragmentSeq] = data
}
func (q *FrameBufferQueue) IsSegmentComplete(segmentNum, fragmentCount int) bool {
q.mu.Lock()
defer q.mu.Unlock()
if frags, ok := q.segments[segmentNum]; ok {
// Make sure we have the right number of fragments
if len(frags) != fragmentCount {
return false
}
// Check if we have all sequences from 1 to fragmentCount
for i := 1; i <= fragmentCount; i++ {
if _, ok := frags[i]; !ok {
return false
}
}
return true
}
return false
}
func (q *FrameBufferQueue) GetCombinedBuffer(segNum int) []byte {
q.mu.Lock()
defer q.mu.Unlock()
if frags, ok := q.segments[segNum]; ok {
// Sort fragments by sequence number
var keys []int
for k := range frags {
keys = append(keys, k)
}
sort.Ints(keys)
// Calculate total size for pre-allocation
totalSize := 0
for _, k := range keys {
totalSize += len(frags[k])
}
// Pre-allocate buffer for better performance
combined := make([]byte, 0, totalSize)
// Combine fragments in sequence order
for _, k := range keys {
combined = append(combined, frags[k]...)
}
// Remove this segment to free memory
delete(q.segments, segNum)
return combined
}
return nil
}
+31 -27
View File
@@ -131,7 +131,7 @@ func (c *TuyaClient) onConnect(client mqtt.Client) {
func (c *TuyaClient) consume(client mqtt.Client, msg mqtt.Message) { func (c *TuyaClient) consume(client mqtt.Client, msg mqtt.Message) {
var rmqtt MqttMessage var rmqtt MqttMessage
if err := json.Unmarshal(msg.Payload(), &rmqtt); err != nil { if err := json.Unmarshal(msg.Payload(), &rmqtt); err != nil {
c.mqtt.onError(fmt.Errorf("unmarshal mqtt message fail: %s, payload: %s", err.Error(), string(msg.Payload()))) c.mqtt.onError(err)
return return
} }
@@ -152,10 +152,7 @@ func (c *TuyaClient) consume(client mqtt.Client, msg mqtt.Message) {
func (c *TuyaMQTT) onMqttAnswer(msg *MqttMessage) { func (c *TuyaMQTT) onMqttAnswer(msg *MqttMessage) {
var answerFrame AnswerFrame var answerFrame AnswerFrame
if err := json.Unmarshal(msg.Data.Message, &answerFrame); err != nil { if err := json.Unmarshal(msg.Data.Message, &answerFrame); err != nil {
c.onError(fmt.Errorf("unmarshal mqtt answer frame fail: %s, session: %s, frame: %s", c.onError(err)
err.Error(),
msg.Data.Header.SessionID,
string(msg.Data.Message)))
return return
} }
@@ -165,10 +162,7 @@ func (c *TuyaMQTT) onMqttAnswer(msg *MqttMessage) {
func (c *TuyaMQTT) onMqttCandidate(msg *MqttMessage) { func (c *TuyaMQTT) onMqttCandidate(msg *MqttMessage) {
var candidateFrame CandidateFrame var candidateFrame CandidateFrame
if err := json.Unmarshal(msg.Data.Message, &candidateFrame); err != nil { if err := json.Unmarshal(msg.Data.Message, &candidateFrame); err != nil {
c.onError(fmt.Errorf("unmarshal mqtt candidate frame fail: %s, session: %s, frame: %s", c.onError(err)
err.Error(),
msg.Data.Header.SessionID,
string(msg.Data.Message)))
return return
} }
@@ -208,37 +202,48 @@ func (c *TuyaMQTT) onError(err error) {
} }
} }
func (c *TuyaClient) sendOffer(sdp string, streamType string) { func (c *TuyaClient) sendOffer(sdp string, streamRole string) {
fixedStreamType := c.getStreamType(streamType) streamType := c.getStreamType(streamRole)
isHEVC := c.isHEVC(fixedStreamType) isHEVC := c.isHEVC(streamType)
if isHEVC { if isHEVC {
// On HEVC we use streamType 0 for main stream and 1 for sub stream // On HEVC we use streamType 0 for main stream and 1 for sub stream
if streamType == "main" { if streamRole == "main" {
fixedStreamType = 0 streamType = 0
} else { } else {
fixedStreamType = 1 streamType = 1
} }
} }
c.sendMqttMessage("offer", 302, "", OfferFrame{ err := c.sendMqttMessage("offer", 302, "", OfferFrame{
Mode: "webrtc", Mode: "webrtc",
Sdp: sdp, Sdp: sdp,
StreamType: fixedStreamType, StreamType: streamType,
Auth: c.auth, Auth: c.auth,
DatachannelEnable: isHEVC, DatachannelEnable: isHEVC,
}) })
if err != nil {
c.mqtt.onError(err)
return
}
} }
func (c *TuyaClient) sendCandidate(candidate string) { func (c *TuyaClient) sendCandidate(candidate string) {
c.sendMqttMessage("candidate", 302, "", CandidateFrame{ err := c.sendMqttMessage("candidate", 302, "", CandidateFrame{
Mode: "webrtc", Mode: "webrtc",
Candidate: candidate, Candidate: candidate,
}) })
if err != nil {
c.mqtt.onError(err)
return
}
} }
func (c *TuyaClient) sendResolution(resolution int) { func (c *TuyaClient) sendResolution(resolution int) {
if !c.isClaritySupported(resolution) { isClaritySupperted := (c.skill.WebRTC & (1 << 5)) != 0
if !isClaritySupperted {
return return
} }
@@ -261,16 +266,14 @@ func (c *TuyaClient) sendDisconnect() {
}) })
} }
func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transactionID string, data interface{}) { func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transactionID string, data interface{}) error {
if c.mqtt.closed { if c.mqtt.closed {
c.mqtt.onError(fmt.Errorf("mqtt client is closed, send mqtt message fail")) return fmt.Errorf("mqtt client is closed, send mqtt message fail")
return
} }
jsonMessage, err := json.Marshal(data) jsonMessage, err := json.Marshal(data)
if err != nil { if err != nil {
c.mqtt.onError(fmt.Errorf("marshal mqtt message fail: %s", err.Error())) return err
return
} }
msg := &MqttMessage{ msg := &MqttMessage{
@@ -292,12 +295,13 @@ func (c *TuyaClient) sendMqttMessage(messageType string, protocol int, transacti
payload, err := json.Marshal(msg) payload, err := json.Marshal(msg)
if err != nil { if err != nil {
c.mqtt.onError(fmt.Errorf("marshal mqtt message fail: %s", err.Error())) return err
return
} }
token := c.mqtt.client.Publish(c.mqtt.publishTopic, 1, false, payload) token := c.mqtt.client.Publish(c.mqtt.publishTopic, 1, false, payload)
if token.Wait() && token.Error() != nil { if token.Wait() && token.Error() != nil {
c.mqtt.onError(fmt.Errorf("mqtt publish fail: %s, topic: %s", token.Error().Error(), c.mqtt.publishTopic)) return token.Error()
} }
return nil
} }
+1 -1
View File
@@ -217,4 +217,4 @@ func sanitizeIP6(host string) string {
return "[" + host + "]" return "[" + host + "]"
} }
return host return host
} }
+1 -1
View File
@@ -87,4 +87,4 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
c.Senders = append(c.Senders, sender) c.Senders = append(c.Senders, sender)
return nil return nil
} }