From 7016289f140f97ccf28af4234c0facce9b1a2db1 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Sat, 25 Mar 2023 11:39:29 +0300 Subject: [PATCH] Adds dynamic timeouts on reconnect --- cmd/streams/producer.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/cmd/streams/producer.go b/cmd/streams/producer.go index 1af588fe..810703d2 100644 --- a/cmd/streams/producer.go +++ b/cmd/streams/producer.go @@ -157,10 +157,10 @@ func (p *Producer) worker(conn core.Producer, workerID int) { log.Warn().Err(err).Str("url", p.url).Caller().Send() } - p.reconnect(workerID) + p.reconnect(workerID, 0) } -func (p *Producer) reconnect(workerID int) { +func (p *Producer) reconnect(workerID, retry int) { p.mu.Lock() defer p.mu.Unlock() @@ -169,14 +169,23 @@ func (p *Producer) reconnect(workerID int) { return } - log.Debug().Msgf("[streams] reconnect to url=%s", p.url) + log.Debug().Msgf("[streams] retry=%d to url=%s", retry, p.url) var err error if p.conn, err = GetProducer(p.url); err != nil { log.Debug().Msgf("[streams] producer=%s", err) - // TODO: dynamic timeout - time.AfterFunc(30*time.Second, func() { - p.reconnect(workerID) + + timeout := time.Minute + if retry < 5 { + timeout = time.Second + } else if retry < 10 { + timeout = time.Second * 5 + } else if retry < 20 { + timeout = time.Second * 10 + } + + time.AfterFunc(timeout, func() { + p.reconnect(workerID, retry+1) }) return }