Fix lock on mp4 restarts

This commit is contained in:
Alexey Khit
2022-11-07 13:32:27 +03:00
parent e1bc30fab3
commit 3a07e9fa03
2 changed files with 20 additions and 19 deletions
+15 -14
View File
@@ -37,14 +37,14 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
cons := &mp4.Consumer{} cons := &mp4.Consumer{}
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
switch msg := msg.(type) { if data, ok := msg.([]byte); ok && exit != nil {
case []byte: exit <- data
exit <- msg exit = nil
} }
}) })
if err := stream.AddConsumer(cons); err != nil { if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Msg("[api.keyframe] add consumer") log.Error().Err(err).Caller().Send()
return return
} }
@@ -54,7 +54,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
data, err := cons.Init() data, err := cons.Init()
if err != nil { if err != nil {
log.Error().Err(err).Msg("[api.keyframe] init") log.Error().Err(err).Caller().Send()
return return
} }
data = append(data, <-exit...) data = append(data, <-exit...)
@@ -63,7 +63,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Length", strconv.Itoa(len(data))) w.Header().Set("Content-Length", strconv.Itoa(len(data)))
if _, err := w.Write(data); err != nil { if _, err := w.Write(data); err != nil {
log.Error().Err(err).Msg("[api.keyframe] add consumer") log.Error().Err(err).Caller().Send()
} }
} }
@@ -80,19 +80,20 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
return return
} }
exit := make(chan struct{}) exit := make(chan error)
cons := &mp4.Consumer{} cons := &mp4.Consumer{}
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
if _, err := w.Write(data); err != nil { if _, err := w.Write(data); err != nil && exit != nil {
exit <- struct{}{} exit <- err
exit = nil
} }
} }
}) })
if err := stream.AddConsumer(cons); err != nil { if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Msg("[api.mp4] add consumer") log.Error().Err(err).Caller().Send()
return return
} }
@@ -102,20 +103,20 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
data, err := cons.Init() data, err := cons.Init()
if err != nil { if err != nil {
log.Error().Err(err).Msg("[api.mp4] init") log.Error().Err(err).Caller().Send()
return return
} }
if _, err = w.Write(data); err != nil { if _, err = w.Write(data); err != nil {
log.Error().Err(err).Msg("[api.mp4] write") log.Error().Err(err).Caller().Send()
return return
} }
cons.Start() cons.Start()
<-exit err = <-exit
log.Trace().Msg("[api.mp4] close") log.Trace().Err(err).Caller().Send()
} }
func isChromeFirst(w http.ResponseWriter, r *http.Request) bool { func isChromeFirst(w http.ResponseWriter, r *http.Request) bool {
+5 -5
View File
@@ -13,7 +13,7 @@ type Track struct {
Codec *Codec Codec *Codec
Direction string Direction string
sink map[*Track]WriterFunc sink map[*Track]WriterFunc
sinkMu sync.Mutex sinkMu sync.RWMutex
} }
func (t *Track) String() string { func (t *Track) String() string {
@@ -23,11 +23,11 @@ func (t *Track) String() string {
} }
func (t *Track) WriteRTP(p *rtp.Packet) error { func (t *Track) WriteRTP(p *rtp.Packet) error {
t.sinkMu.Lock() t.sinkMu.RLock()
for _, f := range t.sink { for _, f := range t.sink {
_ = f(p) _ = f(p)
} }
t.sinkMu.Unlock() t.sinkMu.RUnlock()
return nil return nil
} }
@@ -59,7 +59,7 @@ func (t *Track) GetSink(from *Track) {
} }
func (t *Track) HasSink() bool { func (t *Track) HasSink() bool {
t.sinkMu.Lock() t.sinkMu.RLock()
defer t.sinkMu.Unlock() defer t.sinkMu.RUnlock()
return len(t.sink) > 0 return len(t.sink) > 0
} }