diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index 1648fc0a..20e8e149 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -43,8 +43,6 @@ const ( ModeServerConsumer ) -const KeepAlive = time.Second * 25 - type Conn struct { streamer.Element @@ -60,6 +58,7 @@ type Conn struct { // internal auth *tcp.Auth + closed bool conn net.Conn mode Mode reader *bufio.Reader @@ -459,15 +458,14 @@ func (c *Conn) Teardown() (err error) { } func (c *Conn) Close() error { - if c.conn == nil { + if c.closed { return nil } if err := c.Teardown(); err != nil { return err } - conn := c.conn - c.conn = nil - return conn.Close() + c.closed = true + return c.conn.Close() } const transport = "RTP/AVP/TCP;unicast;interleaved=" @@ -602,16 +600,40 @@ func (c *Conn) Accept() error { func (c *Conn) Handle() (err error) { defer func() { - if c.conn == nil { + if c.closed { err = nil } - //c.Fire(streamer.StateNull) }() - //c.Fire(streamer.StatePlaying) - ts := time.Now().Add(KeepAlive) + var timeout time.Duration + + switch c.mode { + case ModeClientProducer: + // polling frames from remote RTSP Server (ex Camera) + timeout = time.Second * 5 + go c.keepalive() + + case ModeServerProducer: + // polling frames from remote RTSP Client (ex FFmpeg) + timeout = time.Second * 15 + + case ModeServerConsumer: + // pushing frames to remote RTSP Client (ex VLC) + timeout = time.Second * 60 + + default: + return fmt.Errorf("wrong RTSP conn mode: %d", c.mode) + } for { + if c.closed { + return + } + + if err = c.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil { + return + } + // we can read: // 1. RTP interleaved: `$` + 1B channel number + 2B size // 2. RTSP response: RTSP/1.0 200 OK @@ -689,16 +711,21 @@ func (c *Conn) Handle() (err error) { c.Fire(msg) } + } +} - // keep-alive - now := time.Now() - if now.After(ts) { - req := &tcp.Request{Method: MethodOptions, URL: c.URL} - // don't need to wait respose on this request - if err = c.Request(req); err != nil { - return err - } - ts = now.Add(KeepAlive) +const KeepAlive = time.Second * 25 + +func (c *Conn) keepalive() { + // TODO: rewrite to RTCP + req := &tcp.Request{Method: MethodOptions, URL: c.URL} + for { + time.Sleep(KeepAlive) + if c.closed { + return + } + if err := c.Request(req); err != nil { + return } } } @@ -716,7 +743,7 @@ func (c *Conn) bindTrack( track *streamer.Track, channel uint8, payloadType uint8, ) *streamer.Track { push := func(packet *rtp.Packet) error { - if c.conn == nil { + if c.closed { return nil } packet.Header.PayloadType = payloadType