diff --git a/internal/xiaomi/xiaomi.go b/internal/xiaomi/xiaomi.go index 55cb30c1..f5f9b5bd 100644 --- a/internal/xiaomi/xiaomi.go +++ b/internal/xiaomi/xiaomi.go @@ -92,7 +92,10 @@ func getCameraURL(url *url.URL) (string, error) { var v struct { Vendor struct { - VendorID byte `json:"vendor"` + ID byte `json:"vendor"` + Params struct { + UID string `json:"p2p_id"` + } `json:"vendor_params"` } `json:"vendor"` PublicKey string `json:"public_key"` Sign string `json:"sign"` @@ -105,7 +108,11 @@ func getCameraURL(url *url.URL) (string, error) { query.Set("client_private", hex.EncodeToString(clientPrivate)) query.Set("device_public", v.PublicKey) query.Set("sign", v.Sign) - query.Set("vendor", getVendorName(v.Vendor.VendorID)) + query.Set("vendor", getVendorName(v.Vendor.ID)) + + if v.Vendor.ID == 1 { + query.Set("uid", v.Vendor.Params.UID) + } url.RawQuery = query.Encode() return url.String(), nil diff --git a/pkg/xiaomi/cs2/conn.go b/pkg/xiaomi/cs2/conn.go new file mode 100644 index 00000000..45c9e704 --- /dev/null +++ b/pkg/xiaomi/cs2/conn.go @@ -0,0 +1,315 @@ +package cs2 + +import ( + "encoding/binary" + "fmt" + "io" + "net" + "sync" + "sync/atomic" + "time" +) + +func Dial(host string) (*Conn, error) { + conn, err := net.ListenUDP("udp", nil) + if err != nil { + return nil, err + } + + c := &Conn{ + conn: conn, + addr: &net.UDPAddr{IP: net.ParseIP(host), Port: 32108}, + } + + if err = c.handshake(); err != nil { + _ = conn.Close() + return nil, err + } + + c.rawCh0 = make(chan []byte, 10) + c.rawCh2 = make(chan []byte, 100) + + go c.worker() + + return c, nil +} + +type Conn struct { + conn *net.UDPConn + addr *net.UDPAddr + + err error + seqCh0 uint16 + seqCh3 uint16 + rawCh0 chan []byte + rawCh2 chan []byte + + cmdMu sync.Mutex + cmdAck func() +} + +const ( + magic = 0xF1 + magicDrw = 0xD1 + msgLanSearch = 0x30 + msgPunchPkt = 0x41 + msgP2PRdy = 0x42 + msgDrw = 0xD0 + msgDrwAck = 0xD1 + msgAlive = 0xE0 +) + +func (c *Conn) handshake() error { + _ = c.SetDeadline(time.Now().Add(5 * time.Second)) + + buf, err := c.WriteAndWait([]byte{magic, msgLanSearch, 0, 0}, msgPunchPkt) + if err != nil { + return fmt.Errorf("%s: read punch: %w", "cs2", err) + } + + _, err = c.WriteAndWait(buf, msgP2PRdy) + if err != nil { + return fmt.Errorf("%s: read ready: %w", "cs2", err) + } + + _ = c.Write([]byte{magic, msgAlive, 0, 0}) + + _ = c.SetDeadline(time.Time{}) + + return nil +} + +func (c *Conn) worker() { + defer func() { + close(c.rawCh0) + close(c.rawCh2) + }() + + chAck := make([]uint16, 4) + buf := make([]byte, 1200) + var ch2WaitSize int + var ch2WaitData []byte + + for { + n, addr, err := c.conn.ReadFromUDP(buf) + if err != nil { + c.err = fmt.Errorf("%s: %w", "cs2", err) + return + } + + if string(addr.IP) != string(c.addr.IP) || n < 8 || buf[0] != magic { + continue // skip messages from another IP + } + + //log.Printf("<- %x", buf[:n]) + + switch buf[1] { + case msgDrw: + ch := buf[5] + seqHI := buf[6] + seqLO := buf[7] + + if chAck[ch] != uint16(seqHI)<<8|uint16(seqLO) { + continue + } + chAck[ch]++ + + ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO} + if _, err = c.conn.WriteToUDP(ack, c.addr); err != nil { + return + } + + switch ch { + case 0: + select { + case c.rawCh0 <- buf[12:]: + default: + } + continue + + case 2: + ch2WaitData = append(ch2WaitData, buf[8:n]...) + + for len(ch2WaitData) > 4 { + if ch2WaitSize == 0 { + ch2WaitSize = int(binary.BigEndian.Uint32(ch2WaitData)) + ch2WaitData = ch2WaitData[4:] + } + if ch2WaitSize <= len(ch2WaitData) { + select { + case c.rawCh2 <- ch2WaitData[:ch2WaitSize]: + default: + c.err = fmt.Errorf("%s: media queue is full", "cs2") + return + } + + ch2WaitData = ch2WaitData[ch2WaitSize:] + ch2WaitSize = 0 + } else { + break + } + } + continue + } + + case msgP2PRdy: // skip it + continue + case msgDrwAck: + if c.cmdAck != nil { + c.cmdAck() + } + continue + } + + fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n]) + } +} + +func (c *Conn) Write(req []byte) error { + //log.Printf("-> %x", req) + _, err := c.conn.WriteToUDP(req, c.addr) + return err +} + +func (c *Conn) WriteAndWait(req []byte, waitMsg uint8) ([]byte, error) { + var t *time.Timer + t = time.AfterFunc(1, func() { + if err := c.Write(req); err == nil && t != nil { + t.Reset(time.Second) + } + }) + defer t.Stop() + + buf := make([]byte, 1200) + + for { + n, addr, err := c.conn.ReadFromUDP(buf) + if err != nil { + return nil, err + } + + if string(addr.IP) != string(c.addr.IP) || n < 16 { + continue // skip messages from another IP + } + + if buf[0] == magic && buf[1] == waitMsg { + c.addr.Port = addr.Port + return buf[:n], nil + } + } +} + +func (c *Conn) RemoteAddr() net.Addr { + return c.addr +} + +func (c *Conn) SetDeadline(t time.Time) error { + return c.conn.SetDeadline(t) +} + +func (c *Conn) Close() error { + return c.conn.Close() +} + +func (c *Conn) Error() error { + if c.err != nil { + return c.err + } + return io.EOF +} + +func (c *Conn) ReadCommand() (cmd uint16, data []byte, err error) { + buf, ok := <-c.rawCh0 + if !ok { + return 0, nil, c.Error() + } + cmd = binary.LittleEndian.Uint16(buf[:2]) + data = buf[4:] + return +} + +func (c *Conn) WriteCommand(cmd uint16, data []byte) error { + c.cmdMu.Lock() + defer c.cmdMu.Unlock() + + var repeat atomic.Int32 + repeat.Store(5) + + timeout := time.NewTicker(time.Second) + defer timeout.Stop() + + c.cmdAck = func() { + repeat.Store(0) + timeout.Reset(1) + } + + req := marshalCmd(0, c.seqCh0, uint32(cmd), data) + c.seqCh0++ + + for { + if err := c.Write(req); err != nil { + return err + } + <-timeout.C + r := repeat.Add(-1) + if r < 0 { + return nil + } + if r == 0 { + return fmt.Errorf("%s: can't send command %d", "cs2", cmd) + } + } +} + +func (c *Conn) ReadPacket() ([]byte, error) { + data, ok := <-c.rawCh2 + if !ok { + return nil, c.Error() + } + return data, nil +} + +func (c *Conn) WritePacket(data []byte) error { + const offset = 12 + + n := uint32(len(data)) + req := make([]byte, n+offset) + req[0] = magic + req[1] = msgDrw + binary.BigEndian.PutUint16(req[2:], uint16(n+8)) + + req[4] = magicDrw + req[5] = 3 // channel + binary.BigEndian.PutUint16(req[6:], c.seqCh3) + c.seqCh3++ + binary.BigEndian.PutUint32(req[8:], n) + copy(req[offset:], data) + + return c.Write(req) +} + +func marshalCmd(channel byte, seq uint16, cmd uint32, payload []byte) []byte { + size := len(payload) + req := make([]byte, 4+4+4+4+size) + + // 1. message header (4 bytes) + req[0] = magic + req[1] = msgDrw + binary.BigEndian.PutUint16(req[2:], uint16(4+4+4+size)) + + // 2. drw? header (4 bytes) + req[4] = magicDrw + req[5] = channel + binary.BigEndian.PutUint16(req[6:], seq) + + // 3. payload size (4 bytes) + binary.BigEndian.PutUint32(req[8:], uint32(4+size)) + + // 4. payload command (4 bytes) + binary.BigEndian.PutUint32(req[12:], cmd) + + // 5. payload + copy(req[16:], payload) + + return req +} diff --git a/pkg/xiaomi/miss/client.go b/pkg/xiaomi/miss/client.go index a1e3ded9..a335968d 100644 --- a/pkg/xiaomi/miss/client.go +++ b/pkg/xiaomi/miss/client.go @@ -1,17 +1,17 @@ package miss import ( + "bytes" "crypto/rand" "encoding/binary" "encoding/hex" "fmt" - "log" "net" "net/url" - "strings" "time" - "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/xiaomi/cs2" + "github.com/AlexxIT/go2rtc/pkg/xiaomi/tutk" "golang.org/x/crypto/chacha20" "golang.org/x/crypto/nacl/box" ) @@ -23,43 +23,34 @@ func Dial(rawURL string) (*Client, error) { } query := u.Query() - if s := query.Get("vendor"); s != "cs2" { + + c := &Client{} + + c.key, err = calcSharedKey(query.Get("device_public"), query.Get("client_private")) + if err != nil { + return nil, err + } + + switch s := query.Get("vendor"); s { + case "cs2": + c.conn, err = cs2.Dial(u.Host) + case "tutk": + c.conn, err = tutk.Dial(u.Host, query.Get("uid")) + default: return nil, fmt.Errorf("miss: unsupported vendor %s", s) } - clientPrivate := query.Get("client_private") - devicePublic := query.Get("device_public") - - key, err := calcSharedKey(devicePublic, clientPrivate) if err != nil { return nil, err } - conn, err := net.ListenUDP("udp", nil) + err = c.login(query.Get("client_public"), query.Get("sign")) if err != nil { + _ = c.conn.Close() return nil, err } - client := &Client{ - conn: conn, - addr: &net.UDPAddr{IP: net.ParseIP(u.Host), Port: 32108}, - buf: make([]byte, 1500), - key: key, - } - - clientPublic := query.Get("client_public") - sign := query.Get("sign") - - if err = client.login(clientPublic, sign); err != nil { - _ = conn.Close() - return nil, err - } - - client.chSeq0 = 1 - client.chRaw2 = make(chan []byte, 100) - go client.worker() - - return client, nil + return c, nil } const ( @@ -71,19 +62,23 @@ const ( CodecOPUS = 1032 ) -type Client struct { - conn *net.UDPConn - addr *net.UDPAddr - buf []byte - key []byte // shared key - - chSeq0 uint16 - chSeq3 uint16 - chRaw2 chan []byte +type Conn interface { + ReadCommand() (cmd uint16, data []byte, err error) + WriteCommand(cmd uint16, data []byte) error + ReadPacket() ([]byte, error) + WritePacket(data []byte) error + RemoteAddr() net.Addr + SetDeadline(t time.Time) error + Close() error } -func (c *Client) RemoteAddr() *net.UDPAddr { - return c.addr +type Client struct { + conn Conn + key []byte +} + +func (c *Client) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() } func (c *Client) SetDeadline(t time.Time) error { @@ -94,16 +89,17 @@ func (c *Client) Close() error { return c.conn.Close() } -const ( - magic = 0xF1 - magicDrw = 0xD1 - msgLanSearch = 0x30 - msgPunchPkt = 0x41 - msgP2PRdy = 0x42 - msgDrw = 0xD0 - msgDrwAck = 0xD1 - msgAlive = 0xE0 +func (c *Client) Protocol() string { + switch c.conn.(type) { + case *cs2.Conn: + return "cs2+udp" + case *tutk.Conn: + return "tutk+udp" + } + return "" +} +const ( cmdAuthReq = 0x100 cmdAuthRes = 0x101 cmdVideoStart = 0x102 @@ -127,98 +123,57 @@ const ( ) func (c *Client) login(clientPublic, sign string) error { - _ = c.conn.SetDeadline(time.Now().Add(core.ConnDialTimeout)) - - buf, err := c.writeAndWait([]byte{magic, msgLanSearch, 0, 0}, msgPunchPkt) - if err != nil { - return fmt.Errorf("miss: read punch: %w", err) - } - - _, err = c.writeAndWait(buf, msgP2PRdy) - if err != nil { - return fmt.Errorf("miss: read ready: %w", err) - } - - _, _ = c.conn.WriteToUDP([]byte{magic, msgAlive, 0, 0}, c.addr) - s := fmt.Sprintf(`{"public_key":"%s","sign":"%s","uuid":"","support_encrypt":0}`, clientPublic, sign) - buf, err = c.writeAndWait(marshalCmd(0, 0, cmdAuthReq, []byte(s)), msgDrw) + if err := c.conn.WriteCommand(cmdAuthReq, []byte(s)); err != nil { + return err + } + + _, data, err := c.conn.ReadCommand() if err != nil { - return fmt.Errorf("miss: read auth: %w", err) + return err } - if !strings.Contains(string(buf[16:]), `"result":"success"`) { - return fmt.Errorf("miss: read auth: %s", buf[16:]) + if !bytes.Contains(data, []byte(`"result":"success"`)) { + return fmt.Errorf("miss: auth: %s", data) } - _, _ = c.conn.WriteToUDP([]byte{magic, msgDrwAck, 0, 6, magicDrw, 0, 0, 1, 0, 0}, c.addr) - - _ = c.conn.SetDeadline(time.Time{}) - return nil } -func (c *Client) writeAndWait(b []byte, waitMsg uint8) ([]byte, error) { - if _, err := c.conn.WriteToUDP(b, c.addr); err != nil { - return nil, err - } - - for { - n, addr, err := c.conn.ReadFromUDP(c.buf) - if err != nil { - return nil, err - } - - if string(addr.IP) != string(c.addr.IP) { - continue // skip messages from another IP - } - - if n >= 16 && c.buf[0] == magic && c.buf[1] == waitMsg { - if waitMsg == msgPunchPkt { - c.addr.Port = addr.Port - } - return c.buf[:n], nil - } +func (c *Client) WriteCommand(data []byte) error { + data, err := encode(c.key, data) + if err != nil { + return err } + return c.conn.WriteCommand(cmdEncoded, data) } func (c *Client) VideoStart(channel, quality, audio uint8) error { - buf := binary.BigEndian.AppendUint32(nil, cmdVideoStart) + data := binary.BigEndian.AppendUint32(nil, cmdVideoStart) if channel == 0 { - buf = fmt.Appendf(buf, `{"videoquality":%d,"enableaudio":%d}`, quality, audio) + data = fmt.Appendf(data, `{"videoquality":%d,"enableaudio":%d}`, quality, audio) } else { - buf = fmt.Appendf(buf, `{"videoquality":-1,"videoquality2":%d,"enableaudio":%d}`, quality, audio) + data = fmt.Appendf(data, `{"videoquality":-1,"videoquality2":%d,"enableaudio":%d}`, quality, audio) } - buf, err := encode(c.key, buf) - if err != nil { - return err - } - buf = marshalCmd(0, c.chSeq0, cmdEncoded, buf) - c.chSeq0++ + return c.WriteCommand(data) +} - _, err = c.conn.WriteToUDP(buf, c.addr) - return err +func (c *Client) AudioStart() error { + data := binary.BigEndian.AppendUint32(nil, cmdAudioStart) + return c.WriteCommand(data) } func (c *Client) SpeakerStart() error { - buf := binary.BigEndian.AppendUint32(nil, cmdSpeakerStartReq) - buf, err := encode(c.key, buf) - if err != nil { - return err - } - buf = marshalCmd(0, c.chSeq0, cmdEncoded, buf) - c.chSeq0++ - - _, err = c.conn.WriteToUDP(buf, c.addr) - return err + data := binary.BigEndian.AppendUint32(nil, cmdSpeakerStartReq) + return c.WriteCommand(data) } func (c *Client) ReadPacket() (*Packet, error) { - b, ok := <-c.chRaw2 - if !ok { - return nil, fmt.Errorf("miss: read raw: i/o timeout") + data, err := c.conn.ReadPacket() + if err != nil { + return nil, fmt.Errorf("miss: read media: %w", err) } - return unmarshalPacket(c.key, b) + return unmarshalPacket(c.key, data) } func unmarshalPacket(key, b []byte) (*Packet, error) { @@ -247,141 +202,20 @@ func unmarshalPacket(key, b []byte) (*Packet, error) { } func (c *Client) WriteAudio(codecID uint32, payload []byte) error { - payload, err := encode(c.key, payload) + payload, err := encode(c.key, payload) // new payload will have new size! if err != nil { return err } + const hdrSize = 32 n := uint32(len(payload)) - const hdrOffset = 12 - const hdrSize = 32 - - buf := make([]byte, n+hdrOffset+hdrSize) - buf[0] = magic - buf[1] = msgDrw - binary.BigEndian.PutUint16(buf[2:], uint16(n+8+hdrSize)) - - buf[4] = magicDrw - buf[5] = 3 // channel - binary.BigEndian.PutUint16(buf[6:], c.chSeq3) - - binary.BigEndian.PutUint32(buf[8:], n+hdrSize) - - binary.LittleEndian.PutUint32(buf[hdrOffset:], n) - binary.LittleEndian.PutUint32(buf[hdrOffset+4:], codecID) - binary.LittleEndian.PutUint64(buf[hdrOffset+16:], uint64(time.Now().UnixMilli())) - copy(buf[hdrOffset+hdrSize:], payload) - - c.chSeq3++ - - _, err = c.conn.WriteToUDP(buf, c.addr) - return err -} - -func (c *Client) worker() { - defer close(c.chRaw2) - - chAck := []uint16{1, 0, 0, 0} - - var ch2WaitSize int - var ch2WaitData []byte - - for { - n, addr, err := c.conn.ReadFromUDP(c.buf) - if err != nil { - return - } - - //log.Printf("<- %.20x...", c.buf[:n]) - - if string(addr.IP) != string(c.addr.IP) || n < 8 || c.buf[0] != magic { - //log.Printf("unknown msg: %x", c.buf[:n]) - continue // skip messages from another IP - } - - switch c.buf[1] { - case msgDrw: - ch := c.buf[5] - seqHI := c.buf[6] - seqLO := c.buf[7] - - if chAck[ch] != uint16(seqHI)<<8|uint16(seqLO) { - continue - } - chAck[ch]++ - - //log.Printf("%.40x", c.buf) - - ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO} - if _, err = c.conn.WriteToUDP(ack, c.addr); err != nil { - return - } - - switch ch { - case 0: - //log.Printf("data ch0 %x", c.buf[:n]) - //size := binary.BigEndian.Uint32(c.buf[8:]) - //if binary.BigEndian.Uint32(c.buf[12:]) == cmdEncoded { - // raw, _ := decode(c.key, c.buf[16:12+size]) - // log.Printf("cmd enc %x", raw) - //} else { - // log.Printf("cmd raw %x", c.buf[12:12+size]) - //} - - case 2: - ch2WaitData = append(ch2WaitData, c.buf[8:n]...) - - for len(ch2WaitData) > 4 { - if ch2WaitSize == 0 { - ch2WaitSize = int(binary.BigEndian.Uint32(ch2WaitData)) - ch2WaitData = ch2WaitData[4:] - } - if ch2WaitSize <= len(ch2WaitData) { - c.chRaw2 <- ch2WaitData[:ch2WaitSize] - ch2WaitData = ch2WaitData[ch2WaitSize:] - ch2WaitSize = 0 - } else { - break - } - } - - default: - log.Printf("!!! unknown chanel: %x", c.buf[:n]) - } - - case msgDrwAck: // skip it - - default: - log.Printf("!!! unknown msg type: %x", c.buf[:n]) - } - } -} - -func marshalCmd(channel byte, seq uint16, cmd uint32, payload []byte) []byte { - size := len(payload) - buf := make([]byte, 4+4+4+4+size) - - // 1. message header (4 bytes) - buf[0] = magic - buf[1] = msgDrw - binary.BigEndian.PutUint16(buf[2:], uint16(4+4+4+size)) - - // 2. drw? header (4 bytes) - buf[4] = magicDrw - buf[5] = channel - binary.BigEndian.PutUint16(buf[6:], seq) - - // 3. payload size (4 bytes) - binary.BigEndian.PutUint32(buf[8:], uint32(4+size)) - - // 4. payload command (4 bytes) - binary.BigEndian.PutUint32(buf[12:], cmd) - - // 5. payload - copy(buf[16:], payload) - - return buf + data := make([]byte, hdrSize+n) + binary.LittleEndian.PutUint32(data, n) + binary.LittleEndian.PutUint32(data[4:], codecID) + binary.LittleEndian.PutUint64(data[16:], uint64(time.Now().UnixMilli())) // not really necessary + copy(data[hdrSize:], payload) + return c.conn.WritePacket(data) } func calcSharedKey(devicePublic, clientPrivate string) ([]byte, error) { diff --git a/pkg/xiaomi/producer.go b/pkg/xiaomi/producer.go index f9164d0b..09ba7360 100644 --- a/pkg/xiaomi/producer.go +++ b/pkg/xiaomi/producer.go @@ -44,7 +44,16 @@ func Dial(rawURL string) (core.Producer, error) { quality = core.ParseByte(s) } - medias, err := probe(client, channel, quality) + // 0 - disabled, 1 - enabled, 2 - enabled (another API) + var audio byte + switch s := query.Get("audio"); s { + case "", "1": + audio = 1 + default: + audio = core.ParseByte(s) + } + + medias, err := probe(client, channel, quality, audio) if err != nil { _ = client.Close() return nil, err @@ -54,7 +63,7 @@ func Dial(rawURL string) (core.Producer, error) { Connection: core.Connection{ ID: core.NewID(), FormatName: "xiaomi", - Protocol: "cs2+udp", + Protocol: client.Protocol(), RemoteAddr: client.RemoteAddr().String(), Source: rawURL, Medias: medias, @@ -65,14 +74,18 @@ func Dial(rawURL string) (core.Producer, error) { }, nil } -func probe(client *miss.Client, channel, quality uint8) ([]*core.Media, error) { +func probe(client *miss.Client, channel, quality, audio uint8) ([]*core.Media, error) { _ = client.SetDeadline(time.Now().Add(core.ProbeTimeout)) - if err := client.VideoStart(channel, quality, 1); err != nil { + if err := client.VideoStart(channel, quality, audio&1); err != nil { return nil, err } - var video, audio *core.Codec + if audio > 1 { + _ = client.AudioStart() + } + + var vcodec, acodec *core.Codec for { pkt, err := client.ReadPacket() @@ -82,53 +95,61 @@ func probe(client *miss.Client, channel, quality uint8) ([]*core.Media, error) { switch pkt.CodecID { case miss.CodecH264: - if video == nil { + if vcodec == nil { buf := annexb.EncodeToAVCC(pkt.Payload) if h264.NALUType(buf) == h264.NALUTypeSPS { - video = h264.AVCCToCodec(buf) + vcodec = h264.AVCCToCodec(buf) } } case miss.CodecH265: - if video == nil { + if vcodec == nil { buf := annexb.EncodeToAVCC(pkt.Payload) if h265.NALUType(buf) == h265.NALUTypeVPS { - video = h265.AVCCToCodec(buf) + vcodec = h265.AVCCToCodec(buf) } } case miss.CodecPCMA: - if audio == nil { - audio = &core.Codec{Name: core.CodecPCMA, ClockRate: 8000} + if acodec == nil { + acodec = &core.Codec{Name: core.CodecPCMA, ClockRate: 8000} } case miss.CodecOPUS: - if audio == nil { - audio = &core.Codec{Name: core.CodecOpus, ClockRate: 48000, Channels: 2} + if acodec == nil { + acodec = &core.Codec{Name: core.CodecOpus, ClockRate: 48000, Channels: 2} } } - if video != nil && audio != nil { + if vcodec != nil && (acodec != nil || audio == 0) { break } } _ = client.SetDeadline(time.Time{}) - return []*core.Media{ + medias := []*core.Media{ { Kind: core.KindVideo, Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{video}, + Codecs: []*core.Codec{vcodec}, }, - { + } + + if acodec != nil { + medias = append(medias, &core.Media{ Kind: core.KindAudio, Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{audio}, - }, - { - Kind: core.KindAudio, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{audio.Clone()}, - }, - }, nil + Codecs: []*core.Codec{acodec}, + }) + + if client.Protocol() == "cs2+udp" { + medias = append(medias, &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{acodec.Clone()}, + }) + } + } + + return medias, nil } const timestamp40ms = 48000 * 0.040 diff --git a/pkg/xiaomi/tutk/conn.go b/pkg/xiaomi/tutk/conn.go new file mode 100644 index 00000000..bce4a795 --- /dev/null +++ b/pkg/xiaomi/tutk/conn.go @@ -0,0 +1,457 @@ +package tutk + +import ( + "bytes" + "crypto/rand" + "encoding/binary" + "fmt" + "io" + "net" + "sync" + "sync/atomic" + "time" +) + +func Dial(host, uid string) (*Conn, error) { + conn, err := net.ListenUDP("udp", nil) + if err != nil { + return nil, err + } + + c := &Conn{ + conn: conn, + addr: &net.UDPAddr{IP: net.ParseIP(host), Port: 32761}, + sid: genSID(), + } + + if err = c.handshake([]byte(uid)); err != nil { + _ = c.Close() + return nil, err + } + + c.rawCmd = make(chan []byte, 10) + c.rawPkt = make(chan []byte, 100) + + go c.worker() + + return c, nil +} + +type Conn struct { + conn *net.UDPConn + addr *net.UDPAddr + sid []byte + + err error + seqCh0 uint16 + seqCmd uint16 + rawCmd chan []byte + rawPkt chan []byte + + cmdMu sync.Mutex + cmdAck func() +} + +func (c *Conn) handshake(uid []byte) (err error) { + _ = c.SetDeadline(time.Now().Add(5 * time.Second)) + + if _, err = c.WriteAndWait( + c.msgLanSearch(uid, 1), // 01062100 + func(_, res []byte) bool { + return bytes.Index(res, uid) == 16 // 02061200 + }, + ); err != nil { + return err + } + + if err = c.Write(c.msgLanSearch(uid, 2)); err != nil { + return err + } + + if _, err = c.WriteAndWait( + c.msgAvClientStartReq(), // 07042100 + 00000b00 + func(req, res []byte) bool { + mid := req[48:52] + return bytes.Index(res, mid) == 48 // 08041200 + 00140800 + }, + ); err != nil { + return err + } + + _ = c.SetDeadline(time.Time{}) + + return nil +} + +func (c *Conn) worker() { + defer func() { + close(c.rawCmd) + close(c.rawPkt) + }() + + buf := make([]byte, 1200) + var waitSeq uint16 + var waitSize uint32 + var waitData []byte + + for { + n, addr, err := c.conn.ReadFromUDP(buf) + if err != nil { + c.err = fmt.Errorf("%s: %w", "tutk", err) + return + } + + if string(addr.IP) != string(c.addr.IP) || n < 16 { + continue // skip messages from another IP + } + + b := ReverseTransCodePartial(buf[:n]) + //log.Printf("<- %x", b) + + if b[0] != 0x04 || b[1] != 0x02 { + continue + } + + if len(b) == 24 { + _ = c.Write(msgAckPing(b)) + continue + } + + switch b[14] { + case 0: + switch string(b[28:30]) { + case "\x00\x12": + _ = c.Write(c.msgAckCh0Req0012(b)) + continue + + case "\x00\x70": + _ = c.Write(c.msgAckCh0Req0070(b)) + select { + case c.rawCmd <- b[52:]: + default: + } + continue + + case "\x00\x71": + if c.cmdAck != nil { + c.cmdAck() + } + continue + + case "\x01\x03": + seq := binary.LittleEndian.Uint16(b[40:]) + if seq != waitSeq { + waitSeq = 0 // data loss + continue + } + if seq == 0 { + waitSize = binary.LittleEndian.Uint32(b[36:]) + 32 + } + + waitData = append(waitData, b[52:]...) + if n := uint32(len(waitData)); n < waitSize { + waitSeq++ + continue + } else if n > waitSize { + waitSeq = 0 // data loss + continue + } + + // create a buffer for the header and collected data + packetData := make([]byte, waitSize) + // there's a header at the end - let's move it to the beginning + copy(packetData, waitData[waitSize-32:]) + copy(packetData[32:], waitData) + + select { + case c.rawPkt <- packetData: + default: + c.err = fmt.Errorf("%s: media queue is full", "tutk") + return + } + + waitSeq = 0 + waitData = waitData[:0] + continue + + case "\x01\x04": + waitSize2 := binary.LittleEndian.Uint32(b[36:]) + waitData2 := b[52:] + + if uint32(len(waitData2)) != waitSize2 { + continue // shouldn't happened for audio + } + + packetData := make([]byte, waitSize2) + copy(packetData, waitData2) + + select { + case c.rawPkt <- packetData: + default: + c.err = fmt.Errorf("%s: media queue is full", "tutk") + return + } + continue + } + case 1: + switch string(b[28:30]) { + case "\x00\x00": + _ = c.Write(msgAckCh1Req0000(b)) + continue + case "\x00\x07": + _ = c.Write(msgAckCh1Req0007(b)) + continue + } + case 5: + if len(b) == 48 { + _ = c.Write(msgAckCh5(b)) + continue + } + } + + fmt.Printf("%s: unknown msg: %x\n", "tutk", buf[:n]) + } +} + +func (c *Conn) Write(req []byte) error { + //log.Printf("-> %x", req) + _, err := c.conn.WriteToUDP(TransCodePartial(req), c.addr) + return err +} + +func (c *Conn) WriteAndWait(req []byte, ok func(req, res []byte) bool) ([]byte, error) { + var t *time.Timer + t = time.AfterFunc(1, func() { + if err := c.Write(req); err == nil && t != nil { + t.Reset(time.Second) + } + }) + defer t.Stop() + + buf := make([]byte, 1200) + + for { + n, addr, err := c.conn.ReadFromUDP(buf) + if err != nil { + return nil, err + } + + if string(addr.IP) != string(c.addr.IP) || n < 16 { + continue // skip messages from another IP + } + + res := ReverseTransCodePartial(buf[:n]) + //log.Printf("<- %x", b) + if ok(req, res) { + c.addr.Port = addr.Port + return res, nil + } + } +} + +func (c *Conn) RemoteAddr() net.Addr { + return c.addr +} + +func (c *Conn) SetDeadline(t time.Time) error { + return c.conn.SetDeadline(t) +} + +func (c *Conn) Close() error { + return c.conn.Close() +} + +func (c *Conn) Error() error { + if c.err != nil { + return c.err + } + return io.EOF +} + +func (c *Conn) ReadCommand() (cmd uint16, data []byte, err error) { + buf, ok := <-c.rawCmd + if !ok { + return 0, nil, c.Error() + } + cmd = binary.LittleEndian.Uint16(buf[:2]) + data = buf[4:] + return +} + +// WriteCommand will send a command every second five times +func (c *Conn) WriteCommand(cmd uint16, data []byte) error { + c.cmdMu.Lock() + defer c.cmdMu.Unlock() + + var repeat atomic.Int32 + repeat.Store(5) + + timeout := time.NewTicker(time.Second) + defer timeout.Stop() + + c.cmdAck = func() { + repeat.Store(0) + timeout.Reset(1) + } + + req := c.msgAvSendIOCtrl(cmd, data) + + for { + if err := c.Write(req); err != nil { + return err + } + <-timeout.C + r := repeat.Add(-1) + if r < 0 { + return nil + } + if r == 0 { + return fmt.Errorf("%s: can't send command %d", "tutk", cmd) + } + } +} + +func (c *Conn) ReadPacket() ([]byte, error) { + buf, ok := <-c.rawPkt + if !ok { + return nil, c.Error() + } + return buf, nil +} + +func (c *Conn) WritePacket(data []byte) error { + panic("not implemented") +} + +func genSID() []byte { + b := make([]byte, 16) + _, _ = rand.Read(b[8:]) + copy(b, b[8:10]) + b[4] = 0x0c + return b +} + +func (c *Conn) msgLanSearch(uid []byte, i byte) []byte { + const size = 68 // or 52 or 68 or 88 + b := make([]byte, size) + copy(b, "\x04\x02\x0f\x02") + b[4] = size - 16 + copy(b[8:], "\x01\x06\x21\x00") + copy(b[16:], uid) + copy(b[52:], "\x00\x03\x01\x02") // or 07000303 or 01010204 + copy(b[56:], c.sid[8:]) + b[64] = i + return b +} + +func (c *Conn) msg(size uint16) []byte { + b := make([]byte, size) + copy(b, "\x04\x02\x19\x0a") + binary.LittleEndian.PutUint16(b[4:], size-16) + binary.LittleEndian.PutUint16(b[6:], c.seqCh0) + c.seqCh0++ // start from 0 + copy(b[8:], "\x07\x04\x21\x00") + return b +} + +func (c *Conn) msgAvClientStartReq() []byte { + const size = 586 // or 586 or 598 + b := c.msg(size) + copy(b[12:], c.sid) + copy(b[28:], "\x00\x00\x08\x00") // or 00000400 or 00000b00 + binary.LittleEndian.PutUint16(b[44:], size-52) + binary.LittleEndian.PutUint32(b[48:], uint32(time.Now().UnixMilli())) + copy(b[size-16:], "\x04\x00\x00\x00\xfb\x07\x1f\x00") + return b +} + +func (c *Conn) msgAvSendIOCtrl(cmd uint16, msg []byte) []byte { + size := 52 + 4 + uint16(len(msg)) + b := c.msg(size) + copy(b[12:], c.sid) + copy(b[28:], "\x00\x70\x08\x00") // or 00700400 or 00700b00 + c.seqCmd++ // start from 1 + binary.LittleEndian.PutUint16(b[32:], c.seqCmd) + binary.LittleEndian.PutUint16(b[44:], size-52) + //_, _ = rand.Read(b[48:52]) // mid + binary.LittleEndian.PutUint32(b[48:], uint32(time.Now().UnixMilli())) + binary.LittleEndian.PutUint16(b[52:], cmd) + copy(b[56:], msg) + return b +} + +const version = 0x19 + +func msgAckPing(req []byte) []byte { + // <- [24] 0402120a 08000000 28041200 000000005b0d4202070aa8c0 + // -> [24] 04021a0a 08000000 27042100 000000005b0d4202070aa8c0 + req[2] = version + req[8] = 0x27 + req[10] = 0x21 + return req +} + +func msgAck(req []byte, size byte) []byte { + // xxxx??xx ??00xxxx 07xx21xx ... + req[2] = version + req[4] = size - 16 + req[5] = 0x00 + req[8] = 0x07 + req[10] = 0x21 + return req[:size] +} + +func (c *Conn) msgAckCh0Req0012(req []byte) []byte { + // <- [64] 0402120a 30000000 08041200 e6e8 0000 0c000000e6e839da66b0dc14 00120800000000000000000000000000 0c00 000000000000 020000000100000001000000 + // -> [72] 0402190a 38000300 07042100 e6e8 0000 0c000000e6e839da66b0dc14 00130b00000000000000000000000000 1400 000000000000 0200000001000000010000000000000000000000 + const size = 72 + req = append(req, 0, 0, 0, 0, 0, 0, 0, 0) + binary.LittleEndian.PutUint16(req[6:], c.seqCh0) // channel sequence + c.seqCh0++ + req[28] = 0x00 // command + req[29] = 0x13 + req[44] = size - 52 // data size + req[45] = 0x00 + return msgAck(req, size) +} + +func (c *Conn) msgAckCh0Req0070(req []byte) []byte { + // <- [104] 0402120a 58000300 08041200 e6e8 0000 0c000000e6e839da66b0dc14 00700800010000000000000000000000 3400 00007625a02f ... + // -> [ 52] 0402190a 24000400 07042100 e6e8 0000 0c000000e6e839da66b0dc14 00710800010000000000000000000000 0000 00007625a02f + binary.LittleEndian.PutUint16(req[6:], c.seqCh0) // channel sequence + c.seqCh0++ + req[28] = 0x00 // command + req[29] = 0x71 + req[44] = 0x00 // data size + req[45] = 0x00 + return msgAck(req, 52) +} + +func msgAckCh1Req0000(req []byte) []byte { + // <- [590] 0402120a 3e020100 08041200 e6e8 0100 0c000000e6e839da66b0dc14 00000800000000000000000000000000 1a02 0000d9c0001b ... + // -> [ 84] 0402190a 44000000 07042100 e6e8 0100 0c000000e6e839da66b0dc14 00140b00000000000000000000000000 2000 0000d9c0001b ... + const size = 84 + req[28] = 0x00 // command + req[29] = 0x14 + req[44] = size - 52 // data size + req[45] = 0x00 + copy(req[52:], req[len(req)-32:]) // size + return msgAck(req, size) +} + +func msgAckCh1Req0007(req []byte) []byte { + // <- [64] 0402120a 30000300 08041200 e6e8 0100 0c000000e6e839da66b0dc14 00070800000000000000000000000000 0c00 000001000000 000000006f1ea02f00000000 + // -> [56] 0402190a 28000200 07042100 e6e8 0100 0c000000e6e839da66b0dc14 010a0b00000000000000000000000000 0000 000001000000 00000000 + req[28] = 0x01 // command + req[29] = 0x0a + req[44] = 0x00 // data size + req[45] = 0x00 + return msgAck(req, 56) +} + +func msgAckCh5(req []byte) []byte { + // <- [48] 0402120a 20000200 08041200 e6e8 0500 0c000000e6e839da66b0dc14 5a97c2f1010500000000000000000000 00a0 0000 + // -> [48] 0402190a 20000200 07042100 e6e8 0500 0c000000e6e839da66b0dc14 5a97c2f1410500000000000000000000 00a0 0000 + req[32] = 0x41 + return msgAck(req, 48) +} diff --git a/pkg/xiaomi/tutk/crypto.go b/pkg/xiaomi/tutk/crypto.go new file mode 100644 index 00000000..c98fc092 --- /dev/null +++ b/pkg/xiaomi/tutk/crypto.go @@ -0,0 +1,138 @@ +package tutk + +import ( + "bytes" + "encoding/binary" + "math/bits" +) + +// I'd like to say hello to Charlie. Your name is forever etched into the history of streaming software. +const charlie = "Charlie is the designer of P2P!!" + +func ReverseTransCodePartial(src []byte) []byte { + n := len(src) + tmp := make([]byte, n) + dst := bytes.Clone(src) + + src16 := src + tmp16 := tmp + dst16 := dst + + for ; n >= 16; n -= 16 { + for i := 0; i != 16; i += 4 { + x := binary.LittleEndian.Uint32(src16[i:]) + binary.LittleEndian.PutUint32(tmp16[i:], bits.RotateLeft32(x, i+3)) + } + + swap(tmp16, dst16, 16) + + for i := 0; i != 16; i++ { + tmp16[i] = dst16[i] ^ charlie[i] + } + + for i := 0; i != 16; i += 4 { + x := binary.LittleEndian.Uint32(tmp16[i:]) + binary.LittleEndian.PutUint32(dst16[i:], bits.RotateLeft32(x, i+1)) + } + + tmp16 = tmp16[16:] + dst16 = dst16[16:] + src16 = src16[16:] + } + + swap(src16, tmp16, n) + + for i := 0; i < n; i++ { + dst16[i] = tmp16[i] ^ charlie[i] + } + + return dst +} + +func TransCodePartial(src []byte) []byte { + n := len(src) + tmp := make([]byte, n) + dst := bytes.Clone(src) + + src16 := src + tmp16 := tmp + dst16 := dst + + for ; n >= 16; n -= 16 { + for i := 0; i != 16; i += 4 { + x := binary.LittleEndian.Uint32(src16[i:]) + binary.LittleEndian.PutUint32(tmp16[i:], bits.RotateLeft32(x, -i-1)) + } + + for i := 0; i != 16; i++ { + dst16[i] = tmp16[i] ^ charlie[i] + } + + swap(dst16, tmp16, 16) + + for i := 0; i != 16; i += 4 { + x := binary.LittleEndian.Uint32(tmp16[i:]) + binary.LittleEndian.PutUint32(dst16[i:], bits.RotateLeft32(x, -i-3)) + } + + tmp16 = tmp16[16:] + dst16 = dst16[16:] + src16 = src16[16:] + } + + for i := 0; i < n; i++ { + tmp16[i] = src16[i] ^ charlie[i] + } + + swap(tmp16, dst16, n) + + return dst +} + +func swap(src, dst []byte, n int) { + switch n { + case 2: + _, _ = src[1], dst[1] + dst[0] = src[1] + dst[1] = src[0] + return + case 4: + _, _ = src[3], dst[3] + dst[0] = src[2] + dst[1] = src[3] + dst[2] = src[0] + dst[3] = src[1] + return + case 8: + _, _ = src[7], dst[7] + dst[0] = src[7] + dst[1] = src[4] + dst[2] = src[3] + dst[3] = src[2] + dst[4] = src[1] + dst[5] = src[6] + dst[6] = src[5] + dst[7] = src[0] + return + case 16: + _, _ = src[15], dst[15] + dst[0] = src[11] + dst[1] = src[9] + dst[2] = src[8] + dst[3] = src[15] + dst[4] = src[13] + dst[5] = src[10] + dst[6] = src[12] + dst[7] = src[14] + dst[8] = src[2] + dst[9] = src[1] + dst[10] = src[5] + dst[11] = src[0] + dst[12] = src[6] + dst[13] = src[4] + dst[14] = src[7] + dst[15] = src[3] + return + } + copy(dst, src[:n]) +}