Fix external producers

This commit is contained in:
Alexey Khit
2022-11-13 21:33:09 +03:00
parent 220b9ca318
commit e6d3939c78
4 changed files with 51 additions and 25 deletions
+7 -7
View File
@@ -14,6 +14,7 @@ const (
stateMedias stateMedias
stateTracks stateTracks
stateStart stateStart
stateExternal
) )
type Producer struct { type Producer struct {
@@ -71,11 +72,6 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea
} }
} }
// can't get new tracks after start
if p.state == stateStart {
return nil
}
track := p.element.GetTrack(media, codec) track := p.element.GetTrack(media, codec)
if track == nil { if track == nil {
return nil return nil
@@ -162,6 +158,12 @@ func (p *Producer) reconnect() {
func (p *Producer) stop() { func (p *Producer) stop() {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock()
if p.state == stateExternal {
log.Debug().Msgf("[streams] can't stop external producer")
return
}
log.Debug().Msgf("[streams] stop producer url=%s", p.url) log.Debug().Msgf("[streams] stop producer url=%s", p.url)
@@ -176,6 +178,4 @@ func (p *Producer) stop() {
p.state = stateNone p.state = stateNone
p.tracks = nil p.tracks = nil
p.mu.Unlock()
} }
+1 -1
View File
@@ -146,7 +146,7 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) {
} }
func (s *Stream) AddProducer(prod streamer.Producer) { func (s *Stream) AddProducer(prod streamer.Producer) {
producer := &Producer{element: prod, state: stateTracks} producer := &Producer{element: prod, state: stateExternal}
s.mu.Lock() s.mu.Lock()
s.producers = append(s.producers, producer) s.producers = append(s.producers, producer)
s.mu.Unlock() s.mu.Unlock()
+36 -15
View File
@@ -44,6 +44,15 @@ const (
ModeServerConsumer ModeServerConsumer
) )
type State byte
const (
StateNone State = iota
StateConn
StateSetup
StatePlay
)
type Conn struct { type Conn struct {
streamer.Element streamer.Element
@@ -59,9 +68,9 @@ type Conn struct {
// internal // internal
auth *tcp.Auth auth *tcp.Auth
closed bool
conn net.Conn conn net.Conn
mode Mode mode Mode
state State
reader *bufio.Reader reader *bufio.Reader
sequence int sequence int
uri string uri string
@@ -108,9 +117,6 @@ func (c *Conn) parseURI() (err error) {
} }
func (c *Conn) Dial() (err error) { func (c *Conn) Dial() (err error) {
//if c.state != StateClientInit {
// panic("wrong state")
//}
if c.conn != nil { if c.conn != nil {
_ = c.parseURI() _ = c.parseURI()
} }
@@ -137,6 +143,7 @@ func (c *Conn) Dial() (err error) {
} }
c.reader = bufio.NewReader(c.conn) c.reader = bufio.NewReader(c.conn)
c.state = StateConn
return nil return nil
} }
@@ -437,33 +444,38 @@ func (c *Conn) SetupMedia(
track = c.bindTrack(track, byte(ch), codec.PayloadType) track = c.bindTrack(track, byte(ch), codec.PayloadType)
} }
c.state = StateSetup
c.tracks = append(c.tracks, track) c.tracks = append(c.tracks, track)
return track, nil return track, nil
} }
func (c *Conn) Play() (err error) { func (c *Conn) Play() (err error) {
if c.state != StateSetup {
return fmt.Errorf("RTSP PLAY from wrong state: %s", c.state)
}
req := &tcp.Request{Method: MethodPlay, URL: c.URL} req := &tcp.Request{Method: MethodPlay, URL: c.URL}
return c.Request(req) return c.Request(req)
} }
func (c *Conn) Teardown() (err error) { func (c *Conn) Teardown() (err error) {
//if c.state != StateClientPlay { if c.state != StatePlay {
// panic("wrong state") return fmt.Errorf("RTSP TEARDOWN from wrong state: %s", c.state)
//} }
req := &tcp.Request{Method: MethodTeardown, URL: c.URL} req := &tcp.Request{Method: MethodTeardown, URL: c.URL}
return c.Request(req) return c.Request(req)
} }
func (c *Conn) Close() error { func (c *Conn) Close() error {
if c.closed { if c.state == StateNone {
return nil return nil
} }
if err := c.Teardown(); err != nil { if err := c.Teardown(); err != nil {
return err return err
} }
c.closed = true c.state = StateNone
return c.conn.Close() return c.conn.Close()
} }
@@ -574,6 +586,7 @@ func (c *Conn) Accept() error {
if strings.HasPrefix(tr, transport) { if strings.HasPrefix(tr, transport) {
c.Session = "1" // TODO: fixme c.Session = "1" // TODO: fixme
c.state = StateSetup
res.Header.Set("Transport", tr[:len(transport)+3]) res.Header.Set("Transport", tr[:len(transport)+3])
} else { } else {
res.Status = "461 Unsupported transport" res.Status = "461 Unsupported transport"
@@ -594,14 +607,22 @@ func (c *Conn) Accept() error {
} }
func (c *Conn) Handle() (err error) { func (c *Conn) Handle() (err error) {
if c.state != StateSetup {
return fmt.Errorf("RTSP Handle from wrong state: %d", c.state)
}
c.state = StatePlay
defer func() { defer func() {
if c.closed { if c.state == StateNone {
err = nil err = nil
} else { return
}
// may have gotten here because of the deadline // may have gotten here because of the deadline
// so close the connection to stop keepalive // so close the connection to stop keepalive
c.state = StateNone
_ = c.conn.Close() _ = c.conn.Close()
}
}() }()
var timeout time.Duration var timeout time.Duration
@@ -625,7 +646,7 @@ func (c *Conn) Handle() (err error) {
} }
for { for {
if c.closed { if c.state == StateNone {
return return
} }
@@ -717,7 +738,7 @@ func (c *Conn) keepalive() {
req := &tcp.Request{Method: MethodOptions, URL: c.URL} req := &tcp.Request{Method: MethodOptions, URL: c.URL}
for { for {
time.Sleep(time.Second * 25) time.Sleep(time.Second * 25)
if c.closed { if c.state == StateNone {
return return
} }
if err := c.Request(req); err != nil { if err := c.Request(req); err != nil {
@@ -739,7 +760,7 @@ func (c *Conn) bindTrack(
track *streamer.Track, channel uint8, payloadType uint8, track *streamer.Track, channel uint8, payloadType uint8,
) *streamer.Track { ) *streamer.Track {
push := func(packet *rtp.Packet) error { push := func(packet *rtp.Packet) error {
if c.closed { if c.state == StateNone {
return nil return nil
} }
packet.Header.PayloadType = payloadType packet.Header.PayloadType = payloadType
+5
View File
@@ -20,6 +20,11 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
} }
} }
// can't setup new tracks from play state
if c.state == StatePlay {
return nil
}
track, err := c.SetupMedia(media, codec) track, err := c.SetupMedia(media, codec)
if err != nil { if err != nil {
return nil return nil