Refactor discovery and session setup logic

This commit is contained in:
seydx
2026-01-02 02:22:14 +01:00
parent f923487546
commit 4cff72c9a3
+177 -148
View File
@@ -9,7 +9,6 @@ import (
"fmt"
"io"
"net"
"os"
"sync"
"time"
@@ -23,6 +22,11 @@ const (
DefaultPort = 32761 // TUTK discovery port
MaxPacketSize = 2048 // Max single packet size
ReadBufferSize = 2 * 1024 * 1024 // 2MB for video streams
DiscoTimeout = 5000 * time.Millisecond // Total timeout for discovery
DiscoInterval = 100 * time.Millisecond // Interval between discovery packets
SessionTimeout = 5000 * time.Millisecond // Total timeout for session setup
ReadWaitInterval = 50 * time.Millisecond // Read wait interval per iteration
)
type FrameAssembler struct {
@@ -33,16 +37,16 @@ type FrameAssembler struct {
}
type Conn struct {
udpConn *net.UDPConn
addr *net.UDPAddr
broadcastAddr *net.UDPAddr
randomID []byte
uid string
authKey string
enr string
psk []byte
iotcTxSeq uint16
avLoginResp *AVLoginResponse
udpConn *net.UDPConn
addr *net.UDPAddr
broadcastAddrs []*net.UDPAddr
randomID []byte
uid string
authKey string
enr string
psk []byte
iotcTxSeq uint16
avLoginResp *AVLoginResponse
// DTLS - Main Channel (we = Client)
mainConn *dtls.Conn
@@ -93,21 +97,19 @@ func Dial(host, uid, authKey, enr string, verbose bool) (*Conn, error) {
psk := hash[:]
c := &Conn{
udpConn: conn,
addr: &net.UDPAddr{IP: net.ParseIP(host), Port: DefaultPort},
broadcastAddr: &net.UDPAddr{IP: net.IPv4(255, 255, 255, 255), Port: DefaultPort},
randomID: genRandomID(),
uid: uid,
authKey: authKey,
enr: enr,
psk: psk,
verbose: verbose,
ctx: ctx,
cancel: cancel,
// DTLS channel buffers
udpConn: conn,
addr: &net.UDPAddr{IP: net.ParseIP(host), Port: DefaultPort},
broadcastAddrs: getBroadcastAddrs(DefaultPort, verbose),
randomID: genRandomID(),
uid: uid,
authKey: authKey,
enr: enr,
psk: psk,
verbose: verbose,
ctx: ctx,
cancel: cancel,
mainBuf: make(chan []byte, 64),
speakerBuf: make(chan []byte, 64),
// Packet delivery (SDK-style FIFO)
packetQueue: make(chan *Packet, 128),
done: make(chan struct{}),
ioctrl: make(chan []byte, 16),
@@ -400,18 +402,26 @@ func (c *Conn) discoStage1() error {
encrypted := crypto.TransCodeBlob(pkt)
if c.verbose {
fmt.Printf("[IOTC] Disco Stage 1: broadcast + direct to %s\n", c.addr)
fmt.Printf("[IOTC] Disco Stage 1: timeout=%v interval=%v broadcasts=%d\n",
DiscoTimeout, DiscoInterval, len(c.broadcastAddrs))
}
for range 10 {
_, _ = c.udpConn.WriteToUDP(encrypted, c.broadcastAddr)
deadline := time.Now().Add(DiscoTimeout)
lastSend := time.Time{}
buf := make([]byte, MaxPacketSize)
if _, err := c.udpConn.WriteToUDP(encrypted, c.addr); err != nil {
return err
for time.Now().Before(deadline) {
if time.Since(lastSend) >= DiscoInterval {
for _, bcast := range c.broadcastAddrs {
c.udpConn.WriteToUDP(encrypted, bcast)
if c.verbose {
fmt.Printf("[IOTC] Disco Stage 1: sent to %s\n", bcast)
}
}
lastSend = time.Now()
}
buf := make([]byte, MaxPacketSize)
c.udpConn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
c.udpConn.SetReadDeadline(time.Now().Add(ReadWaitInterval))
n, addr, err := c.udpConn.ReadFromUDP(buf)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
@@ -439,7 +449,7 @@ func (c *Conn) discoStage1() error {
}
}
return fmt.Errorf("timeout")
return fmt.Errorf("timeout after %v", DiscoTimeout)
}
func (c *Conn) discoStage2() {
@@ -453,28 +463,22 @@ func (c *Conn) sessionSetup() error {
pkt := c.buildSession()
if c.verbose {
fmt.Printf("[IOTC] Session setup: sending to %s\n", c.addr)
fmt.Printf("[IOTC] Session setup: target=%s\n", c.addr)
}
// Send request
if _, err := c.sendEncrypted(pkt); err != nil {
return err
}
for retry := range 10 {
buf := make([]byte, MaxPacketSize)
c.udpConn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
// Wait for response
buf := make([]byte, MaxPacketSize)
c.udpConn.SetReadDeadline(time.Now().Add(SessionTimeout))
for {
n, addr, err := c.udpConn.ReadFromUDP(buf)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
if retry%3 == 2 {
if c.verbose {
fmt.Printf("[IOTC] Session setup: resending (retry %d)\n", retry)
}
_, _ = c.sendEncrypted(pkt)
}
continue
}
return err
return fmt.Errorf("timeout: %w", err)
}
data := crypto.ReverseTransCodeBlob(buf[:n])
@@ -495,8 +499,6 @@ func (c *Conn) sessionSetup() error {
return nil
}
}
return fmt.Errorf("timeout")
}
func (c *Conn) connect() error {
@@ -1209,28 +1211,30 @@ func (c *Conn) buildDisco(stage byte) []byte {
const frameSize = 16 + bodySize
frame := make([]byte, frameSize)
frame[0] = 0x04
frame[1] = 0x02
frame[2] = 0x1a
frame[3] = 0x02
binary.LittleEndian.PutUint16(frame[4:6], bodySize)
binary.LittleEndian.PutUint16(frame[8:10], CmdDiscoReq)
binary.LittleEndian.PutUint16(frame[10:12], 0x0021)
// IOTC Frame Header [0-15]
frame[0] = 0x04 // [0] Marker1
frame[1] = 0x02 // [1] Marker2
frame[2] = 0x1a // [2] Marker3
frame[3] = 0x02 // [3] Mode = Disco
binary.LittleEndian.PutUint16(frame[4:6], bodySize) // [4-5] BodySize
binary.LittleEndian.PutUint16(frame[8:10], CmdDiscoReq) // [8-9] Command = 0x0601
binary.LittleEndian.PutUint16(frame[10:12], 0x0021) // [10-11] Flags
// Body [16-87]
body := frame[16:]
copy(body[0:], c.uid)
copy(body[0:20], c.uid) // [0-19] UID (20 bytes)
body[36] = 0x01
body[37] = 0x01
body[38] = 0x02
body[39] = 0x04
body[36] = 0x01 // [36] Unknown1
body[37] = 0x01 // [37] Unknown2
body[38] = 0x02 // [38] Unknown3
body[39] = 0x04 // [39] Unknown4
copy(body[40:48], c.randomID)
body[48] = stage
copy(body[40:48], c.randomID) // [40-47] RandomID
body[48] = stage // [48] Stage (1=broadcast, 2=direct)
if stage == 1 && len(c.authKey) > 0 {
copy(body[58:], c.authKey)
copy(body[58:], c.authKey) // [58-65] AuthKey
}
return frame
@@ -1241,41 +1245,28 @@ func (c *Conn) buildSession() []byte {
const frameSize = 16 + bodySize
frame := make([]byte, frameSize)
frame[0] = 0x04
frame[1] = 0x02
frame[2] = 0x1a
frame[3] = 0x02
binary.LittleEndian.PutUint16(frame[4:6], bodySize)
binary.LittleEndian.PutUint16(frame[8:10], CmdSessionReq)
binary.LittleEndian.PutUint16(frame[10:12], 0x0033)
// IOTC Frame Header [0-15]
frame[0] = 0x04 // [0] Marker1
frame[1] = 0x02 // [1] Marker2
frame[2] = 0x1a // [2] Marker3
frame[3] = 0x02 // [3] Mode
binary.LittleEndian.PutUint16(frame[4:6], bodySize) // [4-5] BodySize
binary.LittleEndian.PutUint16(frame[8:10], CmdSessionReq) // [8-9] Command = 0x0402
binary.LittleEndian.PutUint16(frame[10:12], 0x0033) // [10-11] Flags
// Body [16-51]
body := frame[16:]
copy(body[0:], c.uid)
copy(body[20:28], c.randomID)
copy(body[0:20], c.uid) // [0-19] UID (20 bytes)
copy(body[20:28], c.randomID) // [20-27] RandomID
ts := uint32(time.Now().Unix())
binary.LittleEndian.PutUint32(body[32:36], ts)
binary.LittleEndian.PutUint32(body[32:36], ts) // [32-35] Timestamp
return frame
}
func (c *Conn) buildDTLSConfig(isServer bool) *dtls.Config {
var keyLogWriter io.Writer
if c.verbose {
keyLogPath := os.Getenv("SSLKEYLOGFILE")
if keyLogPath != "" {
f, err := os.OpenFile(keyLogPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err == nil {
keyLogWriter = f
if c.verbose {
fmt.Printf("[DTLS] Key Log: %s\n", keyLogPath)
}
}
}
}
config := &dtls.Config{
PSK: func(hint []byte) ([]byte, error) {
if c.verbose {
@@ -1289,7 +1280,6 @@ func (c *Conn) buildDTLSConfig(isServer bool) *dtls.Config {
MTU: 1200,
FlightInterval: 300 * time.Millisecond,
ExtendedMasterSecret: dtls.DisableExtendedMasterSecret,
KeyLogWriter: keyLogWriter,
}
// Use custom cipher suites for client, standard for server
@@ -1308,32 +1298,31 @@ func (c *Conn) buildDataTXChannel(payload []byte, channel byte) []byte {
frameSize := 16 + bodySize
frame := make([]byte, frameSize)
frame[0] = 0x04
frame[1] = 0x02
frame[2] = 0x1a
frame[3] = 0x0b
binary.LittleEndian.PutUint16(frame[4:6], uint16(bodySize))
binary.LittleEndian.PutUint16(frame[6:8], c.iotcTxSeq)
// IOTC Frame Header [0-15]
frame[0] = 0x04 // [0] Marker1
frame[1] = 0x02 // [1] Marker2
frame[2] = 0x1a // [2] Marker3
frame[3] = 0x0b // [3] Mode = Data
binary.LittleEndian.PutUint16(frame[4:6], uint16(bodySize)) // [4-5] BodySize
binary.LittleEndian.PutUint16(frame[6:8], c.iotcTxSeq) // [6-7] Sequence
c.iotcTxSeq++
binary.LittleEndian.PutUint16(frame[8:10], CmdDataTX)
binary.LittleEndian.PutUint16(frame[10:12], 0x0021)
binary.LittleEndian.PutUint16(frame[8:10], CmdDataTX) // [8-9] Command = 0x0407
binary.LittleEndian.PutUint16(frame[10:12], 0x0021) // [10-11] Flags
copy(frame[12:14], c.randomID[:2]) // [12-13] RandomID[0:2]
frame[14] = channel // [14] Channel (0=Main, 1=Back)
frame[15] = 0x01 // [15] Marker
copy(frame[12:14], c.randomID[:2])
frame[14] = channel // Channel byte: 0 = Main, 1 = Backchannel
frame[15] = 0x01
binary.LittleEndian.PutUint32(frame[16:20], 0x0000000c)
copy(frame[20:28], c.randomID[:8])
// Sub-Header [16-27]
binary.LittleEndian.PutUint32(frame[16:20], 0x0000000c) // [16-19] Const
copy(frame[20:28], c.randomID[:8]) // [20-27] RandomID
// Payload [28+]
copy(frame[28:], payload)
return frame
}
func (c *Conn) buildACK() []byte {
// c.ackFlags++
if c.ackFlags == 0 {
c.ackFlags = 0x0001
} else if c.ackFlags < 0x0007 {
@@ -1341,13 +1330,13 @@ func (c *Conn) buildACK() []byte {
}
ack := make([]byte, 24)
binary.LittleEndian.PutUint16(ack[0:2], MagicACK) // Magic
binary.LittleEndian.PutUint16(ack[2:4], ProtocolVersion) // Version
binary.LittleEndian.PutUint32(ack[4:8], c.avTxSeq) // TxSeq
binary.LittleEndian.PutUint16(ack[0:2], MagicACK) // [0-1] Magic = 0x0009
binary.LittleEndian.PutUint16(ack[2:4], ProtocolVersion) // [2-3] Version = 0x000C
binary.LittleEndian.PutUint32(ack[4:8], c.avTxSeq) // [4-7] TxSeq
c.avTxSeq++
binary.LittleEndian.PutUint32(ack[8:12], 0xffffffff) // RxSeq
binary.LittleEndian.PutUint16(ack[12:14], c.ackFlags) // Flags
binary.LittleEndian.PutUint32(ack[16:20], uint32(c.ackFlags)<<16) // SDK uses ackFlags<<16, not avTxSeq
binary.LittleEndian.PutUint32(ack[8:12], 0xffffffff) // [8-11] RxSeq (not used)
binary.LittleEndian.PutUint16(ack[12:14], c.ackFlags) // [12-13] AckFlags
binary.LittleEndian.PutUint32(ack[16:20], uint32(c.ackFlags)<<16) // [16-19] AckCounter
return ack
}
@@ -1355,17 +1344,18 @@ func (c *Conn) buildACK() []byte {
func (c *Conn) buildKeepaliveResponse(incomingPayload []byte) []byte {
frame := make([]byte, 24)
frame[0] = 0x04
frame[1] = 0x02
frame[2] = 0x1a
frame[3] = 0x0a
binary.LittleEndian.PutUint16(frame[4:6], 8)
binary.LittleEndian.PutUint16(frame[8:10], CmdKeepaliveReq)
binary.LittleEndian.PutUint16(frame[10:12], 0x0021)
// IOTC Frame Header [0-15]
frame[0] = 0x04 // [0] Marker1
frame[1] = 0x02 // [1] Marker2
frame[2] = 0x1a // [2] Marker3
frame[3] = 0x0a // [3] Mode
binary.LittleEndian.PutUint16(frame[4:6], 8) // [4-5] BodySize = 8
binary.LittleEndian.PutUint16(frame[8:10], CmdKeepaliveReq) // [8-9] Command = 0x0427
binary.LittleEndian.PutUint16(frame[10:12], 0x0021) // [10-11] Flags
// Body [16-23]: Echo back incoming payload
if len(incomingPayload) >= 8 {
copy(frame[16:24], incomingPayload[:8])
copy(frame[16:24], incomingPayload[:8]) // [16-23] EchoPayload
}
return frame
@@ -1403,33 +1393,6 @@ func (c *Conn) buildAVLoginPacket(magic uint16, size int, flags uint16, randomID
}
func (c *Conn) buildAVLoginResponse(checksum uint32) []byte {
// SDK sends 60-byte AV Login response
// Captured from SDK: 00 21 0c 00 10 00 00 00 00 00 00 00 00 00 00 00
// 24 00 00 00 cd ac ca 40 00 00 00 00 00 01 00 01
// 00 00 00 00 04 00 00 00 fb 07 1f 00 00 00 00 00
// 00 00 00 00 00 00 03 00 02 00 00 00
//
// Structure:
// [0-1] Magic: 0x2100 (Login Response)
// [2-3] Protocol Version: 0x000c
// [4] Response Type: 0x10 (success)
// [5-15] Reserved: zeros
// [16-19] Payload Size: 0x24 = 36
// [20-23] Checksum: MUST echo from request!
// [24-27] Reserved: zeros
// [28] Flag1: 0x00
// [29] EnableFlag: 0x01
// [30] Flag2: 0x00
// [31] TwoWayStreaming: 0x01
// [32-35] Reserved: zeros
// [36-39] BufferConfig: 0x04
// [40-43] Capabilities: 0x001f07fb
// [44-51] Reserved: zeros
// [52-53] Reserved: zeros
// [54-55] ChannelInfo1: 0x0003
// [56-57] ChannelInfo2: 0x0002
// [58-59] Reserved: zeros
resp := make([]byte, 60)
// Header
@@ -1553,3 +1516,69 @@ func genRandomID() []byte {
_, _ = rand.Read(b)
return b
}
func getBroadcastAddrs(port int, verbose bool) []*net.UDPAddr {
var addrs []*net.UDPAddr
ifaces, err := net.Interfaces()
if err != nil {
if verbose {
fmt.Printf("[IOTC] Failed to get interfaces: %v\n", err)
}
// Fallback to limited broadcast
return []*net.UDPAddr{{IP: net.IPv4(255, 255, 255, 255), Port: port}}
}
for _, iface := range ifaces {
// Skip loopback and down interfaces
if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 {
continue
}
ifAddrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range ifAddrs {
ipNet, ok := addr.(*net.IPNet)
if !ok {
continue
}
// Only IPv4
ip4 := ipNet.IP.To4()
if ip4 == nil {
continue
}
// Calculate broadcast address: IP | ~mask
mask := ipNet.Mask
if len(mask) != 4 {
continue
}
broadcast := make(net.IP, 4)
for i := 0; i < 4; i++ {
broadcast[i] = ip4[i] | ^mask[i]
}
bcastAddr := &net.UDPAddr{IP: broadcast, Port: port}
addrs = append(addrs, bcastAddr)
if verbose {
fmt.Printf("[IOTC] Found broadcast address: %s (iface: %s)\n", bcastAddr, iface.Name)
}
}
}
if len(addrs) == 0 {
// Fallback to limited broadcast
if verbose {
fmt.Printf("[IOTC] No broadcast addresses found, using 255.255.255.255\n")
}
return []*net.UDPAddr{{IP: net.IPv4(255, 255, 255, 255), Port: port}}
}
return addrs
}