From 0d035e5bcea34f7a541f81b41386af173fbd7166 Mon Sep 17 00:00:00 2001 From: seydx Date: Thu, 15 Jan 2026 01:11:06 +0100 Subject: [PATCH] update ack hangling to improve streaming --- pkg/wyze/tutk/conn.go | 147 +++++++++++++++++++++++++++--------------- 1 file changed, 95 insertions(+), 52 deletions(-) diff --git a/pkg/wyze/tutk/conn.go b/pkg/wyze/tutk/conn.go index 6aaa0ad2..22b72afd 100644 --- a/pkg/wyze/tutk/conn.go +++ b/pkg/wyze/tutk/conn.go @@ -28,14 +28,22 @@ const ( ) type Conn struct { - conn *net.UDPConn - addr *net.UDPAddr + conn *net.UDPConn + addr *net.UDPAddr + frames *FrameHandler + err error + verbose bool + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + mu sync.RWMutex // DTLS clientConn *dtls.Conn serverConn *dtls.Conn clientBuf chan []byte serverBuf chan []byte + rawCmd chan []byte // Identity uid string @@ -59,23 +67,12 @@ type Conn struct { audioSeq uint32 audioFrameNo uint32 - // Channels - rawCmd chan []byte - - // Frame assembly - frames *FrameHandler - ackFlags uint16 - - // State - err error - verbose bool - - // Sync - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - mu sync.RWMutex - cmdAck func() + // Ack + ackFlags uint16 + rxSeqStart uint16 + rxSeqEnd uint16 + rxSeqInit bool + cmdAck func() } func Dial(host string, port int, uid, authKey, enr, mac string, verbose bool) (*Conn, error) { @@ -94,17 +91,19 @@ func Dial(host string, port int, uid, authKey, enr, mac string, verbose bool) (* } c := &Conn{ - conn: udp, - addr: &net.UDPAddr{IP: net.ParseIP(host), Port: port}, - rid: genRandomID(), - uid: uid, - authKey: authKey, - enr: enr, - mac: mac, - psk: psk, - verbose: verbose, - ctx: ctx, - cancel: cancel, + conn: udp, + addr: &net.UDPAddr{IP: net.ParseIP(host), Port: port}, + rid: genRandomID(), + uid: uid, + authKey: authKey, + enr: enr, + mac: mac, + psk: psk, + verbose: verbose, + ctx: ctx, + cancel: cancel, + rxSeqStart: 0xffff, // Initialize RX seq for ACK + rxSeqEnd: 0xffff, } if err = c.discovery(); err != nil { @@ -166,6 +165,25 @@ func (c *Conn) AVClientStart(timeout time.Duration) error { ack := c.buildACK() c.clientConn.Write(ack) + c.wg.Add(1) + go func() { + defer c.wg.Done() + ackTicker := time.NewTicker(100 * time.Millisecond) + defer ackTicker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + case <-ackTicker.C: + if c.clientConn != nil { + ack := c.buildACK() + c.clientConn.Write(ack) + } + } + } + }() + return nil } case <-timer.C: @@ -254,9 +272,9 @@ func (c *Conn) AVSendAudioData(codec uint16, payload []byte, timestampUS uint32, } func (c *Conn) Write(data []byte) error { - if c.verbose { - fmt.Printf("[UDP TX] to=%s, len=%d, data:\n%s", c.addr.String(), len(data), hexDump(data)) - } + // if c.verbose { + // fmt.Printf("[UDP TX] to=%s, len=%d, data:\n%s", c.addr.String(), len(data), hexDump(data)) + // } if c.newProto { _, err := c.conn.WriteToUDP(data, c.addr) @@ -274,9 +292,9 @@ func (c *Conn) WriteDTLS(payload []byte, channel byte) error { frame = c.buildTxData(payload, channel) } - if c.verbose { - fmt.Printf("[DTLS TX] to=%s, len=%d, channel=%d, data:\n%s", c.addr.String(), len(frame), channel, hexDump(frame)) - } + // if c.verbose { + // fmt.Printf("[DTLS TX] to=%s, len=%d, channel=%d, data:\n%s", c.addr.String(), len(frame), channel, hexDump(frame)) + // } return c.Write(frame) } @@ -517,9 +535,9 @@ func (c *Conn) worker() { data := buf[:n] magic := binary.LittleEndian.Uint16(data) - if c.verbose { - fmt.Printf("[DTLS RX] from=%s, len=%d, data:\n%s", c.addr.String(), n, hexDump(data)) - } + // if c.verbose { + // fmt.Printf("[DTLS RX] from=%s, len=%d, data:\n%s", c.addr.String(), n, hexDump(data)) + // } switch magic { case MagicAVLoginResp: @@ -545,6 +563,29 @@ func (c *Conn) worker() { } } + case ProtoVersion: + if len(data) >= 8 { + // Extract seq number at byte 4-5 (uint16 of uint32 AVSeq) + seq := binary.LittleEndian.Uint16(data[4:]) + if !c.rxSeqInit { + c.rxSeqInit = true + } + // Track highest received sequence + if seq > c.rxSeqEnd || c.rxSeqEnd == 0xffff { + c.rxSeqEnd = seq + } + + // Check for HL command response + if len(data) >= 36 { + for i := 32; i+2 < len(data); i++ { + if data[i] == 'H' && data[i+1] == 'L' { + c.queue(c.rawCmd, data[i:]) + break + } + } + } + } + case MagicACK: c.mu.RLock() ack := c.cmdAck @@ -582,9 +623,9 @@ func (c *Conn) reader() { return } - if c.verbose { - fmt.Printf("[UDP RX] from=%s, len=%d, data:\n%s", addr.String(), n, hexDump(buf[:n])) - } + // if c.verbose { + // fmt.Printf("[UDP RX] from=%s, len=%d, data:\n%s", addr.String(), n, hexDump(buf[:n])) + // } if !addr.IP.Equal(c.addr.IP) { if c.verbose { @@ -780,7 +821,7 @@ func (c *Conn) buildAVLoginPacket(magic uint16, size int, flags uint16, randomID copy(b[20:], randomID[:4]) copy(b[24:], DefaultUser) // username copy(b[280:], c.enr) // password/ENR - // binary.LittleEndian.PutUint32(b[536:], 1) // resend + // binary.LittleEndian.PutUint32(b[536:], 1) // resend enabled binary.LittleEndian.PutUint32(b[540:], 4) // security_mode ? binary.LittleEndian.PutUint32(b[552:], DefaultCaps) // capabilities return b @@ -880,19 +921,21 @@ func (c *Conn) buildNewTxData(payload []byte, channel byte) []byte { } func (c *Conn) buildACK() []byte { - if c.ackFlags == 0 { - c.ackFlags = 0x0001 - } else if c.ackFlags < 0x0007 { - c.ackFlags++ - } + c.ackFlags++ b := make([]byte, 24) binary.LittleEndian.PutUint16(b[0:], MagicACK) // 0x0009 binary.LittleEndian.PutUint16(b[2:], ProtoVersion) // 0x000c - binary.LittleEndian.PutUint32(b[4:], c.avSeq) // tx seq + binary.LittleEndian.PutUint32(b[4:], c.avSeq) // TX seq c.avSeq++ - binary.LittleEndian.PutUint32(b[8:], 0xffffffff) // rx seq - binary.LittleEndian.PutUint16(b[12:], c.ackFlags) // ack flags - binary.LittleEndian.PutUint32(b[16:], uint32(c.ackFlags)<<16) // ack counter + binary.LittleEndian.PutUint16(b[8:], c.rxSeqStart) // RX start (last acked) + binary.LittleEndian.PutUint16(b[10:], c.rxSeqEnd) // RX end (highest received) + if c.rxSeqInit { + c.rxSeqStart = c.rxSeqEnd + } + binary.LittleEndian.PutUint16(b[12:], c.ackFlags) // AckFlags + binary.LittleEndian.PutUint32(b[16:], uint32(c.ackFlags)<<16) // AckCounter + ts := uint32(time.Now().UnixMilli() & 0xFFFF) + binary.LittleEndian.PutUint16(b[20:], uint16(ts)) // Timestamp return b }