Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4ae733aa11 | |||
| 27d8b33b62 | |||
| ff8b0fbb9c | |||
| c6ad7ac39f | |||
| 7a3adf17be | |||
| 94f6c07b28 | |||
| 7b326d4753 | |||
| 5407a3bc4b | |||
| 6b24421722 | |||
| d12775a2d7 | |||
| 6151593c08 | |||
| dba0989c54 | |||
| ba0c7d911d | |||
| 09fefca712 | |||
| b3f177e2ec | |||
| 228abb8fbe | |||
| eee70c07b7 | |||
| d92b0f29af | |||
| fca6c87b2c | |||
| 0601091772 |
+18
-2
@@ -6,6 +6,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Message - struct for data exchange in Web API
|
// Message - struct for data exchange in Web API
|
||||||
@@ -68,6 +69,8 @@ func apiWS(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
tr := &Transport{Request: r}
|
tr := &Transport{Request: r}
|
||||||
tr.OnWrite(func(msg interface{}) {
|
tr.OnWrite(func(msg interface{}) {
|
||||||
|
_ = ws.SetWriteDeadline(time.Now().Add(time.Second * 5))
|
||||||
|
|
||||||
if data, ok := msg.([]byte); ok {
|
if data, ok := msg.([]byte); ok {
|
||||||
_ = ws.WriteMessage(websocket.BinaryMessage, data)
|
_ = ws.WriteMessage(websocket.BinaryMessage, data)
|
||||||
} else {
|
} else {
|
||||||
@@ -101,7 +104,9 @@ type Transport struct {
|
|||||||
Request *http.Request
|
Request *http.Request
|
||||||
Consumer interface{} // TODO: rewrite
|
Consumer interface{} // TODO: rewrite
|
||||||
|
|
||||||
|
closed bool
|
||||||
mx sync.Mutex
|
mx sync.Mutex
|
||||||
|
wrmx sync.Mutex
|
||||||
|
|
||||||
onChange func()
|
onChange func()
|
||||||
onWrite func(msg interface{})
|
onWrite func(msg interface{})
|
||||||
@@ -118,21 +123,32 @@ func (t *Transport) OnWrite(f func(msg interface{})) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transport) Write(msg interface{}) {
|
func (t *Transport) Write(msg interface{}) {
|
||||||
t.mx.Lock()
|
t.wrmx.Lock()
|
||||||
t.onWrite(msg)
|
t.onWrite(msg)
|
||||||
t.mx.Unlock()
|
t.wrmx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transport) Close() {
|
func (t *Transport) Close() {
|
||||||
|
t.mx.Lock()
|
||||||
for _, f := range t.onClose {
|
for _, f := range t.onClose {
|
||||||
f()
|
f()
|
||||||
}
|
}
|
||||||
|
t.closed = true
|
||||||
|
t.mx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transport) OnChange(f func()) {
|
func (t *Transport) OnChange(f func()) {
|
||||||
|
t.mx.Lock()
|
||||||
t.onChange = f
|
t.onChange = f
|
||||||
|
t.mx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transport) OnClose(f func()) {
|
func (t *Transport) OnClose(f func()) {
|
||||||
|
t.mx.Lock()
|
||||||
|
if t.closed {
|
||||||
|
f()
|
||||||
|
} else {
|
||||||
t.onClose = append(t.onClose, f)
|
t.onClose = append(t.onClose, f)
|
||||||
|
}
|
||||||
|
t.mx.Unlock()
|
||||||
}
|
}
|
||||||
|
|||||||
+1
-1
@@ -10,7 +10,7 @@ import (
|
|||||||
"runtime"
|
"runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
var Version = "0.1-rc.7"
|
var Version = "0.1-rc.8"
|
||||||
var UserAgent = "go2rtc/" + Version
|
var UserAgent = "go2rtc/" + Version
|
||||||
|
|
||||||
func Init() {
|
func Init() {
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ var stackSkip = [][]byte{
|
|||||||
|
|
||||||
// webrtc/api.go
|
// webrtc/api.go
|
||||||
[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"),
|
[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"),
|
||||||
|
[]byte("created by github.com/pion/ice/v2.NewUDPMuxDefault"),
|
||||||
}
|
}
|
||||||
|
|
||||||
func stackHandler(w http.ResponseWriter, r *http.Request) {
|
func stackHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|||||||
+13
-2
@@ -34,8 +34,13 @@ func Init() {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
waiter <- conn
|
// unblocking write to channel
|
||||||
|
select {
|
||||||
|
case waiter <- conn:
|
||||||
return true
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
streams.HandleFunc("exec", Handle)
|
streams.HandleFunc("exec", Handle)
|
||||||
@@ -86,7 +91,13 @@ func Handle(url string) (streamer.Producer, error) {
|
|||||||
chErr := make(chan error)
|
chErr := make(chan error)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
chErr <- cmd.Wait()
|
err := cmd.Wait()
|
||||||
|
// unblocking write to channel
|
||||||
|
select {
|
||||||
|
case chErr <- err:
|
||||||
|
default:
|
||||||
|
log.Trace().Str("url", url).Msg("[exec] close")
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
|||||||
+1
-1
@@ -38,7 +38,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if err := stream.AddConsumer(cons); err != nil {
|
if err := stream.AddConsumer(cons); err != nil {
|
||||||
log.Warn().Err(err).Caller().Send()
|
log.Debug().Err(err).Msg("[mp4] add consumer")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+4
-1
@@ -200,6 +200,9 @@ func tcpHandler(conn *rtsp.Conn) {
|
|||||||
|
|
||||||
if err := conn.Accept(); err != nil {
|
if err := conn.Accept(); err != nil {
|
||||||
log.Warn().Err(err).Caller().Send()
|
log.Warn().Err(err).Caller().Send()
|
||||||
|
if closer != nil {
|
||||||
|
closer()
|
||||||
|
}
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -212,7 +215,7 @@ func tcpHandler(conn *rtsp.Conn) {
|
|||||||
|
|
||||||
if closer != nil {
|
if closer != nil {
|
||||||
if err := conn.Handle(); err != nil {
|
if err := conn.Handle(); err != nil {
|
||||||
log.Debug().Err(err).Caller().Send()
|
log.Debug().Msgf("[rtsp] handle=%s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
closer()
|
closer()
|
||||||
|
|||||||
+29
-22
@@ -29,7 +29,7 @@ type Producer struct {
|
|||||||
|
|
||||||
state state
|
state state
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
restart *time.Timer
|
workerID int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Producer) SetSource(s string) {
|
func (p *Producer) SetSource(s string) {
|
||||||
@@ -104,20 +104,32 @@ func (p *Producer) start() {
|
|||||||
log.Debug().Msgf("[streams] start producer url=%s", p.url)
|
log.Debug().Msgf("[streams] start producer url=%s", p.url)
|
||||||
|
|
||||||
p.state = stateStart
|
p.state = stateStart
|
||||||
go func() {
|
p.workerID++
|
||||||
// safe read element while mu locked
|
|
||||||
if err := p.element.Start(); err != nil {
|
go p.worker(p.element, p.workerID)
|
||||||
log.Warn().Err(err).Str("url", p.url).Caller().Send()
|
|
||||||
}
|
|
||||||
p.reconnect()
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Producer) reconnect() {
|
func (p *Producer) worker(element streamer.Producer, workerID int) {
|
||||||
|
if err := element.Start(); err != nil {
|
||||||
|
p.mu.Lock()
|
||||||
|
closed := p.workerID != workerID
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
if closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warn().Err(err).Str("url", p.url).Caller().Send()
|
||||||
|
}
|
||||||
|
|
||||||
|
p.reconnect(workerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Producer) reconnect(workerID int) {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
if p.state != stateStart {
|
if p.workerID != workerID {
|
||||||
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
|
log.Trace().Msgf("[streams] stop reconnect url=%s", p.url)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -126,9 +138,11 @@ func (p *Producer) reconnect() {
|
|||||||
|
|
||||||
p.element, p.lastErr = GetProducer(p.url)
|
p.element, p.lastErr = GetProducer(p.url)
|
||||||
if p.lastErr != nil || p.element == nil {
|
if p.lastErr != nil || p.element == nil {
|
||||||
log.Debug().Err(p.lastErr).Caller().Send()
|
log.Debug().Msgf("[streams] producer=%s", p.lastErr)
|
||||||
// TODO: dynamic timeout
|
// TODO: dynamic timeout
|
||||||
p.restart = time.AfterFunc(30*time.Second, p.reconnect)
|
time.AfterFunc(30*time.Second, func() {
|
||||||
|
p.reconnect(workerID)
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -152,12 +166,7 @@ func (p *Producer) reconnect() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go p.worker(p.element, workerID)
|
||||||
if err := p.element.Start(); err != nil {
|
|
||||||
log.Debug().Err(err).Caller().Send()
|
|
||||||
}
|
|
||||||
p.reconnect()
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Producer) stop() {
|
func (p *Producer) stop() {
|
||||||
@@ -171,6 +180,8 @@ func (p *Producer) stop() {
|
|||||||
case stateNone:
|
case stateNone:
|
||||||
log.Debug().Msgf("[streams] can't stop none producer")
|
log.Debug().Msgf("[streams] can't stop none producer")
|
||||||
return
|
return
|
||||||
|
case stateStart:
|
||||||
|
p.workerID++
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug().Msgf("[streams] stop producer url=%s", p.url)
|
log.Debug().Msgf("[streams] stop producer url=%s", p.url)
|
||||||
@@ -179,10 +190,6 @@ func (p *Producer) stop() {
|
|||||||
_ = p.element.Stop()
|
_ = p.element.Stop()
|
||||||
p.element = nil
|
p.element = nil
|
||||||
}
|
}
|
||||||
if p.restart != nil {
|
|
||||||
p.restart.Stop()
|
|
||||||
p.restart = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
p.state = stateNone
|
p.state = stateNone
|
||||||
p.tracks = nil
|
p.tracks = nil
|
||||||
|
|||||||
+18
-10
@@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Consumer struct {
|
type Consumer struct {
|
||||||
@@ -18,7 +19,7 @@ type Stream struct {
|
|||||||
producers []*Producer
|
producers []*Producer
|
||||||
consumers []*Consumer
|
consumers []*Consumer
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
wg sync.WaitGroup
|
requests int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStream(source interface{}) *Stream {
|
func NewStream(source interface{}) *Stream {
|
||||||
@@ -53,6 +54,9 @@ func (s *Stream) SetSource(source string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||||
|
// support for multiple simultaneous requests from different consumers
|
||||||
|
atomic.AddInt32(&s.requests, 1)
|
||||||
|
|
||||||
ic := len(s.consumers)
|
ic := len(s.consumers)
|
||||||
|
|
||||||
consumer := &Consumer{element: cons}
|
consumer := &Consumer{element: cons}
|
||||||
@@ -60,9 +64,6 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
|||||||
|
|
||||||
var codecs string
|
var codecs string
|
||||||
|
|
||||||
// support for multiple simultaneous requests from different consumers
|
|
||||||
s.wg.Add(1)
|
|
||||||
|
|
||||||
// Step 1. Get consumer medias
|
// Step 1. Get consumer medias
|
||||||
for icc, consMedia := range cons.GetMedias() {
|
for icc, consMedia := range cons.GetMedias() {
|
||||||
log.Trace().Stringer("media", consMedia).
|
log.Trace().Stringer("media", consMedia).
|
||||||
@@ -86,7 +87,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
|||||||
// Step 4. Get producer track
|
// Step 4. Get producer track
|
||||||
prodTrack := prod.GetTrack(prodMedia, prodCodec)
|
prodTrack := prod.GetTrack(prodMedia, prodCodec)
|
||||||
if prodTrack == nil {
|
if prodTrack == nil {
|
||||||
log.Warn().Str("url", prod.url).Msg("[stream] can't get track")
|
log.Warn().Str("url", prod.url).Msg("[streams] can't get track")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -101,12 +102,11 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.wg.Done()
|
if atomic.AddInt32(&s.requests, -1) == 0 {
|
||||||
s.wg.Wait()
|
s.stopProducers()
|
||||||
|
}
|
||||||
|
|
||||||
if len(producers) == 0 {
|
if len(producers) == 0 {
|
||||||
s.stopProducers()
|
|
||||||
|
|
||||||
if len(codecs) > 0 {
|
if len(codecs) > 0 {
|
||||||
return errors.New("codecs not match: " + codecs)
|
return errors.New("codecs not match: " + codecs)
|
||||||
}
|
}
|
||||||
@@ -197,8 +197,12 @@ producers:
|
|||||||
//}
|
//}
|
||||||
|
|
||||||
func (s *Stream) MarshalJSON() ([]byte, error) {
|
func (s *Stream) MarshalJSON() ([]byte, error) {
|
||||||
|
if !s.mu.TryLock() {
|
||||||
|
log.Warn().Msgf("[streams] json locked")
|
||||||
|
return []byte(`null`), nil
|
||||||
|
}
|
||||||
|
|
||||||
var v []interface{}
|
var v []interface{}
|
||||||
s.mu.Lock()
|
|
||||||
for _, prod := range s.producers {
|
for _, prod := range s.producers {
|
||||||
if prod.element != nil {
|
if prod.element != nil {
|
||||||
v = append(v, prod.element)
|
v = append(v, prod.element)
|
||||||
@@ -242,6 +246,10 @@ func (s *Stream) removeProducer(i int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func collectCodecs(media *streamer.Media, codecs *string) {
|
func collectCodecs(media *streamer.Media, codecs *string) {
|
||||||
|
if media.Direction == streamer.DirectionRecvonly {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
for _, codec := range media.Codecs {
|
for _, codec := range media.Codecs {
|
||||||
name := codec.Name
|
name := codec.Name
|
||||||
if name == streamer.CodecAAC {
|
if name == streamer.CodecAAC {
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var candidates []string
|
var candidates []string
|
||||||
|
var networks = []string{"udp", "tcp"}
|
||||||
|
|
||||||
func AddCandidate(address string) {
|
func AddCandidate(address string) {
|
||||||
candidates = append(candidates, address)
|
candidates = append(candidates, address)
|
||||||
@@ -20,7 +21,8 @@ func asyncCandidates(tr *api.Transport) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
cand, err := webrtc.NewCandidate(address)
|
for _, network := range networks {
|
||||||
|
cand, err := webrtc.NewCandidate(network, address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn().Err(err).Caller().Send()
|
log.Warn().Err(err).Caller().Send()
|
||||||
continue
|
continue
|
||||||
@@ -30,6 +32,7 @@ func asyncCandidates(tr *api.Transport) {
|
|||||||
|
|
||||||
tr.Write(&api.Message{Type: "webrtc/candidate", Value: cand})
|
tr.Write(&api.Message{Type: "webrtc/candidate", Value: cand})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func syncCanditates(answer string) (string, error) {
|
func syncCanditates(answer string) (string, error) {
|
||||||
@@ -57,7 +60,8 @@ func syncCanditates(answer string) (string, error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
cand, err := webrtc.NewCandidate(address)
|
for _, network := range networks {
|
||||||
|
cand, err := webrtc.NewCandidate(network, address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn().Err(err).Msg("[webrtc] candidate")
|
log.Warn().Err(err).Msg("[webrtc] candidate")
|
||||||
continue
|
continue
|
||||||
@@ -65,6 +69,7 @@ func syncCanditates(answer string) (string, error) {
|
|||||||
|
|
||||||
md.WithPropertyAttribute(cand)
|
md.WithPropertyAttribute(cand)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if end {
|
if end {
|
||||||
md.WithPropertyAttribute("end-of-candidates")
|
md.WithPropertyAttribute("end-of-candidates")
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ func Init() {
|
|||||||
} `yaml:"webrtc"`
|
} `yaml:"webrtc"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfg.Mod.Listen = ":8555"
|
||||||
cfg.Mod.IceServers = []pion.ICEServer{
|
cfg.Mod.IceServers = []pion.ICEServer{
|
||||||
{URLs: []string{"stun:stun.l.google.com:19302"}},
|
{URLs: []string{"stun:stun.l.google.com:19302"}},
|
||||||
}
|
}
|
||||||
@@ -112,7 +113,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
|
|||||||
|
|
||||||
// 2. AddConsumer, so we get new tracks
|
// 2. AddConsumer, so we get new tracks
|
||||||
if err = stream.AddConsumer(conn); err != nil {
|
if err = stream.AddConsumer(conn); err != nil {
|
||||||
log.Warn().Err(err).Caller().Send()
|
log.Debug().Err(err).Msg("[webrtc] add consumer")
|
||||||
_ = conn.Conn.Close()
|
_ = conn.Conn.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
+43
-10
@@ -14,9 +14,18 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type State byte
|
||||||
|
|
||||||
|
const (
|
||||||
|
StateNone State = iota
|
||||||
|
StateConn
|
||||||
|
StateHandle
|
||||||
|
)
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
streamer.Element
|
streamer.Element
|
||||||
|
|
||||||
@@ -26,12 +35,12 @@ type Client struct {
|
|||||||
medias []*streamer.Media
|
medias []*streamer.Media
|
||||||
tracks map[byte]*streamer.Track
|
tracks map[byte]*streamer.Track
|
||||||
|
|
||||||
closed bool
|
|
||||||
|
|
||||||
msg *message
|
msg *message
|
||||||
t0 time.Time
|
t0 time.Time
|
||||||
|
|
||||||
buffer chan []byte
|
buffer chan []byte
|
||||||
|
state State
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(id string) *Client {
|
func NewClient(id string) *Client {
|
||||||
@@ -69,16 +78,26 @@ func (c *Client) Dial() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.state = StateConn
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Handle() error {
|
func (c *Client) Handle() error {
|
||||||
c.buffer = make(chan []byte, 5)
|
|
||||||
// add delay to the stream for smooth playing (not a best solution)
|
// add delay to the stream for smooth playing (not a best solution)
|
||||||
c.t0 = time.Now().Add(time.Second)
|
c.t0 = time.Now().Add(time.Second)
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
|
||||||
|
if c.state == StateConn {
|
||||||
|
c.buffer = make(chan []byte, 5)
|
||||||
|
c.state = StateHandle
|
||||||
|
|
||||||
// processing stream in separate thread for lower delay between packets
|
// processing stream in separate thread for lower delay between packets
|
||||||
go c.worker()
|
go c.worker(c.buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
_, data, err := c.conn.ReadMessage()
|
_, data, err := c.conn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -87,8 +106,12 @@ func (c *Client) Handle() error {
|
|||||||
|
|
||||||
track := c.tracks[c.msg.Track]
|
track := c.tracks[c.msg.Track]
|
||||||
if track != nil {
|
if track != nil {
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.state == StateHandle {
|
||||||
c.buffer <- data
|
c.buffer <- data
|
||||||
}
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// we have one unprocessed msg after getTracks
|
// we have one unprocessed msg after getTracks
|
||||||
for {
|
for {
|
||||||
@@ -114,8 +137,12 @@ func (c *Client) Handle() error {
|
|||||||
|
|
||||||
track = c.tracks[msg.Track]
|
track = c.tracks[msg.Track]
|
||||||
if track != nil {
|
if track != nil {
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.state == StateHandle {
|
||||||
c.buffer <- data
|
c.buffer <- data
|
||||||
}
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("wrong message type: %s", data)
|
return fmt.Errorf("wrong message type: %s", data)
|
||||||
@@ -124,13 +151,19 @@ func (c *Client) Handle() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Close() error {
|
func (c *Client) Close() error {
|
||||||
if c.conn == nil {
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
switch c.state {
|
||||||
|
case StateNone:
|
||||||
return nil
|
return nil
|
||||||
}
|
case StateConn:
|
||||||
if c.buffer != nil {
|
case StateHandle:
|
||||||
close(c.buffer)
|
close(c.buffer)
|
||||||
}
|
}
|
||||||
c.closed = true
|
|
||||||
|
c.state = StateNone
|
||||||
|
|
||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -210,13 +243,13 @@ func (c *Client) getTracks() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) worker() {
|
func (c *Client) worker(buffer chan []byte) {
|
||||||
var track *streamer.Track
|
var track *streamer.Track
|
||||||
for _, track = range c.tracks {
|
for _, track = range c.tracks {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
for data := range c.buffer {
|
for data := range buffer {
|
||||||
moof := &fmp4io.MovieFrag{}
|
moof := &fmp4io.MovieFrag{}
|
||||||
if _, err := moof.Unmarshal(data, 0); err != nil {
|
if _, err := moof.Unmarshal(data, 0); err != nil {
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streame
|
|||||||
|
|
||||||
func (c *Client) Start() error {
|
func (c *Client) Start() error {
|
||||||
err := c.Handle()
|
err := c.Handle()
|
||||||
if c.closed {
|
if c.buffer == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -64,6 +64,8 @@ func (c *Client) Start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Stop() error {
|
func (c *Client) Stop() error {
|
||||||
|
// important for close reader/writer gorutines
|
||||||
|
_ = c.res.Body.Close()
|
||||||
c.closed = true
|
c.closed = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -89,7 +89,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !codec.IsRAW() {
|
if codec.IsRTP() {
|
||||||
wrapper := h264.RTPDepay(track)
|
wrapper := h264.RTPDepay(track)
|
||||||
push = wrapper(push)
|
push = wrapper(push)
|
||||||
}
|
}
|
||||||
|
|||||||
+24
-2
@@ -48,6 +48,22 @@ const (
|
|||||||
|
|
||||||
type State byte
|
type State byte
|
||||||
|
|
||||||
|
func (s State) String() string {
|
||||||
|
switch s {
|
||||||
|
case StateNone:
|
||||||
|
return "NONE"
|
||||||
|
case StateConn:
|
||||||
|
return "CONN"
|
||||||
|
case StateSetup:
|
||||||
|
return "SETUP"
|
||||||
|
case StatePlay:
|
||||||
|
return "PLAY"
|
||||||
|
case StateHandle:
|
||||||
|
return "HANDLE"
|
||||||
|
}
|
||||||
|
return strconv.Itoa(int(s))
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
StateNone State = iota
|
StateNone State = iota
|
||||||
StateConn
|
StateConn
|
||||||
@@ -346,6 +362,10 @@ func (c *Conn) SetupMedia(
|
|||||||
c.stateMu.Lock()
|
c.stateMu.Lock()
|
||||||
defer c.stateMu.Unlock()
|
defer c.stateMu.Unlock()
|
||||||
|
|
||||||
|
if c.state != StateConn && c.state != StateSetup {
|
||||||
|
return nil, fmt.Errorf("RTSP SETUP from wrong state: %s", c.state)
|
||||||
|
}
|
||||||
|
|
||||||
ch := c.GetChannel(media)
|
ch := c.GetChannel(media)
|
||||||
if ch < 0 {
|
if ch < 0 {
|
||||||
return nil, fmt.Errorf("wrong media: %v", media)
|
return nil, fmt.Errorf("wrong media: %v", media)
|
||||||
@@ -648,15 +668,17 @@ func (c *Conn) Handle() (err error) {
|
|||||||
case StatePlay:
|
case StatePlay:
|
||||||
c.state = StateHandle
|
c.state = StateHandle
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("RTSP Handle from wrong state: %d", c.state)
|
err = fmt.Errorf("RTSP HANDLE from wrong state: %s", c.state)
|
||||||
|
|
||||||
c.state = StateNone
|
c.state = StateNone
|
||||||
_ = c.conn.Close()
|
_ = c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ok := c.state == StateHandle
|
||||||
|
|
||||||
c.stateMu.Unlock()
|
c.stateMu.Unlock()
|
||||||
|
|
||||||
if c.state != StateHandle {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -119,7 +119,7 @@ func (c *Conn) MarshalJSON() ([]byte, error) {
|
|||||||
v[k] = media.String()
|
v[k] = media.String()
|
||||||
}
|
}
|
||||||
for i, track := range c.tracks {
|
for i, track := range c.tracks {
|
||||||
k := "track:" + strconv.Itoa(int(i>>1))
|
k := "track:" + strconv.Itoa(i)
|
||||||
v[k] = track.String()
|
v[k] = track.String()
|
||||||
}
|
}
|
||||||
//for i, track := range c.tracks {
|
//for i, track := range c.tracks {
|
||||||
|
|||||||
+6
-2
@@ -35,13 +35,17 @@ func NewAPI(address string) (*webrtc.API, error) {
|
|||||||
s.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled)
|
s.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled)
|
||||||
|
|
||||||
if address != "" {
|
if address != "" {
|
||||||
ln, err := net.Listen("tcp", address)
|
|
||||||
if err == nil {
|
|
||||||
s.SetNetworkTypes([]webrtc.NetworkType{
|
s.SetNetworkTypes([]webrtc.NetworkType{
|
||||||
webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6,
|
webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6,
|
||||||
webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6,
|
webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if ln, err := net.ListenPacket("udp", address); err == nil {
|
||||||
|
udpMux := webrtc.NewICEUDPMux(nil, ln)
|
||||||
|
s.SetICEUDPMux(udpMux)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ln, err := net.Listen("tcp", address); err == nil {
|
||||||
tcpMux := webrtc.NewICETCPMux(nil, ln, 8)
|
tcpMux := webrtc.NewICETCPMux(nil, ln, 8)
|
||||||
s.SetICETCPMux(tcpMux)
|
s.SetICETCPMux(tcpMux)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewCandidate(address string) (string, error) {
|
func NewCandidate(network, address string) (string, error) {
|
||||||
i := strings.LastIndexByte(address, ':')
|
i := strings.LastIndexByte(address, ':')
|
||||||
if i < 0 {
|
if i < 0 {
|
||||||
return "", errors.New("wrong candidate: " + address)
|
return "", errors.New("wrong candidate: " + address)
|
||||||
@@ -26,7 +26,7 @@ func NewCandidate(address string) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cand, err := ice.NewCandidateHost(&ice.CandidateHostConfig{
|
cand, err := ice.NewCandidateHost(&ice.CandidateHostConfig{
|
||||||
Network: "tcp",
|
Network: network,
|
||||||
Address: host,
|
Address: host,
|
||||||
Port: i,
|
Port: i,
|
||||||
Component: ice.ComponentRTP,
|
Component: ice.ComponentRTP,
|
||||||
|
|||||||
Reference in New Issue
Block a user