diff --git a/cmd/streams/producer.go b/cmd/streams/producer.go index 8a845f2c..083bb143 100644 --- a/cmd/streams/producer.go +++ b/cmd/streams/producer.go @@ -27,9 +27,9 @@ type Producer struct { lastErr error tracks []*streamer.Track - state state - mu sync.Mutex - restart *time.Timer + state state + mu sync.Mutex + workerID int } func (p *Producer) SetSource(s string) { @@ -104,20 +104,32 @@ func (p *Producer) start() { log.Debug().Msgf("[streams] start producer url=%s", p.url) p.state = stateStart - go func() { - // safe read element while mu locked - if err := p.element.Start(); err != nil { - log.Warn().Err(err).Str("url", p.url).Caller().Send() - } - p.reconnect() - }() + p.workerID++ + + go p.worker(p.element, p.workerID) } -func (p *Producer) reconnect() { +func (p *Producer) worker(element streamer.Producer, workerID int) { + if err := element.Start(); err != nil { + p.mu.Lock() + closed := p.workerID != workerID + p.mu.Unlock() + + if closed { + return + } + + log.Warn().Err(err).Str("url", p.url).Caller().Send() + } + + p.reconnect(workerID) +} + +func (p *Producer) reconnect(workerID int) { p.mu.Lock() defer p.mu.Unlock() - if p.state != stateStart { + if p.workerID != workerID { log.Trace().Msgf("[streams] stop reconnect url=%s", p.url) return } @@ -126,9 +138,11 @@ func (p *Producer) reconnect() { p.element, p.lastErr = GetProducer(p.url) if p.lastErr != nil || p.element == nil { - log.Debug().Err(p.lastErr).Caller().Send() + log.Debug().Msgf("[streams] producer=%s", p.lastErr) // TODO: dynamic timeout - p.restart = time.AfterFunc(30*time.Second, p.reconnect) + time.AfterFunc(30*time.Second, func() { + p.reconnect(workerID) + }) return } @@ -152,12 +166,7 @@ func (p *Producer) reconnect() { } } - go func() { - if err := p.element.Start(); err != nil { - log.Debug().Err(err).Caller().Send() - } - p.reconnect() - }() + go p.worker(p.element, workerID) } func (p *Producer) stop() { @@ -171,6 +180,8 @@ func (p *Producer) stop() { case stateNone: log.Debug().Msgf("[streams] can't stop none producer") return + case stateStart: + p.workerID++ } log.Debug().Msgf("[streams] stop producer url=%s", p.url) @@ -179,10 +190,6 @@ func (p *Producer) stop() { _ = p.element.Stop() p.element = nil } - if p.restart != nil { - p.restart.Stop() - p.restart = nil - } p.state = stateNone p.tracks = nil