From b19c081642a2ce5247c7553846a0c102ee44b740 Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 13 Jan 2026 12:13:58 +0300 Subject: [PATCH 1/4] Improve cs2+udp proto for xiaomi source --- pkg/xiaomi/cs2/conn.go | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/pkg/xiaomi/cs2/conn.go b/pkg/xiaomi/cs2/conn.go index cde09ab5..43103321 100644 --- a/pkg/xiaomi/cs2/conn.go +++ b/pkg/xiaomi/cs2/conn.go @@ -2,6 +2,7 @@ package cs2 import ( "bufio" + "bytes" "encoding/binary" "fmt" "io" @@ -46,6 +47,7 @@ type Conn struct { const ( magic = 0xF1 magicDrw = 0xD1 + magicTCP = 0x68 msgLanSearch = 0x30 msgPunchPkt = 0x41 msgP2PRdyUDP = 0x42 @@ -121,6 +123,11 @@ func (c *Conn) worker() { return } + // 0 f1d0 magic + // 2 005d size = total size + 4 + // 4 d1 magic + // 5 00 channel + // 6 0000 seq switch buf[1] { case msgDrw: ch := buf[5] @@ -134,22 +141,27 @@ func (c *Conn) worker() { } } else { // For UDP we should using ack. - seqHI := buf[6] - seqLO := buf[7] + seq := binary.BigEndian.Uint16(buf[6:]) + diff := int16(seq - chAck[ch]) - if chAck[ch] != uint16(seqHI)<<8|uint16(seqLO) { - continue + if diff > 0 { + continue // new seq - skip before ack } - chAck[ch]++ - ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO} + ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, buf[6], buf[7]} _, _ = c.conn.Write(ack) + + if diff < 0 { + continue // old seq - skip after ack + } + + chAck[ch]++ // expected seq - OK } switch ch { case 0: select { - case c.rawCh0 <- buf[12:]: + case c.rawCh0 <- bytes.Clone(buf[12:n]): default: } continue @@ -343,23 +355,24 @@ type udpConn struct { addr *net.UDPAddr } -func (c *udpConn) Read(p []byte) (n int, err error) { +func (c *udpConn) Read(b []byte) (n int, err error) { var addr *net.UDPAddr for { - n, addr, err = c.UDPConn.ReadFromUDP(p) + n, addr, err = c.UDPConn.ReadFromUDP(b) if err != nil { return 0, err } if string(addr.IP) == string(c.addr.IP) || n >= 8 { + //log.Printf("<- %x", b[:n]) return } } } -func (c *udpConn) Write(req []byte) (n int, err error) { - //log.Printf("-> %x", req) - return c.UDPConn.WriteToUDP(req, c.addr) +func (c *udpConn) Write(b []byte) (n int, err error) { + //log.Printf("-> %x", b) + return c.UDPConn.WriteToUDP(b, c.addr) } func (c *udpConn) RemoteAddr() net.Addr { @@ -425,7 +438,7 @@ func (c *tcpConn) Write(req []byte) (n int, err error) { n = len(req) buf := make([]byte, 8+n) binary.BigEndian.PutUint16(buf, uint16(n)) - buf[2] = 0x68 + buf[2] = magicTCP copy(buf[8:], req) //log.Printf("-> %x", buf) _, err = c.TCPConn.Write(buf) From fd6810794005acd2970c840b1ad528c634d46a7b Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 13 Jan 2026 12:15:02 +0300 Subject: [PATCH 2/4] Add "bad conn" for debugging UDP --- pkg/debug/conn.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 pkg/debug/conn.go diff --git a/pkg/debug/conn.go b/pkg/debug/conn.go new file mode 100644 index 00000000..6261cb75 --- /dev/null +++ b/pkg/debug/conn.go @@ -0,0 +1,47 @@ +package debug + +import ( + "bytes" + "math/rand" + "net" +) + +type badConn struct { + net.Conn + delay int + buf []byte +} + +func NewBadConn(conn net.Conn) net.Conn { + return &badConn{Conn: conn} +} + +const ( + missChance = 0.05 + delayChance = 0.1 +) + +func (c *badConn) Read(b []byte) (n int, err error) { + if rand.Float32() < missChance { + if _, err = c.Conn.Read(b); err != nil { + return + } + //log.Printf("bad conn: miss") + } + + if c.delay > 0 { + if c.delay--; c.delay == 0 { + n = copy(b, c.buf) + return + } + } else if rand.Float32() < delayChance { + if n, err = c.Conn.Read(b); err != nil { + return + } + c.delay = 1 + rand.Intn(5) + c.buf = bytes.Clone(b[:n]) + //log.Printf("bad conn: delay %d", c.delay) + } + + return c.Conn.Read(b) +} From 59161fcef2f6b144145f81ba0adb6a38b9c45a83 Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 13 Jan 2026 21:25:30 +0300 Subject: [PATCH 3/4] Improve cs2+udp proto for xiaomi source #2026 #2030 --- pkg/xiaomi/cs2/conn.go | 194 +++++++++++++++++++++++++++-------------- 1 file changed, 129 insertions(+), 65 deletions(-) diff --git a/pkg/xiaomi/cs2/conn.go b/pkg/xiaomi/cs2/conn.go index 43103321..ff5aead6 100644 --- a/pkg/xiaomi/cs2/conn.go +++ b/pkg/xiaomi/cs2/conn.go @@ -21,10 +21,11 @@ func Dial(host, transport string) (*Conn, error) { _, isTCP := conn.(*tcpConn) c := &Conn{ - conn: conn, - isTCP: isTCP, - rawCh0: make(chan []byte, 10), - rawCh2: make(chan []byte, 100), + conn: conn, + isTCP: isTCP, + channels: [4]*dataChannel{ + newDataChannel(0, 10), nil, newDataChannel(250, 100), nil, + }, } go c.worker() return c, nil @@ -37,8 +38,8 @@ type Conn struct { err error seqCh0 uint16 seqCh3 uint16 - rawCh0 chan []byte - rawCh2 chan []byte + + channels [4]*dataChannel cmdMu sync.Mutex cmdAck func() @@ -106,15 +107,13 @@ func handshake(host, transport string) (net.Conn, error) { func (c *Conn) worker() { defer func() { - close(c.rawCh0) - close(c.rawCh2) + c.channels[0].Close() + c.channels[2].Close() }() - chAck := make([]uint16, 4) // only for UDP + var keepaliveTS time.Time // only for TCP + buf := make([]byte, 1200) - var ch2WaitSize int - var ch2WaitData []byte - var keepaliveTS time.Time for { n, err := c.conn.Read(buf) @@ -131,6 +130,7 @@ func (c *Conn) worker() { switch buf[1] { case msgDrw: ch := buf[5] + channel := c.channels[ch] if c.isTCP { // For TCP we should send ping every second to keep connection alive. @@ -139,71 +139,37 @@ func (c *Conn) worker() { _, _ = c.conn.Write([]byte{magic, msgPing, 0, 0}) keepaliveTS = now.Add(time.Second) } + + err = channel.Push(buf[8:n]) } else { - // For UDP we should using ack. - seq := binary.BigEndian.Uint16(buf[6:]) - diff := int16(seq - chAck[ch]) + var pushed int - if diff > 0 { - continue // new seq - skip before ack + seqHI, seqLO := buf[6], buf[7] + seq := uint16(seqHI)<<8 | uint16(seqLO) + pushed, err = channel.PushSeq(seq, buf[8:n]) + + if pushed >= 0 { + // For UDP we should send ACK. + ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO} + _, _ = c.conn.Write(ack) } - - ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, buf[6], buf[7]} - _, _ = c.conn.Write(ack) - - if diff < 0 { - continue // old seq - skip after ack - } - - chAck[ch]++ // expected seq - OK } - switch ch { - case 0: - select { - case c.rawCh0 <- bytes.Clone(buf[12:n]): - default: - } - continue - - case 2: - ch2WaitData = append(ch2WaitData, buf[8:n]...) - - for len(ch2WaitData) > 4 { - if ch2WaitSize == 0 { - ch2WaitSize = int(binary.BigEndian.Uint32(ch2WaitData)) - ch2WaitData = ch2WaitData[4:] - } - if ch2WaitSize <= len(ch2WaitData) { - select { - case c.rawCh2 <- ch2WaitData[:ch2WaitSize]: - default: - c.err = fmt.Errorf("%s: media queue is full", "cs2") - return - } - - ch2WaitData = ch2WaitData[ch2WaitSize:] - ch2WaitSize = 0 - } else { - break - } - } - continue + if err != nil { + c.err = fmt.Errorf("%s: %w", "cs2", err) + return } case msgPing: _, _ = c.conn.Write([]byte{magic, msgPong, 0, 0}) - continue - case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose: - continue // skip it + case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose: // skip it case msgDrwAck: // only for UDP if c.cmdAck != nil { c.cmdAck() } - continue + default: + fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n]) } - - fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n]) } } @@ -234,7 +200,7 @@ func (c *Conn) Error() error { } func (c *Conn) ReadCommand() (cmd uint16, data []byte, err error) { - buf, ok := <-c.rawCh0 + buf, ok := c.channels[0].Pop() if !ok { return 0, nil, c.Error() } @@ -282,7 +248,7 @@ func (c *Conn) WriteCommand(cmd uint16, data []byte) error { } func (c *Conn) ReadPacket() ([]byte, error) { - data, ok := <-c.rawCh2 + data, ok := c.channels[2].Pop() if !ok { return nil, c.Error() } @@ -444,3 +410,101 @@ func (c *tcpConn) Write(req []byte) (n int, err error) { _, err = c.TCPConn.Write(buf) return } + +func newDataChannel(pushSize, popSize int) *dataChannel { + c := &dataChannel{} + if pushSize > 0 { + c.pushBuf = make(map[uint16][]byte, pushSize) + c.pushSize = pushSize + } + if popSize >= 0 { + c.popBuf = make(chan []byte, popSize) + } + return c +} + +type dataChannel struct { + waitSeq uint16 + pushBuf map[uint16][]byte + pushSize int + + waitData []byte + waitSize int + popBuf chan []byte +} + +func (c *dataChannel) Push(b []byte) error { + c.waitData = append(c.waitData, b...) + + for len(c.waitData) > 4 { + // Every new data starts with size. There can be several data inside one packet. + if c.waitSize == 0 { + c.waitSize = int(binary.BigEndian.Uint32(c.waitData)) + c.waitData = c.waitData[4:] + } + if c.waitSize > len(c.waitData) { + break + } + + select { + case c.popBuf <- c.waitData[:c.waitSize]: + default: + return fmt.Errorf("pop buffer is full") + } + + c.waitData = c.waitData[c.waitSize:] + c.waitSize = 0 + } + return nil +} + +func (c *dataChannel) Pop() ([]byte, bool) { + data, ok := <-c.popBuf + return data, ok +} + +func (c *dataChannel) Close() { + close(c.popBuf) +} + +// PushSeq returns how many seq were processed. +// Returns 0 if seq was saved or processed earlier. +// Returns -1 if seq could not be saved (buffer full or disabled). +func (c *dataChannel) PushSeq(seq uint16, data []byte) (int, error) { + diff := int16(seq - c.waitSeq) + // Check if this is seq from the future. + if diff > 0 { + // Support disabled buffer. + if c.pushSize == 0 { + return -1, nil // couldn't save seq + } + // Check if we don't have this seq in the buffer. + if c.pushBuf[seq] == nil { + // Check if there is enough space in the buffer. + if len(c.pushBuf) == c.pushSize { + return -1, nil // couldn't save seq + } + c.pushBuf[seq] = bytes.Clone(data) + //log.Printf("push buf wait=%d seq=%d len=%d", c.waitSeq, seq, len(c.pushBuf)) + } + return 0, nil + } + + // Check if this is seq from the past. + if diff < 0 { + return 0, nil + } + + for i := 1; ; i++ { + if err := c.Push(data); err != nil { + return i, err + } + c.waitSeq++ + // Check if we have next seq in the buffer. + if data = c.pushBuf[c.waitSeq]; data != nil { + delete(c.pushBuf, c.waitSeq) + } else { + return i, nil + } + } +} From 2f43bfe5dc5b19d320d1435a7d5559353d8b1ded Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 13 Jan 2026 21:26:01 +0300 Subject: [PATCH 4/4] Fix two-way audio for cs2+tcp proto for xiaomi source --- pkg/xiaomi/producer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/xiaomi/producer.go b/pkg/xiaomi/producer.go index dcd419b8..27955dba 100644 --- a/pkg/xiaomi/producer.go +++ b/pkg/xiaomi/producer.go @@ -140,7 +140,8 @@ func probe(client *miss.Client, channel, quality, audio uint8) ([]*core.Media, e Codecs: []*core.Codec{acodec}, }) - if client.Protocol() == "cs2+udp" { + switch client.Protocol() { + case "cs2+udp", "cs2+tcp": medias = append(medias, &core.Media{ Kind: core.KindAudio, Direction: core.DirectionSendonly,