From 3a07e9fa03df570b05e9bd4f2a3097a867896b68 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Mon, 7 Nov 2022 13:32:27 +0300 Subject: [PATCH] Fix lock on mp4 restarts --- cmd/mp4/mp4.go | 29 +++++++++++++++-------------- pkg/streamer/track.go | 10 +++++----- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/cmd/mp4/mp4.go b/cmd/mp4/mp4.go index 57f5e0a3..43272463 100644 --- a/cmd/mp4/mp4.go +++ b/cmd/mp4/mp4.go @@ -37,14 +37,14 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { cons := &mp4.Consumer{} cons.Listen(func(msg interface{}) { - switch msg := msg.(type) { - case []byte: - exit <- msg + if data, ok := msg.([]byte); ok && exit != nil { + exit <- data + exit = nil } }) if err := stream.AddConsumer(cons); err != nil { - log.Error().Err(err).Msg("[api.keyframe] add consumer") + log.Error().Err(err).Caller().Send() return } @@ -54,7 +54,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { data, err := cons.Init() if err != nil { - log.Error().Err(err).Msg("[api.keyframe] init") + log.Error().Err(err).Caller().Send() return } data = append(data, <-exit...) @@ -63,7 +63,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Length", strconv.Itoa(len(data))) if _, err := w.Write(data); err != nil { - log.Error().Err(err).Msg("[api.keyframe] add consumer") + log.Error().Err(err).Caller().Send() } } @@ -80,19 +80,20 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { return } - exit := make(chan struct{}) + exit := make(chan error) cons := &mp4.Consumer{} cons.Listen(func(msg interface{}) { if data, ok := msg.([]byte); ok { - if _, err := w.Write(data); err != nil { - exit <- struct{}{} + if _, err := w.Write(data); err != nil && exit != nil { + exit <- err + exit = nil } } }) if err := stream.AddConsumer(cons); err != nil { - log.Error().Err(err).Msg("[api.mp4] add consumer") + log.Error().Err(err).Caller().Send() return } @@ -102,20 +103,20 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { data, err := cons.Init() if err != nil { - log.Error().Err(err).Msg("[api.mp4] init") + log.Error().Err(err).Caller().Send() return } if _, err = w.Write(data); err != nil { - log.Error().Err(err).Msg("[api.mp4] write") + log.Error().Err(err).Caller().Send() return } cons.Start() - <-exit + err = <-exit - log.Trace().Msg("[api.mp4] close") + log.Trace().Err(err).Caller().Send() } func isChromeFirst(w http.ResponseWriter, r *http.Request) bool { diff --git a/pkg/streamer/track.go b/pkg/streamer/track.go index 7892c91c..b3c2e5d2 100644 --- a/pkg/streamer/track.go +++ b/pkg/streamer/track.go @@ -13,7 +13,7 @@ type Track struct { Codec *Codec Direction string sink map[*Track]WriterFunc - sinkMu sync.Mutex + sinkMu sync.RWMutex } func (t *Track) String() string { @@ -23,11 +23,11 @@ func (t *Track) String() string { } func (t *Track) WriteRTP(p *rtp.Packet) error { - t.sinkMu.Lock() + t.sinkMu.RLock() for _, f := range t.sink { _ = f(p) } - t.sinkMu.Unlock() + t.sinkMu.RUnlock() return nil } @@ -59,7 +59,7 @@ func (t *Track) GetSink(from *Track) { } func (t *Track) HasSink() bool { - t.sinkMu.Lock() - defer t.sinkMu.Unlock() + t.sinkMu.RLock() + defer t.sinkMu.RUnlock() return len(t.sink) > 0 }