update ack hangling to improve streaming

This commit is contained in:
seydx
2026-01-15 01:11:06 +01:00
parent 7241759fea
commit 0d035e5bce
+95 -52
View File
@@ -28,14 +28,22 @@ const (
)
type Conn struct {
conn *net.UDPConn
addr *net.UDPAddr
conn *net.UDPConn
addr *net.UDPAddr
frames *FrameHandler
err error
verbose bool
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
// DTLS
clientConn *dtls.Conn
serverConn *dtls.Conn
clientBuf chan []byte
serverBuf chan []byte
rawCmd chan []byte
// Identity
uid string
@@ -59,23 +67,12 @@ type Conn struct {
audioSeq uint32
audioFrameNo uint32
// Channels
rawCmd chan []byte
// Frame assembly
frames *FrameHandler
ackFlags uint16
// State
err error
verbose bool
// Sync
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
cmdAck func()
// Ack
ackFlags uint16
rxSeqStart uint16
rxSeqEnd uint16
rxSeqInit bool
cmdAck func()
}
func Dial(host string, port int, uid, authKey, enr, mac string, verbose bool) (*Conn, error) {
@@ -94,17 +91,19 @@ func Dial(host string, port int, uid, authKey, enr, mac string, verbose bool) (*
}
c := &Conn{
conn: udp,
addr: &net.UDPAddr{IP: net.ParseIP(host), Port: port},
rid: genRandomID(),
uid: uid,
authKey: authKey,
enr: enr,
mac: mac,
psk: psk,
verbose: verbose,
ctx: ctx,
cancel: cancel,
conn: udp,
addr: &net.UDPAddr{IP: net.ParseIP(host), Port: port},
rid: genRandomID(),
uid: uid,
authKey: authKey,
enr: enr,
mac: mac,
psk: psk,
verbose: verbose,
ctx: ctx,
cancel: cancel,
rxSeqStart: 0xffff, // Initialize RX seq for ACK
rxSeqEnd: 0xffff,
}
if err = c.discovery(); err != nil {
@@ -166,6 +165,25 @@ func (c *Conn) AVClientStart(timeout time.Duration) error {
ack := c.buildACK()
c.clientConn.Write(ack)
c.wg.Add(1)
go func() {
defer c.wg.Done()
ackTicker := time.NewTicker(100 * time.Millisecond)
defer ackTicker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ackTicker.C:
if c.clientConn != nil {
ack := c.buildACK()
c.clientConn.Write(ack)
}
}
}
}()
return nil
}
case <-timer.C:
@@ -254,9 +272,9 @@ func (c *Conn) AVSendAudioData(codec uint16, payload []byte, timestampUS uint32,
}
func (c *Conn) Write(data []byte) error {
if c.verbose {
fmt.Printf("[UDP TX] to=%s, len=%d, data:\n%s", c.addr.String(), len(data), hexDump(data))
}
// if c.verbose {
// fmt.Printf("[UDP TX] to=%s, len=%d, data:\n%s", c.addr.String(), len(data), hexDump(data))
// }
if c.newProto {
_, err := c.conn.WriteToUDP(data, c.addr)
@@ -274,9 +292,9 @@ func (c *Conn) WriteDTLS(payload []byte, channel byte) error {
frame = c.buildTxData(payload, channel)
}
if c.verbose {
fmt.Printf("[DTLS TX] to=%s, len=%d, channel=%d, data:\n%s", c.addr.String(), len(frame), channel, hexDump(frame))
}
// if c.verbose {
// fmt.Printf("[DTLS TX] to=%s, len=%d, channel=%d, data:\n%s", c.addr.String(), len(frame), channel, hexDump(frame))
// }
return c.Write(frame)
}
@@ -517,9 +535,9 @@ func (c *Conn) worker() {
data := buf[:n]
magic := binary.LittleEndian.Uint16(data)
if c.verbose {
fmt.Printf("[DTLS RX] from=%s, len=%d, data:\n%s", c.addr.String(), n, hexDump(data))
}
// if c.verbose {
// fmt.Printf("[DTLS RX] from=%s, len=%d, data:\n%s", c.addr.String(), n, hexDump(data))
// }
switch magic {
case MagicAVLoginResp:
@@ -545,6 +563,29 @@ func (c *Conn) worker() {
}
}
case ProtoVersion:
if len(data) >= 8 {
// Extract seq number at byte 4-5 (uint16 of uint32 AVSeq)
seq := binary.LittleEndian.Uint16(data[4:])
if !c.rxSeqInit {
c.rxSeqInit = true
}
// Track highest received sequence
if seq > c.rxSeqEnd || c.rxSeqEnd == 0xffff {
c.rxSeqEnd = seq
}
// Check for HL command response
if len(data) >= 36 {
for i := 32; i+2 < len(data); i++ {
if data[i] == 'H' && data[i+1] == 'L' {
c.queue(c.rawCmd, data[i:])
break
}
}
}
}
case MagicACK:
c.mu.RLock()
ack := c.cmdAck
@@ -582,9 +623,9 @@ func (c *Conn) reader() {
return
}
if c.verbose {
fmt.Printf("[UDP RX] from=%s, len=%d, data:\n%s", addr.String(), n, hexDump(buf[:n]))
}
// if c.verbose {
// fmt.Printf("[UDP RX] from=%s, len=%d, data:\n%s", addr.String(), n, hexDump(buf[:n]))
// }
if !addr.IP.Equal(c.addr.IP) {
if c.verbose {
@@ -780,7 +821,7 @@ func (c *Conn) buildAVLoginPacket(magic uint16, size int, flags uint16, randomID
copy(b[20:], randomID[:4])
copy(b[24:], DefaultUser) // username
copy(b[280:], c.enr) // password/ENR
// binary.LittleEndian.PutUint32(b[536:], 1) // resend
// binary.LittleEndian.PutUint32(b[536:], 1) // resend enabled
binary.LittleEndian.PutUint32(b[540:], 4) // security_mode ?
binary.LittleEndian.PutUint32(b[552:], DefaultCaps) // capabilities
return b
@@ -880,19 +921,21 @@ func (c *Conn) buildNewTxData(payload []byte, channel byte) []byte {
}
func (c *Conn) buildACK() []byte {
if c.ackFlags == 0 {
c.ackFlags = 0x0001
} else if c.ackFlags < 0x0007 {
c.ackFlags++
}
c.ackFlags++
b := make([]byte, 24)
binary.LittleEndian.PutUint16(b[0:], MagicACK) // 0x0009
binary.LittleEndian.PutUint16(b[2:], ProtoVersion) // 0x000c
binary.LittleEndian.PutUint32(b[4:], c.avSeq) // tx seq
binary.LittleEndian.PutUint32(b[4:], c.avSeq) // TX seq
c.avSeq++
binary.LittleEndian.PutUint32(b[8:], 0xffffffff) // rx seq
binary.LittleEndian.PutUint16(b[12:], c.ackFlags) // ack flags
binary.LittleEndian.PutUint32(b[16:], uint32(c.ackFlags)<<16) // ack counter
binary.LittleEndian.PutUint16(b[8:], c.rxSeqStart) // RX start (last acked)
binary.LittleEndian.PutUint16(b[10:], c.rxSeqEnd) // RX end (highest received)
if c.rxSeqInit {
c.rxSeqStart = c.rxSeqEnd
}
binary.LittleEndian.PutUint16(b[12:], c.ackFlags) // AckFlags
binary.LittleEndian.PutUint32(b[16:], uint32(c.ackFlags)<<16) // AckCounter
ts := uint32(time.Now().UnixMilli() & 0xFFFF)
binary.LittleEndian.PutUint16(b[20:], uint16(ts)) // Timestamp
return b
}