Add instant start for WebRTC consumer
This commit is contained in:
+35
-13
@@ -113,7 +113,12 @@ func NewSender(media *Media, codec *Codec) *Sender {
|
||||
type HandlerFunc func(packet *rtp.Packet)
|
||||
|
||||
func (s *Sender) HandleRTP(track *Receiver) {
|
||||
bufferSize := 100
|
||||
s.Bind(track)
|
||||
go s.worker(track)
|
||||
}
|
||||
|
||||
func (s *Sender) Bind(track *Receiver) {
|
||||
var bufferSize uint16
|
||||
|
||||
if GetKind(track.Codec.Name) == KindVideo {
|
||||
if track.Codec.IsRTP() {
|
||||
@@ -123,6 +128,8 @@ func (s *Sender) HandleRTP(track *Receiver) {
|
||||
} else {
|
||||
bufferSize = 50
|
||||
}
|
||||
} else {
|
||||
bufferSize = 100
|
||||
}
|
||||
|
||||
buffer := make(chan *rtp.Packet, bufferSize)
|
||||
@@ -133,28 +140,43 @@ func (s *Sender) HandleRTP(track *Receiver) {
|
||||
}
|
||||
track.senders[s] = buffer
|
||||
track.mu.Unlock()
|
||||
|
||||
s.mu.Lock()
|
||||
s.receivers = append(s.receivers, track)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
go func() {
|
||||
// read packets from buffer channel until it will be closed
|
||||
func (s *Sender) worker(track *Receiver) {
|
||||
track.mu.Lock()
|
||||
buffer := track.senders[s]
|
||||
track.mu.Unlock()
|
||||
|
||||
// read packets from buffer channel until it will be closed
|
||||
if buffer != nil {
|
||||
for packet := range buffer {
|
||||
s.bytes += len(packet.Payload)
|
||||
s.Handler(packet)
|
||||
}
|
||||
}
|
||||
|
||||
// remove current receiver from list
|
||||
// it can only happen when receiver close buffer channel
|
||||
s.mu.Lock()
|
||||
for i, receiver := range s.receivers {
|
||||
if receiver == track {
|
||||
s.receivers = append(s.receivers[:i], s.receivers[i+1:]...)
|
||||
break
|
||||
}
|
||||
// remove current receiver from list
|
||||
// it can only happen when receiver close buffer channel
|
||||
s.mu.Lock()
|
||||
for i, receiver := range s.receivers {
|
||||
if receiver == track {
|
||||
s.receivers = append(s.receivers[:i], s.receivers[i+1:]...)
|
||||
break
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Sender) Start() {
|
||||
s.mu.Lock()
|
||||
for _, track := range s.receivers {
|
||||
go s.worker(track)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Sender) Close() {
|
||||
|
||||
@@ -120,6 +120,10 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
|
||||
c.Fire(state)
|
||||
|
||||
switch state {
|
||||
case webrtc.PeerConnectionStateConnected:
|
||||
for _, sender := range c.senders {
|
||||
sender.Start()
|
||||
}
|
||||
case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
|
||||
// disconnect event comes earlier, than failed
|
||||
// but it comes only for success connections
|
||||
|
||||
@@ -20,7 +20,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
|
||||
|
||||
for _, sender := range c.senders {
|
||||
if sender.Codec == codec {
|
||||
sender.HandleRTP(track)
|
||||
sender.Bind(track)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -77,7 +77,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
|
||||
sender.Handler = pcm.RepackG711(false, sender.Handler)
|
||||
}
|
||||
|
||||
sender.HandleRTP(track)
|
||||
sender.Bind(track)
|
||||
|
||||
c.senders = append(c.senders, sender)
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user