Add mutex to stream

This commit is contained in:
Alexey Khit
2022-11-04 22:20:52 +03:00
parent e287b52808
commit f4f588d2c6
+25 -13
View File
@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"sync"
) )
type Consumer struct { type Consumer struct {
@@ -14,6 +15,7 @@ type Consumer struct {
type Stream struct { type Stream struct {
producers []*Producer producers []*Producer
consumers []*Consumer consumers []*Consumer
mu sync.Mutex
} }
func NewStream(source interface{}) *Stream { func NewStream(source interface{}) *Stream {
@@ -93,7 +95,9 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
return errors.New("couldn't find the matching tracks") return errors.New("couldn't find the matching tracks")
} }
s.mu.Lock()
s.consumers = append(s.consumers, consumer) s.consumers = append(s.consumers, consumer)
s.mu.Unlock()
// there may be duplicates, but that's not a problem // there may be duplicates, but that's not a problem
for _, prod := range producers { for _, prod := range producers {
@@ -104,6 +108,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
} }
func (s *Stream) RemoveConsumer(cons streamer.Consumer) { func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
s.mu.Lock()
for i, consumer := range s.consumers { for i, consumer := range s.consumers {
if consumer == nil { if consumer == nil {
log.Warn().Msgf("empty consumer: %+v\n", s) log.Warn().Msgf("empty consumer: %+v\n", s)
@@ -137,38 +142,44 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
producer.stop() producer.stop()
} }
} }
s.mu.Unlock()
} }
func (s *Stream) AddProducer(prod streamer.Producer) { func (s *Stream) AddProducer(prod streamer.Producer) {
producer := &Producer{element: prod, state: stateTracks} producer := &Producer{element: prod, state: stateTracks}
s.mu.Lock()
s.producers = append(s.producers, producer) s.producers = append(s.producers, producer)
s.mu.Unlock()
} }
func (s *Stream) RemoveProducer(prod streamer.Producer) { func (s *Stream) RemoveProducer(prod streamer.Producer) {
s.mu.Lock()
for i, producer := range s.producers { for i, producer := range s.producers {
if producer.element == prod { if producer.element == prod {
s.removeProducer(i) s.removeProducer(i)
break break
} }
} }
s.mu.Unlock()
} }
func (s *Stream) Active() bool { //func (s *Stream) Active() bool {
if len(s.consumers) > 0 { // if len(s.consumers) > 0 {
return true // return true
} // }
//
for _, prod := range s.producers { // for _, prod := range s.producers {
if prod.element != nil { // if prod.element != nil {
return true // return true
} // }
} // }
//
return false // return false
} //}
func (s *Stream) MarshalJSON() ([]byte, error) { func (s *Stream) MarshalJSON() ([]byte, error) {
var v []interface{} var v []interface{}
s.mu.Lock()
for _, prod := range s.producers { for _, prod := range s.producers {
if prod.element != nil { if prod.element != nil {
v = append(v, prod.element) v = append(v, prod.element)
@@ -178,6 +189,7 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
// cons.element always not nil // cons.element always not nil
v = append(v, cons.element) v = append(v, cons.element)
} }
s.mu.Unlock()
if len(v) == 0 { if len(v) == 0 {
v = nil v = nil
} }