From 8b93c97e694bfd41aba7f2f6d473bbab8494114f Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Sun, 6 Nov 2022 22:44:48 +0300 Subject: [PATCH] Add support AAC for RTMP to MP4 --- cmd/mp4/mp4.go | 7 +- cmd/mp4/mse.go | 15 ++-- pkg/mp4/const.go | 32 +++++---- pkg/mp4/consumer.go | 73 +++++++++---------- pkg/mp4/muxer.go | 111 ++++++++++++++++++++--------- pkg/mp4f/consumer.go | 164 +++++++++++++++++++++++++++++++++++++++++++ www/mse.html | 6 +- 7 files changed, 315 insertions(+), 93 deletions(-) create mode 100644 pkg/mp4f/consumer.go diff --git a/cmd/mp4/mp4.go b/cmd/mp4/mp4.go index ac751f1d..57f5e0a3 100644 --- a/cmd/mp4/mp4.go +++ b/cmd/mp4/mp4.go @@ -84,9 +84,8 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { cons := &mp4.Consumer{} cons.Listen(func(msg interface{}) { - switch msg := msg.(type) { - case []byte: - if _, err := w.Write(msg); err != nil { + if data, ok := msg.([]byte); ok { + if _, err := w.Write(data); err != nil { exit <- struct{}{} } } @@ -112,6 +111,8 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { return } + cons.Start() + <-exit log.Trace().Msg("[api.mp4] close") diff --git a/cmd/mp4/mse.go b/cmd/mp4/mse.go index abbbc711..cbc202af 100644 --- a/cmd/mp4/mse.go +++ b/cmd/mp4/mse.go @@ -21,14 +21,13 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) { cons.RemoteAddr = ctx.Request.RemoteAddr cons.Listen(func(msg interface{}) { - switch msg.(type) { - case *streamer.Message, []byte: - ctx.Write(msg) + if data, ok := msg.([]byte); ok { + ctx.Write(data) } }) if err := stream.AddConsumer(cons); err != nil { - log.Warn().Err(err).Msg("[api.mse] add consumer") + log.Warn().Err(err).Caller().Send() ctx.Error(err) return } @@ -37,16 +36,16 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) { stream.RemoveConsumer(cons) }) - ctx.Write(&streamer.Message{ - Type: MsgTypeMSE, Value: cons.MimeType(), - }) + ctx.Write(&streamer.Message{Type: MsgTypeMSE, Value: cons.MimeType()}) data, err := cons.Init() if err != nil { - log.Warn().Err(err).Msg("[api.mse] init") + log.Warn().Err(err).Caller().Send() ctx.Error(err) return } ctx.Write(data) + + cons.Start() } diff --git a/pkg/mp4/const.go b/pkg/mp4/const.go index fb6eda86..db78d636 100644 --- a/pkg/mp4/const.go +++ b/pkg/mp4/const.go @@ -3,6 +3,8 @@ package mp4 import ( "encoding/binary" "github.com/deepch/vdk/format/mp4/mp4io" + "github.com/deepch/vdk/format/mp4f" + "github.com/deepch/vdk/format/mp4f/mp4fio" "time" ) @@ -37,25 +39,17 @@ func MOOV() *mp4io.Movie { SelectionDuration: time0, CurrentTime: time0, }, - MovieExtend: &mp4io.MovieExtend{ - Tracks: []*mp4io.TrackExtend{ - { - TrackId: 1, - DefaultSampleDescIdx: 1, - DefaultSampleDuration: 40, - }, - }, - }, + MovieExtend: &mp4io.MovieExtend{}, } } -func TRAK() *mp4io.Track { +func TRAK(id int) *mp4io.Track { return &mp4io.Track{ // trak > tkhd Header: &mp4io.TrackHeader{ - TrackId: int32(1), // change me - Flags: 0x0007, // 7 ENABLED IN-MOVIE IN-PREVIEW - Duration: 0, // OK + TrackId: int32(id), + Flags: 0x0007, // 7 ENABLED IN-MOVIE IN-PREVIEW + Duration: 0, // OK Matrix: matrix, CreateTime: time0, ModifyTime: time0, @@ -92,3 +86,15 @@ func TRAK() *mp4io.Track { }, } } + +func ESDS(conf []byte) *mp4f.FDummy { + esds := &mp4fio.ElemStreamDesc{DecConfig: conf} + + b := make([]byte, esds.Len()) + esds.Marshal(b) + + return &mp4f.FDummy{ + Data: b, + Tag_: mp4io.Tag(uint32(mp4io.ESDS)), + } +} diff --git a/pkg/mp4/consumer.go b/pkg/mp4/consumer.go index f6cda0f4..53915908 100644 --- a/pkg/mp4/consumer.go +++ b/pkg/mp4/consumer.go @@ -2,7 +2,6 @@ package mp4 import ( "encoding/json" - "fmt" "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h265" "github.com/AlexxIT/go2rtc/pkg/streamer" @@ -28,44 +27,37 @@ func (c *Consumer) GetMedias() []*streamer.Media { Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly, Codecs: []*streamer.Codec{ - {Name: streamer.CodecH264, ClockRate: 90000}, - {Name: streamer.CodecH265, ClockRate: 90000}, + {Name: streamer.CodecH264}, + {Name: streamer.CodecH265}, + }, + }, + { + Kind: streamer.KindAudio, + Direction: streamer.DirectionRecvonly, + Codecs: []*streamer.Codec{ + {Name: streamer.CodecAAC}, }, }, - //{ - // Kind: streamer.KindAudio, - // Direction: streamer.DirectionRecvonly, - // Codecs: []*streamer.Codec{ - // {Name: streamer.CodecAAC, ClockRate: 16000}, - // }, - //}, } } func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { + trackID := byte(len(c.codecs)) + c.codecs = append(c.codecs, track.Codec) + codec := track.Codec switch codec.Name { case streamer.CodecH264: - c.codecs = append(c.codecs, track.Codec) - push := func(packet *rtp.Packet) error { if packet.Version != h264.RTPPacketVersionAVC { return nil } - if c.muxer == nil { + if !c.start { return nil } - if !c.start { - if h264.IsKeyframe(packet.Payload) { - c.start = true - } else { - return nil - } - } - - buf := c.muxer.Marshal(packet) + buf := c.muxer.Marshal(trackID, packet) c.send += len(buf) c.Fire(buf) @@ -83,22 +75,16 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea return track.Bind(push) case streamer.CodecH265: - c.codecs = append(c.codecs, track.Codec) - push := func(packet *rtp.Packet) error { if packet.Version != h264.RTPPacketVersionAVC { return nil } if !c.start { - if h265.IsKeyframe(packet.Payload) { - c.start = true - } else { - return nil - } + return nil } - buf := c.muxer.Marshal(packet) + buf := c.muxer.Marshal(trackID, packet) c.send += len(buf) c.Fire(buf) @@ -110,12 +96,25 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea push = wrapper(push) } + return track.Bind(push) + + case streamer.CodecAAC: + push := func(packet *rtp.Packet) error { + if !c.start { + return nil + } + + buf := c.muxer.Marshal(trackID, packet) + c.send += len(buf) + c.Fire(buf) + + return nil + } + return track.Bind(push) } - fmt.Printf("[rtmp] unsupported codec: %+v\n", track.Codec) - - return nil + panic("unsupported codec") } func (c *Consumer) MimeType() string { @@ -123,12 +122,14 @@ func (c *Consumer) MimeType() string { } func (c *Consumer) Init() ([]byte, error) { - if c.muxer == nil { - c.muxer = &Muxer{} - } + c.muxer = &Muxer{} return c.muxer.GetInit(c.codecs) } +func (c *Consumer) Start() { + c.start = true +} + // func (c *Consumer) MarshalJSON() ([]byte, error) { diff --git a/pkg/mp4/muxer.go b/pkg/mp4/muxer.go index e4c9945b..5ec2928b 100644 --- a/pkg/mp4/muxer.go +++ b/pkg/mp4/muxer.go @@ -2,10 +2,13 @@ package mp4 import ( "encoding/binary" + "encoding/hex" "fmt" "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h265" "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/deepch/vdk/av" + "github.com/deepch/vdk/codec/aacparser" "github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/codec/h265parser" "github.com/deepch/vdk/format/fmp4/fmp4io" @@ -16,22 +19,28 @@ import ( type Muxer struct { fragIndex uint32 - dts uint64 - pts uint32 - data []byte - total int + dts []uint64 + pts []uint32 + //data []byte + //total int } func (m *Muxer) MimeType(codecs []*streamer.Codec) string { s := `video/mp4; codecs="` - for _, codec := range codecs { + for i, codec := range codecs { + if i > 0 { + s += "," + } + switch codec.Name { case streamer.CodecH264: s += "avc1." + h264.GetProfileLevelID(codec.FmtpLine) case streamer.CodecH265: // +Safari +Chrome +Edge -iOS15 -Android13 s += "hvc1.1.6.L93.B0" // hev1.1.6.L93.B0 + case streamer.CodecAAC: + s += "mp4a.40.2" } } @@ -41,7 +50,7 @@ func (m *Muxer) MimeType(codecs []*streamer.Codec) string { func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) { moov := MOOV() - for _, codec := range codecs { + for i, codec := range codecs { switch codec.Name { case streamer.CodecH264: sps, pps := h264.GetParameterSet(codec.FmtpLine) @@ -59,11 +68,14 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) { width := codecData.Width() height := codecData.Height() - trak := TRAK() - trak.Media.Header.TimeScale = int32(codec.ClockRate) + trak := TRAK(i + 1) trak.Header.TrackWidth = float64(width) trak.Header.TrackHeight = float64(height) - + trak.Media.Header.TimeScale = int32(codec.ClockRate) + trak.Media.Handler = &mp4io.HandlerRefer{ + SubType: [4]byte{'v', 'i', 'd', 'e'}, + Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0}, + } trak.Media.Info.Video = &mp4io.VideoMediaInfo{ Flags: 0x000001, } @@ -81,11 +93,6 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) { }, } - trak.Media.Handler = &mp4io.HandlerRefer{ - SubType: [4]byte{'v', 'i', 'd', 'e'}, - Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0}, - } - moov.Tracks = append(moov.Tracks, trak) case streamer.CodecH265: @@ -102,11 +109,14 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) { width := codecData.Width() height := codecData.Height() - trak := TRAK() - trak.Media.Header.TimeScale = int32(codec.ClockRate) + trak := TRAK(i + 1) trak.Header.TrackWidth = float64(width) trak.Header.TrackHeight = float64(height) - + trak.Media.Header.TimeScale = int32(codec.ClockRate) + trak.Media.Handler = &mp4io.HandlerRefer{ + SubType: [4]byte{'v', 'i', 'd', 'e'}, + Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0}, + } trak.Media.Info.Video = &mp4io.VideoMediaInfo{ Flags: 0x000001, } @@ -124,13 +134,52 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) { }, } + moov.Tracks = append(moov.Tracks, trak) + + case streamer.CodecAAC: + s := streamer.Between(codec.FmtpLine, "config=", ";") + b, err := hex.DecodeString(s) + if err != nil { + return nil, err + } + + codecData, err := aacparser.ParseMPEG4AudioConfigBytes(b) + if err != nil { + return nil, err + } + + trak := TRAK(i + 1) + trak.Header.AlternateGroup = 1 + trak.Header.Duration = 0 + trak.Header.Volume = 1 + trak.Media.Header.TimeScale = int32(codec.ClockRate) + trak.Media.Handler = &mp4io.HandlerRefer{ - SubType: [4]byte{'v', 'i', 'd', 'e'}, + SubType: [4]byte{'s', 'o', 'u', 'n'}, Name: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 'm', 'a', 'i', 'n', 0}, } + trak.Media.Info.Sound = &mp4io.SoundMediaInfo{} + + trak.Media.Info.Sample.SampleDesc.MP4ADesc = &mp4io.MP4ADesc{ + DataRefIdx: 1, + NumberOfChannels: int16(codecData.ChannelLayout.Count()), + SampleSize: int16(av.FLTP.BytesPerSample() * 4), + SampleRate: float64(codecData.SampleRate), + Unknowns: []mp4io.Atom{ESDS(b)}, + } moov.Tracks = append(moov.Tracks, trak) } + + trex := &mp4io.TrackExtend{ + TrackId: uint32(i + 1), + DefaultSampleDescIdx: 1, + DefaultSampleDuration: 0, + } + moov.MovieExtend.Tracks = append(moov.MovieExtend.Tracks, trex) + + m.pts = append(m.pts, 0) + m.dts = append(m.dts, 0) } data := make([]byte, moov.Len()) @@ -139,14 +188,12 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) { return append(FTYP(), data...), nil } -func (m *Muxer) Rewind() { - m.dts = 0 - m.pts = 0 -} - -func (m *Muxer) Marshal(packet *rtp.Packet) []byte { - trackID := uint8(1) +//func (m *Muxer) Rewind() { +// m.dts = 0 +// m.pts = 0 +//} +func (m *Muxer) Marshal(trackID byte, packet *rtp.Packet) []byte { run := &mp4fio.TrackFragRun{ Flags: 0x000b05, FirstSampleFlags: uint32(fmp4io.SampleNoDependencies), @@ -161,12 +208,12 @@ func (m *Muxer) Marshal(packet *rtp.Packet) []byte { Tracks: []*mp4fio.TrackFrag{ { Header: &mp4fio.TrackFragHeader{ - Data: []byte{0x00, 0x02, 0x00, 0x20, 0x00, 0x00, 0x00, trackID, 0x01, 0x01, 0x00, 0x00}, + Data: []byte{0x00, 0x02, 0x00, 0x20, 0x00, 0x00, 0x00, trackID + 1, 0x01, 0x01, 0x00, 0x00}, }, DecodeTime: &mp4fio.TrackFragDecodeTime{ Version: 1, Flags: 0, - Time: m.dts, + Time: m.dts[trackID], }, Run: run, }, @@ -179,12 +226,12 @@ func (m *Muxer) Marshal(packet *rtp.Packet) []byte { } newTime := packet.Timestamp - if m.pts > 0 { + if m.pts[trackID] > 0 { //m.dts += uint64(newTime - m.pts) - entry.Duration = newTime - m.pts - m.dts += uint64(entry.Duration) + entry.Duration = newTime - m.pts[trackID] + m.dts[trackID] += uint64(entry.Duration) } - m.pts = newTime + m.pts[trackID] = newTime // important before moof.Len() run.Entries = append(run.Entries, entry) @@ -204,7 +251,7 @@ func (m *Muxer) Marshal(packet *rtp.Packet) []byte { m.fragIndex++ - m.total += moofLen + mdatLen + //m.total += moofLen + mdatLen return buf } diff --git a/pkg/mp4f/consumer.go b/pkg/mp4f/consumer.go new file mode 100644 index 00000000..de366bd8 --- /dev/null +++ b/pkg/mp4f/consumer.go @@ -0,0 +1,164 @@ +package mp4f + +import ( + "encoding/json" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/deepch/vdk/av" + "github.com/deepch/vdk/codec/aacparser" + "github.com/deepch/vdk/codec/h264parser" + "github.com/deepch/vdk/format/mp4f" + "github.com/pion/rtp" + "time" +) + +type Consumer struct { + streamer.Element + + UserAgent string + RemoteAddr string + + muxer *mp4f.Muxer + streams []av.CodecData + mimeType string + start bool + + send int +} + +func (c *Consumer) GetMedias() []*streamer.Media { + return []*streamer.Media{ + { + Kind: streamer.KindVideo, + Direction: streamer.DirectionRecvonly, + Codecs: []*streamer.Codec{ + {Name: streamer.CodecH264, ClockRate: 90000}, + }, + }, + { + Kind: streamer.KindAudio, + Direction: streamer.DirectionRecvonly, + Codecs: []*streamer.Codec{ + {Name: streamer.CodecAAC, ClockRate: 16000}, + }, + }, + } +} + +func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { + codec := track.Codec + trackID := int8(len(c.streams)) + + switch codec.Name { + case streamer.CodecH264: + sps, pps := h264.GetParameterSet(codec.FmtpLine) + stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps) + if err != nil { + return nil + } + + c.mimeType += "avc1." + h264.GetProfileLevelID(codec.FmtpLine) + c.streams = append(c.streams, stream) + + pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond} + + ts2time := time.Second / time.Duration(codec.ClockRate) + + push := func(packet *rtp.Packet) error { + if packet.Version != h264.RTPPacketVersionAVC { + return nil + } + + if !c.start { + return nil + } + + pkt.Data = packet.Payload + newTime := time.Duration(packet.Timestamp) * ts2time + if pkt.Time > 0 { + pkt.Duration = newTime - pkt.Time + } + pkt.Time = newTime + + ready, buf, _ := c.muxer.WritePacket(pkt, false) + if ready { + c.send += len(buf) + c.Fire(buf) + } + + return nil + } + + if !h264.IsAVC(codec) { + wrapper := h264.RTPDepay(track) + push = wrapper(push) + } + + return track.Bind(push) + + case streamer.CodecAAC: + stream, _ := aacparser.NewCodecDataFromMPEG4AudioConfigBytes([]byte{20, 8}) + + c.mimeType += ",mp4a.40.2" + c.streams = append(c.streams, stream) + + pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond} + + ts2time := time.Second / time.Duration(codec.ClockRate) + + push := func(packet *rtp.Packet) error { + if !c.start { + return nil + } + + pkt.Data = packet.Payload + newTime := time.Duration(packet.Timestamp) * ts2time + if pkt.Time > 0 { + pkt.Duration = newTime - pkt.Time + } + pkt.Time = newTime + + ready, buf, _ := c.muxer.WritePacket(pkt, false) + if ready { + c.send += len(buf) + c.Fire(buf) + } + + return nil + } + + return track.Bind(push) + } + + panic("unsupported codec") +} + +func (c *Consumer) MimeType() string { + return `video/mp4; codecs="` + c.mimeType + `"` +} + +func (c *Consumer) Init() ([]byte, error) { + c.muxer = mp4f.NewMuxer(nil) + if err := c.muxer.WriteHeader(c.streams); err != nil { + return nil, err + } + _, data := c.muxer.GetInit(c.streams) + return data, nil +} + +func (c *Consumer) Start() { + c.start = true +} + +// + +func (c *Consumer) MarshalJSON() ([]byte, error) { + v := map[string]interface{}{ + "type": "MSE server consumer", + "send": c.send, + "remote_addr": c.RemoteAddr, + "user_agent": c.UserAgent, + } + + return json.Marshal(v) +} diff --git a/www/mse.html b/www/mse.html index 49446238..b65215c0 100644 --- a/www/mse.html +++ b/www/mse.html @@ -74,7 +74,11 @@ if (sourceBuffer.updating) { queueBuffer.push(ev.data) } else { - sourceBuffer.appendBuffer(ev.data); + try { + sourceBuffer.appendBuffer(ev.data); + } catch (e) { + console.warn(e); + } } } }