refactor frame handling

This commit is contained in:
seydx
2026-01-15 01:11:16 +01:00
parent 0d035e5bce
commit dbd04cb972
+203 -96
View File
@@ -2,6 +2,7 @@ package tutk
import ( import (
"encoding/binary" "encoding/binary"
"encoding/hex"
"fmt" "fmt"
"github.com/AlexxIT/go2rtc/pkg/aac" "github.com/AlexxIT/go2rtc/pkg/aac"
@@ -23,29 +24,48 @@ const (
ChannelPVideo uint8 = 0x07 ChannelPVideo uint8 = 0x07
) )
// Resolution constants
const ( const (
ResolutionUnknown = 0 ResTierLow uint8 = 1 // 360P/SD
ResolutionSD = 1 ResTierHigh uint8 = 4 // HD/2K
Resolution360P = 2 )
Resolution2K = 4
const (
Bitrate360P uint8 = 30
BitrateHD uint8 = 100
Bitrate2K uint8 = 200
) )
const FrameInfoSize = 40 const FrameInfoSize = 40
// FrameInfo - Wyze extended FRAMEINFO (40 bytes at end of packet) // FrameInfo - Wyze extended FRAMEINFO (40 bytes at end of packet)
// Video: 40 bytes, Audio: 16 bytes (uses same struct, fields 16+ are zero)
//
// Offset Size Field
// 0-1 2 CodecID - 0x4E=H264, 0x7B=H265, 0x90=AAC_WYZE
// 2 1 Flags - Video: 1=Keyframe, 0=P-frame | Audio: sample rate/bits/channels
// 3 1 CamIndex - Camera index
// 4 1 OnlineNum - Online number
// 5 1 FPS - Framerate (e.g. 20)
// 6 1 ResTier - Video: 1=Low(360P), 4=High(HD/2K) | Audio: 0
// 7 1 Bitrate - Video: 30=360P, 100=HD, 200=2K | Audio: 1
// 8-11 4 Timestamp - Timestamp (increases ~50000/frame for 20fps video)
// 12-15 4 SessionID - Session marker (constant per stream)
// 16-19 4 PayloadSize - Frame payload size in bytes
// 20-23 4 FrameNo - Global frame number
// 24-35 12 DeviceID - MAC address (ASCII) - video only
// 36-39 4 Padding - Always 0 - video only
type FrameInfo struct { type FrameInfo struct {
CodecID uint16 CodecID uint16 // 0-1
Flags uint8 Flags uint8 // 2
CamIndex uint8 CamIndex uint8 // 3
OnlineNum uint8 OnlineNum uint8 // 4
Framerate uint8 FPS uint8 // 5: Framerate
FrameSize uint8 ResTier uint8 // 6: Resolution tier (1=Low, 4=High)
Bitrate uint8 Bitrate uint8 // 7: Bitrate index (30=360P, 100=HD, 200=2K)
TimestampUS uint32 Timestamp uint32 // 8-11: Timestamp
Timestamp uint32 SessionID uint32 // 12-15: Session marker (constant)
PayloadSize uint32 PayloadSize uint32 // 16-19: Payload size
FrameNo uint32 FrameNo uint32 // 20-23: Frame number
} }
func (fi *FrameInfo) IsKeyframe() bool { func (fi *FrameInfo) IsKeyframe() bool {
@@ -53,12 +73,12 @@ func (fi *FrameInfo) IsKeyframe() bool {
} }
func (fi *FrameInfo) Resolution() string { func (fi *FrameInfo) Resolution() string {
switch fi.FrameSize { switch fi.Bitrate {
case ResolutionSD: case Bitrate360P:
return "SD"
case Resolution360P:
return "360P" return "360P"
case Resolution2K: case BitrateHD:
return "HD"
case Bitrate2K:
return "2K" return "2K"
default: default:
return "unknown" return "unknown"
@@ -98,11 +118,11 @@ func ParseFrameInfo(data []byte) *FrameInfo {
Flags: fi[2], Flags: fi[2],
CamIndex: fi[3], CamIndex: fi[3],
OnlineNum: fi[4], OnlineNum: fi[4],
Framerate: fi[5], FPS: fi[5],
FrameSize: fi[6], ResTier: fi[6],
Bitrate: fi[7], Bitrate: fi[7],
TimestampUS: binary.LittleEndian.Uint32(fi[8:]), Timestamp: binary.LittleEndian.Uint32(fi[8:]),
Timestamp: binary.LittleEndian.Uint32(fi[12:]), SessionID: binary.LittleEndian.Uint32(fi[12:]),
PayloadSize: binary.LittleEndian.Uint32(fi[16:]), PayloadSize: binary.LittleEndian.Uint32(fi[16:]),
FrameNo: binary.LittleEndian.Uint32(fi[20:]), FrameNo: binary.LittleEndian.Uint32(fi[20:]),
} }
@@ -166,7 +186,7 @@ func ParsePacketHeader(data []byte) *PacketHeader {
hdr.PayloadSize = binary.LittleEndian.Uint16(data[16:]) hdr.PayloadSize = binary.LittleEndian.Uint16(data[16:])
hdr.FrameNo = binary.LittleEndian.Uint32(data[24:]) hdr.FrameNo = binary.LittleEndian.Uint32(data[24:])
if IsEndFrame(frameType) && pktIdxOrMarker == 0x0028 { if pktIdxOrMarker == 0x0028 && (IsEndFrame(frameType) || hdr.PktTotal == 1) {
hdr.HasFrameInfo = true hdr.HasFrameInfo = true
if hdr.PktTotal > 0 { if hdr.PktTotal > 0 {
hdr.PktIdx = hdr.PktTotal - 1 hdr.PktIdx = hdr.PktTotal - 1
@@ -180,7 +200,7 @@ func ParsePacketHeader(data []byte) *PacketHeader {
hdr.PayloadSize = binary.LittleEndian.Uint16(data[24:]) hdr.PayloadSize = binary.LittleEndian.Uint16(data[24:])
hdr.FrameNo = binary.LittleEndian.Uint32(data[32:]) hdr.FrameNo = binary.LittleEndian.Uint32(data[32:])
if IsEndFrame(frameType) && pktIdxOrMarker == 0x0028 { if pktIdxOrMarker == 0x0028 && (IsEndFrame(frameType) || hdr.PktTotal == 1) {
hdr.HasFrameInfo = true hdr.HasFrameInfo = true
if hdr.PktTotal > 0 { if hdr.PktTotal > 0 {
hdr.PktIdx = hdr.PktTotal - 1 hdr.PktIdx = hdr.PktTotal - 1
@@ -207,11 +227,24 @@ func IsContinuationFrame(frameType uint8) bool {
return frameType == FrameTypeCont || frameType == FrameTypeContAlt return frameType == FrameTypeCont || frameType == FrameTypeContAlt
} }
type FrameAssembler struct { type channelState struct {
FrameNo uint32 frameNo uint32 // current frame being assembled
PktTotal uint16 pktTotal uint16 // expected total packets
Packets map[uint16][]byte waitSeq uint16 // next expected packet index (0, 1, 2, ...)
FrameInfo *FrameInfo waitData []byte // accumulated payload data
frameInfo *FrameInfo // frame info (from end packet)
hasStarted bool // received first packet of frame
lastPktIdx uint16 // last received packet index (for OOO detection)
}
func (cs *channelState) reset() {
cs.frameNo = 0
cs.pktTotal = 0
cs.waitSeq = 0
cs.waitData = cs.waitData[:0]
cs.frameInfo = nil
cs.hasStarted = false
cs.lastPktIdx = 0
} }
func ParseAudioParams(payload []byte, fi *FrameInfo) (sampleRate uint32, channels uint8) { func ParseAudioParams(payload []byte, fi *FrameInfo) (sampleRate uint32, channels uint8) {
@@ -229,18 +262,22 @@ func ParseAudioParams(payload []byte, fi *FrameInfo) (sampleRate uint32, channel
return 16000, 1 return 16000, 1
} }
const tsWrapPeriod uint32 = 1000000
type FrameHandler struct { type FrameHandler struct {
assemblers map[byte]*FrameAssembler channels map[byte]*channelState
baseTS uint64 lastRawTS uint32
output chan *Packet accumUS uint64
verbose bool firstTS bool
output chan *Packet
verbose bool
} }
func NewFrameHandler(verbose bool) *FrameHandler { func NewFrameHandler(verbose bool) *FrameHandler {
return &FrameHandler{ return &FrameHandler{
assemblers: make(map[byte]*FrameAssembler), channels: make(map[byte]*channelState),
output: make(chan *Packet, 128), output: make(chan *Packet, 128),
verbose: verbose, verbose: verbose,
} }
} }
@@ -252,6 +289,27 @@ func (h *FrameHandler) Close() {
close(h.output) close(h.output)
} }
func (h *FrameHandler) updateTimestamp(rawTS uint32) uint64 {
if !h.firstTS {
h.firstTS = true
h.lastRawTS = rawTS
return 0
}
var delta uint32
if rawTS >= h.lastRawTS {
delta = rawTS - h.lastRawTS
} else {
// Wrapped: delta = (wrap - last) + new
delta = (tsWrapPeriod - h.lastRawTS) + rawTS
}
h.accumUS += uint64(delta)
h.lastRawTS = rawTS
return h.accumUS
}
func (h *FrameHandler) Handle(data []byte) { func (h *FrameHandler) Handle(data []byte) {
hdr := ParsePacketHeader(data) hdr := ParsePacketHeader(data)
if hdr == nil { if hdr == nil {
@@ -263,6 +321,16 @@ func (h *FrameHandler) Handle(data []byte) {
return return
} }
if h.verbose {
fiStr := ""
if hdr.HasFrameInfo {
fiStr = " +FI"
}
fmt.Printf("[RX] ch=0x%02x type=0x%02x #%d pkt=%d/%d data=%dB%s\n",
hdr.Channel, hdr.FrameType,
hdr.FrameNo, hdr.PktIdx, hdr.PktTotal, len(payload), fiStr)
}
switch hdr.Channel { switch hdr.Channel {
case ChannelAudio: case ChannelAudio:
h.handleAudio(payload, fi) h.handleAudio(payload, fi)
@@ -335,71 +403,73 @@ func (h *FrameHandler) extractPayload(data []byte, channel byte) ([]byte, *Frame
} }
func (h *FrameHandler) handleVideo(channel byte, hdr *PacketHeader, payload []byte, fi *FrameInfo) { func (h *FrameHandler) handleVideo(channel byte, hdr *PacketHeader, payload []byte, fi *FrameInfo) {
asm := h.assemblers[channel] cs := h.channels[channel]
if cs == nil {
// Frame transition: new frame number = previous frame complete cs = &channelState{}
if asm != nil && hdr.FrameNo != asm.FrameNo { h.channels[channel] = cs
gotAll := uint16(len(asm.Packets)) == asm.PktTotal
if gotAll && asm.FrameInfo != nil {
h.assembleAndQueue(channel, asm)
}
asm = nil
} }
// Create new assembler if needed // New frame number - reset and start fresh
if asm == nil { if hdr.FrameNo != cs.frameNo {
asm = &FrameAssembler{ // Check if previous frame was incomplete
FrameNo: hdr.FrameNo, if cs.hasStarted && cs.waitSeq < cs.pktTotal {
PktTotal: hdr.PktTotal, fmt.Printf("[DROP] ch=0x%02x #%d INCOMPLETE: got %d/%d pkts\n",
Packets: make(map[uint16][]byte, hdr.PktTotal), channel, cs.frameNo, cs.waitSeq, cs.pktTotal)
} }
h.assemblers[channel] = asm cs.reset()
cs.frameNo = hdr.FrameNo
cs.pktTotal = hdr.PktTotal
} }
// Store packet (copy payload - buffer is reused by worker) // Sequential check: if packet index doesn't match expected, reset (data loss)
payloadCopy := make([]byte, len(payload)) if hdr.PktIdx != cs.waitSeq {
copy(payloadCopy, payload) fmt.Printf("[OOO] ch=0x%02x #%d frameType=0x%02x pktTotal=%d expected pkt %d, got %d - reset\n",
asm.Packets[hdr.PktIdx] = payloadCopy channel, hdr.FrameNo, hdr.FrameType, hdr.PktTotal, cs.waitSeq, hdr.PktIdx)
cs.reset()
return
}
// First packet - mark as started
if cs.waitSeq == 0 {
cs.hasStarted = true
}
// Append payload (simple sequential accumulation)
cs.waitData = append(cs.waitData, payload...)
cs.waitSeq++
// Store frame info if present
if fi != nil { if fi != nil {
asm.FrameInfo = fi cs.frameInfo = fi
} }
// Check if frame is complete // Check if frame is complete
if uint16(len(asm.Packets)) == asm.PktTotal && asm.FrameInfo != nil { if cs.waitSeq == cs.pktTotal && cs.frameInfo != nil {
h.assembleAndQueue(channel, asm) h.emitVideo(channel, cs)
delete(h.assemblers, channel) cs.reset()
} }
} }
func (h *FrameHandler) assembleAndQueue(channel byte, asm *FrameAssembler) { func (h *FrameHandler) emitVideo(channel byte, cs *channelState) {
fi := asm.FrameInfo fi := cs.frameInfo
// Assemble packets in correct order
var payload []byte
for i := uint16(0); i < asm.PktTotal; i++ {
if pkt, ok := asm.Packets[i]; ok {
payload = append(payload, pkt...)
}
}
// Size validation // Size validation
if fi.PayloadSize > 0 && len(payload) != int(fi.PayloadSize) { if fi.PayloadSize > 0 && uint32(len(cs.waitData)) != fi.PayloadSize {
fmt.Printf("[SIZE] ch=0x%02x #%d mismatch: expected %d, got %d\n",
channel, cs.frameNo, fi.PayloadSize, len(cs.waitData))
return return
} }
if len(payload) == 0 { if len(cs.waitData) == 0 {
return return
} }
// Calculate RTP timestamp (90kHz for video) using relative timestamps accumUS := h.updateTimestamp(fi.Timestamp)
absoluteTS := uint64(fi.Timestamp)*1000000 + uint64(fi.TimestampUS) rtpTS := uint32(accumUS * 90000 / 1000000)
if h.baseTS == 0 {
h.baseTS = absoluteTS // Copy payload (buffer will be reused)
} payload := make([]byte, len(cs.waitData))
relativeUS := absoluteTS - h.baseTS copy(payload, cs.waitData)
const clockRate uint64 = 90000
rtpTS := uint32(relativeUS * clockRate / 1000000)
pkt := &Packet{ pkt := &Packet{
Channel: channel, Channel: channel,
@@ -413,10 +483,18 @@ func (h *FrameHandler) assembleAndQueue(channel byte, asm *FrameAssembler) {
if h.verbose { if h.verbose {
frameType := "P" frameType := "P"
if fi.IsKeyframe() { if fi.IsKeyframe() {
frameType = "I" frameType = "KEY"
} }
fmt.Printf("[VIDEO] #%d %s %s size=%d rtp=%d\n", fmt.Printf("[OK] ch=0x%02x #%d %s %s size=%d\n",
fi.FrameNo, CodecName(fi.CodecID), frameType, len(payload), rtpTS) channel, fi.FrameNo, CodecName(fi.CodecID), frameType, len(payload))
fmt.Printf(" [0-1]codec=0x%x(%s) [2]flags=0x%x [3]=%d [4]=%d\n",
fi.CodecID, CodecName(fi.CodecID), fi.Flags, fi.CamIndex, fi.OnlineNum)
fmt.Printf(" [5]=%d [6]=%d [7]=%d [8-11]ts=%d\n",
fi.FPS, fi.ResTier, fi.Bitrate, fi.Timestamp)
fmt.Printf(" [12-15]=0x%x [16-19]payload=%d [20-23]frameNo=%d\n",
fi.SessionID, fi.PayloadSize, fi.FrameNo)
fmt.Printf(" rtp_ts=%d accum_us=%d\n", rtpTS, accumUS)
fmt.Printf(" hex: %s\n", dumpHex(fi))
} }
h.queue(pkt) h.queue(pkt)
@@ -438,14 +516,8 @@ func (h *FrameHandler) handleAudio(payload []byte, fi *FrameInfo) {
channels = fi.Channels() channels = fi.Channels()
} }
// Calculate RTP timestamp using relative timestamps (shared baseTS for A/V sync) accumUS := h.updateTimestamp(fi.Timestamp)
absoluteTS := uint64(fi.Timestamp)*1000000 + uint64(fi.TimestampUS) rtpTS := uint32(accumUS * uint64(sampleRate) / 1000000)
if h.baseTS == 0 {
h.baseTS = absoluteTS
}
relativeUS := absoluteTS - h.baseTS
clockRate := uint64(sampleRate)
rtpTS := uint32(relativeUS * clockRate / 1000000)
pkt := &Packet{ pkt := &Packet{
Channel: ChannelAudio, Channel: ChannelAudio,
@@ -458,8 +530,17 @@ func (h *FrameHandler) handleAudio(payload []byte, fi *FrameInfo) {
} }
if h.verbose { if h.verbose {
fmt.Printf("[AUDIO] #%d %s size=%d rate=%d ch=%d rtp=%d\n", bits := 8
fi.FrameNo, AudioCodecName(fi.CodecID), len(payload), sampleRate, channels, rtpTS) if fi.Flags&0x02 != 0 {
bits = 16
}
fmt.Printf("[OK] Audio #%d %s size=%d\n",
fi.FrameNo, AudioCodecName(fi.CodecID), len(payload))
fmt.Printf(" [0-1]codec=0x%x(%s) [2]flags=0x%x(%dHz/%dbit/%dch)\n",
fi.CodecID, AudioCodecName(fi.CodecID), fi.Flags, sampleRate, bits, channels)
fmt.Printf(" [8-11]ts=%d [12-15]=0x%x rtp_ts=%d\n",
fi.Timestamp, fi.SessionID, rtpTS)
fmt.Printf(" hex: %s\n", dumpHex(fi))
} }
h.queue(pkt) h.queue(pkt)
@@ -477,3 +558,29 @@ func (h *FrameHandler) queue(pkt *Packet) {
h.output <- pkt h.output <- pkt
} }
} }
func dumpHex(fi *FrameInfo) string {
b := make([]byte, FrameInfoSize)
binary.LittleEndian.PutUint16(b[0:], fi.CodecID)
b[2] = fi.Flags
b[3] = fi.CamIndex
b[4] = fi.OnlineNum
b[5] = fi.FPS
b[6] = fi.ResTier
b[7] = fi.Bitrate
binary.LittleEndian.PutUint32(b[8:], fi.Timestamp)
binary.LittleEndian.PutUint32(b[12:], fi.SessionID)
binary.LittleEndian.PutUint32(b[16:], fi.PayloadSize)
binary.LittleEndian.PutUint32(b[20:], fi.FrameNo)
// Bytes 24-39 are DeviceID and Padding (not stored in struct)
hexStr := hex.EncodeToString(b)
formatted := ""
for i := 0; i < len(hexStr); i += 2 {
if i > 0 {
formatted += " "
}
formatted += hexStr[i : i+2]
}
return formatted
}