Add basic support MPEG TS source

This commit is contained in:
Alexey Khit
2023-02-14 11:17:05 +03:00
parent 665545903c
commit 9fd783793e
8 changed files with 377 additions and 106 deletions
+9
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/rtmp"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/tcp"
@@ -41,6 +42,7 @@ func handle(url string) (streamer.Producer, error) {
switch ct {
case "image/jpeg", "multipart/x-mixed-replace":
return mjpeg.NewClient(res), nil
case "video/x-flv":
var conn *rtmp.Client
if conn, err = rtmp.Accept(res); err != nil {
@@ -50,6 +52,13 @@ func handle(url string) (streamer.Producer, error) {
return nil, err
}
return conn, nil
case "video/mpeg":
client := mpegts.NewClient(res)
if err = client.Handle(); err != nil {
return nil, err
}
return client, nil
}
return nil, fmt.Errorf("unsupported Content-Type: %s", ct)
+43
View File
@@ -0,0 +1,43 @@
package mpegts
import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"net/http"
)
func Init() {
api.HandleFunc("api/stream.ts", apiHandle)
}
func apiHandle(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "", http.StatusMethodNotAllowed)
return
}
dst := r.URL.Query().Get("dst")
stream := streams.Get(dst)
if stream == nil {
http.Error(w, api.StreamNotFound, http.StatusNotFound)
return
}
res := &http.Response{Body: r.Body, Request: r}
client := mpegts.NewClient(res)
if err := client.Handle(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
stream.AddProducer(client)
if err := client.Handle(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
stream.RemoveProducer(client)
}
+2
View File
@@ -15,6 +15,7 @@ import (
"github.com/AlexxIT/go2rtc/cmd/ivideon"
"github.com/AlexxIT/go2rtc/cmd/mjpeg"
"github.com/AlexxIT/go2rtc/cmd/mp4"
"github.com/AlexxIT/go2rtc/cmd/mpegts"
"github.com/AlexxIT/go2rtc/cmd/ngrok"
"github.com/AlexxIT/go2rtc/cmd/rtmp"
"github.com/AlexxIT/go2rtc/cmd/rtsp"
@@ -42,6 +43,7 @@ func main() {
http.Init()
dvrip.Init()
tapo.Init()
mpegts.Init()
srtp.Init()
homekit.Init()
+73
View File
@@ -0,0 +1,73 @@
package mpegts
import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"net/http"
)
type Client struct {
streamer.Element
medias []*streamer.Media
tracks map[byte]*streamer.Track
res *http.Response
}
func NewClient(res *http.Response) *Client {
return &Client{res: res}
}
func (c *Client) Handle() error {
if c.tracks == nil {
c.tracks = map[byte]*streamer.Track{}
}
reader := NewReader()
b := make([]byte, 1024*1024*256) // 256K
probe := streamer.NewProbe(c.medias == nil)
for probe == nil || probe.Active() {
n, err := c.res.Body.Read(b)
if err != nil {
return err
}
reader.AppendBuffer(b[:n])
for {
packet := reader.GetPacket()
if packet == nil {
break
}
track := c.tracks[packet.PayloadType]
if track == nil {
// count track on probe state even if not support it
probe.Append(packet.PayloadType)
media := GetMedia(packet)
if media == nil {
continue // unsupported codec
}
track = streamer.NewTrack2(media, nil)
c.medias = append(c.medias, media)
c.tracks[packet.PayloadType] = track
}
_ = track.WriteRTP(packet)
//log.Printf("[AVC] %v, len: %d, pts: %d ts: %10d", h264.Types(packet.Payload), len(packet.Payload), pkt.PTS, packet.Timestamp)
}
}
return nil
}
func (c *Client) Close() error {
_ = c.res.Body.Close()
return nil
}
+179 -21
View File
@@ -1,8 +1,10 @@
package mpegts
import (
"bytes"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp"
"time"
)
@@ -12,8 +14,9 @@ const (
)
const (
StreamTypeH264 = 0x1B
StreamTypePCMA = 0x90
StreamTypeAAC = 0x0F
StreamTypeH264 = 0x1B
StreamTypePCMATapo = 0x90
)
type Packet struct {
@@ -28,31 +31,128 @@ type PES struct {
StreamType byte
StreamID byte
Payload []byte
Mode byte
Size int
Sequence uint16
Timestamp uint32
}
func (p *PES) Packet() *Packet {
// parse Optional PES header
const minHeaderSize = 3
const (
ModeUnknown = iota
ModeSize
ModeStream
)
pkt := &Packet{StreamType: p.StreamType}
// parse Optional PES header
const minHeaderSize = 3
// fist byte also flags
flags := p.Payload[1]
hSize := p.Payload[2] // optional fields
func (p *PES) SetBuffer(size uint16, b []byte) {
if size == 0 {
optSize := b[2] // optional fields
b = b[minHeaderSize+optSize:]
const hasPTS = 0b1000_0000
if flags&hasPTS != 0 {
pkt.PTS = ParseTime(p.Payload[minHeaderSize:])
const hasDTS = 0b0100_0000
if flags&hasDTS != 0 {
pkt.DTS = ParseTime(p.Payload[minHeaderSize+5:])
if p.StreamType == StreamTypeH264 {
if bytes.HasPrefix(b, []byte{0, 0, 0, 1, h264.NALUTypeAUD}) {
p.Mode = ModeStream
b = b[5:]
}
}
if p.Mode == ModeUnknown {
println("WARNING: mpegts: unknown zero-size stream")
}
} else {
p.Mode = ModeSize
p.Size = int(size)
}
pkt.Payload = p.Payload[minHeaderSize+hSize:]
p.Payload = make([]byte, 0, size)
p.Payload = append(p.Payload, b...)
}
return pkt
func (p *PES) AppendBuffer(b []byte) {
p.Payload = append(p.Payload, b...)
}
func (p *PES) GetPacket() (pkt *rtp.Packet) {
switch p.Mode {
case ModeSize:
left := p.Size - len(p.Payload)
if left > 0 {
return
}
if left < 0 {
println("WARNING: mpegts: buffer overflow")
p.Payload = nil
return
}
// fist byte also flags
flags := p.Payload[1]
optSize := p.Payload[2] // optional fields
p.Payload = p.Payload[minHeaderSize+optSize:]
switch p.StreamType {
case StreamTypeH264:
var ts uint32
const hasPTS = 0b1000_0000
if flags&hasPTS != 0 {
ts = uint32(ParseTime(p.Payload[minHeaderSize:]))
}
pkt = &rtp.Packet{
Header: rtp.Header{
PayloadType: p.StreamType,
Timestamp: ts,
},
Payload: h264.AnnexB2AVC(p.Payload),
}
case StreamTypePCMATapo:
p.Sequence++
p.Timestamp += uint32(len(p.Payload))
pkt = &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: p.StreamType,
SequenceNumber: p.Sequence,
Timestamp: p.Timestamp,
},
Payload: p.Payload,
}
}
p.Payload = nil
case ModeStream:
i := bytes.Index(p.Payload, []byte{0, 0, 0, 1, h264.NALUTypeAUD})
if i < 0 {
return
}
if i2 := IndexFrom(p.Payload, []byte{0, 0, 1}, i); i2 < 0 && i2 > 9 {
return
}
pkt = &rtp.Packet{
Header: rtp.Header{
PayloadType: p.StreamType,
Timestamp: uint32(time.Duration(time.Now().UnixNano()) * 90000 / time.Second),
},
Payload: DecodeAnnex3B(p.Payload[:i]),
}
p.Payload = p.Payload[i+5:]
default:
p.Payload = nil
}
return
}
func ParseTime(b []byte) time.Duration {
@@ -60,11 +160,11 @@ func ParseTime(b []byte) time.Duration {
return time.Duration(ts)
}
func GetMedia(pkt *Packet) *streamer.Media {
func GetMedia(pkt *rtp.Packet) *streamer.Media {
var codec *streamer.Codec
var kind string
switch pkt.StreamType {
switch pkt.PayloadType {
case StreamTypeH264:
codec = &streamer.Codec{
Name: streamer.CodecH264,
@@ -74,7 +174,7 @@ func GetMedia(pkt *Packet) *streamer.Media {
}
kind = streamer.KindVideo
case StreamTypePCMA:
case StreamTypePCMATapo:
codec = &streamer.Codec{
Name: streamer.CodecPCMA,
ClockRate: 8000,
@@ -91,3 +191,61 @@ func GetMedia(pkt *Packet) *streamer.Media {
Codecs: []*streamer.Codec{codec},
}
}
func DecodeAnnex3B(annexb []byte) (avc []byte) {
// depends on AU delimeter size
i0 := bytes.Index(annexb, []byte{0, 0, 1})
if i0 < 0 || i0 > 9 {
return nil
}
annexb = annexb[i0+3:] // skip first separator
i0 = 0
for {
// search next separato
iN := IndexFrom(annexb, []byte{0, 0, 1}, i0)
if iN < 0 {
break
}
// move i0 to next AU
if i0 = iN + 3; i0 >= len(annexb) {
break
}
// check if AU type valid
octet := annexb[i0]
const forbiddenZeroBit = 0x80
if octet&forbiddenZeroBit == 0 {
const nalUnitType = 0x1F
switch octet & nalUnitType {
case h264.NALUTypePFrame, h264.NALUTypeIFrame, h264.NALUTypeSPS, h264.NALUTypePPS:
// add AU in AVC format
avc = append(avc, byte(iN>>24), byte(iN>>16), byte(iN>>8), byte(iN))
avc = append(avc, annexb[:iN]...)
// cut search to next AU start
annexb = annexb[i0:]
i0 = 0
}
}
}
size := len(annexb)
avc = append(avc, byte(size>>24), byte(size>>16), byte(size>>8), byte(size))
return append(avc, annexb...)
}
func IndexFrom(b []byte, sep []byte, from int) int {
if from > 0 {
if from < len(b) {
if i := bytes.Index(b[from:], sep); i >= 0 {
return from + i
}
}
return -1
}
return bytes.Index(b, sep)
}
+26
View File
@@ -0,0 +1,26 @@
package mpegts
import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
)
func (c *Client) GetMedias() []*streamer.Media {
return c.medias
}
func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
for _, track := range c.tracks {
if track.Codec == codec {
return track
}
}
return nil
}
func (c *Client) Start() error {
return c.Handle()
}
func (c *Client) Stop() error {
return c.Close()
}
+41 -51
View File
@@ -1,8 +1,11 @@
package mpegts
import "github.com/pion/rtp"
type Reader struct {
b []byte // packets buffer
i byte // read position
s byte // end position
pmt uint16 // Program Map Table (PMT) PID
pes map[uint16]*PES
@@ -15,20 +18,26 @@ func NewReader() *Reader {
func (r *Reader) SetBuffer(b []byte) {
r.b = b
r.i = 0
r.s = PacketSize
}
func (r *Reader) AppendBuffer(b []byte) {
r.b = append(r.b, b...)
}
func (r *Reader) GetPacket() *Packet {
func (r *Reader) GetPacket() *rtp.Packet {
for r.Sync() {
r.Skip(1) // Sync byte
pid := r.ReadUint16() & 0x1FFF // PID
flag := r.ReadByte() // flags...
const hasAdaptionField = 0x20
const pidNullPacket = 0x1FFF
if pid == pidNullPacket {
continue
}
const hasAdaptionField = 0b0010_0000
if flag&hasAdaptionField != 0 {
adSize := r.ReadByte() // Adaptation field length
if adSize > PacketSize-6 {
@@ -39,17 +48,14 @@ func (r *Reader) GetPacket() *Packet {
}
// PAT: Program Association Table
const PAT = 0
if pid == PAT {
const pidPAT = 0
if pid == pidPAT {
// already processed
if r.pmt != 0 {
continue
}
if size := r.ReadPSIHeader(); size <= 4 {
println("WARNING: mpegts: wrong PAT")
continue
}
r.ReadPSIHeader()
const CRCSize = 4
for r.Left() > CRCSize {
@@ -71,26 +77,25 @@ func (r *Reader) GetPacket() *Packet {
continue
}
if size := r.ReadPSIHeader(); size == 0 {
println("WARNING: mpegts: wrong PMT")
continue
}
r.ReadPSIHeader()
pesPID := r.ReadUint16() & 0x1FFF
pSize := r.ReadUint16() & 0x03FF
pesPID := r.ReadUint16() & 0x1FFF // ? PCR PID
pSize := r.ReadUint16() & 0x03FF // ? 0x0FFF
r.Skip(byte(pSize))
r.pes = map[uint16]*PES{}
const minItemSize = 5
for r.Left() > minItemSize {
const CRCSize = 4
for r.Left() > CRCSize {
streamType := r.ReadByte()
pesPID = r.ReadUint16() & 0x1FFF
iSize := r.ReadUint16() & 0x03FF
pesPID = r.ReadUint16() & 0x1FFF // Elementary PID
iSize := r.ReadUint16() & 0x03FF // ? 0x0FFF
r.Skip(byte(iSize))
r.pes[pesPID] = &PES{StreamType: streamType}
}
r.Skip(4) // ? CRC32
continue
}
@@ -103,37 +108,22 @@ func (r *Reader) GetPacket() *Packet {
continue // unknown PID
}
if pes.Payload != nil {
// how many bytes left to collect
left := cap(pes.Payload) - len(pes.Payload) - int(r.Left())
// buffer overflow
if left < 0 {
println("WARNING: mpegts: buffer overflow")
pes.Payload = nil
if pes.Payload == nil {
// PES Packet start code prefix
if r.ReadByte() != 0 || r.ReadByte() != 0 || r.ReadByte() != 1 {
continue
}
pes.Payload = append(pes.Payload, r.Bytes()...)
if left == 0 {
pkt := pes.Packet()
pes.Payload = nil
return pkt
}
continue
// read stream ID and total payload size
pes.StreamID = r.ReadByte()
pes.SetBuffer(r.ReadUint16(), r.Bytes())
} else {
pes.AppendBuffer(r.Bytes())
}
// PES Packet start code prefix
if r.ReadByte() != 0 || r.ReadByte() != 0 || r.ReadByte() != 1 {
continue
if pkt := pes.GetPacket(); pkt != nil {
return pkt
}
// read stream ID and total payload size
pes.StreamID = r.ReadByte()
pes.Payload = make([]byte, 0, r.ReadUint16())
pes.Payload = append(pes.Payload, r.Bytes()...)
}
return nil
@@ -145,6 +135,7 @@ func (r *Reader) Sync() bool {
if r.i != 0 {
r.b = r.b[PacketSize:]
r.i = 0
r.s = PacketSize
}
// if packet available
@@ -167,23 +158,18 @@ func (r *Reader) Sync() bool {
return false
}
func (r *Reader) ReadPSIHeader() uint16 {
func (r *Reader) ReadPSIHeader() {
pointer := r.ReadByte() // Pointer field
r.Skip(pointer) // Pointer filler bytes
r.Skip(1) // Table ID
size := r.ReadUint16() & 0x03FF // Section length
if uint16(r.i)+size != uint16(PacketSize) {
return 0
}
r.SetSize(byte(size))
r.Skip(2) // Table ID extension
r.Skip(1) // flags...
r.Skip(1) // Section number
r.Skip(1) // Last section number
return size - 5
}
func (r *Reader) Skip(i byte) {
@@ -207,5 +193,9 @@ func (r *Reader) Bytes() []byte {
}
func (r *Reader) Left() byte {
return PacketSize - r.i
return r.s - r.i
}
func (r *Reader) SetSize(size byte) {
r.s = r.i + size
}
+4 -34
View File
@@ -6,11 +6,9 @@ import (
"crypto/md5"
"errors"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtp"
"mime/multipart"
"net"
"net/http"
@@ -138,9 +136,6 @@ func (c *Client) Handle() error {
c.tracks = map[byte]*streamer.Track{}
}
var audioSeq uint16
var audioTS uint32
reader := mpegts.NewReader()
probe := streamer.NewProbe(c.medias == nil)
@@ -182,10 +177,10 @@ func (c *Client) Handle() error {
break
}
track := c.tracks[pkt.StreamType]
track := c.tracks[pkt.PayloadType]
if track == nil {
// count track on probe state even if not support it
probe.Append(pkt.StreamType)
probe.Append(pkt.PayloadType)
media := mpegts.GetMedia(pkt)
if media == nil {
@@ -195,35 +190,10 @@ func (c *Client) Handle() error {
track = streamer.NewTrack2(media, nil)
c.medias = append(c.medias, media)
c.tracks[pkt.StreamType] = track
}
switch track.Codec.Name {
case streamer.CodecH264:
packet := &rtp.Packet{
Header: rtp.Header{Timestamp: uint32(pkt.PTS)},
Payload: h264.AnnexB2AVC(pkt.Payload),
}
_ = track.WriteRTP(packet)
//log.Printf("[AVC] %v, len: %d, pts: %d ts: %10d", h264.Types(packet.Payload), len(packet.Payload), pkt.PTS, packet.Timestamp)
case streamer.CodecPCMA:
audioSeq++
audioTS += uint32(len(pkt.Payload))
packet := &rtp.Packet{
Header: rtp.Header{
Version: 2,
Timestamp: audioTS,
SequenceNumber: audioSeq,
},
Payload: pkt.Payload,
}
_ = track.WriteRTP(packet)
//log.Printf("[PCM]len: %d, pts: %d ts: %10d, buf: %x", len(packet.Payload), pkt.PTS, packet.Timestamp, packet.Payload[:32])
c.tracks[pkt.PayloadType] = track
}
_ = track.WriteRTP(pkt)
}
}