Refactor code to include buffer channels to prevent blocking in handler functions and add RWMutex to Receiver and Sender structs for concurrency-safe data access

This commit is contained in:
Sergey Krashevich
2023-03-28 05:56:35 +03:00
parent 8fde2b6fe5
commit 222dc6a5c2
2 changed files with 17 additions and 9 deletions
+10 -4
View File
@@ -44,12 +44,15 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
return return
} }
exit := make(chan []byte) exit := make(chan []byte, 1)
cons := &mp4.Segment{OnlyKeyframe: true} cons := &mp4.Segment{OnlyKeyframe: true}
cons.Listen(func(msg any) { cons.Listen(func(msg any) {
if data, ok := msg.([]byte); ok && exit != nil { if data, ok := msg.([]byte); ok && exit != nil {
exit <- data select {
case exit <- data:
default:
}
exit = nil exit = nil
} }
}) })
@@ -103,7 +106,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
return return
} }
exit := make(chan error) exit := make(chan error, 1) // Add buffer to prevent blocking
cons := &mp4.Consumer{ cons := &mp4.Consumer{
RemoteAddr: tcp.RemoteAddr(r), RemoteAddr: tcp.RemoteAddr(r),
@@ -114,7 +117,10 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
cons.Listen(func(msg any) { cons.Listen(func(msg any) {
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
if _, err := w.Write(data); err != nil && exit != nil { if _, err := w.Write(data); err != nil && exit != nil {
exit <- err select {
case exit <- err:
default:
}
exit = nil exit = nil
} }
} }
+7 -5
View File
@@ -18,7 +18,7 @@ type Receiver struct {
ID byte // Channel for RTSP, PayloadType for MPEG-TS ID byte // Channel for RTSP, PayloadType for MPEG-TS
senders map[*Sender]chan *rtp.Packet senders map[*Sender]chan *rtp.Packet
mu sync.Mutex mu sync.RWMutex
bytes int bytes int
} }
@@ -32,9 +32,9 @@ func (t *Receiver) WriteRTP(packet *rtp.Packet) {
t.mu.Lock() t.mu.Lock()
t.bytes += len(packet.Payload) t.bytes += len(packet.Payload)
for sender, buffer := range t.senders { for sender, buffer := range t.senders {
if len(buffer) < cap(buffer) { select {
buffer <- packet case buffer <- packet:
} else { default:
sender.overflow++ sender.overflow++
} }
} }
@@ -93,7 +93,7 @@ type Sender struct {
Handler HandlerFunc Handler HandlerFunc
receivers []*Receiver receivers []*Receiver
mu sync.Mutex mu sync.RWMutex
bytes int bytes int
overflow int overflow int
@@ -135,7 +135,9 @@ func (s *Sender) HandleRTP(track *Receiver) {
go func() { go func() {
// read packets from buffer channel until it will be closed // read packets from buffer channel until it will be closed
for packet := range buffer { for packet := range buffer {
s.mu.Lock()
s.bytes += len(packet.Payload) s.bytes += len(packet.Payload)
s.mu.Unlock()
s.Handler(packet) s.Handler(packet)
} }