From 5f9788209d0ad50d3183893e43821ec1ec9ab67a Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Thu, 20 Apr 2023 13:20:52 +0300 Subject: [PATCH] Move MP4 mutex from HTTP to Muxer --- cmd/mp4/mp4.go | 8 +------- pkg/mp4/consumer.go | 15 +++++++++++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/cmd/mp4/mp4.go b/cmd/mp4/mp4.go index c0fc343f..825cb647 100644 --- a/cmd/mp4/mp4.go +++ b/cmd/mp4/mp4.go @@ -4,7 +4,6 @@ import ( "net/http" "strconv" "strings" - "sync" "time" "github.com/AlexxIT/go2rtc/cmd/api" @@ -110,14 +109,9 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { Medias: core.ParseQuery(r.URL.Query()), } - var mu sync.Mutex cons.Listen(func(msg any) { if data, ok := msg.([]byte); ok { - mu.Lock() - _, err := w.Write(data) - mu.Unlock() - - if err != nil && exit != nil { + if _, err := w.Write(data); err != nil && exit != nil { select { case exit <- err: default: diff --git a/pkg/mp4/consumer.go b/pkg/mp4/consumer.go index 47555540..a720c8ac 100644 --- a/pkg/mp4/consumer.go +++ b/pkg/mp4/consumer.go @@ -7,6 +7,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h265" "github.com/pion/rtp" + "sync" ) type Consumer struct { @@ -19,6 +20,7 @@ type Consumer struct { senders []*core.Sender muxer *Muxer + mu sync.Mutex wait byte send int @@ -70,10 +72,12 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv c.wait = waitNone } + // important to use Mutex because right fragment order + c.mu.Lock() buf := c.muxer.Marshal(trackID, packet) c.Fire(buf) - c.send += len(buf) + c.mu.Unlock() } if track.Codec.IsRTP() { @@ -97,10 +101,11 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv c.wait = waitNone } + c.mu.Lock() buf := c.muxer.Marshal(trackID, packet) c.Fire(buf) - c.send += len(buf) + c.mu.Unlock() } if track.Codec.IsRTP() { @@ -113,10 +118,11 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv return } + c.mu.Lock() buf := c.muxer.Marshal(trackID, packet) c.Fire(buf) - c.send += len(buf) + c.mu.Unlock() } if track.Codec.IsRTP() { @@ -129,10 +135,11 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv return } + c.mu.Lock() buf := c.muxer.Marshal(trackID, packet) c.Fire(buf) - c.send += len(buf) + c.mu.Unlock() } default: