Fix simultaneous stream reconnect and start

This commit is contained in:
Alexey Khit
2023-01-13 18:03:17 +03:00
parent 5407a3bc4b
commit 7b326d4753
+29 -22
View File
@@ -29,7 +29,7 @@ type Producer struct {
state state state state
mu sync.Mutex mu sync.Mutex
restart *time.Timer workerID int
} }
func (p *Producer) SetSource(s string) { func (p *Producer) SetSource(s string) {
@@ -104,20 +104,32 @@ func (p *Producer) start() {
log.Debug().Msgf("[streams] start producer url=%s", p.url) log.Debug().Msgf("[streams] start producer url=%s", p.url)
p.state = stateStart p.state = stateStart
go func() { p.workerID++
// safe read element while mu locked
if err := p.element.Start(); err != nil { go p.worker(p.element, p.workerID)
log.Warn().Err(err).Str("url", p.url).Caller().Send()
}
p.reconnect()
}()
} }
func (p *Producer) reconnect() { func (p *Producer) worker(element streamer.Producer, workerID int) {
if err := element.Start(); err != nil {
p.mu.Lock()
closed := p.workerID != workerID
p.mu.Unlock()
if closed {
return
}
log.Warn().Err(err).Str("url", p.url).Caller().Send()
}
p.reconnect(workerID)
}
func (p *Producer) reconnect(workerID int) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if p.state != stateStart { if p.workerID != workerID {
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url) log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
return return
} }
@@ -126,9 +138,11 @@ func (p *Producer) reconnect() {
p.element, p.lastErr = GetProducer(p.url) p.element, p.lastErr = GetProducer(p.url)
if p.lastErr != nil || p.element == nil { if p.lastErr != nil || p.element == nil {
log.Debug().Err(p.lastErr).Caller().Send() log.Debug().Msgf("[streams] producer=%s", p.lastErr)
// TODO: dynamic timeout // TODO: dynamic timeout
p.restart = time.AfterFunc(30*time.Second, p.reconnect) time.AfterFunc(30*time.Second, func() {
p.reconnect(workerID)
})
return return
} }
@@ -152,12 +166,7 @@ func (p *Producer) reconnect() {
} }
} }
go func() { go p.worker(p.element, workerID)
if err := p.element.Start(); err != nil {
log.Debug().Err(err).Caller().Send()
}
p.reconnect()
}()
} }
func (p *Producer) stop() { func (p *Producer) stop() {
@@ -171,6 +180,8 @@ func (p *Producer) stop() {
case stateNone: case stateNone:
log.Debug().Msgf("[streams] can't stop none producer") log.Debug().Msgf("[streams] can't stop none producer")
return return
case stateStart:
p.workerID++
} }
log.Debug().Msgf("[streams] stop producer url=%s", p.url) log.Debug().Msgf("[streams] stop producer url=%s", p.url)
@@ -179,10 +190,6 @@ func (p *Producer) stop() {
_ = p.element.Stop() _ = p.element.Stop()
p.element = nil p.element = nil
} }
if p.restart != nil {
p.restart.Stop()
p.restart = nil
}
p.state = stateNone p.state = stateNone
p.tracks = nil p.tracks = nil