Rewrite support MPEG-TS client

This commit is contained in:
Alexey Khit
2023-08-17 05:41:27 +03:00
parent 4a82eb3503
commit b3def6cfa2
11 changed files with 569 additions and 367 deletions
+5 -5
View File
@@ -1,10 +1,11 @@
package mpegts
import (
"net/http"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"net/http"
)
func Init() {
@@ -25,16 +26,15 @@ func apiHandle(w http.ResponseWriter, r *http.Request) {
}
res := &http.Response{Body: r.Body, Request: r}
client := mpegts.NewClient(res)
if err := client.Handle(); err != nil {
client, err := mpegts.Open(res.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
stream.AddProducer(client)
if err := client.Handle(); err != nil {
if err = client.Start(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+6 -3
View File
@@ -17,7 +17,10 @@ const (
// streamtype=5 - audio stream
const fmtp = "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config="
var sampleRates = []uint32{96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350}
var sampleRates = []uint32{
96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350,
0, 0, 0, // protection from request sampleRates[15]
}
func ConfigToCodec(conf []byte) *core.Codec {
// https://en.wikipedia.org/wiki/MPEG-4_Part_3#MPEG-4_Audio_Object_Types
@@ -40,9 +43,9 @@ func ConfigToCodec(conf []byte) *core.Codec {
codec.Name = fmt.Sprintf("AAC-%X", objType)
}
if sampleRateIdx := rd.ReadBits8(4); sampleRateIdx < 12 {
if sampleRateIdx := rd.ReadBits8(4); sampleRateIdx < 0x0F {
codec.ClockRate = sampleRates[sampleRateIdx]
} else if sampleRateIdx == 0x0F {
} else {
codec.ClockRate = rd.ReadBits(24)
}
+61
View File
@@ -0,0 +1,61 @@
package aac
import (
"encoding/hex"
"github.com/AlexxIT/go2rtc/pkg/bits"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func IsADTS(b []byte) bool {
_ = b[1]
return len(b) > 7 && b[0] == 0xFF && b[1]&0xF0 == 0xF0
}
func ADTSToCodec(b []byte) *core.Codec {
// 1. Check ADTS header
if !IsADTS(b) {
return nil
}
// 2. Decode ADTS params
// https://wiki.multimedia.cx/index.php/ADTS
rd := bits.NewReader(b)
_ = rd.ReadBits(12) // Syncword, all bits must be set to 1
_ = rd.ReadBit() // MPEG Version, set to 0 for MPEG-4 and 1 for MPEG-2
_ = rd.ReadBits(2) // Layer, always set to 0
_ = rd.ReadBit() // Protection absence, set to 1 if there is no CRC and 0 if there is CRC
objType := rd.ReadBits8(2) + 1 // Profile, the MPEG-4 Audio Object Type minus 1
sampleRateIdx := rd.ReadBits8(4) // MPEG-4 Sampling Frequency Index
_ = rd.ReadBit() // Private bit, guaranteed never to be used by MPEG, set to 0 when encoding, ignore when decoding
channels := rd.ReadBits16(3) // MPEG-4 Channel Configuration
//_ = rd.ReadBit() // Originality, set to 1 to signal originality of the audio and 0 otherwise
//_ = rd.ReadBit() // Home, set to 1 to signal home usage of the audio and 0 otherwise
//_ = rd.ReadBit() // Copyright ID bit
//_ = rd.ReadBit() // Copyright ID start
//_ = rd.ReadBits(13) // Frame length
//_ = rd.ReadBits(11) // Buffer fullness
//_ = rd.ReadBits(2) // Number of AAC frames (Raw Data Blocks) in ADTS frame minus 1
//_ = rd.ReadBits(16) // CRC check
// 3. Encode RTP config
wr := bits.NewWriter()
wr.WriteBits8(objType, 5)
wr.WriteBits8(sampleRateIdx, 4)
wr.WriteBits16(channels, 4)
conf := wr.Bytes()
codec := &core.Codec{
Name: core.CodecAAC,
ClockRate: sampleRates[sampleRateIdx],
Channels: channels,
FmtpLine: fmtp + hex.EncodeToString(conf),
}
return codec
}
func ReadADTSSize(b []byte) uint16 {
// AAAAAAAA AAAABCCD EEFFFFGH HHIJKLMM MMMMMMMM MMMOOOOO OOOOOOPP (QQQQQQQQ QQQQQQQQ)
return uint16(b[3]&0x03)<<(8+3) | uint16(b[4])<<3 | uint16(b[5]>>5)
}
+22 -6
View File
@@ -2,11 +2,13 @@ package aac
import (
"encoding/binary"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
const RTPPacketVersionAAC = 0
const ADTSHeaderSize = 7
func RTPDepay(handler core.HandlerFunc) core.HandlerFunc {
var timestamp uint32
@@ -14,6 +16,7 @@ func RTPDepay(handler core.HandlerFunc) core.HandlerFunc {
return func(packet *rtp.Packet) {
// support ONLY 2 bytes header size!
// streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1408
// https://datatracker.ietf.org/doc/html/rfc3640
headersSize := binary.BigEndian.Uint16(packet.Payload) >> 3
//log.Printf("[RTP/AAC] units: %d, size: %4d, ts: %10d, %t", headersSize/2, len(packet.Payload), packet.Timestamp, packet.Marker)
@@ -35,7 +38,7 @@ func RTPDepay(handler core.HandlerFunc) core.HandlerFunc {
clone.Version = RTPPacketVersionAAC
clone.Timestamp = timestamp
if IsADTS(unit) {
clone.Payload = unit[7:]
clone.Payload = unit[ADTSHeaderSize:]
} else {
clone.Payload = unit
}
@@ -54,11 +57,11 @@ func RTPPay(handler core.HandlerFunc) core.HandlerFunc {
}
// support ONLY one unit in payload
size := uint16(len(packet.Payload))
auSize := uint16(len(packet.Payload))
// 2 bytes header size + 2 bytes first payload size
payload := make([]byte, 2+2+size)
payload := make([]byte, 2+2+auSize)
payload[1] = 16 // header size in bits
binary.BigEndian.PutUint16(payload[2:], size<<3)
binary.BigEndian.PutUint16(payload[2:], auSize<<3)
copy(payload[4:], packet.Payload)
clone := rtp.Packet{
@@ -74,6 +77,19 @@ func RTPPay(handler core.HandlerFunc) core.HandlerFunc {
}
}
func IsADTS(b []byte) bool {
return len(b) > 7 && b[0] == 0xFF && b[1]&0xF0 == 0xF0
func ADTStoRTP(b []byte) []byte {
header := make([]byte, 2)
for i := 0; i < len(b); {
auSize := ReadADTSSize(b[i:])
header = append(header, byte(auSize>>5), byte(auSize<<3)) // size in bits
i += int(auSize)
}
hdrSize := uint16(len(header) - 2)
binary.BigEndian.PutUint16(header, hdrSize<<3) // size in bits
return append(header, b...)
}
func RTPToCodec(b []byte) *core.Codec {
hdrSize := binary.BigEndian.Uint16(b) / 8
return ADTSToCodec(b[2+hdrSize:])
}
+53
View File
@@ -0,0 +1,53 @@
package bits
type Writer struct {
buf []byte // total buf
byte byte // current byte
bits byte // bits left in byte
len int // current len of buf
}
func NewWriter() *Writer {
return &Writer{}
}
func (w *Writer) WriteBit(b byte) {
if w.bits == 0 {
if w.len != 0 {
w.buf = append(w.buf, w.byte)
}
w.byte = 0
w.bits = 7
w.len++
} else {
w.bits--
}
w.byte |= b << w.bits
}
func (w *Writer) WriteBits(v uint32, n byte) {
for i := n - 1; i != 255; i-- {
w.WriteBit(byte(v>>i) & 0b1)
}
}
func (w *Writer) WriteBits16(v uint16, n byte) {
for i := n - 1; i != 255; i-- {
w.WriteBit(byte(v>>i) & 0b1)
}
}
func (w *Writer) WriteBits8(v, n byte) {
for i := n - 1; i != 255; i-- {
w.WriteBit((v >> i) & 0b1)
}
}
func (w *Writer) Bytes() []byte {
if w.bits == 0 {
return w.buf
}
return append(w.buf, w.byte)
}
+1 -1
View File
@@ -33,7 +33,7 @@ func Open(r io.Reader) (core.Producer, error) {
return flv.Open(rd)
case b[0] == mpegts.SyncByte:
break // TODO
return mpegts.Open(rd)
}
return nil, errors.New("magic: unsupported header: " + hex.EncodeToString(b))
+3 -3
View File
@@ -1,6 +1,6 @@
package mpegts
var ieeeCrc32Tbl = []uint32{
var table = [256]uint32{
0x00000000, 0xB71DC104, 0x6E3B8209, 0xD926430D, 0xDC760413, 0x6B6BC517,
0xB24D861A, 0x0550471E, 0xB8ED0826, 0x0FF0C922, 0xD6D68A2F, 0x61CB4B2B,
0x649B0C35, 0xD386CD31, 0x0AA08E3C, 0xBDBD4F38, 0x70DB114C, 0xC7C6D048,
@@ -43,12 +43,12 @@ var ieeeCrc32Tbl = []uint32{
0xAFF023EA, 0x18EDE2EE, 0x1DBDA5F0, 0xAAA064F4, 0x738627F9, 0xC49BE6FD,
0x09FDB889, 0xBEE0798D, 0x67C63A80, 0xD0DBFB84, 0xD58BBC9A, 0x62967D9E,
0xBBB03E93, 0x0CADFF97, 0xB110B0AF, 0x060D71AB, 0xDF2B32A6, 0x6836F3A2,
0x6D66B4BC, 0xDA7B75B8, 0x035D36B5, 0xB440F7B1, 0x00000001,
0x6D66B4BC, 0xDA7B75B8, 0x035D36B5, 0xB440F7B1,
}
func calcCRC32(crc uint32, data []byte) uint32 {
for _, b := range data {
crc = ieeeCrc32Tbl[b^byte(crc)] ^ (crc >> 8)
crc = table[b^byte(crc)] ^ (crc >> 8)
}
return crc
}
+87 -40
View File
@@ -1,79 +1,126 @@
package mpegts
import (
"bytes"
"io"
"time"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"net/http"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h265"
)
type Client struct {
core.Listener
URL string
rd *core.ReadSeeker
medias []*core.Media
receivers []*core.Receiver
res *http.Response
recv int
}
func NewClient(res *http.Response) *Client {
return &Client{res: res}
func Open(rd io.Reader) (*Client, error) {
client := &Client{rd: core.NewReadSeeker(rd)}
if err := client.describe(); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) Handle() error {
reader := NewReader()
func (c *Client) describe() error {
c.rd.BufferSize = core.ProbeSize
defer c.rd.Rewind()
b := make([]byte, 1024*256) // 256K
rd := NewReader()
probe := core.NewProbe(c.medias == nil)
for probe == nil || probe.Active() {
n, err := c.res.Body.Read(b)
// Strategy:
// 1. Wait packet with metadata, init other packets for wait
// 2. Wait other packets
// 3. Stop after timeout
waitType := []byte{metadataType}
timeout := time.Now().Add(core.ProbeTimeout)
for len(waitType) != 0 && time.Now().Before(timeout) {
pkt, err := rd.ReadPacket(c.rd)
if err != nil {
return err
}
c.recv += n
// check if we wait this type
if i := bytes.IndexByte(waitType, pkt.PayloadType); i < 0 {
continue
} else {
waitType = append(waitType[:i], waitType[i+1:]...)
}
reader.AppendBuffer(b[:n])
reading:
for {
packet := reader.GetPacket()
if packet == nil {
break
}
for _, receiver := range c.receivers {
if receiver.ID == packet.PayloadType {
receiver.WriteRTP(packet)
continue reading
switch pkt.PayloadType {
case metadataType:
for _, streamType := range pkt.Payload {
switch streamType {
case StreamTypeH264, StreamTypeH265, StreamTypeAAC:
waitType = append(waitType, streamType)
}
}
// count track on probe state even if not support it
probe.Append(packet.PayloadType)
media := GetMedia(packet)
if media == nil {
continue // unsupported codec
case StreamTypeH264:
codec := h264.AVCCToCodec(pkt.Payload)
media := &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
receiver := core.NewReceiver(media, media.Codecs[0])
receiver.ID = packet.PayloadType
c.receivers = append(c.receivers, receiver)
case StreamTypeH265:
codec := h265.AVCCToCodec(pkt.Payload)
media := &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
receiver.WriteRTP(packet)
//log.Printf("[AVC] %v, len: %d, pts: %d ts: %10d", h264.Types(packet.Payload), len(packet.Payload), pkt.PTS, packet.Timestamp)
case StreamTypeAAC:
codec := aac.RTPToCodec(pkt.Payload)
media := &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
c.medias = append(c.medias, media)
}
}
return nil
}
func (c *Client) play() error {
rd := NewReader()
for {
pkt, err := rd.ReadPacket(c.rd)
if err != nil {
return err
}
//log.Printf("[mpegts] size: %6d, ts: %10d, pt: %2d", len(pkt.Payload), pkt.Timestamp, pkt.PayloadType)
for _, receiver := range c.receivers {
if receiver.ID == pkt.PayloadType {
pkt.Timestamp = PTSToTimestamp(pkt.Timestamp, receiver.Codec.ClockRate)
receiver.WriteRTP(pkt)
break
}
}
}
}
func (c *Client) Close() error {
_ = c.res.Body.Close()
if closer, ok := c.rd.Reader.(io.Closer); ok {
return closer.Close()
}
return nil
}
+56 -141
View File
@@ -1,21 +1,20 @@
package mpegts
import (
"time"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/AlexxIT/go2rtc/pkg/h265"
"github.com/pion/rtp"
)
const (
PacketSize = 188
SyncByte = 0x47
SyncByte = 0x47 // Uppercase G
)
// https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types
const (
metadataType = 0
StreamTypePrivate = 0x06 // PCMU or PCMA or FLAC from FFmpeg
StreamTypeAAC = 0x0F
StreamTypeH264 = 0x1B
@@ -23,58 +22,23 @@ const (
StreamTypePCMATapo = 0x90
)
type Packet struct {
StreamType byte
PTS time.Duration
DTS time.Duration
Payload []byte
}
// PES - Packetized Elementary Stream
type PES struct {
StreamType byte
StreamID byte
Payload []byte
Mode byte
Size int
PTS uint32 // PTS always 90000Hz
Sequence uint16
Timestamp uint32
Sequence uint16
decodeStream func([]byte) ([]byte, int)
}
const (
ModeUnknown = iota
ModeSize
ModeStream
)
// parse Optional PES header
const minHeaderSize = 3
func (p *PES) SetBuffer(size uint16, b []byte) {
if size == 0 {
optSize := b[2] // optional fields
b = b[minHeaderSize+optSize:]
switch p.StreamType {
case StreamTypeH264:
p.Mode = ModeStream
p.decodeStream = h264.DecodeStream
case StreamTypeH265:
p.Mode = ModeStream
p.decodeStream = h265.DecodeStream
default:
println("WARNING: mpegts: unknown zero-size stream")
}
} else {
p.Mode = ModeSize
p.Size = int(size)
}
p.Payload = make([]byte, 0, size)
p.Payload = append(p.Payload, b...)
p.Size = int(size)
}
func (p *PES) AppendBuffer(b []byte) {
@@ -82,116 +46,67 @@ func (p *PES) AppendBuffer(b []byte) {
}
func (p *PES) GetPacket() (pkt *rtp.Packet) {
switch p.Mode {
case ModeSize:
left := p.Size - len(p.Payload)
if left > 0 {
return
}
if left < 0 {
println("WARNING: mpegts: buffer overflow")
p.Payload = nil
return
}
// fist byte also flags
flags := p.Payload[1]
optSize := p.Payload[2] // optional fields
payload := p.Payload[minHeaderSize+optSize:]
switch p.StreamType {
case StreamTypeH264, StreamTypeH265:
var ts uint32
const hasPTS = 0b1000_0000
if flags&hasPTS != 0 {
ts = ParseTime(p.Payload[minHeaderSize:])
}
pkt = &rtp.Packet{
Header: rtp.Header{
PayloadType: p.StreamType,
Timestamp: ts,
},
Payload: annexb.EncodeToAVCC(payload, false),
}
case StreamTypePCMATapo:
p.Sequence++
p.Timestamp += uint32(len(payload))
pkt = &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: p.StreamType,
SequenceNumber: p.Sequence,
Timestamp: p.Timestamp,
},
Payload: payload,
}
}
p.Payload = nil
case ModeStream:
payload, i := p.decodeStream(p.Payload)
if payload == nil {
return
}
//log.Printf("[AVC] %v, len: %d", h264.Types(payload), len(payload))
p.Payload = p.Payload[i:]
switch p.StreamType {
case StreamTypeH264, StreamTypeH265:
pkt = &rtp.Packet{
Header: rtp.Header{
PayloadType: p.StreamType,
Timestamp: core.Now90000(),
Timestamp: p.PTS,
},
Payload: payload,
Payload: annexb.EncodeToAVCC(p.Payload, false),
}
default:
p.Payload = nil
case StreamTypeAAC:
p.Sequence++
pkt = &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: p.StreamType,
SequenceNumber: p.Sequence,
Timestamp: p.PTS,
},
Payload: aac.ADTStoRTP(p.Payload),
}
case StreamTypePCMATapo:
p.Sequence++
p.PTS += uint32(len(p.Payload))
pkt = &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: p.StreamType,
SequenceNumber: p.Sequence,
Timestamp: p.PTS,
},
Payload: p.Payload,
}
}
p.Payload = nil
return
}
func ParseTime(b []byte) uint32 {
return (uint32(b[0]&0x0E) << 29) | (uint32(b[1]) << 22) | (uint32(b[2]&0xFE) << 14) | (uint32(b[3]) << 7) | (uint32(b[4]) >> 1)
func StreamType(codec *core.Codec) uint8 {
switch codec.Name {
case core.CodecH264:
return StreamTypeH264
case core.CodecH265:
return StreamTypeH265
case core.CodecAAC:
return StreamTypeAAC
case core.CodecPCMA:
return StreamTypePCMATapo
}
return 0
}
func GetMedia(pkt *rtp.Packet) *core.Media {
var codec *core.Codec
var kind string
switch pkt.PayloadType {
case StreamTypeH264:
codec = &core.Codec{
Name: core.CodecH264,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
FmtpLine: h264.GetFmtpLine(pkt.Payload),
}
kind = core.KindVideo
case StreamTypePCMATapo:
codec = &core.Codec{
Name: core.CodecPCMA,
ClockRate: 8000,
}
kind = core.KindAudio
default:
return nil
}
return &core.Media{
Kind: kind,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
// PTSToTimestamp - convert PTS from 90000 to custom clock rate
func PTSToTimestamp(pts, clockRate uint32) uint32 {
if clockRate == 90000 {
return pts
}
return uint32(uint64(pts) * uint64(clockRate) / 90000)
}
+7 -3
View File
@@ -2,6 +2,7 @@ package mpegts
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
)
@@ -15,11 +16,14 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver,
return track, nil
}
}
return nil, core.ErrCantGetTrack
track := core.NewReceiver(media, codec)
track.ID = StreamType(codec)
c.receivers = append(c.receivers, track)
return track, nil
}
func (c *Client) Start() error {
return c.Handle()
return c.play()
}
func (c *Client) Stop() error {
@@ -32,7 +36,7 @@ func (c *Client) Stop() error {
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "MPEG-TS active producer",
URL: c.res.Request.URL.String(),
URL: c.URL,
Medias: c.medias,
Receivers: c.receivers,
Recv: c.recv,
+268 -165
View File
@@ -1,209 +1,312 @@
package mpegts
import "github.com/pion/rtp"
import (
"errors"
"io"
"github.com/pion/rtp"
)
type Reader struct {
b []byte // packets buffer
i byte // read position
s byte // end position
buf [PacketSize]byte // total buf
pmt uint16 // Program Map Table (PMT) PID
pes map[uint16]*PES
byte byte // current byte
bits byte // bits left in byte
pos byte // current pos in buf
end byte // end position
pmtID uint16 // Program Map Table (PMT) PID
pes map[uint16]*PES
}
func NewReader() *Reader {
return &Reader{}
}
func (r *Reader) SetBuffer(b []byte) {
r.b = b
r.i = 0
r.s = PacketSize
}
const skipRead = 0xFF
func (r *Reader) AppendBuffer(b []byte) {
r.b = append(r.b, b...)
}
func (r *Reader) GetPacket() *rtp.Packet {
for r.Sync() {
r.Skip(1) // Sync byte
pid := r.ReadUint16() & 0x1FFF // PID
flag := r.ReadByte() // flags...
const pidNullPacket = 0x1FFF
if pid == pidNullPacket {
continue
func (r *Reader) ReadPacket(rd io.Reader) (*rtp.Packet, error) {
for {
if r.pos != skipRead {
if _, err := io.ReadFull(rd, r.buf[:]); err != nil {
return nil, err
}
}
const hasAdaptionField = 0b0010_0000
if flag&hasAdaptionField != 0 {
adSize := r.ReadByte() // Adaptation field length
if adSize > PacketSize-6 {
println("WARNING: mpegts: wrong adaptation size")
continue
}
r.Skip(adSize)
}
// PAT: Program Association Table
const pidPAT = 0
if pid == pidPAT {
// already processed
if r.pmt != 0 {
continue
}
r.ReadPSIHeader()
const CRCSize = 4
for r.Left() > CRCSize {
pNum := r.ReadUint16()
pPID := r.ReadUint16() & 0x1FFF
if pNum != 0 {
r.pmt = pPID
}
}
r.Skip(4) // CRC32
continue
}
// PMT : Program Map Table
if pid == r.pmt {
// already processed
if r.pes != nil {
continue
}
r.ReadPSIHeader()
pesPID := r.ReadUint16() & 0x1FFF // ? PCR PID
pSize := r.ReadUint16() & 0x03FF // ? 0x0FFF
r.Skip(byte(pSize))
r.pes = map[uint16]*PES{}
const CRCSize = 4
for r.Left() > CRCSize {
streamType := r.ReadByte()
pesPID = r.ReadUint16() & 0x1FFF // Elementary PID
iSize := r.ReadUint16() & 0x03FF // ? 0x0FFF
r.Skip(byte(iSize))
r.pes[pesPID] = &PES{StreamType: streamType}
}
r.Skip(4) // ? CRC32
continue
pid, start, err := r.readPacketHeader()
if err != nil {
return nil, err
}
if r.pes == nil {
switch pid {
case 0: // PAT ID
r.readPAT() // PAT: Program Association Table
case r.pmtID:
r.readPMT() // PMT : Program Map Table
pkt := &rtp.Packet{
Payload: make([]byte, 0, len(r.pes)),
}
for _, pes := range r.pes {
pkt.Payload = append(pkt.Payload, pes.StreamType)
}
return pkt, nil
}
continue
}
pes := r.pes[pid]
if pes == nil {
continue // unknown PID
if pkt := r.readPES(pid, start); pkt != nil {
return pkt, nil
}
}
}
func (r *Reader) readPacketHeader() (pid uint16, start bool, err error) {
r.reset()
sb := r.readByte() // Sync byte
if sb != SyncByte {
return 0, false, errors.New("mpegts: wrong sync byte")
}
_ = r.readBit() // Transport error indicator (TEI)
pusi := r.readBit() // Payload unit start indicator (PUSI)
_ = r.readBit() // Transport priority
pid = r.readBits16(13) // PID
_ = r.readBits(2) // Transport scrambling control (TSC)
af := r.readBit() // Adaptation field
_ = r.readBit() // Payload
_ = r.readBits(4) // Continuity counter
if af != 0 {
adSize := r.readByte() // Adaptation field length
if adSize > PacketSize-6 {
return 0, false, errors.New("mpegts: wrong adaptation size")
}
r.skip(adSize)
}
return pid, pusi != 0, nil
}
func (r *Reader) skip(i byte) {
r.pos += i
}
func (r *Reader) readPSIHeader() {
// https://en.wikipedia.org/wiki/Program-specific_information#Table_Sections
pointer := r.readByte() // Pointer field
r.skip(pointer) // Pointer filler bytes
_ = r.readByte() // Table ID
_ = r.readBit() // Section syntax indicator
_ = r.readBit() // Private bit
_ = r.readBits(2) // Reserved bits
_ = r.readBits(2) // Section length unused bits
size := r.readBits(10) // Section length
r.setSize(byte(size))
_ = r.readBits(16) // Table ID extension
_ = r.readBits(2) // Reserved bits
_ = r.readBits(5) // Version number
_ = r.readBit() // Current/next indicator
_ = r.readByte() // Section number
_ = r.readByte() // Last section number
}
// ReadPAT (Program Association Table)
func (r *Reader) readPAT() {
// https://en.wikipedia.org/wiki/Program-specific_information#PAT_(Program_Association_Table)
r.readPSIHeader()
const CRCSize = 4
for r.left() > CRCSize {
num := r.readBits(16) // Program num
_ = r.readBits(3) // Reserved bits
pid := r.readBits16(13) // Program map PID
if num != 0 {
r.pmtID = pid
}
}
r.skip(4) // CRC32
}
// ReadPMT (Program map specific data)
func (r *Reader) readPMT() {
// https://en.wikipedia.org/wiki/Program-specific_information#PMT_(Program_map_specific_data)
r.readPSIHeader()
_ = r.readBits(3) // Reserved bits
_ = r.readBits(13) // PCR PID
_ = r.readBits(4) // Reserved bits
_ = r.readBits(2) // Program info length unused bits
size := r.readBits(10) // Program info length
r.skip(byte(size))
r.pes = map[uint16]*PES{}
const CRCSize = 4
for r.left() > CRCSize {
streamType := r.readByte() // Stream type
_ = r.readBits(3) // Reserved bits
pid := r.readBits16(13) // Elementary PID
_ = r.readBits(4) // Reserved bits
_ = r.readBits(2) // ES Info length unused bits
size = r.readBits(10) // ES Info length
r.skip(byte(size))
r.pes[pid] = &PES{StreamType: streamType}
}
r.skip(4) // CRC32
}
func (r *Reader) readPES(pid uint16, start bool) *rtp.Packet {
pes := r.pes[pid]
if pes == nil {
return nil
}
// if new payload beging
if start {
if pes.Payload != nil {
r.pos = skipRead
return pes.GetPacket() // finish previous packet
}
if pes.Payload == nil {
// PES Packet start code prefix
if r.ReadByte() != 0 || r.ReadByte() != 0 || r.ReadByte() != 1 {
continue
}
// read stream ID and total payload size
pes.StreamID = r.ReadByte()
pes.SetBuffer(r.ReadUint16(), r.Bytes())
} else {
pes.AppendBuffer(r.Bytes())
// https://en.wikipedia.org/wiki/Packetized_elementary_stream
// Packet start code prefix
if r.readByte() != 0 || r.readByte() != 0 || r.readByte() != 1 {
return nil
}
if pkt := pes.GetPacket(); pkt != nil {
return pkt
pes.StreamID = r.readByte() // Stream id
packetSize := r.readBits16(16) // PES Packet length
_ = r.readBits(2) // Marker bits
_ = r.readBits(2) // Scrambling control
_ = r.readBit() // Priority
_ = r.readBit() // Data alignment indicator
_ = r.readBit() // Copyright
_ = r.readBit() // Original or Copy
pts := r.readBit() // PTS indicator
_ = r.readBit() // DTS indicator
_ = r.readBit() // ESCR flag
_ = r.readBit() // ES rate flag
_ = r.readBit() // DSM trick mode flag
_ = r.readBit() // Additional copy info flag
_ = r.readBit() // CRC flag
_ = r.readBit() // extension flag
headerSize := r.readByte() // PES header length
//log.Printf("[mpegts] pes=%d size=%d header=%d", pes.StreamID, packetSize, headerSize)
if packetSize != 0 {
packetSize -= uint16(3 + headerSize)
}
if pts != 0 {
pes.PTS = r.readTime()
headerSize -= 5
}
r.skip(headerSize)
pes.SetBuffer(packetSize, r.bytes())
} else {
pes.AppendBuffer(r.bytes())
}
if pes.Size != 0 && len(pes.Payload) >= pes.Size {
return pes.GetPacket() // finish current packet
}
return nil
}
func (r *Reader) GetStreamTypes() []byte {
types := make([]byte, 0, len(r.pes))
for _, pes := range r.pes {
types = append(types, pes.StreamType)
}
return types
func (r *Reader) reset() {
r.pos = 0
r.end = PacketSize
r.bits = 0
}
// Sync - search sync byte
func (r *Reader) Sync() bool {
// drop previous readed packet
if r.i != 0 {
r.b = r.b[PacketSize:]
r.i = 0
r.s = PacketSize
//goland:noinspection GoStandardMethods
func (r *Reader) readByte() byte {
if r.bits != 0 {
return byte(r.readBits(8))
}
// if packet available
if len(r.b) < PacketSize {
return false
}
// if data starts from sync byte
if r.b[0] == SyncByte {
return true
}
for len(r.b) >= PacketSize {
if r.b[0] == SyncByte {
return true
}
r.b = r.b[1:]
}
return false
}
func (r *Reader) ReadPSIHeader() {
pointer := r.ReadByte() // Pointer field
r.Skip(pointer) // Pointer filler bytes
r.Skip(1) // Table ID
size := r.ReadUint16() & 0x03FF // Section length
r.SetSize(byte(size))
r.Skip(2) // Table ID extension
r.Skip(1) // flags...
r.Skip(1) // Section number
r.Skip(1) // Last section number
}
func (r *Reader) Skip(i byte) {
r.i += i
}
func (r *Reader) ReadByte() byte {
b := r.b[r.i]
r.i++
b := r.buf[r.pos]
r.pos++
return b
}
func (r *Reader) ReadUint16() uint16 {
i := (uint16(r.b[r.i]) << 8) | uint16(r.b[r.i+1])
r.i += 2
return i
func (r *Reader) readBit() byte {
if r.bits == 0 {
r.byte = r.readByte()
r.bits = 7
} else {
r.bits--
}
return (r.byte >> r.bits) & 0b1
}
func (r *Reader) Bytes() []byte {
return r.b[r.i:PacketSize]
func (r *Reader) readBits(n byte) (res uint32) {
for i := n - 1; i != 255; i-- {
res |= uint32(r.readBit()) << i
}
return
}
func (r *Reader) Left() byte {
return r.s - r.i
func (r *Reader) readBits16(n byte) (res uint16) {
for i := n - 1; i != 255; i-- {
res |= uint16(r.readBit()) << i
}
return
}
func (r *Reader) SetSize(size byte) {
r.s = r.i + size
func (r *Reader) readTime() uint32 {
// https://en.wikipedia.org/wiki/Packetized_elementary_stream
// xxxxAAAx BBBBBBBB BBBBBBBx CCCCCCCC CCCCCCCx
_ = r.readBits(4) // 0010b or 0011b or 0001b
ts := r.readBits(3) << 30
_ = r.readBits(1) // 1b
ts |= r.readBits(15) << 15
_ = r.readBits(1) // 1b
ts |= r.readBits(15)
_ = r.readBits(1) // 1b
return ts
}
func (r *Reader) bytes() []byte {
return r.buf[r.pos:PacketSize]
}
func (r *Reader) left() byte {
return r.end - r.pos
}
func (r *Reader) setSize(size byte) {
r.end = r.pos + size
}
// Deprecated:
func (r *Reader) SetBuffer(b []byte) {
}
// Deprecated:
func (r *Reader) GetPacket() *rtp.Packet {
panic("")
}
// Deprecated:
func (r *Reader) AppendBuffer(sniff []byte) {
}