Merge branch 'AlexxIT:master' into wyze

This commit is contained in:
seydx
2026-01-13 21:50:10 +01:00
committed by GitHub
3 changed files with 192 additions and 67 deletions
+47
View File
@@ -0,0 +1,47 @@
package debug
import (
"bytes"
"math/rand"
"net"
)
type badConn struct {
net.Conn
delay int
buf []byte
}
func NewBadConn(conn net.Conn) net.Conn {
return &badConn{Conn: conn}
}
const (
missChance = 0.05
delayChance = 0.1
)
func (c *badConn) Read(b []byte) (n int, err error) {
if rand.Float32() < missChance {
if _, err = c.Conn.Read(b); err != nil {
return
}
//log.Printf("bad conn: miss")
}
if c.delay > 0 {
if c.delay--; c.delay == 0 {
n = copy(b, c.buf)
return
}
} else if rand.Float32() < delayChance {
if n, err = c.Conn.Read(b); err != nil {
return
}
c.delay = 1 + rand.Intn(5)
c.buf = bytes.Clone(b[:n])
//log.Printf("bad conn: delay %d", c.delay)
}
return c.Conn.Read(b)
}
+143 -66
View File
@@ -2,6 +2,7 @@ package cs2
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
@@ -20,10 +21,11 @@ func Dial(host, transport string) (*Conn, error) {
_, isTCP := conn.(*tcpConn)
c := &Conn{
conn: conn,
isTCP: isTCP,
rawCh0: make(chan []byte, 10),
rawCh2: make(chan []byte, 100),
conn: conn,
isTCP: isTCP,
channels: [4]*dataChannel{
newDataChannel(0, 10), nil, newDataChannel(250, 100), nil,
},
}
go c.worker()
return c, nil
@@ -36,8 +38,8 @@ type Conn struct {
err error
seqCh0 uint16
seqCh3 uint16
rawCh0 chan []byte
rawCh2 chan []byte
channels [4]*dataChannel
cmdMu sync.Mutex
cmdAck func()
@@ -46,6 +48,7 @@ type Conn struct {
const (
magic = 0xF1
magicDrw = 0xD1
magicTCP = 0x68
msgLanSearch = 0x30
msgPunchPkt = 0x41
msgP2PRdyUDP = 0x42
@@ -104,15 +107,13 @@ func handshake(host, transport string) (net.Conn, error) {
func (c *Conn) worker() {
defer func() {
close(c.rawCh0)
close(c.rawCh2)
c.channels[0].Close()
c.channels[2].Close()
}()
chAck := make([]uint16, 4) // only for UDP
var keepaliveTS time.Time // only for TCP
buf := make([]byte, 1200)
var ch2WaitSize int
var ch2WaitData []byte
var keepaliveTS time.Time
for {
n, err := c.conn.Read(buf)
@@ -121,9 +122,15 @@ func (c *Conn) worker() {
return
}
// 0 f1d0 magic
// 2 005d size = total size + 4
// 4 d1 magic
// 5 00 channel
// 6 0000 seq
switch buf[1] {
case msgDrw:
ch := buf[5]
channel := c.channels[ch]
if c.isTCP {
// For TCP we should send ping every second to keep connection alive.
@@ -132,66 +139,37 @@ func (c *Conn) worker() {
_, _ = c.conn.Write([]byte{magic, msgPing, 0, 0})
keepaliveTS = now.Add(time.Second)
}
err = channel.Push(buf[8:n])
} else {
// For UDP we should using ack.
seqHI := buf[6]
seqLO := buf[7]
var pushed int
if chAck[ch] != uint16(seqHI)<<8|uint16(seqLO) {
continue
seqHI, seqLO := buf[6], buf[7]
seq := uint16(seqHI)<<8 | uint16(seqLO)
pushed, err = channel.PushSeq(seq, buf[8:n])
if pushed >= 0 {
// For UDP we should send ACK.
ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO}
_, _ = c.conn.Write(ack)
}
chAck[ch]++
ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO}
_, _ = c.conn.Write(ack)
}
switch ch {
case 0:
select {
case c.rawCh0 <- buf[12:]:
default:
}
continue
case 2:
ch2WaitData = append(ch2WaitData, buf[8:n]...)
for len(ch2WaitData) > 4 {
if ch2WaitSize == 0 {
ch2WaitSize = int(binary.BigEndian.Uint32(ch2WaitData))
ch2WaitData = ch2WaitData[4:]
}
if ch2WaitSize <= len(ch2WaitData) {
select {
case c.rawCh2 <- ch2WaitData[:ch2WaitSize]:
default:
c.err = fmt.Errorf("%s: media queue is full", "cs2")
return
}
ch2WaitData = ch2WaitData[ch2WaitSize:]
ch2WaitSize = 0
} else {
break
}
}
continue
if err != nil {
c.err = fmt.Errorf("%s: %w", "cs2", err)
return
}
case msgPing:
_, _ = c.conn.Write([]byte{magic, msgPong, 0, 0})
continue
case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose:
continue // skip it
case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose: // skip it
case msgDrwAck: // only for UDP
if c.cmdAck != nil {
c.cmdAck()
}
continue
default:
fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n])
}
fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n])
}
}
@@ -222,7 +200,7 @@ func (c *Conn) Error() error {
}
func (c *Conn) ReadCommand() (cmd uint16, data []byte, err error) {
buf, ok := <-c.rawCh0
buf, ok := c.channels[0].Pop()
if !ok {
return 0, nil, c.Error()
}
@@ -270,7 +248,7 @@ func (c *Conn) WriteCommand(cmd uint16, data []byte) error {
}
func (c *Conn) ReadPacket() ([]byte, error) {
data, ok := <-c.rawCh2
data, ok := c.channels[2].Pop()
if !ok {
return nil, c.Error()
}
@@ -343,23 +321,24 @@ type udpConn struct {
addr *net.UDPAddr
}
func (c *udpConn) Read(p []byte) (n int, err error) {
func (c *udpConn) Read(b []byte) (n int, err error) {
var addr *net.UDPAddr
for {
n, addr, err = c.UDPConn.ReadFromUDP(p)
n, addr, err = c.UDPConn.ReadFromUDP(b)
if err != nil {
return 0, err
}
if string(addr.IP) == string(c.addr.IP) || n >= 8 {
//log.Printf("<- %x", b[:n])
return
}
}
}
func (c *udpConn) Write(req []byte) (n int, err error) {
//log.Printf("-> %x", req)
return c.UDPConn.WriteToUDP(req, c.addr)
func (c *udpConn) Write(b []byte) (n int, err error) {
//log.Printf("-> %x", b)
return c.UDPConn.WriteToUDP(b, c.addr)
}
func (c *udpConn) RemoteAddr() net.Addr {
@@ -425,9 +404,107 @@ func (c *tcpConn) Write(req []byte) (n int, err error) {
n = len(req)
buf := make([]byte, 8+n)
binary.BigEndian.PutUint16(buf, uint16(n))
buf[2] = 0x68
buf[2] = magicTCP
copy(buf[8:], req)
//log.Printf("-> %x", buf)
_, err = c.TCPConn.Write(buf)
return
}
func newDataChannel(pushSize, popSize int) *dataChannel {
c := &dataChannel{}
if pushSize > 0 {
c.pushBuf = make(map[uint16][]byte, pushSize)
c.pushSize = pushSize
}
if popSize >= 0 {
c.popBuf = make(chan []byte, popSize)
}
return c
}
type dataChannel struct {
waitSeq uint16
pushBuf map[uint16][]byte
pushSize int
waitData []byte
waitSize int
popBuf chan []byte
}
func (c *dataChannel) Push(b []byte) error {
c.waitData = append(c.waitData, b...)
for len(c.waitData) > 4 {
// Every new data starts with size. There can be several data inside one packet.
if c.waitSize == 0 {
c.waitSize = int(binary.BigEndian.Uint32(c.waitData))
c.waitData = c.waitData[4:]
}
if c.waitSize > len(c.waitData) {
break
}
select {
case c.popBuf <- c.waitData[:c.waitSize]:
default:
return fmt.Errorf("pop buffer is full")
}
c.waitData = c.waitData[c.waitSize:]
c.waitSize = 0
}
return nil
}
func (c *dataChannel) Pop() ([]byte, bool) {
data, ok := <-c.popBuf
return data, ok
}
func (c *dataChannel) Close() {
close(c.popBuf)
}
// PushSeq returns how many seq were processed.
// Returns 0 if seq was saved or processed earlier.
// Returns -1 if seq could not be saved (buffer full or disabled).
func (c *dataChannel) PushSeq(seq uint16, data []byte) (int, error) {
diff := int16(seq - c.waitSeq)
// Check if this is seq from the future.
if diff > 0 {
// Support disabled buffer.
if c.pushSize == 0 {
return -1, nil // couldn't save seq
}
// Check if we don't have this seq in the buffer.
if c.pushBuf[seq] == nil {
// Check if there is enough space in the buffer.
if len(c.pushBuf) == c.pushSize {
return -1, nil // couldn't save seq
}
c.pushBuf[seq] = bytes.Clone(data)
//log.Printf("push buf wait=%d seq=%d len=%d", c.waitSeq, seq, len(c.pushBuf))
}
return 0, nil
}
// Check if this is seq from the past.
if diff < 0 {
return 0, nil
}
for i := 1; ; i++ {
if err := c.Push(data); err != nil {
return i, err
}
c.waitSeq++
// Check if we have next seq in the buffer.
if data = c.pushBuf[c.waitSeq]; data != nil {
delete(c.pushBuf, c.waitSeq)
} else {
return i, nil
}
}
}
+2 -1
View File
@@ -140,7 +140,8 @@ func probe(client *miss.Client, channel, quality, audio uint8) ([]*core.Media, e
Codecs: []*core.Codec{acodec},
})
if client.Protocol() == "cs2+udp" {
switch client.Protocol() {
case "cs2+udp", "cs2+tcp":
medias = append(medias, &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,