Update error msg for stream start

This commit is contained in:
Alexey Khit
2023-04-17 15:04:45 +03:00
parent 116319f876
commit edb4e6eaad
2 changed files with 47 additions and 35 deletions
-2
View File
@@ -30,8 +30,6 @@ type Producer struct {
receivers []*core.Receiver receivers []*core.Receiver
senders []*core.Receiver senders []*core.Receiver
lastErr error
state state state state
mu sync.Mutex mu sync.Mutex
workerID int workerID int
+47 -33
View File
@@ -3,7 +3,6 @@ package streams
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"strings" "strings"
"sync" "sync"
@@ -50,9 +49,9 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
// support for multiple simultaneous requests from different consumers // support for multiple simultaneous requests from different consumers
consN := atomic.AddInt32(&s.requests, 1) - 1 consN := atomic.AddInt32(&s.requests, 1) - 1
var producers []*Producer // matched producers for consumer var statErrors []error
var statMedias []*core.Media
var codecs string var statProds []*Producer // matched producers for consumer
// Step 1. Get consumer medias // Step 1. Get consumer medias
for _, consMedia := range cons.GetMedias() { for _, consMedia := range cons.GetMedias() {
@@ -62,14 +61,14 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
for prodN, prod := range s.producers { for prodN, prod := range s.producers {
if err = prod.Dial(); err != nil { if err = prod.Dial(); err != nil {
log.Trace().Err(err).Msgf("[streams] skip prod=%s", prod.url) log.Trace().Err(err).Msgf("[streams] skip prod=%s", prod.url)
statErrors = append(statErrors, err)
continue continue
} }
// Step 2. Get producer medias (not tracks yet) // Step 2. Get producer medias (not tracks yet)
for _, prodMedia := range prod.GetMedias() { for _, prodMedia := range prod.GetMedias() {
log.Trace().Msgf("[streams] check prod=%d media=%s", prodN, prodMedia) log.Trace().Msgf("[streams] check prod=%d media=%s", prodN, prodMedia)
statMedias = append(statMedias, prodMedia)
collectCodecs(prodMedia, &codecs)
// Step 3. Match consumer/producer codecs list // Step 3. Match consumer/producer codecs list
prodCodec, consCodec := prodMedia.MatchMedia(consMedia) prodCodec, consCodec := prodMedia.MatchMedia(consMedia)
@@ -109,7 +108,7 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
} }
} }
producers = append(producers, prod) statProds = append(statProds, prod)
if !consMedia.MatchAll() { if !consMedia.MatchAll() {
break producers break producers
@@ -123,18 +122,8 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
s.stopProducers() s.stopProducers()
} }
if len(producers) == 0 { if len(statProds) == 0 {
if len(codecs) > 0 { return formatError(statMedias, statErrors)
return errors.New("codecs not match: " + codecs)
}
for i, producer := range s.producers {
if producer.lastErr != nil {
return fmt.Errorf("source %d error: %w", i, producer.lastErr)
}
}
return fmt.Errorf("sources unavailable: %d", len(s.producers))
} }
s.mu.Lock() s.mu.Lock()
@@ -142,7 +131,7 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) {
s.mu.Unlock() s.mu.Unlock()
// there may be duplicates, but that's not a problem // there may be duplicates, but that's not a problem
for _, prod := range producers { for _, prod := range statProds {
prod.start() prod.start()
} }
@@ -219,22 +208,47 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
return json.Marshal(info) return json.Marshal(info)
} }
func collectCodecs(media *core.Media, codecs *string) { func formatError(statMedias []*core.Media, statErrors []error) error {
if media.Direction == core.DirectionRecvonly { var text string
return
}
for _, codec := range media.Codecs { for _, media := range statMedias {
name := codec.Name if media.Direction == core.DirectionRecvonly {
if name == core.CodecAAC {
name = "AAC"
}
if strings.Contains(*codecs, name) {
continue continue
} }
if len(*codecs) > 0 {
*codecs += "," for _, codec := range media.Codecs {
name := codec.Name
if name == core.CodecAAC {
name = "AAC"
}
if strings.Contains(text, name) {
continue
}
if len(text) > 0 {
text += ","
}
text += name
} }
*codecs += name
} }
if text != "" {
return errors.New(text)
}
for _, err := range statErrors {
s := err.Error()
if strings.Contains(text, s) {
continue
}
if len(text) > 0 {
text += ","
}
text += s
}
if text != "" {
return errors.New(text)
}
return errors.New("unknown error")
} }