diff --git a/cmd/mp4/mp4.go b/cmd/mp4/mp4.go index a8a6b292..e569f869 100644 --- a/cmd/mp4/mp4.go +++ b/cmd/mp4/mp4.go @@ -44,12 +44,15 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { return } - exit := make(chan []byte) + exit := make(chan []byte, 1) cons := &mp4.Segment{OnlyKeyframe: true} cons.Listen(func(msg any) { if data, ok := msg.([]byte); ok && exit != nil { - exit <- data + select { + case exit <- data: + default: + } exit = nil } }) @@ -103,7 +106,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { return } - exit := make(chan error) + exit := make(chan error, 1) // Add buffer to prevent blocking cons := &mp4.Consumer{ RemoteAddr: tcp.RemoteAddr(r), @@ -114,7 +117,10 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { cons.Listen(func(msg any) { if data, ok := msg.([]byte); ok { if _, err := w.Write(data); err != nil && exit != nil { - exit <- err + select { + case exit <- err: + default: + } exit = nil } } diff --git a/pkg/core/track.go b/pkg/core/track.go index 82c8cc8a..891b2527 100644 --- a/pkg/core/track.go +++ b/pkg/core/track.go @@ -18,7 +18,7 @@ type Receiver struct { ID byte // Channel for RTSP, PayloadType for MPEG-TS senders map[*Sender]chan *rtp.Packet - mu sync.Mutex + mu sync.RWMutex bytes int } @@ -32,9 +32,9 @@ func (t *Receiver) WriteRTP(packet *rtp.Packet) { t.mu.Lock() t.bytes += len(packet.Payload) for sender, buffer := range t.senders { - if len(buffer) < cap(buffer) { - buffer <- packet - } else { + select { + case buffer <- packet: + default: sender.overflow++ } } @@ -93,7 +93,7 @@ type Sender struct { Handler HandlerFunc receivers []*Receiver - mu sync.Mutex + mu sync.RWMutex bytes int overflow int @@ -135,7 +135,9 @@ func (s *Sender) HandleRTP(track *Receiver) { go func() { // read packets from buffer channel until it will be closed for packet := range buffer { + s.mu.Lock() s.bytes += len(packet.Payload) + s.mu.Unlock() s.Handler(packet) }