From 6906b56524a0ada2c73d332a4134f46b6ad295f9 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Sun, 8 Jan 2023 20:01:38 +0300 Subject: [PATCH] Fix double start for RTSP source --- pkg/rtsp/conn.go | 50 +++++++++++++++++++++++++++++++++++++------- pkg/rtsp/streamer.go | 3 ++- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index 259b5786..5eb74af5 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -20,6 +20,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" ) @@ -52,6 +53,7 @@ const ( StateConn StateSetup StatePlay + StateHandle ) type Conn struct { @@ -72,6 +74,7 @@ type Conn struct { conn net.Conn mode Mode state State + stateMu sync.Mutex reader *bufio.Reader sequence int uri string @@ -340,6 +343,9 @@ func (c *Conn) Setup() error { func (c *Conn) SetupMedia( media *streamer.Media, codec *streamer.Codec, ) (*streamer.Track, error) { + c.stateMu.Lock() + defer c.stateMu.Unlock() + ch := c.GetChannel(media) if ch < 0 { return nil, fmt.Errorf("wrong media: %v", media) @@ -461,12 +467,19 @@ func (c *Conn) SetupMedia( } func (c *Conn) Play() (err error) { + c.stateMu.Lock() + defer c.stateMu.Unlock() + if c.state != StateSetup { return fmt.Errorf("RTSP PLAY from wrong state: %s", c.state) } req := &tcp.Request{Method: MethodPlay, URL: c.URL} - return c.Request(req) + if err = c.Request(req); err == nil { + c.state = StatePlay + } + + return } func (c *Conn) Teardown() (err error) { @@ -476,12 +489,14 @@ func (c *Conn) Teardown() (err error) { } func (c *Conn) Close() error { + c.stateMu.Lock() + defer c.stateMu.Unlock() + if c.state == StateNone { return nil } - if err := c.Teardown(); err != nil { - return err - } + + _ = c.Teardown() c.state = StateNone return c.conn.Close() } @@ -614,7 +629,10 @@ func (c *Conn) Accept() error { case MethodRecord, MethodPlay: res := &tcp.Response{Request: req} - return c.Response(res) + if err = c.Response(res); err == nil { + c.state = StatePlay + } + return err default: return fmt.Errorf("unsupported method: %s", req.Method) @@ -623,13 +641,29 @@ func (c *Conn) Accept() error { } func (c *Conn) Handle() (err error) { - if c.state != StateSetup { - return fmt.Errorf("RTSP Handle from wrong state: %d", c.state) + c.stateMu.Lock() + + switch c.state { + case StateNone: // Close after PLAY and before Handle is OK (because SETUP after PLAY) + case StatePlay: + c.state = StateHandle + default: + err = fmt.Errorf("RTSP Handle from wrong state: %d", c.state) + + c.state = StateNone + _ = c.conn.Close() } - c.state = StatePlay + c.stateMu.Unlock() + + if c.state != StateHandle { + return + } defer func() { + c.stateMu.Lock() + defer c.stateMu.Unlock() + if c.state == StateNone { err = nil return diff --git a/pkg/rtsp/streamer.go b/pkg/rtsp/streamer.go index d8505576..34473dfd 100644 --- a/pkg/rtsp/streamer.go +++ b/pkg/rtsp/streamer.go @@ -21,7 +21,8 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer. } // can't setup new tracks from play state - forcing a reconnection feature - if c.state == StatePlay { + switch c.state { + case StatePlay, StateHandle: go c.Close() return streamer.NewTrack(codec, media.Direction) }