diff --git a/cmd/http/http.go b/cmd/http/http.go index 11b9d417..e7d9d4a8 100644 --- a/cmd/http/http.go +++ b/cmd/http/http.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/pkg/mjpeg" + "github.com/AlexxIT/go2rtc/pkg/mpegts" "github.com/AlexxIT/go2rtc/pkg/rtmp" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/tcp" @@ -41,6 +42,7 @@ func handle(url string) (streamer.Producer, error) { switch ct { case "image/jpeg", "multipart/x-mixed-replace": return mjpeg.NewClient(res), nil + case "video/x-flv": var conn *rtmp.Client if conn, err = rtmp.Accept(res); err != nil { @@ -50,6 +52,13 @@ func handle(url string) (streamer.Producer, error) { return nil, err } return conn, nil + + case "video/mpeg": + client := mpegts.NewClient(res) + if err = client.Handle(); err != nil { + return nil, err + } + return client, nil } return nil, fmt.Errorf("unsupported Content-Type: %s", ct) diff --git a/cmd/mpegts/mpegts.go b/cmd/mpegts/mpegts.go new file mode 100644 index 00000000..7f76da92 --- /dev/null +++ b/cmd/mpegts/mpegts.go @@ -0,0 +1,43 @@ +package mpegts + +import ( + "github.com/AlexxIT/go2rtc/cmd/api" + "github.com/AlexxIT/go2rtc/cmd/streams" + "github.com/AlexxIT/go2rtc/pkg/mpegts" + "net/http" +) + +func Init() { + api.HandleFunc("api/stream.ts", apiHandle) +} + +func apiHandle(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "", http.StatusMethodNotAllowed) + return + } + + dst := r.URL.Query().Get("dst") + stream := streams.Get(dst) + if stream == nil { + http.Error(w, api.StreamNotFound, http.StatusNotFound) + return + } + + res := &http.Response{Body: r.Body, Request: r} + client := mpegts.NewClient(res) + + if err := client.Handle(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + stream.AddProducer(client) + + if err := client.Handle(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + stream.RemoveProducer(client) +} diff --git a/main.go b/main.go index 33fed57a..4c4335c5 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "github.com/AlexxIT/go2rtc/cmd/ivideon" "github.com/AlexxIT/go2rtc/cmd/mjpeg" "github.com/AlexxIT/go2rtc/cmd/mp4" + "github.com/AlexxIT/go2rtc/cmd/mpegts" "github.com/AlexxIT/go2rtc/cmd/ngrok" "github.com/AlexxIT/go2rtc/cmd/rtmp" "github.com/AlexxIT/go2rtc/cmd/rtsp" @@ -42,6 +43,7 @@ func main() { http.Init() dvrip.Init() tapo.Init() + mpegts.Init() srtp.Init() homekit.Init() diff --git a/pkg/mpegts/client.go b/pkg/mpegts/client.go new file mode 100644 index 00000000..0b515a43 --- /dev/null +++ b/pkg/mpegts/client.go @@ -0,0 +1,73 @@ +package mpegts + +import ( + "github.com/AlexxIT/go2rtc/pkg/streamer" + "net/http" +) + +type Client struct { + streamer.Element + + medias []*streamer.Media + tracks map[byte]*streamer.Track + + res *http.Response +} + +func NewClient(res *http.Response) *Client { + return &Client{res: res} +} + +func (c *Client) Handle() error { + if c.tracks == nil { + c.tracks = map[byte]*streamer.Track{} + } + + reader := NewReader() + + b := make([]byte, 1024*1024*256) // 256K + + probe := streamer.NewProbe(c.medias == nil) + for probe == nil || probe.Active() { + n, err := c.res.Body.Read(b) + if err != nil { + return err + } + + reader.AppendBuffer(b[:n]) + + for { + packet := reader.GetPacket() + if packet == nil { + break + } + + track := c.tracks[packet.PayloadType] + if track == nil { + // count track on probe state even if not support it + probe.Append(packet.PayloadType) + + media := GetMedia(packet) + if media == nil { + continue // unsupported codec + } + + track = streamer.NewTrack2(media, nil) + + c.medias = append(c.medias, media) + c.tracks[packet.PayloadType] = track + } + + _ = track.WriteRTP(packet) + + //log.Printf("[AVC] %v, len: %d, pts: %d ts: %10d", h264.Types(packet.Payload), len(packet.Payload), pkt.PTS, packet.Timestamp) + } + } + + return nil +} + +func (c *Client) Close() error { + _ = c.res.Body.Close() + return nil +} diff --git a/pkg/mpegts/helpers.go b/pkg/mpegts/helpers.go index bdac1dee..7d706628 100644 --- a/pkg/mpegts/helpers.go +++ b/pkg/mpegts/helpers.go @@ -1,8 +1,10 @@ package mpegts import ( + "bytes" "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/pion/rtp" "time" ) @@ -12,8 +14,9 @@ const ( ) const ( - StreamTypeH264 = 0x1B - StreamTypePCMA = 0x90 + StreamTypeAAC = 0x0F + StreamTypeH264 = 0x1B + StreamTypePCMATapo = 0x90 ) type Packet struct { @@ -28,31 +31,128 @@ type PES struct { StreamType byte StreamID byte Payload []byte + Mode byte + Size int + + Sequence uint16 + Timestamp uint32 } -func (p *PES) Packet() *Packet { - // parse Optional PES header - const minHeaderSize = 3 +const ( + ModeUnknown = iota + ModeSize + ModeStream +) - pkt := &Packet{StreamType: p.StreamType} +// parse Optional PES header +const minHeaderSize = 3 - // fist byte also flags - flags := p.Payload[1] - hSize := p.Payload[2] // optional fields +func (p *PES) SetBuffer(size uint16, b []byte) { + if size == 0 { + optSize := b[2] // optional fields + b = b[minHeaderSize+optSize:] - const hasPTS = 0b1000_0000 - if flags&hasPTS != 0 { - pkt.PTS = ParseTime(p.Payload[minHeaderSize:]) - - const hasDTS = 0b0100_0000 - if flags&hasDTS != 0 { - pkt.DTS = ParseTime(p.Payload[minHeaderSize+5:]) + if p.StreamType == StreamTypeH264 { + if bytes.HasPrefix(b, []byte{0, 0, 0, 1, h264.NALUTypeAUD}) { + p.Mode = ModeStream + b = b[5:] + } } + + if p.Mode == ModeUnknown { + println("WARNING: mpegts: unknown zero-size stream") + } + } else { + p.Mode = ModeSize + p.Size = int(size) } - pkt.Payload = p.Payload[minHeaderSize+hSize:] + p.Payload = make([]byte, 0, size) + p.Payload = append(p.Payload, b...) +} - return pkt +func (p *PES) AppendBuffer(b []byte) { + p.Payload = append(p.Payload, b...) +} + +func (p *PES) GetPacket() (pkt *rtp.Packet) { + switch p.Mode { + case ModeSize: + left := p.Size - len(p.Payload) + if left > 0 { + return + } + + if left < 0 { + println("WARNING: mpegts: buffer overflow") + p.Payload = nil + return + } + + // fist byte also flags + flags := p.Payload[1] + optSize := p.Payload[2] // optional fields + + p.Payload = p.Payload[minHeaderSize+optSize:] + + switch p.StreamType { + case StreamTypeH264: + var ts uint32 + + const hasPTS = 0b1000_0000 + if flags&hasPTS != 0 { + ts = uint32(ParseTime(p.Payload[minHeaderSize:])) + } + + pkt = &rtp.Packet{ + Header: rtp.Header{ + PayloadType: p.StreamType, + Timestamp: ts, + }, + Payload: h264.AnnexB2AVC(p.Payload), + } + + case StreamTypePCMATapo: + p.Sequence++ + p.Timestamp += uint32(len(p.Payload)) + + pkt = &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: p.StreamType, + SequenceNumber: p.Sequence, + Timestamp: p.Timestamp, + }, + Payload: p.Payload, + } + } + + p.Payload = nil + + case ModeStream: + i := bytes.Index(p.Payload, []byte{0, 0, 0, 1, h264.NALUTypeAUD}) + if i < 0 { + return + } + if i2 := IndexFrom(p.Payload, []byte{0, 0, 1}, i); i2 < 0 && i2 > 9 { + return + } + + pkt = &rtp.Packet{ + Header: rtp.Header{ + PayloadType: p.StreamType, + Timestamp: uint32(time.Duration(time.Now().UnixNano()) * 90000 / time.Second), + }, + Payload: DecodeAnnex3B(p.Payload[:i]), + } + + p.Payload = p.Payload[i+5:] + + default: + p.Payload = nil + } + + return } func ParseTime(b []byte) time.Duration { @@ -60,11 +160,11 @@ func ParseTime(b []byte) time.Duration { return time.Duration(ts) } -func GetMedia(pkt *Packet) *streamer.Media { +func GetMedia(pkt *rtp.Packet) *streamer.Media { var codec *streamer.Codec var kind string - switch pkt.StreamType { + switch pkt.PayloadType { case StreamTypeH264: codec = &streamer.Codec{ Name: streamer.CodecH264, @@ -74,7 +174,7 @@ func GetMedia(pkt *Packet) *streamer.Media { } kind = streamer.KindVideo - case StreamTypePCMA: + case StreamTypePCMATapo: codec = &streamer.Codec{ Name: streamer.CodecPCMA, ClockRate: 8000, @@ -91,3 +191,61 @@ func GetMedia(pkt *Packet) *streamer.Media { Codecs: []*streamer.Codec{codec}, } } + +func DecodeAnnex3B(annexb []byte) (avc []byte) { + // depends on AU delimeter size + i0 := bytes.Index(annexb, []byte{0, 0, 1}) + if i0 < 0 || i0 > 9 { + return nil + } + + annexb = annexb[i0+3:] // skip first separator + i0 = 0 + + for { + // search next separato + iN := IndexFrom(annexb, []byte{0, 0, 1}, i0) + if iN < 0 { + break + } + + // move i0 to next AU + if i0 = iN + 3; i0 >= len(annexb) { + break + } + + // check if AU type valid + octet := annexb[i0] + const forbiddenZeroBit = 0x80 + if octet&forbiddenZeroBit == 0 { + const nalUnitType = 0x1F + switch octet & nalUnitType { + case h264.NALUTypePFrame, h264.NALUTypeIFrame, h264.NALUTypeSPS, h264.NALUTypePPS: + // add AU in AVC format + avc = append(avc, byte(iN>>24), byte(iN>>16), byte(iN>>8), byte(iN)) + avc = append(avc, annexb[:iN]...) + + // cut search to next AU start + annexb = annexb[i0:] + i0 = 0 + } + } + } + + size := len(annexb) + avc = append(avc, byte(size>>24), byte(size>>16), byte(size>>8), byte(size)) + return append(avc, annexb...) +} + +func IndexFrom(b []byte, sep []byte, from int) int { + if from > 0 { + if from < len(b) { + if i := bytes.Index(b[from:], sep); i >= 0 { + return from + i + } + } + return -1 + } + + return bytes.Index(b, sep) +} diff --git a/pkg/mpegts/producer.go b/pkg/mpegts/producer.go new file mode 100644 index 00000000..03e39684 --- /dev/null +++ b/pkg/mpegts/producer.go @@ -0,0 +1,26 @@ +package mpegts + +import ( + "github.com/AlexxIT/go2rtc/pkg/streamer" +) + +func (c *Client) GetMedias() []*streamer.Media { + return c.medias +} + +func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { + for _, track := range c.tracks { + if track.Codec == codec { + return track + } + } + return nil +} + +func (c *Client) Start() error { + return c.Handle() +} + +func (c *Client) Stop() error { + return c.Close() +} diff --git a/pkg/mpegts/reader.go b/pkg/mpegts/reader.go index 6e3f7fc4..66607992 100644 --- a/pkg/mpegts/reader.go +++ b/pkg/mpegts/reader.go @@ -1,8 +1,11 @@ package mpegts +import "github.com/pion/rtp" + type Reader struct { b []byte // packets buffer i byte // read position + s byte // end position pmt uint16 // Program Map Table (PMT) PID pes map[uint16]*PES @@ -15,20 +18,26 @@ func NewReader() *Reader { func (r *Reader) SetBuffer(b []byte) { r.b = b r.i = 0 + r.s = PacketSize } func (r *Reader) AppendBuffer(b []byte) { r.b = append(r.b, b...) } -func (r *Reader) GetPacket() *Packet { +func (r *Reader) GetPacket() *rtp.Packet { for r.Sync() { r.Skip(1) // Sync byte pid := r.ReadUint16() & 0x1FFF // PID flag := r.ReadByte() // flags... - const hasAdaptionField = 0x20 + const pidNullPacket = 0x1FFF + if pid == pidNullPacket { + continue + } + + const hasAdaptionField = 0b0010_0000 if flag&hasAdaptionField != 0 { adSize := r.ReadByte() // Adaptation field length if adSize > PacketSize-6 { @@ -39,17 +48,14 @@ func (r *Reader) GetPacket() *Packet { } // PAT: Program Association Table - const PAT = 0 - if pid == PAT { + const pidPAT = 0 + if pid == pidPAT { // already processed if r.pmt != 0 { continue } - if size := r.ReadPSIHeader(); size <= 4 { - println("WARNING: mpegts: wrong PAT") - continue - } + r.ReadPSIHeader() const CRCSize = 4 for r.Left() > CRCSize { @@ -71,26 +77,25 @@ func (r *Reader) GetPacket() *Packet { continue } - if size := r.ReadPSIHeader(); size == 0 { - println("WARNING: mpegts: wrong PMT") - continue - } + r.ReadPSIHeader() - pesPID := r.ReadUint16() & 0x1FFF - pSize := r.ReadUint16() & 0x03FF + pesPID := r.ReadUint16() & 0x1FFF // ? PCR PID + pSize := r.ReadUint16() & 0x03FF // ? 0x0FFF r.Skip(byte(pSize)) r.pes = map[uint16]*PES{} - const minItemSize = 5 - for r.Left() > minItemSize { + const CRCSize = 4 + for r.Left() > CRCSize { streamType := r.ReadByte() - pesPID = r.ReadUint16() & 0x1FFF - iSize := r.ReadUint16() & 0x03FF + pesPID = r.ReadUint16() & 0x1FFF // Elementary PID + iSize := r.ReadUint16() & 0x03FF // ? 0x0FFF r.Skip(byte(iSize)) r.pes[pesPID] = &PES{StreamType: streamType} } + + r.Skip(4) // ? CRC32 continue } @@ -103,37 +108,22 @@ func (r *Reader) GetPacket() *Packet { continue // unknown PID } - if pes.Payload != nil { - // how many bytes left to collect - left := cap(pes.Payload) - len(pes.Payload) - int(r.Left()) - - // buffer overflow - if left < 0 { - println("WARNING: mpegts: buffer overflow") - pes.Payload = nil + if pes.Payload == nil { + // PES Packet start code prefix + if r.ReadByte() != 0 || r.ReadByte() != 0 || r.ReadByte() != 1 { continue } - pes.Payload = append(pes.Payload, r.Bytes()...) - - if left == 0 { - pkt := pes.Packet() - pes.Payload = nil - return pkt - } - - continue + // read stream ID and total payload size + pes.StreamID = r.ReadByte() + pes.SetBuffer(r.ReadUint16(), r.Bytes()) + } else { + pes.AppendBuffer(r.Bytes()) } - // PES Packet start code prefix - if r.ReadByte() != 0 || r.ReadByte() != 0 || r.ReadByte() != 1 { - continue + if pkt := pes.GetPacket(); pkt != nil { + return pkt } - - // read stream ID and total payload size - pes.StreamID = r.ReadByte() - pes.Payload = make([]byte, 0, r.ReadUint16()) - pes.Payload = append(pes.Payload, r.Bytes()...) } return nil @@ -145,6 +135,7 @@ func (r *Reader) Sync() bool { if r.i != 0 { r.b = r.b[PacketSize:] r.i = 0 + r.s = PacketSize } // if packet available @@ -167,23 +158,18 @@ func (r *Reader) Sync() bool { return false } -func (r *Reader) ReadPSIHeader() uint16 { +func (r *Reader) ReadPSIHeader() { pointer := r.ReadByte() // Pointer field r.Skip(pointer) // Pointer filler bytes r.Skip(1) // Table ID size := r.ReadUint16() & 0x03FF // Section length - - if uint16(r.i)+size != uint16(PacketSize) { - return 0 - } + r.SetSize(byte(size)) r.Skip(2) // Table ID extension r.Skip(1) // flags... r.Skip(1) // Section number r.Skip(1) // Last section number - - return size - 5 } func (r *Reader) Skip(i byte) { @@ -207,5 +193,9 @@ func (r *Reader) Bytes() []byte { } func (r *Reader) Left() byte { - return PacketSize - r.i + return r.s - r.i +} + +func (r *Reader) SetSize(size byte) { + r.s = r.i + size } diff --git a/pkg/tapo/client.go b/pkg/tapo/client.go index d0e67d3d..16f4064d 100644 --- a/pkg/tapo/client.go +++ b/pkg/tapo/client.go @@ -6,11 +6,9 @@ import ( "crypto/md5" "errors" "fmt" - "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/mpegts" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/tcp" - "github.com/pion/rtp" "mime/multipart" "net" "net/http" @@ -138,9 +136,6 @@ func (c *Client) Handle() error { c.tracks = map[byte]*streamer.Track{} } - var audioSeq uint16 - var audioTS uint32 - reader := mpegts.NewReader() probe := streamer.NewProbe(c.medias == nil) @@ -182,10 +177,10 @@ func (c *Client) Handle() error { break } - track := c.tracks[pkt.StreamType] + track := c.tracks[pkt.PayloadType] if track == nil { // count track on probe state even if not support it - probe.Append(pkt.StreamType) + probe.Append(pkt.PayloadType) media := mpegts.GetMedia(pkt) if media == nil { @@ -195,35 +190,10 @@ func (c *Client) Handle() error { track = streamer.NewTrack2(media, nil) c.medias = append(c.medias, media) - c.tracks[pkt.StreamType] = track - } - - switch track.Codec.Name { - case streamer.CodecH264: - packet := &rtp.Packet{ - Header: rtp.Header{Timestamp: uint32(pkt.PTS)}, - Payload: h264.AnnexB2AVC(pkt.Payload), - } - _ = track.WriteRTP(packet) - - //log.Printf("[AVC] %v, len: %d, pts: %d ts: %10d", h264.Types(packet.Payload), len(packet.Payload), pkt.PTS, packet.Timestamp) - - case streamer.CodecPCMA: - audioSeq++ - audioTS += uint32(len(pkt.Payload)) - - packet := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Timestamp: audioTS, - SequenceNumber: audioSeq, - }, - Payload: pkt.Payload, - } - _ = track.WriteRTP(packet) - //log.Printf("[PCM]len: %d, pts: %d ts: %10d, buf: %x", len(packet.Payload), pkt.PTS, packet.Timestamp, packet.Payload[:32]) + c.tracks[pkt.PayloadType] = track } + _ = track.WriteRTP(pkt) } }