diff --git a/pkg/mpegts/README.md b/pkg/mpegts/README.md new file mode 100644 index 00000000..1b00017a --- /dev/null +++ b/pkg/mpegts/README.md @@ -0,0 +1,5 @@ +## Useful links + +- https://github.com/theREDspace/video-onboarding/blob/main/MPEGTS%20Knowledge.md +- https://en.wikipedia.org/wiki/MPEG_transport_stream +- https://en.wikipedia.org/wiki/Program-specific_information diff --git a/pkg/mpegts/helpers.go b/pkg/mpegts/helpers.go new file mode 100644 index 00000000..fbe232f7 --- /dev/null +++ b/pkg/mpegts/helpers.go @@ -0,0 +1,59 @@ +package mpegts + +import ( + "time" +) + +const ( + PacketSize = 188 + SyncByte = 0x47 +) + +const ( + StreamTypeH264 = 0x1B + StreamTypePCMA = 0x90 +) + +type Packet struct { + StreamType byte + PTS time.Duration + DTS time.Duration + Payload []byte +} + +// PES - Packetized Elementary Stream +type PES struct { + StreamType byte + StreamID byte + Payload []byte +} + +func (p *PES) Packet() *Packet { + // parse Optional PES header + const minHeaderSize = 3 + + pkt := &Packet{StreamType: p.StreamType} + + // fist byte also flags + flags := p.Payload[1] + hSize := p.Payload[2] // optional fields + + const hasPTS = 0b1000_0000 + if flags&hasPTS != 0 { + pkt.PTS = ParseTime(p.Payload[minHeaderSize:]) + + const hasDTS = 0b0100_0000 + if flags&hasDTS != 0 { + pkt.DTS = ParseTime(p.Payload[minHeaderSize+5:]) + } + } + + pkt.Payload = p.Payload[minHeaderSize+hSize:] + + return pkt +} + +func ParseTime(b []byte) time.Duration { + ts := (uint64(b[0]) >> 1 & 0x7 << 30) | (uint64(b[1]) << 22) | (uint64(b[2]) >> 1 & 0x7F << 15) | (uint64(b[3]) << 7) | (uint64(b[4]) >> 1 & 0x7F) + return time.Duration(ts) +} diff --git a/pkg/mpegts/reader.go b/pkg/mpegts/reader.go new file mode 100644 index 00000000..6e3f7fc4 --- /dev/null +++ b/pkg/mpegts/reader.go @@ -0,0 +1,211 @@ +package mpegts + +type Reader struct { + b []byte // packets buffer + i byte // read position + + pmt uint16 // Program Map Table (PMT) PID + pes map[uint16]*PES +} + +func NewReader() *Reader { + return &Reader{} +} + +func (r *Reader) SetBuffer(b []byte) { + r.b = b + r.i = 0 +} + +func (r *Reader) AppendBuffer(b []byte) { + r.b = append(r.b, b...) +} + +func (r *Reader) GetPacket() *Packet { + for r.Sync() { + r.Skip(1) // Sync byte + + pid := r.ReadUint16() & 0x1FFF // PID + flag := r.ReadByte() // flags... + + const hasAdaptionField = 0x20 + if flag&hasAdaptionField != 0 { + adSize := r.ReadByte() // Adaptation field length + if adSize > PacketSize-6 { + println("WARNING: mpegts: wrong adaptation size") + continue + } + r.Skip(adSize) + } + + // PAT: Program Association Table + const PAT = 0 + if pid == PAT { + // already processed + if r.pmt != 0 { + continue + } + + if size := r.ReadPSIHeader(); size <= 4 { + println("WARNING: mpegts: wrong PAT") + continue + } + + const CRCSize = 4 + for r.Left() > CRCSize { + pNum := r.ReadUint16() + pPID := r.ReadUint16() & 0x1FFF + if pNum != 0 { + r.pmt = pPID + } + } + + r.Skip(4) // CRC32 + continue + } + + // PMT : Program Map Table + if pid == r.pmt { + // already processed + if r.pes != nil { + continue + } + + if size := r.ReadPSIHeader(); size == 0 { + println("WARNING: mpegts: wrong PMT") + continue + } + + pesPID := r.ReadUint16() & 0x1FFF + pSize := r.ReadUint16() & 0x03FF + r.Skip(byte(pSize)) + + r.pes = map[uint16]*PES{} + + const minItemSize = 5 + for r.Left() > minItemSize { + streamType := r.ReadByte() + pesPID = r.ReadUint16() & 0x1FFF + iSize := r.ReadUint16() & 0x03FF + r.Skip(byte(iSize)) + + r.pes[pesPID] = &PES{StreamType: streamType} + } + continue + } + + if r.pes == nil { + continue + } + + pes := r.pes[pid] + if pes == nil { + continue // unknown PID + } + + if pes.Payload != nil { + // how many bytes left to collect + left := cap(pes.Payload) - len(pes.Payload) - int(r.Left()) + + // buffer overflow + if left < 0 { + println("WARNING: mpegts: buffer overflow") + pes.Payload = nil + continue + } + + pes.Payload = append(pes.Payload, r.Bytes()...) + + if left == 0 { + pkt := pes.Packet() + pes.Payload = nil + return pkt + } + + continue + } + + // PES Packet start code prefix + if r.ReadByte() != 0 || r.ReadByte() != 0 || r.ReadByte() != 1 { + continue + } + + // read stream ID and total payload size + pes.StreamID = r.ReadByte() + pes.Payload = make([]byte, 0, r.ReadUint16()) + pes.Payload = append(pes.Payload, r.Bytes()...) + } + + return nil +} + +// Sync - search sync byte +func (r *Reader) Sync() bool { + // drop previous readed packet + if r.i != 0 { + r.b = r.b[PacketSize:] + r.i = 0 + } + + // if packet available + if len(r.b) < PacketSize { + return false + } + + // if data starts from sync byte + if r.b[0] == SyncByte { + return true + } + + for len(r.b) >= PacketSize { + if r.b[0] == SyncByte { + return true + } + r.b = r.b[1:] + } + + return false +} + +func (r *Reader) ReadPSIHeader() uint16 { + pointer := r.ReadByte() // Pointer field + r.Skip(pointer) // Pointer filler bytes + + r.Skip(1) // Table ID + size := r.ReadUint16() & 0x03FF // Section length + + if uint16(r.i)+size != uint16(PacketSize) { + return 0 + } + + r.Skip(2) // Table ID extension + r.Skip(1) // flags... + r.Skip(1) // Section number + r.Skip(1) // Last section number + + return size - 5 +} + +func (r *Reader) Skip(i byte) { + r.i += i +} + +func (r *Reader) ReadByte() byte { + b := r.b[r.i] + r.i++ + return b +} + +func (r *Reader) ReadUint16() uint16 { + i := (uint16(r.b[r.i]) << 8) | uint16(r.b[r.i+1]) + r.i += 2 + return i +} + +func (r *Reader) Bytes() []byte { + return r.b[r.i:PacketSize] +} + +func (r *Reader) Left() byte { + return PacketSize - r.i +}