From f4f588d2c6db10a862eb8d498310221f480b5037 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Fri, 4 Nov 2022 22:20:52 +0300 Subject: [PATCH] Add mutex to stream --- cmd/streams/stream.go | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/cmd/streams/stream.go b/cmd/streams/stream.go index 451c6b4a..6c1d0d53 100644 --- a/cmd/streams/stream.go +++ b/cmd/streams/stream.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "github.com/AlexxIT/go2rtc/pkg/streamer" + "sync" ) type Consumer struct { @@ -14,6 +15,7 @@ type Consumer struct { type Stream struct { producers []*Producer consumers []*Consumer + mu sync.Mutex } func NewStream(source interface{}) *Stream { @@ -93,7 +95,9 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { return errors.New("couldn't find the matching tracks") } + s.mu.Lock() s.consumers = append(s.consumers, consumer) + s.mu.Unlock() // there may be duplicates, but that's not a problem for _, prod := range producers { @@ -104,6 +108,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { } func (s *Stream) RemoveConsumer(cons streamer.Consumer) { + s.mu.Lock() for i, consumer := range s.consumers { if consumer == nil { log.Warn().Msgf("empty consumer: %+v\n", s) @@ -137,38 +142,44 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) { producer.stop() } } + s.mu.Unlock() } func (s *Stream) AddProducer(prod streamer.Producer) { producer := &Producer{element: prod, state: stateTracks} + s.mu.Lock() s.producers = append(s.producers, producer) + s.mu.Unlock() } func (s *Stream) RemoveProducer(prod streamer.Producer) { + s.mu.Lock() for i, producer := range s.producers { if producer.element == prod { s.removeProducer(i) break } } + s.mu.Unlock() } -func (s *Stream) Active() bool { - if len(s.consumers) > 0 { - return true - } - - for _, prod := range s.producers { - if prod.element != nil { - return true - } - } - - return false -} +//func (s *Stream) Active() bool { +// if len(s.consumers) > 0 { +// return true +// } +// +// for _, prod := range s.producers { +// if prod.element != nil { +// return true +// } +// } +// +// return false +//} func (s *Stream) MarshalJSON() ([]byte, error) { var v []interface{} + s.mu.Lock() for _, prod := range s.producers { if prod.element != nil { v = append(v, prod.element) @@ -178,6 +189,7 @@ func (s *Stream) MarshalJSON() ([]byte, error) { // cons.element always not nil v = append(v, cons.element) } + s.mu.Unlock() if len(v) == 0 { v = nil }