+129
-65
@@ -21,10 +21,11 @@ func Dial(host, transport string) (*Conn, error) {
|
||||
_, isTCP := conn.(*tcpConn)
|
||||
|
||||
c := &Conn{
|
||||
conn: conn,
|
||||
isTCP: isTCP,
|
||||
rawCh0: make(chan []byte, 10),
|
||||
rawCh2: make(chan []byte, 100),
|
||||
conn: conn,
|
||||
isTCP: isTCP,
|
||||
channels: [4]*dataChannel{
|
||||
newDataChannel(0, 10), nil, newDataChannel(250, 100), nil,
|
||||
},
|
||||
}
|
||||
go c.worker()
|
||||
return c, nil
|
||||
@@ -37,8 +38,8 @@ type Conn struct {
|
||||
err error
|
||||
seqCh0 uint16
|
||||
seqCh3 uint16
|
||||
rawCh0 chan []byte
|
||||
rawCh2 chan []byte
|
||||
|
||||
channels [4]*dataChannel
|
||||
|
||||
cmdMu sync.Mutex
|
||||
cmdAck func()
|
||||
@@ -106,15 +107,13 @@ func handshake(host, transport string) (net.Conn, error) {
|
||||
|
||||
func (c *Conn) worker() {
|
||||
defer func() {
|
||||
close(c.rawCh0)
|
||||
close(c.rawCh2)
|
||||
c.channels[0].Close()
|
||||
c.channels[2].Close()
|
||||
}()
|
||||
|
||||
chAck := make([]uint16, 4) // only for UDP
|
||||
var keepaliveTS time.Time // only for TCP
|
||||
|
||||
buf := make([]byte, 1200)
|
||||
var ch2WaitSize int
|
||||
var ch2WaitData []byte
|
||||
var keepaliveTS time.Time
|
||||
|
||||
for {
|
||||
n, err := c.conn.Read(buf)
|
||||
@@ -131,6 +130,7 @@ func (c *Conn) worker() {
|
||||
switch buf[1] {
|
||||
case msgDrw:
|
||||
ch := buf[5]
|
||||
channel := c.channels[ch]
|
||||
|
||||
if c.isTCP {
|
||||
// For TCP we should send ping every second to keep connection alive.
|
||||
@@ -139,71 +139,37 @@ func (c *Conn) worker() {
|
||||
_, _ = c.conn.Write([]byte{magic, msgPing, 0, 0})
|
||||
keepaliveTS = now.Add(time.Second)
|
||||
}
|
||||
|
||||
err = channel.Push(buf[8:n])
|
||||
} else {
|
||||
// For UDP we should using ack.
|
||||
seq := binary.BigEndian.Uint16(buf[6:])
|
||||
diff := int16(seq - chAck[ch])
|
||||
var pushed int
|
||||
|
||||
if diff > 0 {
|
||||
continue // new seq - skip before ack
|
||||
seqHI, seqLO := buf[6], buf[7]
|
||||
seq := uint16(seqHI)<<8 | uint16(seqLO)
|
||||
pushed, err = channel.PushSeq(seq, buf[8:n])
|
||||
|
||||
if pushed >= 0 {
|
||||
// For UDP we should send ACK.
|
||||
ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, seqHI, seqLO}
|
||||
_, _ = c.conn.Write(ack)
|
||||
}
|
||||
|
||||
ack := []byte{magic, msgDrwAck, 0, 6, magicDrw, ch, 0, 1, buf[6], buf[7]}
|
||||
_, _ = c.conn.Write(ack)
|
||||
|
||||
if diff < 0 {
|
||||
continue // old seq - skip after ack
|
||||
}
|
||||
|
||||
chAck[ch]++ // expected seq - OK
|
||||
}
|
||||
|
||||
switch ch {
|
||||
case 0:
|
||||
select {
|
||||
case c.rawCh0 <- bytes.Clone(buf[12:n]):
|
||||
default:
|
||||
}
|
||||
continue
|
||||
|
||||
case 2:
|
||||
ch2WaitData = append(ch2WaitData, buf[8:n]...)
|
||||
|
||||
for len(ch2WaitData) > 4 {
|
||||
if ch2WaitSize == 0 {
|
||||
ch2WaitSize = int(binary.BigEndian.Uint32(ch2WaitData))
|
||||
ch2WaitData = ch2WaitData[4:]
|
||||
}
|
||||
if ch2WaitSize <= len(ch2WaitData) {
|
||||
select {
|
||||
case c.rawCh2 <- ch2WaitData[:ch2WaitSize]:
|
||||
default:
|
||||
c.err = fmt.Errorf("%s: media queue is full", "cs2")
|
||||
return
|
||||
}
|
||||
|
||||
ch2WaitData = ch2WaitData[ch2WaitSize:]
|
||||
ch2WaitSize = 0
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
continue
|
||||
if err != nil {
|
||||
c.err = fmt.Errorf("%s: %w", "cs2", err)
|
||||
return
|
||||
}
|
||||
|
||||
case msgPing:
|
||||
_, _ = c.conn.Write([]byte{magic, msgPong, 0, 0})
|
||||
continue
|
||||
case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose:
|
||||
continue // skip it
|
||||
case msgPong, msgP2PRdyUDP, msgP2PRdyTCP, msgClose: // skip it
|
||||
case msgDrwAck: // only for UDP
|
||||
if c.cmdAck != nil {
|
||||
c.cmdAck()
|
||||
}
|
||||
continue
|
||||
default:
|
||||
fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n])
|
||||
}
|
||||
|
||||
fmt.Printf("%s: unknown msg: %x\n", "cs2", buf[:n])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,7 +200,7 @@ func (c *Conn) Error() error {
|
||||
}
|
||||
|
||||
func (c *Conn) ReadCommand() (cmd uint16, data []byte, err error) {
|
||||
buf, ok := <-c.rawCh0
|
||||
buf, ok := c.channels[0].Pop()
|
||||
if !ok {
|
||||
return 0, nil, c.Error()
|
||||
}
|
||||
@@ -282,7 +248,7 @@ func (c *Conn) WriteCommand(cmd uint16, data []byte) error {
|
||||
}
|
||||
|
||||
func (c *Conn) ReadPacket() ([]byte, error) {
|
||||
data, ok := <-c.rawCh2
|
||||
data, ok := c.channels[2].Pop()
|
||||
if !ok {
|
||||
return nil, c.Error()
|
||||
}
|
||||
@@ -444,3 +410,101 @@ func (c *tcpConn) Write(req []byte) (n int, err error) {
|
||||
_, err = c.TCPConn.Write(buf)
|
||||
return
|
||||
}
|
||||
|
||||
func newDataChannel(pushSize, popSize int) *dataChannel {
|
||||
c := &dataChannel{}
|
||||
if pushSize > 0 {
|
||||
c.pushBuf = make(map[uint16][]byte, pushSize)
|
||||
c.pushSize = pushSize
|
||||
}
|
||||
if popSize >= 0 {
|
||||
c.popBuf = make(chan []byte, popSize)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
type dataChannel struct {
|
||||
waitSeq uint16
|
||||
pushBuf map[uint16][]byte
|
||||
pushSize int
|
||||
|
||||
waitData []byte
|
||||
waitSize int
|
||||
popBuf chan []byte
|
||||
}
|
||||
|
||||
func (c *dataChannel) Push(b []byte) error {
|
||||
c.waitData = append(c.waitData, b...)
|
||||
|
||||
for len(c.waitData) > 4 {
|
||||
// Every new data starts with size. There can be several data inside one packet.
|
||||
if c.waitSize == 0 {
|
||||
c.waitSize = int(binary.BigEndian.Uint32(c.waitData))
|
||||
c.waitData = c.waitData[4:]
|
||||
}
|
||||
if c.waitSize > len(c.waitData) {
|
||||
break
|
||||
}
|
||||
|
||||
select {
|
||||
case c.popBuf <- c.waitData[:c.waitSize]:
|
||||
default:
|
||||
return fmt.Errorf("pop buffer is full")
|
||||
}
|
||||
|
||||
c.waitData = c.waitData[c.waitSize:]
|
||||
c.waitSize = 0
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *dataChannel) Pop() ([]byte, bool) {
|
||||
data, ok := <-c.popBuf
|
||||
return data, ok
|
||||
}
|
||||
|
||||
func (c *dataChannel) Close() {
|
||||
close(c.popBuf)
|
||||
}
|
||||
|
||||
// PushSeq returns how many seq were processed.
|
||||
// Returns 0 if seq was saved or processed earlier.
|
||||
// Returns -1 if seq could not be saved (buffer full or disabled).
|
||||
func (c *dataChannel) PushSeq(seq uint16, data []byte) (int, error) {
|
||||
diff := int16(seq - c.waitSeq)
|
||||
// Check if this is seq from the future.
|
||||
if diff > 0 {
|
||||
// Support disabled buffer.
|
||||
if c.pushSize == 0 {
|
||||
return -1, nil // couldn't save seq
|
||||
}
|
||||
// Check if we don't have this seq in the buffer.
|
||||
if c.pushBuf[seq] == nil {
|
||||
// Check if there is enough space in the buffer.
|
||||
if len(c.pushBuf) == c.pushSize {
|
||||
return -1, nil // couldn't save seq
|
||||
}
|
||||
c.pushBuf[seq] = bytes.Clone(data)
|
||||
//log.Printf("push buf wait=%d seq=%d len=%d", c.waitSeq, seq, len(c.pushBuf))
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Check if this is seq from the past.
|
||||
if diff < 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
for i := 1; ; i++ {
|
||||
if err := c.Push(data); err != nil {
|
||||
return i, err
|
||||
}
|
||||
c.waitSeq++
|
||||
// Check if we have next seq in the buffer.
|
||||
if data = c.pushBuf[c.waitSeq]; data != nil {
|
||||
delete(c.pushBuf, c.waitSeq)
|
||||
} else {
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user