diff --git a/internal/exec/exec.go b/internal/exec/exec.go index 454c54a4..d30b0dbe 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -113,7 +113,7 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { cmd.Stdout = os.Stdout } - waiter := make(chan core.Producer) + waiter := make(chan *pkg.Conn, 1) waitersMu.Lock() waiters[path] = waiter @@ -149,6 +149,10 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr) case prod := <-waiter: log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp") + prod.OnClose = func() error { + log.Debug().Msgf("[exec] kill rtsp") + return errors.Join(cmd.Process.Kill(), cmd.Wait()) + } return prod, nil } } @@ -157,7 +161,7 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { var ( log zerolog.Logger - waiters = map[string]chan core.Producer{} + waiters = make(map[string]chan *pkg.Conn) waitersMu sync.Mutex ) diff --git a/internal/streams/producer.go b/internal/streams/producer.go index 5a25dba5..daca7edf 100644 --- a/internal/streams/producer.go +++ b/internal/streams/producer.go @@ -207,7 +207,7 @@ func (p *Producer) reconnect(workerID, retry int) { for _, media := range conn.GetMedias() { switch media.Direction { case core.DirectionRecvonly: - for _, receiver := range p.receivers { + for i, receiver := range p.receivers { codec := media.MatchCodec(receiver.Codec) if codec == nil { continue @@ -219,6 +219,7 @@ func (p *Producer) reconnect(workerID, retry int) { } receiver.Replace(track) + p.receivers[i] = track break } @@ -234,6 +235,9 @@ func (p *Producer) reconnect(workerID, retry int) { } } + // stop previous connection after moving tracks (fix ghost exec/ffmpeg) + _ = p.conn.Stop() + // swap connections p.conn = conn go p.worker(conn, workerID) diff --git a/pkg/rtsp/client.go b/pkg/rtsp/client.go index ca32ce32..59f96e94 100644 --- a/pkg/rtsp/client.go +++ b/pkg/rtsp/client.go @@ -304,5 +304,8 @@ func (c *Conn) Close() error { if c.mode == core.ModeActiveProducer { _ = c.Teardown() } + if c.OnClose != nil { + _ = c.OnClose() + } return c.conn.Close() } diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index 91465f2c..1d9edf06 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -24,6 +24,7 @@ type Conn struct { Backchannel bool Media string + OnClose func() error PacketSize uint16 SessionName string Timeout int