diff --git a/cmd/ffmpeg/ffmpeg.go b/cmd/ffmpeg/ffmpeg.go index cbd795ec..1e61122f 100644 --- a/cmd/ffmpeg/ffmpeg.go +++ b/cmd/ffmpeg/ffmpeg.go @@ -51,7 +51,7 @@ var defaults = map[string]string{ "rtsp/udp": "-fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -i {input}", // output - "output": "-user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}", + "output": "-user_agent ffmpeg/go2rtc -rtsp_transport tcp -bufsize 8192k -f rtsp {output}", // `-preset superfast` - we can't use ultrafast because it doesn't support `-profile main -level 4.1` // `-tune zerolatency` - for minimal latency diff --git a/cmd/mp4/mp4.go b/cmd/mp4/mp4.go index 06f4fe8d..6f3fa1f7 100644 --- a/cmd/mp4/mp4.go +++ b/cmd/mp4/mp4.go @@ -46,12 +46,15 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { return } - exit := make(chan []byte) + exit := make(chan []byte, 1) cons := &mp4.Segment{OnlyKeyframe: true} cons.Listen(func(msg any) { if data, ok := msg.([]byte); ok && exit != nil { - exit <- data + select { + case exit <- data: + default: + } exit = nil } }) @@ -105,7 +108,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { return } - exit := make(chan error) + exit := make(chan error, 1) // Add buffer to prevent blocking cons := &mp4.Consumer{ RemoteAddr: tcp.RemoteAddr(r), @@ -119,7 +122,10 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { mu.Lock() defer mu.Unlock() if _, err := w.Write(data); err != nil && exit != nil { - exit <- err + select { + case exit <- err: + default: + } exit = nil } } diff --git a/cmd/rtsp/rtsp.go b/cmd/rtsp/rtsp.go index f5d54440..31ab985b 100644 --- a/cmd/rtsp/rtsp.go +++ b/cmd/rtsp/rtsp.go @@ -1,6 +1,7 @@ package rtsp import ( + "io" "net" "net/url" "strings" @@ -213,7 +214,9 @@ func tcpHandler(conn *rtsp.Conn) { }) if err := conn.Accept(); err != nil { - log.Warn().Err(err).Caller().Send() + if err != io.EOF { + log.Warn().Err(err).Caller().Send() + } if closer != nil { closer() } diff --git a/pkg/core/track.go b/pkg/core/track.go index 82c8cc8a..ade3ca8d 100644 --- a/pkg/core/track.go +++ b/pkg/core/track.go @@ -18,7 +18,7 @@ type Receiver struct { ID byte // Channel for RTSP, PayloadType for MPEG-TS senders map[*Sender]chan *rtp.Packet - mu sync.Mutex + mu sync.RWMutex bytes int } @@ -32,9 +32,9 @@ func (t *Receiver) WriteRTP(packet *rtp.Packet) { t.mu.Lock() t.bytes += len(packet.Payload) for sender, buffer := range t.senders { - if len(buffer) < cap(buffer) { - buffer <- packet - } else { + select { + case buffer <- packet: + default: sender.overflow++ } } @@ -42,11 +42,11 @@ func (t *Receiver) WriteRTP(packet *rtp.Packet) { } func (t *Receiver) Senders() (senders []*Sender) { - t.mu.Lock() + t.mu.RLock() for sender := range t.senders { senders = append(senders, sender) } - t.mu.Unlock() + t.mu.RUnlock() return } @@ -73,12 +73,9 @@ func (t *Receiver) Replace(target *Receiver) { func (t *Receiver) String() string { s := t.Codec.String() + ", bytes=" + strconv.Itoa(t.bytes) - if t.mu.TryLock() { - s += fmt.Sprintf(", senders=%d", len(t.senders)) - t.mu.Unlock() - } else { - s += fmt.Sprintf(", senders=?") - } + t.mu.RLock() + s += fmt.Sprintf(", senders=%d", len(t.senders)) + t.mu.RUnlock() return s } @@ -93,7 +90,7 @@ type Sender struct { Handler HandlerFunc receivers []*Receiver - mu sync.Mutex + mu sync.RWMutex bytes int overflow int @@ -127,7 +124,6 @@ func (s *Sender) HandleRTP(track *Receiver) { } track.senders[s] = buffer track.mu.Unlock() - s.mu.Lock() s.receivers = append(s.receivers, track) s.mu.Unlock() @@ -135,7 +131,9 @@ func (s *Sender) HandleRTP(track *Receiver) { go func() { // read packets from buffer channel until it will be closed for packet := range buffer { + s.mu.Lock() s.bytes += len(packet.Payload) + s.mu.Unlock() s.Handler(packet) } @@ -171,12 +169,9 @@ func (s *Sender) Close() { func (s *Sender) String() string { info := s.Codec.String() + ", bytes=" + strconv.Itoa(s.bytes) - if s.mu.TryLock() { - info += ", receivers=" + strconv.Itoa(len(s.receivers)) - s.mu.Unlock() - } else { - info += ", receivers=?" - } + s.mu.RLock() + info += ", receivers=" + strconv.Itoa(len(s.receivers)) + s.mu.RUnlock() if s.overflow > 0 { info += ", overflow=" + strconv.Itoa(s.overflow) }