From 59161fcef2f6b144145f81ba0adb6a38b9c45a83 Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 13 Jan 2026 21:25:30 +0300 Subject: [PATCH] Improve cs2+udp proto for xiaomi source #2026 #2030 --- pkg/xiaomi/cs2/conn.go | 194 +++++++++++++++++++++++++++-------------- 1 file changed, 129 insertions(+), 65 deletions(-) diff --git a/pkg/xiaomi/cs2/conn.go b/pkg/xiaomi/cs2/conn.go index 43103321..ff5aead6 100644 --- a/pkg/xiaomi/cs2/conn.go +++ b/pkg/xiaomi/cs2/conn.go @@ -21,10 +21,11 @@ func Dial(host, transport string) (*Conn, error) { _, isTCP := conn.(*tcpConn) c := &Conn{ - conn: conn, - isTCP: isTCP, - rawCh0: make(chan []byte, 10), - rawCh2: make(chan []byte, 100), + conn: conn, + isTCP: isTCP, + channels: [4]*dataChannel{ + newDataChannel(0, 10), nil, newDataChannel(250, 100), nil, + }, } go c.worker() return c, nil @@ -37,8 +38,8 @@ type Conn struct { err error seqCh0 uint16 seqCh3 uint16 - rawCh0 chan []byte - rawCh2 chan []byte + + channels [4]*dataChannel cmdMu sync.Mutex cmdAck func() @@ -106,15 +107,13 @@ func handshake(host, transport string) (net.Conn, error) { func (c *Conn) worker() { defer func() { - close(c.rawCh0) - close(c.rawCh2) + c.channels[0].Close() + c.channels[2].Close() }() - chAck := make([]uint16, 4) // only for UDP + var keepaliveTS time.Time // only for TCP + buf := make([]byte, 1200) - var ch2WaitSize int - var ch2WaitData []byte - var keepaliveTS time.Time for { n, err := c.conn.Read(buf) @@ -131,6 +130,7 @@ func (c *Conn) worker() { switch buf[1] { case msgDrw: ch := buf[5] + channel := c.channels[ch] if c.isTCP { // For TCP we should send ping every second to keep connection alive. @@ -139,71 +139,37 @@ func (c *Conn) worker() { _, _ = c.conn.Write([]byte{magic, msgPing, 0, 0}) keepaliveTS = now.Add(time.Second) } + + err = channel.Push(buf[8:n]) } else { - // For UDP we should using ack. - seq := binary.BigEndian.Uint16(buf[6:]) - diff := int16(seq - chAck[ch]) + var pushed int - if diff > 0 { - continue // new seq - skip before ack + seqHI, seqLO := buf[6], buf[7] + seq := uint16(seqHI)<<8 | uint16(seqLO) + pushed, err = channel.PushSeq(seq, buf[8:n]) + + if pushed >= 0 { + // For UDP we should send ACK. + ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO} + _, _ = c.conn.Write(ack) } - - ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, buf[6], buf[7]} - _, _ = c.conn.Write(ack) - - if diff < 0 { - continue // old seq - skip after ack - } - - chAck[ch]++ // expected seq - OK } - switch ch { - case 0: - select { - case c.rawCh0 <- bytes.Clone(buf[12:n]): - default: - } - continue - - case 2: - ch2WaitData = append(ch2WaitData, buf[8:n]...) - - for len(ch2WaitData) > 4 { - if ch2WaitSize == 0 { - ch2WaitSize = int(binary.BigEndian.Uint32(ch2WaitData)) - ch2WaitData = ch2WaitData[4:] - } - if ch2WaitSize <= len(ch2WaitData) { - select { - case c.rawCh2 <- ch2WaitData[:ch2WaitSize]: - default: - c.err = fmt.Errorf("%s: media queue is full", "cs2") - return - } - - ch2WaitData = ch2WaitData[ch2WaitSize:] - ch2WaitSize = 0 - } else { - break - } - } - continue + if err != nil { + c.err = fmt.Errorf("%s: %w", "cs2", err) + return } case msgPing: _, _ = c.conn.Write([]byte{magic, msgPong, 0, 0}) - continue - case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose: - continue // skip it + case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose: // skip it case msgDrwAck: // only for UDP if c.cmdAck != nil { c.cmdAck() } - continue + default: + fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n]) } - - fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n]) } } @@ -234,7 +200,7 @@ func (c *Conn) Error() error { } func (c *Conn) ReadCommand() (cmd uint16, data []byte, err error) { - buf, ok := <-c.rawCh0 + buf, ok := c.channels[0].Pop() if !ok { return 0, nil, c.Error() } @@ -282,7 +248,7 @@ func (c *Conn) WriteCommand(cmd uint16, data []byte) error { } func (c *Conn) ReadPacket() ([]byte, error) { - data, ok := <-c.rawCh2 + data, ok := c.channels[2].Pop() if !ok { return nil, c.Error() } @@ -444,3 +410,101 @@ func (c *tcpConn) Write(req []byte) (n int, err error) { _, err = c.TCPConn.Write(buf) return } + +func newDataChannel(pushSize, popSize int) *dataChannel { + c := &dataChannel{} + if pushSize > 0 { + c.pushBuf = make(map[uint16][]byte, pushSize) + c.pushSize = pushSize + } + if popSize >= 0 { + c.popBuf = make(chan []byte, popSize) + } + return c +} + +type dataChannel struct { + waitSeq uint16 + pushBuf map[uint16][]byte + pushSize int + + waitData []byte + waitSize int + popBuf chan []byte +} + +func (c *dataChannel) Push(b []byte) error { + c.waitData = append(c.waitData, b...) + + for len(c.waitData) > 4 { + // Every new data starts with size. There can be several data inside one packet. + if c.waitSize == 0 { + c.waitSize = int(binary.BigEndian.Uint32(c.waitData)) + c.waitData = c.waitData[4:] + } + if c.waitSize > len(c.waitData) { + break + } + + select { + case c.popBuf <- c.waitData[:c.waitSize]: + default: + return fmt.Errorf("pop buffer is full") + } + + c.waitData = c.waitData[c.waitSize:] + c.waitSize = 0 + } + return nil +} + +func (c *dataChannel) Pop() ([]byte, bool) { + data, ok := <-c.popBuf + return data, ok +} + +func (c *dataChannel) Close() { + close(c.popBuf) +} + +// PushSeq returns how many seq were processed. +// Returns 0 if seq was saved or processed earlier. +// Returns -1 if seq could not be saved (buffer full or disabled). +func (c *dataChannel) PushSeq(seq uint16, data []byte) (int, error) { + diff := int16(seq - c.waitSeq) + // Check if this is seq from the future. + if diff > 0 { + // Support disabled buffer. + if c.pushSize == 0 { + return -1, nil // couldn't save seq + } + // Check if we don't have this seq in the buffer. + if c.pushBuf[seq] == nil { + // Check if there is enough space in the buffer. + if len(c.pushBuf) == c.pushSize { + return -1, nil // couldn't save seq + } + c.pushBuf[seq] = bytes.Clone(data) + //log.Printf("push buf wait=%d seq=%d len=%d", c.waitSeq, seq, len(c.pushBuf)) + } + return 0, nil + } + + // Check if this is seq from the past. + if diff < 0 { + return 0, nil + } + + for i := 1; ; i++ { + if err := c.Push(data); err != nil { + return i, err + } + c.waitSeq++ + // Check if we have next seq in the buffer. + if data = c.pushBuf[c.waitSeq]; data != nil { + delete(c.pushBuf, c.waitSeq) + } else { + return i, nil + } + } +}