From b3def6cfa27b229f5213d48f86ed10663df50247 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Thu, 17 Aug 2023 05:41:27 +0300 Subject: [PATCH] Rewrite support MPEG-TS client --- internal/mpegts/mpegts.go | 10 +- pkg/aac/aac.go | 9 +- pkg/aac/adts.go | 61 ++++++ pkg/aac/rtp.go | 28 ++- pkg/bits/writer.go | 53 +++++ pkg/magic/magic.go | 2 +- pkg/mpegts/checksum.go | 6 +- pkg/mpegts/client.go | 127 +++++++---- pkg/mpegts/helpers.go | 197 +++++------------ pkg/mpegts/producer.go | 10 +- pkg/mpegts/reader.go | 433 +++++++++++++++++++++++--------------- 11 files changed, 569 insertions(+), 367 deletions(-) create mode 100644 pkg/aac/adts.go create mode 100644 pkg/bits/writer.go diff --git a/internal/mpegts/mpegts.go b/internal/mpegts/mpegts.go index fad0f11e..c359d204 100644 --- a/internal/mpegts/mpegts.go +++ b/internal/mpegts/mpegts.go @@ -1,10 +1,11 @@ package mpegts import ( + "net/http" + "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/mpegts" - "net/http" ) func Init() { @@ -25,16 +26,15 @@ func apiHandle(w http.ResponseWriter, r *http.Request) { } res := &http.Response{Body: r.Body, Request: r} - client := mpegts.NewClient(res) - - if err := client.Handle(); err != nil { + client, err := mpegts.Open(res.Body) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } stream.AddProducer(client) - if err := client.Handle(); err != nil { + if err = client.Start(); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go index b4aebeac..7fc143c1 100644 --- a/pkg/aac/aac.go +++ b/pkg/aac/aac.go @@ -17,7 +17,10 @@ const ( // streamtype=5 - audio stream const fmtp = "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=" -var sampleRates = []uint32{96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350} +var sampleRates = []uint32{ + 96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350, + 0, 0, 0, // protection from request sampleRates[15] +} func ConfigToCodec(conf []byte) *core.Codec { // https://en.wikipedia.org/wiki/MPEG-4_Part_3#MPEG-4_Audio_Object_Types @@ -40,9 +43,9 @@ func ConfigToCodec(conf []byte) *core.Codec { codec.Name = fmt.Sprintf("AAC-%X", objType) } - if sampleRateIdx := rd.ReadBits8(4); sampleRateIdx < 12 { + if sampleRateIdx := rd.ReadBits8(4); sampleRateIdx < 0x0F { codec.ClockRate = sampleRates[sampleRateIdx] - } else if sampleRateIdx == 0x0F { + } else { codec.ClockRate = rd.ReadBits(24) } diff --git a/pkg/aac/adts.go b/pkg/aac/adts.go new file mode 100644 index 00000000..45d3167e --- /dev/null +++ b/pkg/aac/adts.go @@ -0,0 +1,61 @@ +package aac + +import ( + "encoding/hex" + + "github.com/AlexxIT/go2rtc/pkg/bits" + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func IsADTS(b []byte) bool { + _ = b[1] + return len(b) > 7 && b[0] == 0xFF && b[1]&0xF0 == 0xF0 +} + +func ADTSToCodec(b []byte) *core.Codec { + // 1. Check ADTS header + if !IsADTS(b) { + return nil + } + + // 2. Decode ADTS params + // https://wiki.multimedia.cx/index.php/ADTS + rd := bits.NewReader(b) + _ = rd.ReadBits(12) // Syncword, all bits must be set to 1 + _ = rd.ReadBit() // MPEG Version, set to 0 for MPEG-4 and 1 for MPEG-2 + _ = rd.ReadBits(2) // Layer, always set to 0 + _ = rd.ReadBit() // Protection absence, set to 1 if there is no CRC and 0 if there is CRC + objType := rd.ReadBits8(2) + 1 // Profile, the MPEG-4 Audio Object Type minus 1 + sampleRateIdx := rd.ReadBits8(4) // MPEG-4 Sampling Frequency Index + _ = rd.ReadBit() // Private bit, guaranteed never to be used by MPEG, set to 0 when encoding, ignore when decoding + channels := rd.ReadBits16(3) // MPEG-4 Channel Configuration + + //_ = rd.ReadBit() // Originality, set to 1 to signal originality of the audio and 0 otherwise + //_ = rd.ReadBit() // Home, set to 1 to signal home usage of the audio and 0 otherwise + //_ = rd.ReadBit() // Copyright ID bit + //_ = rd.ReadBit() // Copyright ID start + //_ = rd.ReadBits(13) // Frame length + //_ = rd.ReadBits(11) // Buffer fullness + //_ = rd.ReadBits(2) // Number of AAC frames (Raw Data Blocks) in ADTS frame minus 1 + //_ = rd.ReadBits(16) // CRC check + + // 3. Encode RTP config + wr := bits.NewWriter() + wr.WriteBits8(objType, 5) + wr.WriteBits8(sampleRateIdx, 4) + wr.WriteBits16(channels, 4) + conf := wr.Bytes() + + codec := &core.Codec{ + Name: core.CodecAAC, + ClockRate: sampleRates[sampleRateIdx], + Channels: channels, + FmtpLine: fmtp + hex.EncodeToString(conf), + } + return codec +} + +func ReadADTSSize(b []byte) uint16 { + // AAAAAAAA AAAABCCD EEFFFFGH HHIJKLMM MMMMMMMM MMMOOOOO OOOOOOPP (QQQQQQQQ QQQQQQQQ) + return uint16(b[3]&0x03)<<(8+3) | uint16(b[4])<<3 | uint16(b[5]>>5) +} diff --git a/pkg/aac/rtp.go b/pkg/aac/rtp.go index ec92ebad..50f713f6 100644 --- a/pkg/aac/rtp.go +++ b/pkg/aac/rtp.go @@ -2,11 +2,13 @@ package aac import ( "encoding/binary" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" ) const RTPPacketVersionAAC = 0 +const ADTSHeaderSize = 7 func RTPDepay(handler core.HandlerFunc) core.HandlerFunc { var timestamp uint32 @@ -14,6 +16,7 @@ func RTPDepay(handler core.HandlerFunc) core.HandlerFunc { return func(packet *rtp.Packet) { // support ONLY 2 bytes header size! // streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1408 + // https://datatracker.ietf.org/doc/html/rfc3640 headersSize := binary.BigEndian.Uint16(packet.Payload) >> 3 //log.Printf("[RTP/AAC] units: %d, size: %4d, ts: %10d, %t", headersSize/2, len(packet.Payload), packet.Timestamp, packet.Marker) @@ -35,7 +38,7 @@ func RTPDepay(handler core.HandlerFunc) core.HandlerFunc { clone.Version = RTPPacketVersionAAC clone.Timestamp = timestamp if IsADTS(unit) { - clone.Payload = unit[7:] + clone.Payload = unit[ADTSHeaderSize:] } else { clone.Payload = unit } @@ -54,11 +57,11 @@ func RTPPay(handler core.HandlerFunc) core.HandlerFunc { } // support ONLY one unit in payload - size := uint16(len(packet.Payload)) + auSize := uint16(len(packet.Payload)) // 2 bytes header size + 2 bytes first payload size - payload := make([]byte, 2+2+size) + payload := make([]byte, 2+2+auSize) payload[1] = 16 // header size in bits - binary.BigEndian.PutUint16(payload[2:], size<<3) + binary.BigEndian.PutUint16(payload[2:], auSize<<3) copy(payload[4:], packet.Payload) clone := rtp.Packet{ @@ -74,6 +77,19 @@ func RTPPay(handler core.HandlerFunc) core.HandlerFunc { } } -func IsADTS(b []byte) bool { - return len(b) > 7 && b[0] == 0xFF && b[1]&0xF0 == 0xF0 +func ADTStoRTP(b []byte) []byte { + header := make([]byte, 2) + for i := 0; i < len(b); { + auSize := ReadADTSSize(b[i:]) + header = append(header, byte(auSize>>5), byte(auSize<<3)) // size in bits + i += int(auSize) + } + hdrSize := uint16(len(header) - 2) + binary.BigEndian.PutUint16(header, hdrSize<<3) // size in bits + return append(header, b...) +} + +func RTPToCodec(b []byte) *core.Codec { + hdrSize := binary.BigEndian.Uint16(b) / 8 + return ADTSToCodec(b[2+hdrSize:]) } diff --git a/pkg/bits/writer.go b/pkg/bits/writer.go new file mode 100644 index 00000000..887da3ae --- /dev/null +++ b/pkg/bits/writer.go @@ -0,0 +1,53 @@ +package bits + +type Writer struct { + buf []byte // total buf + byte byte // current byte + bits byte // bits left in byte + len int // current len of buf +} + +func NewWriter() *Writer { + return &Writer{} +} + +func (w *Writer) WriteBit(b byte) { + if w.bits == 0 { + if w.len != 0 { + w.buf = append(w.buf, w.byte) + } + + w.byte = 0 + w.bits = 7 + w.len++ + } else { + w.bits-- + } + + w.byte |= b << w.bits +} + +func (w *Writer) WriteBits(v uint32, n byte) { + for i := n - 1; i != 255; i-- { + w.WriteBit(byte(v>>i) & 0b1) + } +} + +func (w *Writer) WriteBits16(v uint16, n byte) { + for i := n - 1; i != 255; i-- { + w.WriteBit(byte(v>>i) & 0b1) + } +} + +func (w *Writer) WriteBits8(v, n byte) { + for i := n - 1; i != 255; i-- { + w.WriteBit((v >> i) & 0b1) + } +} + +func (w *Writer) Bytes() []byte { + if w.bits == 0 { + return w.buf + } + return append(w.buf, w.byte) +} diff --git a/pkg/magic/magic.go b/pkg/magic/magic.go index 88b9eda0..4224171d 100644 --- a/pkg/magic/magic.go +++ b/pkg/magic/magic.go @@ -33,7 +33,7 @@ func Open(r io.Reader) (core.Producer, error) { return flv.Open(rd) case b[0] == mpegts.SyncByte: - break // TODO + return mpegts.Open(rd) } return nil, errors.New("magic: unsupported header: " + hex.EncodeToString(b)) diff --git a/pkg/mpegts/checksum.go b/pkg/mpegts/checksum.go index eafc8c6d..82f5357a 100644 --- a/pkg/mpegts/checksum.go +++ b/pkg/mpegts/checksum.go @@ -1,6 +1,6 @@ package mpegts -var ieeeCrc32Tbl = []uint32{ +var table = [256]uint32{ 0x00000000, 0xB71DC104, 0x6E3B8209, 0xD926430D, 0xDC760413, 0x6B6BC517, 0xB24D861A, 0x0550471E, 0xB8ED0826, 0x0FF0C922, 0xD6D68A2F, 0x61CB4B2B, 0x649B0C35, 0xD386CD31, 0x0AA08E3C, 0xBDBD4F38, 0x70DB114C, 0xC7C6D048, @@ -43,12 +43,12 @@ var ieeeCrc32Tbl = []uint32{ 0xAFF023EA, 0x18EDE2EE, 0x1DBDA5F0, 0xAAA064F4, 0x738627F9, 0xC49BE6FD, 0x09FDB889, 0xBEE0798D, 0x67C63A80, 0xD0DBFB84, 0xD58BBC9A, 0x62967D9E, 0xBBB03E93, 0x0CADFF97, 0xB110B0AF, 0x060D71AB, 0xDF2B32A6, 0x6836F3A2, - 0x6D66B4BC, 0xDA7B75B8, 0x035D36B5, 0xB440F7B1, 0x00000001, + 0x6D66B4BC, 0xDA7B75B8, 0x035D36B5, 0xB440F7B1, } func calcCRC32(crc uint32, data []byte) uint32 { for _, b := range data { - crc = ieeeCrc32Tbl[b^byte(crc)] ^ (crc >> 8) + crc = table[b^byte(crc)] ^ (crc >> 8) } return crc } diff --git a/pkg/mpegts/client.go b/pkg/mpegts/client.go index 16bc5420..520c69ca 100644 --- a/pkg/mpegts/client.go +++ b/pkg/mpegts/client.go @@ -1,79 +1,126 @@ package mpegts import ( + "bytes" + "io" + "time" + + "github.com/AlexxIT/go2rtc/pkg/aac" "github.com/AlexxIT/go2rtc/pkg/core" - "net/http" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/h265" ) type Client struct { - core.Listener + URL string + + rd *core.ReadSeeker medias []*core.Media receivers []*core.Receiver - res *http.Response - recv int } -func NewClient(res *http.Response) *Client { - return &Client{res: res} +func Open(rd io.Reader) (*Client, error) { + client := &Client{rd: core.NewReadSeeker(rd)} + if err := client.describe(); err != nil { + return nil, err + } + return client, nil } -func (c *Client) Handle() error { - reader := NewReader() +func (c *Client) describe() error { + c.rd.BufferSize = core.ProbeSize + defer c.rd.Rewind() - b := make([]byte, 1024*256) // 256K + rd := NewReader() - probe := core.NewProbe(c.medias == nil) - for probe == nil || probe.Active() { - n, err := c.res.Body.Read(b) + // Strategy: + // 1. Wait packet with metadata, init other packets for wait + // 2. Wait other packets + // 3. Stop after timeout + waitType := []byte{metadataType} + timeout := time.Now().Add(core.ProbeTimeout) + + for len(waitType) != 0 && time.Now().Before(timeout) { + pkt, err := rd.ReadPacket(c.rd) if err != nil { return err } - c.recv += n + // check if we wait this type + if i := bytes.IndexByte(waitType, pkt.PayloadType); i < 0 { + continue + } else { + waitType = append(waitType[:i], waitType[i+1:]...) + } - reader.AppendBuffer(b[:n]) - - reading: - for { - packet := reader.GetPacket() - if packet == nil { - break - } - - for _, receiver := range c.receivers { - if receiver.ID == packet.PayloadType { - receiver.WriteRTP(packet) - continue reading + switch pkt.PayloadType { + case metadataType: + for _, streamType := range pkt.Payload { + switch streamType { + case StreamTypeH264, StreamTypeH265, StreamTypeAAC: + waitType = append(waitType, streamType) } } - // count track on probe state even if not support it - probe.Append(packet.PayloadType) - - media := GetMedia(packet) - if media == nil { - continue // unsupported codec + case StreamTypeH264: + codec := h264.AVCCToCodec(pkt.Payload) + media := &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, } - c.medias = append(c.medias, media) - receiver := core.NewReceiver(media, media.Codecs[0]) - receiver.ID = packet.PayloadType - c.receivers = append(c.receivers, receiver) + case StreamTypeH265: + codec := h265.AVCCToCodec(pkt.Payload) + media := &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + } + c.medias = append(c.medias, media) - receiver.WriteRTP(packet) - - //log.Printf("[AVC] %v, len: %d, pts: %d ts: %10d", h264.Types(packet.Payload), len(packet.Payload), pkt.PTS, packet.Timestamp) + case StreamTypeAAC: + codec := aac.RTPToCodec(pkt.Payload) + media := &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + } + c.medias = append(c.medias, media) } } return nil } +func (c *Client) play() error { + rd := NewReader() + + for { + pkt, err := rd.ReadPacket(c.rd) + if err != nil { + return err + } + + //log.Printf("[mpegts] size: %6d, ts: %10d, pt: %2d", len(pkt.Payload), pkt.Timestamp, pkt.PayloadType) + + for _, receiver := range c.receivers { + if receiver.ID == pkt.PayloadType { + pkt.Timestamp = PTSToTimestamp(pkt.Timestamp, receiver.Codec.ClockRate) + receiver.WriteRTP(pkt) + break + } + } + } +} + func (c *Client) Close() error { - _ = c.res.Body.Close() + if closer, ok := c.rd.Reader.(io.Closer); ok { + return closer.Close() + } return nil } diff --git a/pkg/mpegts/helpers.go b/pkg/mpegts/helpers.go index 7beb7f8e..31c481f5 100644 --- a/pkg/mpegts/helpers.go +++ b/pkg/mpegts/helpers.go @@ -1,21 +1,20 @@ package mpegts import ( - "time" - + "github.com/AlexxIT/go2rtc/pkg/aac" "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h264/annexb" - "github.com/AlexxIT/go2rtc/pkg/h265" "github.com/pion/rtp" ) const ( PacketSize = 188 - SyncByte = 0x47 + SyncByte = 0x47 // Uppercase G ) +// https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types const ( + metadataType = 0 StreamTypePrivate = 0x06 // PCMU or PCMA or FLAC from FFmpeg StreamTypeAAC = 0x0F StreamTypeH264 = 0x1B @@ -23,58 +22,23 @@ const ( StreamTypePCMATapo = 0x90 ) -type Packet struct { - StreamType byte - PTS time.Duration - DTS time.Duration - Payload []byte -} - // PES - Packetized Elementary Stream type PES struct { StreamType byte StreamID byte Payload []byte - Mode byte Size int + PTS uint32 // PTS always 90000Hz - Sequence uint16 - Timestamp uint32 + Sequence uint16 decodeStream func([]byte) ([]byte, int) } -const ( - ModeUnknown = iota - ModeSize - ModeStream -) - -// parse Optional PES header -const minHeaderSize = 3 - func (p *PES) SetBuffer(size uint16, b []byte) { - if size == 0 { - optSize := b[2] // optional fields - b = b[minHeaderSize+optSize:] - - switch p.StreamType { - case StreamTypeH264: - p.Mode = ModeStream - p.decodeStream = h264.DecodeStream - case StreamTypeH265: - p.Mode = ModeStream - p.decodeStream = h265.DecodeStream - default: - println("WARNING: mpegts: unknown zero-size stream") - } - } else { - p.Mode = ModeSize - p.Size = int(size) - } - p.Payload = make([]byte, 0, size) p.Payload = append(p.Payload, b...) + p.Size = int(size) } func (p *PES) AppendBuffer(b []byte) { @@ -82,116 +46,67 @@ func (p *PES) AppendBuffer(b []byte) { } func (p *PES) GetPacket() (pkt *rtp.Packet) { - switch p.Mode { - case ModeSize: - left := p.Size - len(p.Payload) - if left > 0 { - return - } - - if left < 0 { - println("WARNING: mpegts: buffer overflow") - p.Payload = nil - return - } - - // fist byte also flags - flags := p.Payload[1] - optSize := p.Payload[2] // optional fields - - payload := p.Payload[minHeaderSize+optSize:] - - switch p.StreamType { - case StreamTypeH264, StreamTypeH265: - var ts uint32 - - const hasPTS = 0b1000_0000 - if flags&hasPTS != 0 { - ts = ParseTime(p.Payload[minHeaderSize:]) - } - - pkt = &rtp.Packet{ - Header: rtp.Header{ - PayloadType: p.StreamType, - Timestamp: ts, - }, - Payload: annexb.EncodeToAVCC(payload, false), - } - - case StreamTypePCMATapo: - p.Sequence++ - p.Timestamp += uint32(len(payload)) - - pkt = &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - PayloadType: p.StreamType, - SequenceNumber: p.Sequence, - Timestamp: p.Timestamp, - }, - Payload: payload, - } - } - - p.Payload = nil - - case ModeStream: - payload, i := p.decodeStream(p.Payload) - if payload == nil { - return - } - - //log.Printf("[AVC] %v, len: %d", h264.Types(payload), len(payload)) - - p.Payload = p.Payload[i:] - + switch p.StreamType { + case StreamTypeH264, StreamTypeH265: pkt = &rtp.Packet{ Header: rtp.Header{ PayloadType: p.StreamType, - Timestamp: core.Now90000(), + Timestamp: p.PTS, }, - Payload: payload, + Payload: annexb.EncodeToAVCC(p.Payload, false), } - default: - p.Payload = nil + case StreamTypeAAC: + p.Sequence++ + + pkt = &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: p.StreamType, + SequenceNumber: p.Sequence, + Timestamp: p.PTS, + }, + Payload: aac.ADTStoRTP(p.Payload), + } + + case StreamTypePCMATapo: + p.Sequence++ + p.PTS += uint32(len(p.Payload)) + + pkt = &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: p.StreamType, + SequenceNumber: p.Sequence, + Timestamp: p.PTS, + }, + Payload: p.Payload, + } } + p.Payload = nil + return } -func ParseTime(b []byte) uint32 { - return (uint32(b[0]&0x0E) << 29) | (uint32(b[1]) << 22) | (uint32(b[2]&0xFE) << 14) | (uint32(b[3]) << 7) | (uint32(b[4]) >> 1) +func StreamType(codec *core.Codec) uint8 { + switch codec.Name { + case core.CodecH264: + return StreamTypeH264 + case core.CodecH265: + return StreamTypeH265 + case core.CodecAAC: + return StreamTypeAAC + case core.CodecPCMA: + return StreamTypePCMATapo + } + return 0 } -func GetMedia(pkt *rtp.Packet) *core.Media { - var codec *core.Codec - var kind string - - switch pkt.PayloadType { - case StreamTypeH264: - codec = &core.Codec{ - Name: core.CodecH264, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - FmtpLine: h264.GetFmtpLine(pkt.Payload), - } - kind = core.KindVideo - - case StreamTypePCMATapo: - codec = &core.Codec{ - Name: core.CodecPCMA, - ClockRate: 8000, - } - kind = core.KindAudio - - default: - return nil - } - - return &core.Media{ - Kind: kind, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, +// PTSToTimestamp - convert PTS from 90000 to custom clock rate +func PTSToTimestamp(pts, clockRate uint32) uint32 { + if clockRate == 90000 { + return pts } + return uint32(uint64(pts) * uint64(clockRate) / 90000) } diff --git a/pkg/mpegts/producer.go b/pkg/mpegts/producer.go index d75d7a21..2a7dc6c6 100644 --- a/pkg/mpegts/producer.go +++ b/pkg/mpegts/producer.go @@ -2,6 +2,7 @@ package mpegts import ( "encoding/json" + "github.com/AlexxIT/go2rtc/pkg/core" ) @@ -15,11 +16,14 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, return track, nil } } - return nil, core.ErrCantGetTrack + track := core.NewReceiver(media, codec) + track.ID = StreamType(codec) + c.receivers = append(c.receivers, track) + return track, nil } func (c *Client) Start() error { - return c.Handle() + return c.play() } func (c *Client) Stop() error { @@ -32,7 +36,7 @@ func (c *Client) Stop() error { func (c *Client) MarshalJSON() ([]byte, error) { info := &core.Info{ Type: "MPEG-TS active producer", - URL: c.res.Request.URL.String(), + URL: c.URL, Medias: c.medias, Receivers: c.receivers, Recv: c.recv, diff --git a/pkg/mpegts/reader.go b/pkg/mpegts/reader.go index c38b35dd..b3af17d1 100644 --- a/pkg/mpegts/reader.go +++ b/pkg/mpegts/reader.go @@ -1,209 +1,312 @@ package mpegts -import "github.com/pion/rtp" +import ( + "errors" + "io" + + "github.com/pion/rtp" +) type Reader struct { - b []byte // packets buffer - i byte // read position - s byte // end position + buf [PacketSize]byte // total buf - pmt uint16 // Program Map Table (PMT) PID - pes map[uint16]*PES + byte byte // current byte + bits byte // bits left in byte + pos byte // current pos in buf + end byte // end position + + pmtID uint16 // Program Map Table (PMT) PID + pes map[uint16]*PES } func NewReader() *Reader { return &Reader{} } -func (r *Reader) SetBuffer(b []byte) { - r.b = b - r.i = 0 - r.s = PacketSize -} +const skipRead = 0xFF -func (r *Reader) AppendBuffer(b []byte) { - r.b = append(r.b, b...) -} - -func (r *Reader) GetPacket() *rtp.Packet { - for r.Sync() { - r.Skip(1) // Sync byte - - pid := r.ReadUint16() & 0x1FFF // PID - flag := r.ReadByte() // flags... - - const pidNullPacket = 0x1FFF - if pid == pidNullPacket { - continue +func (r *Reader) ReadPacket(rd io.Reader) (*rtp.Packet, error) { + for { + if r.pos != skipRead { + if _, err := io.ReadFull(rd, r.buf[:]); err != nil { + return nil, err + } } - const hasAdaptionField = 0b0010_0000 - if flag&hasAdaptionField != 0 { - adSize := r.ReadByte() // Adaptation field length - if adSize > PacketSize-6 { - println("WARNING: mpegts: wrong adaptation size") - continue - } - r.Skip(adSize) - } - - // PAT: Program Association Table - const pidPAT = 0 - if pid == pidPAT { - // already processed - if r.pmt != 0 { - continue - } - - r.ReadPSIHeader() - - const CRCSize = 4 - for r.Left() > CRCSize { - pNum := r.ReadUint16() - pPID := r.ReadUint16() & 0x1FFF - if pNum != 0 { - r.pmt = pPID - } - } - - r.Skip(4) // CRC32 - continue - } - - // PMT : Program Map Table - if pid == r.pmt { - // already processed - if r.pes != nil { - continue - } - - r.ReadPSIHeader() - - pesPID := r.ReadUint16() & 0x1FFF // ? PCR PID - pSize := r.ReadUint16() & 0x03FF // ? 0x0FFF - r.Skip(byte(pSize)) - - r.pes = map[uint16]*PES{} - - const CRCSize = 4 - for r.Left() > CRCSize { - streamType := r.ReadByte() - pesPID = r.ReadUint16() & 0x1FFF // Elementary PID - iSize := r.ReadUint16() & 0x03FF // ? 0x0FFF - r.Skip(byte(iSize)) - - r.pes[pesPID] = &PES{StreamType: streamType} - } - - r.Skip(4) // ? CRC32 - continue + pid, start, err := r.readPacketHeader() + if err != nil { + return nil, err } if r.pes == nil { + switch pid { + case 0: // PAT ID + r.readPAT() // PAT: Program Association Table + case r.pmtID: + r.readPMT() // PMT : Program Map Table + + pkt := &rtp.Packet{ + Payload: make([]byte, 0, len(r.pes)), + } + for _, pes := range r.pes { + pkt.Payload = append(pkt.Payload, pes.StreamType) + } + return pkt, nil + } continue } - pes := r.pes[pid] - if pes == nil { - continue // unknown PID + if pkt := r.readPES(pid, start); pkt != nil { + return pkt, nil + } + } +} + +func (r *Reader) readPacketHeader() (pid uint16, start bool, err error) { + r.reset() + + sb := r.readByte() // Sync byte + if sb != SyncByte { + return 0, false, errors.New("mpegts: wrong sync byte") + } + + _ = r.readBit() // Transport error indicator (TEI) + pusi := r.readBit() // Payload unit start indicator (PUSI) + _ = r.readBit() // Transport priority + pid = r.readBits16(13) // PID + + _ = r.readBits(2) // Transport scrambling control (TSC) + af := r.readBit() // Adaptation field + _ = r.readBit() // Payload + _ = r.readBits(4) // Continuity counter + + if af != 0 { + adSize := r.readByte() // Adaptation field length + if adSize > PacketSize-6 { + return 0, false, errors.New("mpegts: wrong adaptation size") + } + r.skip(adSize) + } + + return pid, pusi != 0, nil +} + +func (r *Reader) skip(i byte) { + r.pos += i +} + +func (r *Reader) readPSIHeader() { + // https://en.wikipedia.org/wiki/Program-specific_information#Table_Sections + pointer := r.readByte() // Pointer field + r.skip(pointer) // Pointer filler bytes + + _ = r.readByte() // Table ID + _ = r.readBit() // Section syntax indicator + _ = r.readBit() // Private bit + _ = r.readBits(2) // Reserved bits + _ = r.readBits(2) // Section length unused bits + size := r.readBits(10) // Section length + r.setSize(byte(size)) + + _ = r.readBits(16) // Table ID extension + _ = r.readBits(2) // Reserved bits + _ = r.readBits(5) // Version number + _ = r.readBit() // Current/next indicator + _ = r.readByte() // Section number + _ = r.readByte() // Last section number +} + +// ReadPAT (Program Association Table) +func (r *Reader) readPAT() { + // https://en.wikipedia.org/wiki/Program-specific_information#PAT_(Program_Association_Table) + r.readPSIHeader() + + const CRCSize = 4 + for r.left() > CRCSize { + num := r.readBits(16) // Program num + _ = r.readBits(3) // Reserved bits + pid := r.readBits16(13) // Program map PID + if num != 0 { + r.pmtID = pid + } + } + + r.skip(4) // CRC32 +} + +// ReadPMT (Program map specific data) +func (r *Reader) readPMT() { + // https://en.wikipedia.org/wiki/Program-specific_information#PMT_(Program_map_specific_data) + r.readPSIHeader() + + _ = r.readBits(3) // Reserved bits + _ = r.readBits(13) // PCR PID + _ = r.readBits(4) // Reserved bits + _ = r.readBits(2) // Program info length unused bits + size := r.readBits(10) // Program info length + r.skip(byte(size)) + + r.pes = map[uint16]*PES{} + + const CRCSize = 4 + for r.left() > CRCSize { + streamType := r.readByte() // Stream type + _ = r.readBits(3) // Reserved bits + pid := r.readBits16(13) // Elementary PID + _ = r.readBits(4) // Reserved bits + _ = r.readBits(2) // ES Info length unused bits + size = r.readBits(10) // ES Info length + r.skip(byte(size)) + + r.pes[pid] = &PES{StreamType: streamType} + } + + r.skip(4) // CRC32 +} + +func (r *Reader) readPES(pid uint16, start bool) *rtp.Packet { + pes := r.pes[pid] + if pes == nil { + return nil + } + + // if new payload beging + if start { + if pes.Payload != nil { + r.pos = skipRead + return pes.GetPacket() // finish previous packet } - if pes.Payload == nil { - // PES Packet start code prefix - if r.ReadByte() != 0 || r.ReadByte() != 0 || r.ReadByte() != 1 { - continue - } - - // read stream ID and total payload size - pes.StreamID = r.ReadByte() - pes.SetBuffer(r.ReadUint16(), r.Bytes()) - } else { - pes.AppendBuffer(r.Bytes()) + // https://en.wikipedia.org/wiki/Packetized_elementary_stream + // Packet start code prefix + if r.readByte() != 0 || r.readByte() != 0 || r.readByte() != 1 { + return nil } - if pkt := pes.GetPacket(); pkt != nil { - return pkt + pes.StreamID = r.readByte() // Stream id + packetSize := r.readBits16(16) // PES Packet length + + _ = r.readBits(2) // Marker bits + _ = r.readBits(2) // Scrambling control + _ = r.readBit() // Priority + _ = r.readBit() // Data alignment indicator + _ = r.readBit() // Copyright + _ = r.readBit() // Original or Copy + + pts := r.readBit() // PTS indicator + _ = r.readBit() // DTS indicator + _ = r.readBit() // ESCR flag + _ = r.readBit() // ES rate flag + _ = r.readBit() // DSM trick mode flag + _ = r.readBit() // Additional copy info flag + _ = r.readBit() // CRC flag + _ = r.readBit() // extension flag + + headerSize := r.readByte() // PES header length + + //log.Printf("[mpegts] pes=%d size=%d header=%d", pes.StreamID, packetSize, headerSize) + + if packetSize != 0 { + packetSize -= uint16(3 + headerSize) } + + if pts != 0 { + pes.PTS = r.readTime() + headerSize -= 5 + } + + r.skip(headerSize) + + pes.SetBuffer(packetSize, r.bytes()) + } else { + pes.AppendBuffer(r.bytes()) + } + + if pes.Size != 0 && len(pes.Payload) >= pes.Size { + return pes.GetPacket() // finish current packet } return nil } -func (r *Reader) GetStreamTypes() []byte { - types := make([]byte, 0, len(r.pes)) - for _, pes := range r.pes { - types = append(types, pes.StreamType) - } - return types +func (r *Reader) reset() { + r.pos = 0 + r.end = PacketSize + r.bits = 0 } -// Sync - search sync byte -func (r *Reader) Sync() bool { - // drop previous readed packet - if r.i != 0 { - r.b = r.b[PacketSize:] - r.i = 0 - r.s = PacketSize +//goland:noinspection GoStandardMethods +func (r *Reader) readByte() byte { + if r.bits != 0 { + return byte(r.readBits(8)) } - // if packet available - if len(r.b) < PacketSize { - return false - } - - // if data starts from sync byte - if r.b[0] == SyncByte { - return true - } - - for len(r.b) >= PacketSize { - if r.b[0] == SyncByte { - return true - } - r.b = r.b[1:] - } - - return false -} - -func (r *Reader) ReadPSIHeader() { - pointer := r.ReadByte() // Pointer field - r.Skip(pointer) // Pointer filler bytes - - r.Skip(1) // Table ID - size := r.ReadUint16() & 0x03FF // Section length - r.SetSize(byte(size)) - - r.Skip(2) // Table ID extension - r.Skip(1) // flags... - r.Skip(1) // Section number - r.Skip(1) // Last section number -} - -func (r *Reader) Skip(i byte) { - r.i += i -} - -func (r *Reader) ReadByte() byte { - b := r.b[r.i] - r.i++ + b := r.buf[r.pos] + r.pos++ return b } -func (r *Reader) ReadUint16() uint16 { - i := (uint16(r.b[r.i]) << 8) | uint16(r.b[r.i+1]) - r.i += 2 - return i +func (r *Reader) readBit() byte { + if r.bits == 0 { + r.byte = r.readByte() + r.bits = 7 + } else { + r.bits-- + } + + return (r.byte >> r.bits) & 0b1 } -func (r *Reader) Bytes() []byte { - return r.b[r.i:PacketSize] +func (r *Reader) readBits(n byte) (res uint32) { + for i := n - 1; i != 255; i-- { + res |= uint32(r.readBit()) << i + } + return } -func (r *Reader) Left() byte { - return r.s - r.i +func (r *Reader) readBits16(n byte) (res uint16) { + for i := n - 1; i != 255; i-- { + res |= uint16(r.readBit()) << i + } + return } -func (r *Reader) SetSize(size byte) { - r.s = r.i + size +func (r *Reader) readTime() uint32 { + // https://en.wikipedia.org/wiki/Packetized_elementary_stream + // xxxxAAAx BBBBBBBB BBBBBBBx CCCCCCCC CCCCCCCx + _ = r.readBits(4) // 0010b or 0011b or 0001b + ts := r.readBits(3) << 30 + _ = r.readBits(1) // 1b + ts |= r.readBits(15) << 15 + _ = r.readBits(1) // 1b + ts |= r.readBits(15) + _ = r.readBits(1) // 1b + return ts +} + +func (r *Reader) bytes() []byte { + return r.buf[r.pos:PacketSize] +} + +func (r *Reader) left() byte { + return r.end - r.pos +} + +func (r *Reader) setSize(size byte) { + r.end = r.pos + size +} + +// Deprecated: +func (r *Reader) SetBuffer(b []byte) { + +} + +// Deprecated: +func (r *Reader) GetPacket() *rtp.Packet { + panic("") +} + +// Deprecated: +func (r *Reader) AppendBuffer(sniff []byte) { + }