diff --git a/cmd/streams/stream.go b/cmd/streams/stream.go index 2c175913..859e4f5b 100644 --- a/cmd/streams/stream.go +++ b/cmd/streams/stream.go @@ -7,6 +7,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/streamer" "strings" "sync" + "sync/atomic" ) type Consumer struct { @@ -18,7 +19,7 @@ type Stream struct { producers []*Producer consumers []*Consumer mu sync.Mutex - wg sync.WaitGroup + requests int32 } func NewStream(source interface{}) *Stream { @@ -53,6 +54,9 @@ func (s *Stream) SetSource(source string) { } func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { + // support for multiple simultaneous requests from different consumers + atomic.AddInt32(&s.requests, 1) + ic := len(s.consumers) consumer := &Consumer{element: cons} @@ -60,9 +64,6 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { var codecs string - // support for multiple simultaneous requests from different consumers - s.wg.Add(1) - // Step 1. Get consumer medias for icc, consMedia := range cons.GetMedias() { log.Trace().Stringer("media", consMedia). @@ -86,7 +87,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { // Step 4. Get producer track prodTrack := prod.GetTrack(prodMedia, prodCodec) if prodTrack == nil { - log.Warn().Str("url", prod.url).Msg("[stream] can't get track") + log.Warn().Str("url", prod.url).Msg("[streams] can't get track") continue } @@ -101,13 +102,11 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { } } - s.wg.Done() + if atomic.AddInt32(&s.requests, -1) == 0 { + s.stopProducers() + } if len(producers) == 0 { - s.wg.Wait() - - s.stopProducers() - if len(codecs) > 0 { return errors.New("codecs not match: " + codecs) }