Rewrite keepalive and add timeouts for RTSP

This commit is contained in:
Alexey Khit
2022-11-02 10:24:52 +03:00
parent 3d4514eab9
commit f17dadbbbf
+47 -20
View File
@@ -43,8 +43,6 @@ const (
ModeServerConsumer ModeServerConsumer
) )
const KeepAlive = time.Second * 25
type Conn struct { type Conn struct {
streamer.Element streamer.Element
@@ -60,6 +58,7 @@ type Conn struct {
// internal // internal
auth *tcp.Auth auth *tcp.Auth
closed bool
conn net.Conn conn net.Conn
mode Mode mode Mode
reader *bufio.Reader reader *bufio.Reader
@@ -459,15 +458,14 @@ func (c *Conn) Teardown() (err error) {
} }
func (c *Conn) Close() error { func (c *Conn) Close() error {
if c.conn == nil { if c.closed {
return nil return nil
} }
if err := c.Teardown(); err != nil { if err := c.Teardown(); err != nil {
return err return err
} }
conn := c.conn c.closed = true
c.conn = nil return c.conn.Close()
return conn.Close()
} }
const transport = "RTP/AVP/TCP;unicast;interleaved=" const transport = "RTP/AVP/TCP;unicast;interleaved="
@@ -602,16 +600,40 @@ func (c *Conn) Accept() error {
func (c *Conn) Handle() (err error) { func (c *Conn) Handle() (err error) {
defer func() { defer func() {
if c.conn == nil { if c.closed {
err = nil err = nil
} }
//c.Fire(streamer.StateNull)
}() }()
//c.Fire(streamer.StatePlaying) var timeout time.Duration
ts := time.Now().Add(KeepAlive)
switch c.mode {
case ModeClientProducer:
// polling frames from remote RTSP Server (ex Camera)
timeout = time.Second * 5
go c.keepalive()
case ModeServerProducer:
// polling frames from remote RTSP Client (ex FFmpeg)
timeout = time.Second * 15
case ModeServerConsumer:
// pushing frames to remote RTSP Client (ex VLC)
timeout = time.Second * 60
default:
return fmt.Errorf("wrong RTSP conn mode: %d", c.mode)
}
for { for {
if c.closed {
return
}
if err = c.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
return
}
// we can read: // we can read:
// 1. RTP interleaved: `$` + 1B channel number + 2B size // 1. RTP interleaved: `$` + 1B channel number + 2B size
// 2. RTSP response: RTSP/1.0 200 OK // 2. RTSP response: RTSP/1.0 200 OK
@@ -689,16 +711,21 @@ func (c *Conn) Handle() (err error) {
c.Fire(msg) c.Fire(msg)
} }
}
}
// keep-alive const KeepAlive = time.Second * 25
now := time.Now()
if now.After(ts) { func (c *Conn) keepalive() {
req := &tcp.Request{Method: MethodOptions, URL: c.URL} // TODO: rewrite to RTCP
// don't need to wait respose on this request req := &tcp.Request{Method: MethodOptions, URL: c.URL}
if err = c.Request(req); err != nil { for {
return err time.Sleep(KeepAlive)
} if c.closed {
ts = now.Add(KeepAlive) return
}
if err := c.Request(req); err != nil {
return
} }
} }
} }
@@ -716,7 +743,7 @@ func (c *Conn) bindTrack(
track *streamer.Track, channel uint8, payloadType uint8, track *streamer.Track, channel uint8, payloadType uint8,
) *streamer.Track { ) *streamer.Track {
push := func(packet *rtp.Packet) error { push := func(packet *rtp.Packet) error {
if c.conn == nil { if c.closed {
return nil return nil
} }
packet.Header.PayloadType = payloadType packet.Header.PayloadType = payloadType