From e6d3939c789498e51cb91d46a8a080209c858e39 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Sun, 13 Nov 2022 21:33:09 +0300 Subject: [PATCH] Fix external producers --- cmd/streams/producer.go | 14 +++++------ cmd/streams/stream.go | 2 +- pkg/rtsp/conn.go | 55 ++++++++++++++++++++++++++++------------- pkg/rtsp/streamer.go | 5 ++++ 4 files changed, 51 insertions(+), 25 deletions(-) diff --git a/cmd/streams/producer.go b/cmd/streams/producer.go index af6869dc..e67f891b 100644 --- a/cmd/streams/producer.go +++ b/cmd/streams/producer.go @@ -14,6 +14,7 @@ const ( stateMedias stateTracks stateStart + stateExternal ) type Producer struct { @@ -71,11 +72,6 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea } } - // can't get new tracks after start - if p.state == stateStart { - return nil - } - track := p.element.GetTrack(media, codec) if track == nil { return nil @@ -162,6 +158,12 @@ func (p *Producer) reconnect() { func (p *Producer) stop() { p.mu.Lock() + defer p.mu.Unlock() + + if p.state == stateExternal { + log.Debug().Msgf("[streams] can't stop external producer") + return + } log.Debug().Msgf("[streams] stop producer url=%s", p.url) @@ -176,6 +178,4 @@ func (p *Producer) stop() { p.state = stateNone p.tracks = nil - - p.mu.Unlock() } diff --git a/cmd/streams/stream.go b/cmd/streams/stream.go index 6c1d0d53..2f6f3ab6 100644 --- a/cmd/streams/stream.go +++ b/cmd/streams/stream.go @@ -146,7 +146,7 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) { } func (s *Stream) AddProducer(prod streamer.Producer) { - producer := &Producer{element: prod, state: stateTracks} + producer := &Producer{element: prod, state: stateExternal} s.mu.Lock() s.producers = append(s.producers, producer) s.mu.Unlock() diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index c7d74ab5..a567b6e4 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -44,6 +44,15 @@ const ( ModeServerConsumer ) +type State byte + +const ( + StateNone State = iota + StateConn + StateSetup + StatePlay +) + type Conn struct { streamer.Element @@ -59,9 +68,9 @@ type Conn struct { // internal auth *tcp.Auth - closed bool conn net.Conn mode Mode + state State reader *bufio.Reader sequence int uri string @@ -108,9 +117,6 @@ func (c *Conn) parseURI() (err error) { } func (c *Conn) Dial() (err error) { - //if c.state != StateClientInit { - // panic("wrong state") - //} if c.conn != nil { _ = c.parseURI() } @@ -137,6 +143,7 @@ func (c *Conn) Dial() (err error) { } c.reader = bufio.NewReader(c.conn) + c.state = StateConn return nil } @@ -437,33 +444,38 @@ func (c *Conn) SetupMedia( track = c.bindTrack(track, byte(ch), codec.PayloadType) } + c.state = StateSetup c.tracks = append(c.tracks, track) return track, nil } func (c *Conn) Play() (err error) { + if c.state != StateSetup { + return fmt.Errorf("RTSP PLAY from wrong state: %s", c.state) + } + req := &tcp.Request{Method: MethodPlay, URL: c.URL} return c.Request(req) } func (c *Conn) Teardown() (err error) { - //if c.state != StateClientPlay { - // panic("wrong state") - //} + if c.state != StatePlay { + return fmt.Errorf("RTSP TEARDOWN from wrong state: %s", c.state) + } req := &tcp.Request{Method: MethodTeardown, URL: c.URL} return c.Request(req) } func (c *Conn) Close() error { - if c.closed { + if c.state == StateNone { return nil } if err := c.Teardown(); err != nil { return err } - c.closed = true + c.state = StateNone return c.conn.Close() } @@ -574,6 +586,7 @@ func (c *Conn) Accept() error { if strings.HasPrefix(tr, transport) { c.Session = "1" // TODO: fixme + c.state = StateSetup res.Header.Set("Transport", tr[:len(transport)+3]) } else { res.Status = "461 Unsupported transport" @@ -594,14 +607,22 @@ func (c *Conn) Accept() error { } func (c *Conn) Handle() (err error) { + if c.state != StateSetup { + return fmt.Errorf("RTSP Handle from wrong state: %d", c.state) + } + + c.state = StatePlay + defer func() { - if c.closed { + if c.state == StateNone { err = nil - } else { - // may have gotten here because of the deadline - // so close the connection to stop keepalive - _ = c.conn.Close() + return } + + // may have gotten here because of the deadline + // so close the connection to stop keepalive + c.state = StateNone + _ = c.conn.Close() }() var timeout time.Duration @@ -625,7 +646,7 @@ func (c *Conn) Handle() (err error) { } for { - if c.closed { + if c.state == StateNone { return } @@ -717,7 +738,7 @@ func (c *Conn) keepalive() { req := &tcp.Request{Method: MethodOptions, URL: c.URL} for { time.Sleep(time.Second * 25) - if c.closed { + if c.state == StateNone { return } if err := c.Request(req); err != nil { @@ -739,7 +760,7 @@ func (c *Conn) bindTrack( track *streamer.Track, channel uint8, payloadType uint8, ) *streamer.Track { push := func(packet *rtp.Packet) error { - if c.closed { + if c.state == StateNone { return nil } packet.Header.PayloadType = payloadType diff --git a/pkg/rtsp/streamer.go b/pkg/rtsp/streamer.go index 13c206aa..6f37cd9e 100644 --- a/pkg/rtsp/streamer.go +++ b/pkg/rtsp/streamer.go @@ -20,6 +20,11 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer. } } + // can't setup new tracks from play state + if c.state == StatePlay { + return nil + } + track, err := c.SetupMedia(media, codec) if err != nil { return nil