From 4815ce1baf382f7d5c81ad3c9753fc0c6095b5a8 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Sun, 13 Nov 2022 22:24:37 +0300 Subject: [PATCH] Fix stop ffmpeg without matching tracks --- cmd/streams/producer.go | 6 +++++- cmd/streams/stream.go | 39 +++++++++++++++++---------------------- pkg/rtsp/conn.go | 5 +---- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/cmd/streams/producer.go b/cmd/streams/producer.go index e67f891b..84235069 100644 --- a/cmd/streams/producer.go +++ b/cmd/streams/producer.go @@ -160,9 +160,13 @@ func (p *Producer) stop() { p.mu.Lock() defer p.mu.Unlock() - if p.state == stateExternal { + switch p.state { + case stateExternal: log.Debug().Msgf("[streams] can't stop external producer") return + case stateNone: + log.Debug().Msgf("[streams] can't stop none producer") + return } log.Debug().Msgf("[streams] stop producer url=%s", p.url) diff --git a/cmd/streams/stream.go b/cmd/streams/stream.go index 2f6f3ab6..b819f6e1 100644 --- a/cmd/streams/stream.go +++ b/cmd/streams/stream.go @@ -92,6 +92,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { } if len(producers) == 0 { + s.stopProducers() return errors.New("couldn't find the matching tracks") } @@ -110,11 +111,6 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { func (s *Stream) RemoveConsumer(cons streamer.Consumer) { s.mu.Lock() for i, consumer := range s.consumers { - if consumer == nil { - log.Warn().Msgf("empty consumer: %+v\n", s) - continue - } - if consumer.element == cons { // remove consumer pads from all producers for _, track := range consumer.tracks { @@ -125,24 +121,9 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) { break } } - - for _, producer := range s.producers { - if producer == nil { - log.Warn().Msgf("empty producer: %+v\n", s) - continue - } - - var sink bool - for _, track := range producer.tracks { - if track.HasSink() { - sink = true - } - } - if !sink { - producer.stop() - } - } s.mu.Unlock() + + s.stopProducers() } func (s *Stream) AddProducer(prod streamer.Producer) { @@ -163,6 +144,20 @@ func (s *Stream) RemoveProducer(prod streamer.Producer) { s.mu.Unlock() } +func (s *Stream) stopProducers() { + s.mu.Lock() +producers: + for _, producer := range s.producers { + for _, track := range producer.tracks { + if track.HasSink() { + continue producers + } + } + producer.stop() + } + s.mu.Unlock() +} + //func (s *Stream) Active() bool { // if len(s.consumers) > 0 { // return true diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index a567b6e4..86fbe3a4 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -460,10 +460,7 @@ func (c *Conn) Play() (err error) { } func (c *Conn) Teardown() (err error) { - if c.state != StatePlay { - return fmt.Errorf("RTSP TEARDOWN from wrong state: %s", c.state) - } - + // allow TEARDOWN from any state (ex. ANNOUNCE > SETUP) req := &tcp.Request{Method: MethodTeardown, URL: c.URL} return c.Request(req) }