diff --git a/cmd/streams/producer.go b/cmd/streams/producer.go index 95f2ac65..6eed311b 100644 --- a/cmd/streams/producer.go +++ b/cmd/streams/producer.go @@ -30,8 +30,6 @@ type Producer struct { receivers []*core.Receiver senders []*core.Receiver - lastErr error - state state mu sync.Mutex workerID int diff --git a/cmd/streams/stream.go b/cmd/streams/stream.go index 8c7be19d..468f8889 100644 --- a/cmd/streams/stream.go +++ b/cmd/streams/stream.go @@ -3,7 +3,6 @@ package streams import ( "encoding/json" "errors" - "fmt" "github.com/AlexxIT/go2rtc/pkg/core" "strings" "sync" @@ -50,9 +49,9 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) { // support for multiple simultaneous requests from different consumers consN := atomic.AddInt32(&s.requests, 1) - 1 - var producers []*Producer // matched producers for consumer - - var codecs string + var statErrors []error + var statMedias []*core.Media + var statProds []*Producer // matched producers for consumer // Step 1. Get consumer medias for _, consMedia := range cons.GetMedias() { @@ -62,14 +61,14 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) { for prodN, prod := range s.producers { if err = prod.Dial(); err != nil { log.Trace().Err(err).Msgf("[streams] skip prod=%s", prod.url) + statErrors = append(statErrors, err) continue } // Step 2. Get producer medias (not tracks yet) for _, prodMedia := range prod.GetMedias() { log.Trace().Msgf("[streams] check prod=%d media=%s", prodN, prodMedia) - - collectCodecs(prodMedia, &codecs) + statMedias = append(statMedias, prodMedia) // Step 3. Match consumer/producer codecs list prodCodec, consCodec := prodMedia.MatchMedia(consMedia) @@ -109,7 +108,7 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) { } } - producers = append(producers, prod) + statProds = append(statProds, prod) if !consMedia.MatchAll() { break producers @@ -123,18 +122,8 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) { s.stopProducers() } - if len(producers) == 0 { - if len(codecs) > 0 { - return errors.New("codecs not match: " + codecs) - } - - for i, producer := range s.producers { - if producer.lastErr != nil { - return fmt.Errorf("source %d error: %w", i, producer.lastErr) - } - } - - return fmt.Errorf("sources unavailable: %d", len(s.producers)) + if len(statProds) == 0 { + return formatError(statMedias, statErrors) } s.mu.Lock() @@ -142,7 +131,7 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) { s.mu.Unlock() // there may be duplicates, but that's not a problem - for _, prod := range producers { + for _, prod := range statProds { prod.start() } @@ -219,22 +208,47 @@ func (s *Stream) MarshalJSON() ([]byte, error) { return json.Marshal(info) } -func collectCodecs(media *core.Media, codecs *string) { - if media.Direction == core.DirectionRecvonly { - return - } +func formatError(statMedias []*core.Media, statErrors []error) error { + var text string - for _, codec := range media.Codecs { - name := codec.Name - if name == core.CodecAAC { - name = "AAC" - } - if strings.Contains(*codecs, name) { + for _, media := range statMedias { + if media.Direction == core.DirectionRecvonly { continue } - if len(*codecs) > 0 { - *codecs += "," + + for _, codec := range media.Codecs { + name := codec.Name + if name == core.CodecAAC { + name = "AAC" + } + if strings.Contains(text, name) { + continue + } + if len(text) > 0 { + text += "," + } + text += name } - *codecs += name } + + if text != "" { + return errors.New(text) + } + + for _, err := range statErrors { + s := err.Error() + if strings.Contains(text, s) { + continue + } + if len(text) > 0 { + text += "," + } + text += s + } + + if text != "" { + return errors.New(text) + } + + return errors.New("unknown error") }