Fix double start for RTSP source
This commit is contained in:
+42
-8
@@ -20,6 +20,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -52,6 +53,7 @@ const (
|
|||||||
StateConn
|
StateConn
|
||||||
StateSetup
|
StateSetup
|
||||||
StatePlay
|
StatePlay
|
||||||
|
StateHandle
|
||||||
)
|
)
|
||||||
|
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
@@ -72,6 +74,7 @@ type Conn struct {
|
|||||||
conn net.Conn
|
conn net.Conn
|
||||||
mode Mode
|
mode Mode
|
||||||
state State
|
state State
|
||||||
|
stateMu sync.Mutex
|
||||||
reader *bufio.Reader
|
reader *bufio.Reader
|
||||||
sequence int
|
sequence int
|
||||||
uri string
|
uri string
|
||||||
@@ -340,6 +343,9 @@ func (c *Conn) Setup() error {
|
|||||||
func (c *Conn) SetupMedia(
|
func (c *Conn) SetupMedia(
|
||||||
media *streamer.Media, codec *streamer.Codec,
|
media *streamer.Media, codec *streamer.Codec,
|
||||||
) (*streamer.Track, error) {
|
) (*streamer.Track, error) {
|
||||||
|
c.stateMu.Lock()
|
||||||
|
defer c.stateMu.Unlock()
|
||||||
|
|
||||||
ch := c.GetChannel(media)
|
ch := c.GetChannel(media)
|
||||||
if ch < 0 {
|
if ch < 0 {
|
||||||
return nil, fmt.Errorf("wrong media: %v", media)
|
return nil, fmt.Errorf("wrong media: %v", media)
|
||||||
@@ -461,12 +467,19 @@ func (c *Conn) SetupMedia(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Play() (err error) {
|
func (c *Conn) Play() (err error) {
|
||||||
|
c.stateMu.Lock()
|
||||||
|
defer c.stateMu.Unlock()
|
||||||
|
|
||||||
if c.state != StateSetup {
|
if c.state != StateSetup {
|
||||||
return fmt.Errorf("RTSP PLAY from wrong state: %s", c.state)
|
return fmt.Errorf("RTSP PLAY from wrong state: %s", c.state)
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &tcp.Request{Method: MethodPlay, URL: c.URL}
|
req := &tcp.Request{Method: MethodPlay, URL: c.URL}
|
||||||
return c.Request(req)
|
if err = c.Request(req); err == nil {
|
||||||
|
c.state = StatePlay
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Teardown() (err error) {
|
func (c *Conn) Teardown() (err error) {
|
||||||
@@ -476,12 +489,14 @@ func (c *Conn) Teardown() (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Close() error {
|
func (c *Conn) Close() error {
|
||||||
|
c.stateMu.Lock()
|
||||||
|
defer c.stateMu.Unlock()
|
||||||
|
|
||||||
if c.state == StateNone {
|
if c.state == StateNone {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := c.Teardown(); err != nil {
|
|
||||||
return err
|
_ = c.Teardown()
|
||||||
}
|
|
||||||
c.state = StateNone
|
c.state = StateNone
|
||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
}
|
}
|
||||||
@@ -614,7 +629,10 @@ func (c *Conn) Accept() error {
|
|||||||
|
|
||||||
case MethodRecord, MethodPlay:
|
case MethodRecord, MethodPlay:
|
||||||
res := &tcp.Response{Request: req}
|
res := &tcp.Response{Request: req}
|
||||||
return c.Response(res)
|
if err = c.Response(res); err == nil {
|
||||||
|
c.state = StatePlay
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unsupported method: %s", req.Method)
|
return fmt.Errorf("unsupported method: %s", req.Method)
|
||||||
@@ -623,13 +641,29 @@ func (c *Conn) Accept() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Handle() (err error) {
|
func (c *Conn) Handle() (err error) {
|
||||||
if c.state != StateSetup {
|
c.stateMu.Lock()
|
||||||
return fmt.Errorf("RTSP Handle from wrong state: %d", c.state)
|
|
||||||
|
switch c.state {
|
||||||
|
case StateNone: // Close after PLAY and before Handle is OK (because SETUP after PLAY)
|
||||||
|
case StatePlay:
|
||||||
|
c.state = StateHandle
|
||||||
|
default:
|
||||||
|
err = fmt.Errorf("RTSP Handle from wrong state: %d", c.state)
|
||||||
|
|
||||||
|
c.state = StateNone
|
||||||
|
_ = c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
c.state = StatePlay
|
c.stateMu.Unlock()
|
||||||
|
|
||||||
|
if c.state != StateHandle {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
c.stateMu.Lock()
|
||||||
|
defer c.stateMu.Unlock()
|
||||||
|
|
||||||
if c.state == StateNone {
|
if c.state == StateNone {
|
||||||
err = nil
|
err = nil
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -21,7 +21,8 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// can't setup new tracks from play state - forcing a reconnection feature
|
// can't setup new tracks from play state - forcing a reconnection feature
|
||||||
if c.state == StatePlay {
|
switch c.state {
|
||||||
|
case StatePlay, StateHandle:
|
||||||
go c.Close()
|
go c.Close()
|
||||||
return streamer.NewTrack(codec, media.Direction)
|
return streamer.NewTrack(codec, media.Direction)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user