Adds dynamic timeouts on reconnect
This commit is contained in:
+15
-6
@@ -157,10 +157,10 @@ func (p *Producer) worker(conn core.Producer, workerID int) {
|
|||||||
log.Warn().Err(err).Str("url", p.url).Caller().Send()
|
log.Warn().Err(err).Str("url", p.url).Caller().Send()
|
||||||
}
|
}
|
||||||
|
|
||||||
p.reconnect(workerID)
|
p.reconnect(workerID, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Producer) reconnect(workerID int) {
|
func (p *Producer) reconnect(workerID, retry int) {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
@@ -169,14 +169,23 @@ func (p *Producer) reconnect(workerID int) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
|
log.Debug().Msgf("[streams] retry=%d to url=%s", retry, p.url)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if p.conn, err = GetProducer(p.url); err != nil {
|
if p.conn, err = GetProducer(p.url); err != nil {
|
||||||
log.Debug().Msgf("[streams] producer=%s", err)
|
log.Debug().Msgf("[streams] producer=%s", err)
|
||||||
// TODO: dynamic timeout
|
|
||||||
time.AfterFunc(30*time.Second, func() {
|
timeout := time.Minute
|
||||||
p.reconnect(workerID)
|
if retry < 5 {
|
||||||
|
timeout = time.Second
|
||||||
|
} else if retry < 10 {
|
||||||
|
timeout = time.Second * 5
|
||||||
|
} else if retry < 20 {
|
||||||
|
timeout = time.Second * 10
|
||||||
|
}
|
||||||
|
|
||||||
|
time.AfterFunc(timeout, func() {
|
||||||
|
p.reconnect(workerID, retry+1)
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user