diff --git a/pkg/debug/conn.go b/pkg/debug/conn.go new file mode 100644 index 00000000..6261cb75 --- /dev/null +++ b/pkg/debug/conn.go @@ -0,0 +1,47 @@ +package debug + +import ( + "bytes" + "math/rand" + "net" +) + +type badConn struct { + net.Conn + delay int + buf []byte +} + +func NewBadConn(conn net.Conn) net.Conn { + return &badConn{Conn: conn} +} + +const ( + missChance = 0.05 + delayChance = 0.1 +) + +func (c *badConn) Read(b []byte) (n int, err error) { + if rand.Float32() < missChance { + if _, err = c.Conn.Read(b); err != nil { + return + } + //log.Printf("bad conn: miss") + } + + if c.delay > 0 { + if c.delay--; c.delay == 0 { + n = copy(b, c.buf) + return + } + } else if rand.Float32() < delayChance { + if n, err = c.Conn.Read(b); err != nil { + return + } + c.delay = 1 + rand.Intn(5) + c.buf = bytes.Clone(b[:n]) + //log.Printf("bad conn: delay %d", c.delay) + } + + return c.Conn.Read(b) +} diff --git a/pkg/xiaomi/cs2/conn.go b/pkg/xiaomi/cs2/conn.go index cde09ab5..ff5aead6 100644 --- a/pkg/xiaomi/cs2/conn.go +++ b/pkg/xiaomi/cs2/conn.go @@ -2,6 +2,7 @@ package cs2 import ( "bufio" + "bytes" "encoding/binary" "fmt" "io" @@ -20,10 +21,11 @@ func Dial(host, transport string) (*Conn, error) { _, isTCP := conn.(*tcpConn) c := &Conn{ - conn: conn, - isTCP: isTCP, - rawCh0: make(chan []byte, 10), - rawCh2: make(chan []byte, 100), + conn: conn, + isTCP: isTCP, + channels: [4]*dataChannel{ + newDataChannel(0, 10), nil, newDataChannel(250, 100), nil, + }, } go c.worker() return c, nil @@ -36,8 +38,8 @@ type Conn struct { err error seqCh0 uint16 seqCh3 uint16 - rawCh0 chan []byte - rawCh2 chan []byte + + channels [4]*dataChannel cmdMu sync.Mutex cmdAck func() @@ -46,6 +48,7 @@ type Conn struct { const ( magic = 0xF1 magicDrw = 0xD1 + magicTCP = 0x68 msgLanSearch = 0x30 msgPunchPkt = 0x41 msgP2PRdyUDP = 0x42 @@ -104,15 +107,13 @@ func handshake(host, transport string) (net.Conn, error) { func (c *Conn) worker() { defer func() { - close(c.rawCh0) - close(c.rawCh2) + c.channels[0].Close() + c.channels[2].Close() }() - chAck := make([]uint16, 4) // only for UDP + var keepaliveTS time.Time // only for TCP + buf := make([]byte, 1200) - var ch2WaitSize int - var ch2WaitData []byte - var keepaliveTS time.Time for { n, err := c.conn.Read(buf) @@ -121,9 +122,15 @@ func (c *Conn) worker() { return } + // 0 f1d0 magic + // 2 005d size = total size + 4 + // 4 d1 magic + // 5 00 channel + // 6 0000 seq switch buf[1] { case msgDrw: ch := buf[5] + channel := c.channels[ch] if c.isTCP { // For TCP we should send ping every second to keep connection alive. @@ -132,66 +139,37 @@ func (c *Conn) worker() { _, _ = c.conn.Write([]byte{magic, msgPing, 0, 0}) keepaliveTS = now.Add(time.Second) } + + err = channel.Push(buf[8:n]) } else { - // For UDP we should using ack. - seqHI := buf[6] - seqLO := buf[7] + var pushed int - if chAck[ch] != uint16(seqHI)<<8|uint16(seqLO) { - continue + seqHI, seqLO := buf[6], buf[7] + seq := uint16(seqHI)<<8 | uint16(seqLO) + pushed, err = channel.PushSeq(seq, buf[8:n]) + + if pushed >= 0 { + // For UDP we should send ACK. + ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO} + _, _ = c.conn.Write(ack) } - chAck[ch]++ - - ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO} - _, _ = c.conn.Write(ack) } - switch ch { - case 0: - select { - case c.rawCh0 <- buf[12:]: - default: - } - continue - - case 2: - ch2WaitData = append(ch2WaitData, buf[8:n]...) - - for len(ch2WaitData) > 4 { - if ch2WaitSize == 0 { - ch2WaitSize = int(binary.BigEndian.Uint32(ch2WaitData)) - ch2WaitData = ch2WaitData[4:] - } - if ch2WaitSize <= len(ch2WaitData) { - select { - case c.rawCh2 <- ch2WaitData[:ch2WaitSize]: - default: - c.err = fmt.Errorf("%s: media queue is full", "cs2") - return - } - - ch2WaitData = ch2WaitData[ch2WaitSize:] - ch2WaitSize = 0 - } else { - break - } - } - continue + if err != nil { + c.err = fmt.Errorf("%s: %w", "cs2", err) + return } case msgPing: _, _ = c.conn.Write([]byte{magic, msgPong, 0, 0}) - continue - case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose: - continue // skip it + case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose: // skip it case msgDrwAck: // only for UDP if c.cmdAck != nil { c.cmdAck() } - continue + default: + fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n]) } - - fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n]) } } @@ -222,7 +200,7 @@ func (c *Conn) Error() error { } func (c *Conn) ReadCommand() (cmd uint16, data []byte, err error) { - buf, ok := <-c.rawCh0 + buf, ok := c.channels[0].Pop() if !ok { return 0, nil, c.Error() } @@ -270,7 +248,7 @@ func (c *Conn) WriteCommand(cmd uint16, data []byte) error { } func (c *Conn) ReadPacket() ([]byte, error) { - data, ok := <-c.rawCh2 + data, ok := c.channels[2].Pop() if !ok { return nil, c.Error() } @@ -343,23 +321,24 @@ type udpConn struct { addr *net.UDPAddr } -func (c *udpConn) Read(p []byte) (n int, err error) { +func (c *udpConn) Read(b []byte) (n int, err error) { var addr *net.UDPAddr for { - n, addr, err = c.UDPConn.ReadFromUDP(p) + n, addr, err = c.UDPConn.ReadFromUDP(b) if err != nil { return 0, err } if string(addr.IP) == string(c.addr.IP) || n >= 8 { + //log.Printf("<- %x", b[:n]) return } } } -func (c *udpConn) Write(req []byte) (n int, err error) { - //log.Printf("-> %x", req) - return c.UDPConn.WriteToUDP(req, c.addr) +func (c *udpConn) Write(b []byte) (n int, err error) { + //log.Printf("-> %x", b) + return c.UDPConn.WriteToUDP(b, c.addr) } func (c *udpConn) RemoteAddr() net.Addr { @@ -425,9 +404,107 @@ func (c *tcpConn) Write(req []byte) (n int, err error) { n = len(req) buf := make([]byte, 8+n) binary.BigEndian.PutUint16(buf, uint16(n)) - buf[2] = 0x68 + buf[2] = magicTCP copy(buf[8:], req) //log.Printf("-> %x", buf) _, err = c.TCPConn.Write(buf) return } + +func newDataChannel(pushSize, popSize int) *dataChannel { + c := &dataChannel{} + if pushSize > 0 { + c.pushBuf = make(map[uint16][]byte, pushSize) + c.pushSize = pushSize + } + if popSize >= 0 { + c.popBuf = make(chan []byte, popSize) + } + return c +} + +type dataChannel struct { + waitSeq uint16 + pushBuf map[uint16][]byte + pushSize int + + waitData []byte + waitSize int + popBuf chan []byte +} + +func (c *dataChannel) Push(b []byte) error { + c.waitData = append(c.waitData, b...) + + for len(c.waitData) > 4 { + // Every new data starts with size. There can be several data inside one packet. + if c.waitSize == 0 { + c.waitSize = int(binary.BigEndian.Uint32(c.waitData)) + c.waitData = c.waitData[4:] + } + if c.waitSize > len(c.waitData) { + break + } + + select { + case c.popBuf <- c.waitData[:c.waitSize]: + default: + return fmt.Errorf("pop buffer is full") + } + + c.waitData = c.waitData[c.waitSize:] + c.waitSize = 0 + } + return nil +} + +func (c *dataChannel) Pop() ([]byte, bool) { + data, ok := <-c.popBuf + return data, ok +} + +func (c *dataChannel) Close() { + close(c.popBuf) +} + +// PushSeq returns how many seq were processed. +// Returns 0 if seq was saved or processed earlier. +// Returns -1 if seq could not be saved (buffer full or disabled). +func (c *dataChannel) PushSeq(seq uint16, data []byte) (int, error) { + diff := int16(seq - c.waitSeq) + // Check if this is seq from the future. + if diff > 0 { + // Support disabled buffer. + if c.pushSize == 0 { + return -1, nil // couldn't save seq + } + // Check if we don't have this seq in the buffer. + if c.pushBuf[seq] == nil { + // Check if there is enough space in the buffer. + if len(c.pushBuf) == c.pushSize { + return -1, nil // couldn't save seq + } + c.pushBuf[seq] = bytes.Clone(data) + //log.Printf("push buf wait=%d seq=%d len=%d", c.waitSeq, seq, len(c.pushBuf)) + } + return 0, nil + } + + // Check if this is seq from the past. + if diff < 0 { + return 0, nil + } + + for i := 1; ; i++ { + if err := c.Push(data); err != nil { + return i, err + } + c.waitSeq++ + // Check if we have next seq in the buffer. + if data = c.pushBuf[c.waitSeq]; data != nil { + delete(c.pushBuf, c.waitSeq) + } else { + return i, nil + } + } +} diff --git a/pkg/xiaomi/producer.go b/pkg/xiaomi/producer.go index dcd419b8..27955dba 100644 --- a/pkg/xiaomi/producer.go +++ b/pkg/xiaomi/producer.go @@ -140,7 +140,8 @@ func probe(client *miss.Client, channel, quality, audio uint8) ([]*core.Media, e Codecs: []*core.Codec{acodec}, }) - if client.Protocol() == "cs2+udp" { + switch client.Protocol() { + case "cs2+udp", "cs2+tcp": medias = append(medias, &core.Media{ Kind: core.KindAudio, Direction: core.DirectionSendonly,