Compare commits

...

20 Commits

Author SHA1 Message Date
Alexey Khit 4ae733aa11 Update version to 0.1-rc.8 2023-01-13 22:39:24 +03:00
Alexey Khit 27d8b33b62 Fix concurrency in ivideon 2023-01-13 21:52:29 +03:00
Alexey Khit ff8b0fbb9c Set default 8555 port for WebRTC (UDP+TCP) 2023-01-13 21:51:48 +03:00
Alexey Khit c6ad7ac39f Add single UDP port for WebRTC Server 2023-01-13 21:51:48 +03:00
Alexey Khit 7a3adf17be Fix mp4f consumer (unused) 2023-01-13 21:51:24 +03:00
Alexey Khit 94f6c07b28 Fix mjpeg client network connection 2023-01-13 18:03:54 +03:00
Alexey Khit 7b326d4753 Fix simultaneous stream reconnect and start 2023-01-13 18:03:17 +03:00
Alexey Khit 5407a3bc4b Fix multiple requests from different consumers 2023-01-13 18:02:03 +03:00
Alexey Khit 6b24421722 Fix unblocking exec error 2023-01-13 18:01:01 +03:00
Alexey Khit d12775a2d7 Fix unblocking exec waiter 2023-01-13 18:00:48 +03:00
Alexey Khit 6151593c08 Fix ws lock on write and close 2023-01-13 17:28:01 +03:00
Alexey Khit dba0989c54 Fix empty streams json on stream lock 2023-01-13 13:37:36 +03:00
Alexey Khit ba0c7d911d Fix ffmpeg link to same stream 2023-01-13 13:36:43 +03:00
Alexey Khit 09fefca712 Remove backchannel codec from add consumer error 2023-01-13 13:35:58 +03:00
Alexey Khit b3f177e2ec Handle closed state for ws connection 2023-01-13 13:34:41 +03:00
Alexey Khit 228abb8fbe Change logs msg from WRN to DBG for fail on add consumer 2023-01-13 13:33:55 +03:00
Alexey Khit eee70c07b7 Fix closer for ivideon source 2023-01-13 13:32:48 +03:00
Alexey Khit d92b0f29af Fix states handle for RTSP 2023-01-13 13:32:09 +03:00
Alexey Khit fca6c87b2c Fix RTSP tracks list in info json 2023-01-13 13:31:22 +03:00
Alexey Khit 0601091772 Fix closer for RTSP server #163 2023-01-13 13:30:41 +03:00
18 changed files with 198 additions and 85 deletions
+20 -4
View File
@@ -6,6 +6,7 @@ import (
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
"time"
) )
// Message - struct for data exchange in Web API // Message - struct for data exchange in Web API
@@ -68,6 +69,8 @@ func apiWS(w http.ResponseWriter, r *http.Request) {
tr := &Transport{Request: r} tr := &Transport{Request: r}
tr.OnWrite(func(msg interface{}) { tr.OnWrite(func(msg interface{}) {
_ = ws.SetWriteDeadline(time.Now().Add(time.Second * 5))
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
_ = ws.WriteMessage(websocket.BinaryMessage, data) _ = ws.WriteMessage(websocket.BinaryMessage, data)
} else { } else {
@@ -101,7 +104,9 @@ type Transport struct {
Request *http.Request Request *http.Request
Consumer interface{} // TODO: rewrite Consumer interface{} // TODO: rewrite
mx sync.Mutex closed bool
mx sync.Mutex
wrmx sync.Mutex
onChange func() onChange func()
onWrite func(msg interface{}) onWrite func(msg interface{})
@@ -118,21 +123,32 @@ func (t *Transport) OnWrite(f func(msg interface{})) {
} }
func (t *Transport) Write(msg interface{}) { func (t *Transport) Write(msg interface{}) {
t.mx.Lock() t.wrmx.Lock()
t.onWrite(msg) t.onWrite(msg)
t.mx.Unlock() t.wrmx.Unlock()
} }
func (t *Transport) Close() { func (t *Transport) Close() {
t.mx.Lock()
for _, f := range t.onClose { for _, f := range t.onClose {
f() f()
} }
t.closed = true
t.mx.Unlock()
} }
func (t *Transport) OnChange(f func()) { func (t *Transport) OnChange(f func()) {
t.mx.Lock()
t.onChange = f t.onChange = f
t.mx.Unlock()
} }
func (t *Transport) OnClose(f func()) { func (t *Transport) OnClose(f func()) {
t.onClose = append(t.onClose, f) t.mx.Lock()
if t.closed {
f()
} else {
t.onClose = append(t.onClose, f)
}
t.mx.Unlock()
} }
+1 -1
View File
@@ -10,7 +10,7 @@ import (
"runtime" "runtime"
) )
var Version = "0.1-rc.7" var Version = "0.1-rc.8"
var UserAgent = "go2rtc/" + Version var UserAgent = "go2rtc/" + Version
func Init() { func Init() {
+1
View File
@@ -25,6 +25,7 @@ var stackSkip = [][]byte{
// webrtc/api.go // webrtc/api.go
[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"), []byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"),
[]byte("created by github.com/pion/ice/v2.NewUDPMuxDefault"),
} }
func stackHandler(w http.ResponseWriter, r *http.Request) { func stackHandler(w http.ResponseWriter, r *http.Request) {
+14 -3
View File
@@ -34,8 +34,13 @@ func Init() {
return false return false
} }
waiter <- conn // unblocking write to channel
return true select {
case waiter <- conn:
return true
default:
return false
}
}) })
streams.HandleFunc("exec", Handle) streams.HandleFunc("exec", Handle)
@@ -86,7 +91,13 @@ func Handle(url string) (streamer.Producer, error) {
chErr := make(chan error) chErr := make(chan error)
go func() { go func() {
chErr <- cmd.Wait() err := cmd.Wait()
// unblocking write to channel
select {
case chErr <- err:
default:
log.Trace().Str("url", url).Msg("[exec] close")
}
}() }()
select { select {
+1 -1
View File
@@ -38,7 +38,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
}) })
if err := stream.AddConsumer(cons); err != nil { if err := stream.AddConsumer(cons); err != nil {
log.Warn().Err(err).Caller().Send() log.Debug().Err(err).Msg("[mp4] add consumer")
return err return err
} }
+4 -1
View File
@@ -200,6 +200,9 @@ func tcpHandler(conn *rtsp.Conn) {
if err := conn.Accept(); err != nil { if err := conn.Accept(); err != nil {
log.Warn().Err(err).Caller().Send() log.Warn().Err(err).Caller().Send()
if closer != nil {
closer()
}
_ = conn.Close() _ = conn.Close()
return return
} }
@@ -212,7 +215,7 @@ func tcpHandler(conn *rtsp.Conn) {
if closer != nil { if closer != nil {
if err := conn.Handle(); err != nil { if err := conn.Handle(); err != nil {
log.Debug().Err(err).Caller().Send() log.Debug().Msgf("[rtsp] handle=%s", err)
} }
closer() closer()
+31 -24
View File
@@ -27,9 +27,9 @@ type Producer struct {
lastErr error lastErr error
tracks []*streamer.Track tracks []*streamer.Track
state state state state
mu sync.Mutex mu sync.Mutex
restart *time.Timer workerID int
} }
func (p *Producer) SetSource(s string) { func (p *Producer) SetSource(s string) {
@@ -104,20 +104,32 @@ func (p *Producer) start() {
log.Debug().Msgf("[streams] start producer url=%s", p.url) log.Debug().Msgf("[streams] start producer url=%s", p.url)
p.state = stateStart p.state = stateStart
go func() { p.workerID++
// safe read element while mu locked
if err := p.element.Start(); err != nil { go p.worker(p.element, p.workerID)
log.Warn().Err(err).Str("url", p.url).Caller().Send()
}
p.reconnect()
}()
} }
func (p *Producer) reconnect() { func (p *Producer) worker(element streamer.Producer, workerID int) {
if err := element.Start(); err != nil {
p.mu.Lock()
closed := p.workerID != workerID
p.mu.Unlock()
if closed {
return
}
log.Warn().Err(err).Str("url", p.url).Caller().Send()
}
p.reconnect(workerID)
}
func (p *Producer) reconnect(workerID int) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if p.state != stateStart { if p.workerID != workerID {
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url) log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
return return
} }
@@ -126,9 +138,11 @@ func (p *Producer) reconnect() {
p.element, p.lastErr = GetProducer(p.url) p.element, p.lastErr = GetProducer(p.url)
if p.lastErr != nil || p.element == nil { if p.lastErr != nil || p.element == nil {
log.Debug().Err(p.lastErr).Caller().Send() log.Debug().Msgf("[streams] producer=%s", p.lastErr)
// TODO: dynamic timeout // TODO: dynamic timeout
p.restart = time.AfterFunc(30*time.Second, p.reconnect) time.AfterFunc(30*time.Second, func() {
p.reconnect(workerID)
})
return return
} }
@@ -152,12 +166,7 @@ func (p *Producer) reconnect() {
} }
} }
go func() { go p.worker(p.element, workerID)
if err := p.element.Start(); err != nil {
log.Debug().Err(err).Caller().Send()
}
p.reconnect()
}()
} }
func (p *Producer) stop() { func (p *Producer) stop() {
@@ -171,6 +180,8 @@ func (p *Producer) stop() {
case stateNone: case stateNone:
log.Debug().Msgf("[streams] can't stop none producer") log.Debug().Msgf("[streams] can't stop none producer")
return return
case stateStart:
p.workerID++
} }
log.Debug().Msgf("[streams] stop producer url=%s", p.url) log.Debug().Msgf("[streams] stop producer url=%s", p.url)
@@ -179,10 +190,6 @@ func (p *Producer) stop() {
_ = p.element.Stop() _ = p.element.Stop()
p.element = nil p.element = nil
} }
if p.restart != nil {
p.restart.Stop()
p.restart = nil
}
p.state = stateNone p.state = stateNone
p.tracks = nil p.tracks = nil
+18 -10
View File
@@ -7,6 +7,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strings" "strings"
"sync" "sync"
"sync/atomic"
) )
type Consumer struct { type Consumer struct {
@@ -18,7 +19,7 @@ type Stream struct {
producers []*Producer producers []*Producer
consumers []*Consumer consumers []*Consumer
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup requests int32
} }
func NewStream(source interface{}) *Stream { func NewStream(source interface{}) *Stream {
@@ -53,6 +54,9 @@ func (s *Stream) SetSource(source string) {
} }
func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) { func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
// support for multiple simultaneous requests from different consumers
atomic.AddInt32(&s.requests, 1)
ic := len(s.consumers) ic := len(s.consumers)
consumer := &Consumer{element: cons} consumer := &Consumer{element: cons}
@@ -60,9 +64,6 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
var codecs string var codecs string
// support for multiple simultaneous requests from different consumers
s.wg.Add(1)
// Step 1. Get consumer medias // Step 1. Get consumer medias
for icc, consMedia := range cons.GetMedias() { for icc, consMedia := range cons.GetMedias() {
log.Trace().Stringer("media", consMedia). log.Trace().Stringer("media", consMedia).
@@ -86,7 +87,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
// Step 4. Get producer track // Step 4. Get producer track
prodTrack := prod.GetTrack(prodMedia, prodCodec) prodTrack := prod.GetTrack(prodMedia, prodCodec)
if prodTrack == nil { if prodTrack == nil {
log.Warn().Str("url", prod.url).Msg("[stream] can't get track") log.Warn().Str("url", prod.url).Msg("[streams] can't get track")
continue continue
} }
@@ -101,12 +102,11 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
} }
} }
s.wg.Done() if atomic.AddInt32(&s.requests, -1) == 0 {
s.wg.Wait() s.stopProducers()
}
if len(producers) == 0 { if len(producers) == 0 {
s.stopProducers()
if len(codecs) > 0 { if len(codecs) > 0 {
return errors.New("codecs not match: " + codecs) return errors.New("codecs not match: " + codecs)
} }
@@ -197,8 +197,12 @@ producers:
//} //}
func (s *Stream) MarshalJSON() ([]byte, error) { func (s *Stream) MarshalJSON() ([]byte, error) {
if !s.mu.TryLock() {
log.Warn().Msgf("[streams] json locked")
return []byte(`null`), nil
}
var v []interface{} var v []interface{}
s.mu.Lock()
for _, prod := range s.producers { for _, prod := range s.producers {
if prod.element != nil { if prod.element != nil {
v = append(v, prod.element) v = append(v, prod.element)
@@ -242,6 +246,10 @@ func (s *Stream) removeProducer(i int) {
} }
func collectCodecs(media *streamer.Media, codecs *string) { func collectCodecs(media *streamer.Media, codecs *string) {
if media.Direction == streamer.DirectionRecvonly {
return
}
for _, codec := range media.Codecs { for _, codec := range media.Codecs {
name := codec.Name name := codec.Name
if name == streamer.CodecAAC { if name == streamer.CodecAAC {
+19 -14
View File
@@ -7,6 +7,7 @@ import (
) )
var candidates []string var candidates []string
var networks = []string{"udp", "tcp"}
func AddCandidate(address string) { func AddCandidate(address string) {
candidates = append(candidates, address) candidates = append(candidates, address)
@@ -20,15 +21,17 @@ func asyncCandidates(tr *api.Transport) {
continue continue
} }
cand, err := webrtc.NewCandidate(address) for _, network := range networks {
if err != nil { cand, err := webrtc.NewCandidate(network, address)
log.Warn().Err(err).Caller().Send() if err != nil {
continue log.Warn().Err(err).Caller().Send()
continue
}
log.Trace().Str("candidate", cand).Msg("[webrtc] config")
tr.Write(&api.Message{Type: "webrtc/candidate", Value: cand})
} }
log.Trace().Str("candidate", cand).Msg("[webrtc] config")
tr.Write(&api.Message{Type: "webrtc/candidate", Value: cand})
} }
} }
@@ -57,13 +60,15 @@ func syncCanditates(answer string) (string, error) {
continue continue
} }
cand, err := webrtc.NewCandidate(address) for _, network := range networks {
if err != nil { cand, err := webrtc.NewCandidate(network, address)
log.Warn().Err(err).Msg("[webrtc] candidate") if err != nil {
continue log.Warn().Err(err).Msg("[webrtc] candidate")
} continue
}
md.WithPropertyAttribute(cand) md.WithPropertyAttribute(cand)
}
} }
if end { if end {
+2 -1
View File
@@ -22,6 +22,7 @@ func Init() {
} `yaml:"webrtc"` } `yaml:"webrtc"`
} }
cfg.Mod.Listen = ":8555"
cfg.Mod.IceServers = []pion.ICEServer{ cfg.Mod.IceServers = []pion.ICEServer{
{URLs: []string{"stun:stun.l.google.com:19302"}}, {URLs: []string{"stun:stun.l.google.com:19302"}},
} }
@@ -112,7 +113,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
// 2. AddConsumer, so we get new tracks // 2. AddConsumer, so we get new tracks
if err = stream.AddConsumer(conn); err != nil { if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Caller().Send() log.Debug().Err(err).Msg("[webrtc] add consumer")
_ = conn.Conn.Close() _ = conn.Conn.Close()
return err return err
} }
+46 -13
View File
@@ -14,9 +14,18 @@ import (
"io" "io"
"net/http" "net/http"
"strings" "strings"
"sync"
"time" "time"
) )
type State byte
const (
StateNone State = iota
StateConn
StateHandle
)
type Client struct { type Client struct {
streamer.Element streamer.Element
@@ -26,12 +35,12 @@ type Client struct {
medias []*streamer.Media medias []*streamer.Media
tracks map[byte]*streamer.Track tracks map[byte]*streamer.Track
closed bool
msg *message msg *message
t0 time.Time t0 time.Time
buffer chan []byte buffer chan []byte
state State
mu sync.Mutex
} }
func NewClient(id string) *Client { func NewClient(id string) *Client {
@@ -69,16 +78,26 @@ func (c *Client) Dial() (err error) {
return err return err
} }
c.state = StateConn
return nil return nil
} }
func (c *Client) Handle() error { func (c *Client) Handle() error {
c.buffer = make(chan []byte, 5)
// add delay to the stream for smooth playing (not a best solution) // add delay to the stream for smooth playing (not a best solution)
c.t0 = time.Now().Add(time.Second) c.t0 = time.Now().Add(time.Second)
// processing stream in separate thread for lower delay between packets c.mu.Lock()
go c.worker()
if c.state == StateConn {
c.buffer = make(chan []byte, 5)
c.state = StateHandle
// processing stream in separate thread for lower delay between packets
go c.worker(c.buffer)
}
c.mu.Unlock()
_, data, err := c.conn.ReadMessage() _, data, err := c.conn.ReadMessage()
if err != nil { if err != nil {
@@ -87,7 +106,11 @@ func (c *Client) Handle() error {
track := c.tracks[c.msg.Track] track := c.tracks[c.msg.Track]
if track != nil { if track != nil {
c.buffer <- data c.mu.Lock()
if c.state == StateHandle {
c.buffer <- data
}
c.mu.Unlock()
} }
// we have one unprocessed msg after getTracks // we have one unprocessed msg after getTracks
@@ -114,7 +137,11 @@ func (c *Client) Handle() error {
track = c.tracks[msg.Track] track = c.tracks[msg.Track]
if track != nil { if track != nil {
c.buffer <- data c.mu.Lock()
if c.state == StateHandle {
c.buffer <- data
}
c.mu.Unlock()
} }
default: default:
@@ -124,13 +151,19 @@ func (c *Client) Handle() error {
} }
func (c *Client) Close() error { func (c *Client) Close() error {
if c.conn == nil { c.mu.Lock()
defer c.mu.Unlock()
switch c.state {
case StateNone:
return nil return nil
} case StateConn:
if c.buffer != nil { case StateHandle:
close(c.buffer) close(c.buffer)
} }
c.closed = true
c.state = StateNone
return c.conn.Close() return c.conn.Close()
} }
@@ -210,13 +243,13 @@ func (c *Client) getTracks() error {
} }
} }
func (c *Client) worker() { func (c *Client) worker(buffer chan []byte) {
var track *streamer.Track var track *streamer.Track
for _, track = range c.tracks { for _, track = range c.tracks {
break break
} }
for data := range c.buffer { for data := range buffer {
moof := &fmp4io.MovieFrag{} moof := &fmp4io.MovieFrag{}
if _, err := moof.Unmarshal(data, 0); err != nil { if _, err := moof.Unmarshal(data, 0); err != nil {
continue continue
+1 -1
View File
@@ -20,7 +20,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streame
func (c *Client) Start() error { func (c *Client) Start() error {
err := c.Handle() err := c.Handle()
if c.closed { if c.buffer == nil {
return nil return nil
} }
return err return err
+2
View File
@@ -64,6 +64,8 @@ func (c *Client) Start() error {
} }
func (c *Client) Stop() error { func (c *Client) Stop() error {
// important for close reader/writer gorutines
_ = c.res.Body.Close()
c.closed = true c.closed = true
return nil return nil
} }
+1 -1
View File
@@ -89,7 +89,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return nil return nil
} }
if !codec.IsRAW() { if codec.IsRTP() {
wrapper := h264.RTPDepay(track) wrapper := h264.RTPDepay(track)
push = wrapper(push) push = wrapper(push)
} }
+24 -2
View File
@@ -48,6 +48,22 @@ const (
type State byte type State byte
func (s State) String() string {
switch s {
case StateNone:
return "NONE"
case StateConn:
return "CONN"
case StateSetup:
return "SETUP"
case StatePlay:
return "PLAY"
case StateHandle:
return "HANDLE"
}
return strconv.Itoa(int(s))
}
const ( const (
StateNone State = iota StateNone State = iota
StateConn StateConn
@@ -346,6 +362,10 @@ func (c *Conn) SetupMedia(
c.stateMu.Lock() c.stateMu.Lock()
defer c.stateMu.Unlock() defer c.stateMu.Unlock()
if c.state != StateConn && c.state != StateSetup {
return nil, fmt.Errorf("RTSP SETUP from wrong state: %s", c.state)
}
ch := c.GetChannel(media) ch := c.GetChannel(media)
if ch < 0 { if ch < 0 {
return nil, fmt.Errorf("wrong media: %v", media) return nil, fmt.Errorf("wrong media: %v", media)
@@ -648,15 +668,17 @@ func (c *Conn) Handle() (err error) {
case StatePlay: case StatePlay:
c.state = StateHandle c.state = StateHandle
default: default:
err = fmt.Errorf("RTSP Handle from wrong state: %d", c.state) err = fmt.Errorf("RTSP HANDLE from wrong state: %s", c.state)
c.state = StateNone c.state = StateNone
_ = c.conn.Close() _ = c.conn.Close()
} }
ok := c.state == StateHandle
c.stateMu.Unlock() c.stateMu.Unlock()
if c.state != StateHandle { if !ok {
return return
} }
+1 -1
View File
@@ -119,7 +119,7 @@ func (c *Conn) MarshalJSON() ([]byte, error) {
v[k] = media.String() v[k] = media.String()
} }
for i, track := range c.tracks { for i, track := range c.tracks {
k := "track:" + strconv.Itoa(int(i>>1)) k := "track:" + strconv.Itoa(i)
v[k] = track.String() v[k] = track.String()
} }
//for i, track := range c.tracks { //for i, track := range c.tracks {
+10 -6
View File
@@ -35,13 +35,17 @@ func NewAPI(address string) (*webrtc.API, error) {
s.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled) s.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled)
if address != "" { if address != "" {
ln, err := net.Listen("tcp", address) s.SetNetworkTypes([]webrtc.NetworkType{
if err == nil { webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6,
s.SetNetworkTypes([]webrtc.NetworkType{ webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6,
webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6, })
webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6,
})
if ln, err := net.ListenPacket("udp", address); err == nil {
udpMux := webrtc.NewICEUDPMux(nil, ln)
s.SetICEUDPMux(udpMux)
}
if ln, err := net.Listen("tcp", address); err == nil {
tcpMux := webrtc.NewICETCPMux(nil, ln, 8) tcpMux := webrtc.NewICETCPMux(nil, ln, 8)
s.SetICETCPMux(tcpMux) s.SetICETCPMux(tcpMux)
} }
+2 -2
View File
@@ -13,7 +13,7 @@ import (
"time" "time"
) )
func NewCandidate(address string) (string, error) { func NewCandidate(network, address string) (string, error) {
i := strings.LastIndexByte(address, ':') i := strings.LastIndexByte(address, ':')
if i < 0 { if i < 0 {
return "", errors.New("wrong candidate: " + address) return "", errors.New("wrong candidate: " + address)
@@ -26,7 +26,7 @@ func NewCandidate(address string) (string, error) {
} }
cand, err := ice.NewCandidateHost(&ice.CandidateHostConfig{ cand, err := ice.NewCandidateHost(&ice.CandidateHostConfig{
Network: "tcp", Network: network,
Address: host, Address: host,
Port: i, Port: i,
Component: ice.ComponentRTP, Component: ice.ComponentRTP,