From 4c0929d854d7b811809e9bb1147d1453222444b2 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Fri, 2 Dec 2022 21:32:18 +0300 Subject: [PATCH] Support MP4 over WebSocket --- cmd/mp4/mp4.go | 5 +-- cmd/mp4/{mse.go => ws.go} | 39 ++++++++++++++++---- pkg/mp4/consumer.go | 32 +++++++++++++---- pkg/mp4/muxer.go | 18 +++++----- pkg/mp4/{keyframe.go => segment.go} | 56 ++++++++++++++++++++++++----- 5 files changed, 118 insertions(+), 32 deletions(-) rename cmd/mp4/{mse.go => ws.go} (71%) rename pkg/mp4/{keyframe.go => segment.go} (53%) diff --git a/cmd/mp4/mp4.go b/cmd/mp4/mp4.go index 2160e38d..5779d0ff 100644 --- a/cmd/mp4/mp4.go +++ b/cmd/mp4/mp4.go @@ -15,7 +15,8 @@ import ( func Init() { log = app.GetLogger("mp4") - api.HandleWS(MsgTypeMSE, handlerWS) + api.HandleWS("mse", handlerWS) + api.HandleWS("mp4", handlerWS4) api.HandleFunc("api/frame.mp4", handlerKeyframe) api.HandleFunc("api/stream.mp4", handlerMP4) @@ -36,7 +37,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { exit := make(chan []byte) - cons := &mp4.Keyframe{} + cons := &mp4.Segment{OnlyKeyframe: true} cons.Listen(func(msg interface{}) { if data, ok := msg.([]byte); ok && exit != nil { exit <- data diff --git a/cmd/mp4/mse.go b/cmd/mp4/ws.go similarity index 71% rename from cmd/mp4/mse.go rename to cmd/mp4/ws.go index 3590fcea..d1268507 100644 --- a/cmd/mp4/mse.go +++ b/cmd/mp4/ws.go @@ -8,8 +8,6 @@ import ( "strings" ) -const MsgTypeMSE = "mse" // fMP4 - const packetSize = 8192 func handlerWS(ctx *api.Context, msg *streamer.Message) { @@ -24,7 +22,7 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) { cons.RemoteAddr = ctx.Request.RemoteAddr if codecs, ok := msg.Value.(string); ok { - cons.Medias = parseMedias(codecs) + cons.Medias = parseMedias(codecs, true) } cons.Listen(func(msg interface{}) { @@ -47,7 +45,7 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) { stream.RemoveConsumer(cons) }) - ctx.Write(&streamer.Message{Type: MsgTypeMSE, Value: cons.MimeType()}) + ctx.Write(&streamer.Message{Type: "mse", Value: cons.MimeType()}) data, err := cons.Init() if err != nil { @@ -61,7 +59,36 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) { cons.Start() } -func parseMedias(codecs string) (medias []*streamer.Media) { +func handlerWS4(ctx *api.Context, msg *streamer.Message) { + src := ctx.Request.URL.Query().Get("src") + stream := streams.GetOrNew(src) + if stream == nil { + return + } + + cons := &mp4.Segment{} + + if codecs, ok := msg.Value.(string); ok { + cons.Medias = parseMedias(codecs, false) + } + + cons.Listen(func(msg interface{}) { + if data, ok := msg.([]byte); ok { + ctx.Write(data) + } + }) + + if err := stream.AddConsumer(cons); err != nil { + log.Error().Err(err).Caller().Send() + return + } + + ctx.OnClose(func() { + stream.RemoveConsumer(cons) + }) +} + +func parseMedias(codecs string, parseAudio bool) (medias []*streamer.Media) { var videos []*streamer.Codec var audios []*streamer.Codec @@ -88,7 +115,7 @@ func parseMedias(codecs string) (medias []*streamer.Media) { medias = append(medias, media) } - if audios != nil { + if audios != nil && parseAudio { media := &streamer.Media{ Kind: streamer.KindAudio, Direction: streamer.DirectionRecvonly, diff --git a/pkg/mp4/consumer.go b/pkg/mp4/consumer.go index bbfbe40c..342f4dca 100644 --- a/pkg/mp4/consumer.go +++ b/pkg/mp4/consumer.go @@ -18,11 +18,17 @@ type Consumer struct { muxer *Muxer codecs []*streamer.Codec - start bool + wait byte send int } +const ( + waitNone byte = iota + waitKeyframe + waitInit +) + func (c *Consumer) GetMedias() []*streamer.Media { if c.Medias != nil { return c.Medias @@ -55,13 +61,18 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea codec := track.Codec switch codec.Name { case streamer.CodecH264: + c.wait = waitInit + push := func(packet *rtp.Packet) error { if packet.Version != h264.RTPPacketVersionAVC { return nil } - if !c.start { - return nil + if c.wait != waitNone { + if c.wait == waitInit || !h264.IsKeyframe(packet.Payload) { + return nil + } + c.wait = waitNone } buf := c.muxer.Marshal(trackID, packet) @@ -82,13 +93,18 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea return track.Bind(push) case streamer.CodecH265: + c.wait = waitInit + push := func(packet *rtp.Packet) error { if packet.Version != h264.RTPPacketVersionAVC { return nil } - if !c.start { - return nil + if c.wait != waitNone { + if c.wait == waitInit || !h265.IsKeyframe(packet.Payload) { + return nil + } + c.wait = waitNone } buf := c.muxer.Marshal(trackID, packet) @@ -107,7 +123,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea case streamer.CodecAAC: push := func(packet *rtp.Packet) error { - if !c.start { + if c.wait != waitNone { return nil } @@ -139,7 +155,9 @@ func (c *Consumer) Init() ([]byte, error) { } func (c *Consumer) Start() { - c.start = true + if c.wait == waitInit { + c.wait = waitKeyframe + } } // diff --git a/pkg/mp4/muxer.go b/pkg/mp4/muxer.go index ed99bb74..efd4b912 100644 --- a/pkg/mp4/muxer.go +++ b/pkg/mp4/muxer.go @@ -19,8 +19,6 @@ type Muxer struct { fragIndex uint32 dts []uint64 pts []uint32 - //data []byte - //total int } func (m *Muxer) MimeType(codecs []*streamer.Codec) string { @@ -185,10 +183,13 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) { return append(FTYP(), data...), nil } -//func (m *Muxer) Rewind() { -// m.dts = 0 -// m.pts = 0 -//} +func (m *Muxer) Reset() { + m.fragIndex = 0 + for i := range m.dts { + m.dts[i] = 0 + m.pts[i] = 0 + } +} func (m *Muxer) Marshal(trackID byte, packet *rtp.Packet) []byte { run := &mp4fio.TrackFragRun{ @@ -218,15 +219,16 @@ func (m *Muxer) Marshal(trackID byte, packet *rtp.Packet) []byte { } entry := mp4io.TrackFragRunEntry{ - //Duration: 90000, Size: uint32(len(packet.Payload)), } newTime := packet.Timestamp if m.pts[trackID] > 0 { - //m.dts += uint64(newTime - m.pts) entry.Duration = newTime - m.pts[trackID] m.dts[trackID] += uint64(entry.Duration) + } else { + // important, or Safari will fail with first frame + entry.Duration = 1 } m.pts[trackID] = newTime diff --git a/pkg/mp4/keyframe.go b/pkg/mp4/segment.go similarity index 53% rename from pkg/mp4/keyframe.go rename to pkg/mp4/segment.go index 058d98b1..7411701d 100644 --- a/pkg/mp4/keyframe.go +++ b/pkg/mp4/segment.go @@ -7,13 +7,20 @@ import ( "github.com/pion/rtp" ) -type Keyframe struct { +type Segment struct { streamer.Element - MimeType string + Medias []*streamer.Media + MimeType string + OnlyKeyframe bool } -func (c *Keyframe) GetMedias() []*streamer.Media { +func (c *Segment) GetMedias() []*streamer.Media { + if c.Medias != nil { + return c.Medias + } + + // default medias return []*streamer.Media{ { Kind: streamer.KindVideo, @@ -26,7 +33,7 @@ func (c *Keyframe) GetMedias() []*streamer.Media { } } -func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { +func (c *Segment) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { muxer := &Muxer{} codecs := []*streamer.Codec{track.Codec} @@ -40,15 +47,46 @@ func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *strea switch track.Codec.Name { case streamer.CodecH264: - push := func(packet *rtp.Packet) error { - if !h264.IsKeyframe(packet.Payload) { + var push streamer.WriterFunc + + if c.OnlyKeyframe { + push = func(packet *rtp.Packet) error { + if !h264.IsKeyframe(packet.Payload) { + return nil + } + + buf := muxer.Marshal(0, packet) + c.Fire(append(init, buf...)) + return nil } + } else { + var buf []byte - buf := muxer.Marshal(0, packet) - c.Fire(append(init, buf...)) + push = func(packet *rtp.Packet) error { + if h264.IsKeyframe(packet.Payload) { + // fist frame - send only IFrame + // other frames - send IFrame and all PFrames + if buf == nil { + buf = append(buf, init...) + b := muxer.Marshal(0, packet) + buf = append(buf, b...) + } - return nil + c.Fire(buf) + + buf = buf[:0] + buf = append(buf, init...) + muxer.Reset() + } + + if buf != nil { + b := muxer.Marshal(0, packet) + buf = append(buf, b...) + } + + return nil + } } var wrapper streamer.WrapperFunc