Fix multiple requests from different consumers

This commit is contained in:
Alexey Khit
2023-01-13 18:02:03 +03:00
parent 6b24421722
commit 5407a3bc4b
+9 -10
View File
@@ -7,6 +7,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strings" "strings"
"sync" "sync"
"sync/atomic"
) )
type Consumer struct { type Consumer struct {
@@ -18,7 +19,7 @@ type Stream struct {
producers []*Producer producers []*Producer
consumers []*Consumer consumers []*Consumer
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup requests int32
} }
func NewStream(source interface{}) *Stream { func NewStream(source interface{}) *Stream {
@@ -53,6 +54,9 @@ func (s *Stream) SetSource(source string) {
} }
func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
// support for multiple simultaneous requests from different consumers
atomic.AddInt32(&s.requests, 1)
ic := len(s.consumers) ic := len(s.consumers)
consumer := &Consumer{element: cons} consumer := &Consumer{element: cons}
@@ -60,9 +64,6 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
var codecs string var codecs string
// support for multiple simultaneous requests from different consumers
s.wg.Add(1)
// Step 1. Get consumer medias // Step 1. Get consumer medias
for icc, consMedia := range cons.GetMedias() { for icc, consMedia := range cons.GetMedias() {
log.Trace().Stringer("media", consMedia). log.Trace().Stringer("media", consMedia).
@@ -86,7 +87,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
// Step 4. Get producer track // Step 4. Get producer track
prodTrack := prod.GetTrack(prodMedia, prodCodec) prodTrack := prod.GetTrack(prodMedia, prodCodec)
if prodTrack == nil { if prodTrack == nil {
log.Warn().Str("url", prod.url).Msg("[stream] can't get track") log.Warn().Str("url", prod.url).Msg("[streams] can't get track")
continue continue
} }
@@ -101,13 +102,11 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
} }
} }
s.wg.Done() if atomic.AddInt32(&s.requests, -1) == 0 {
s.stopProducers()
}
if len(producers) == 0 { if len(producers) == 0 {
s.wg.Wait()
s.stopProducers()
if len(codecs) > 0 { if len(codecs) > 0 {
return errors.New("codecs not match: " + codecs) return errors.New("codecs not match: " + codecs)
} }