diff --git a/internal/streams/stream.go b/internal/streams/stream.go index 49c58e77..5dacf991 100644 --- a/internal/streams/stream.go +++ b/internal/streams/stream.go @@ -88,6 +88,11 @@ func (s *Stream) RemoveProducer(prod core.Producer) { } func (s *Stream) stopProducers() { + if s.pending.Load() > 0 { + log.Trace().Msg("[streams] skip stop pending producer") + return + } + s.mu.Lock() producers: for _, producer := range s.producers {