Merge remote-tracking branch 'skrashevich/patch-230328'

This commit is contained in:
Alexey Khit
2023-04-14 06:23:17 +03:00
4 changed files with 30 additions and 26 deletions
+1 -1
View File
@@ -51,7 +51,7 @@ var defaults = map[string]string{
"rtsp/udp": "-fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -i {input}", "rtsp/udp": "-fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -i {input}",
// output // output
"output": "-user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}", "output": "-user_agent ffmpeg/go2rtc -rtsp_transport tcp -bufsize 8192k -f rtsp {output}",
// `-preset superfast` - we can't use ultrafast because it doesn't support `-profile main -level 4.1` // `-preset superfast` - we can't use ultrafast because it doesn't support `-profile main -level 4.1`
// `-tune zerolatency` - for minimal latency // `-tune zerolatency` - for minimal latency
+10 -4
View File
@@ -46,12 +46,15 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
return return
} }
exit := make(chan []byte) exit := make(chan []byte, 1)
cons := &mp4.Segment{OnlyKeyframe: true} cons := &mp4.Segment{OnlyKeyframe: true}
cons.Listen(func(msg any) { cons.Listen(func(msg any) {
if data, ok := msg.([]byte); ok && exit != nil { if data, ok := msg.([]byte); ok && exit != nil {
exit <- data select {
case exit <- data:
default:
}
exit = nil exit = nil
} }
}) })
@@ -105,7 +108,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
return return
} }
exit := make(chan error) exit := make(chan error, 1) // Add buffer to prevent blocking
cons := &mp4.Consumer{ cons := &mp4.Consumer{
RemoteAddr: tcp.RemoteAddr(r), RemoteAddr: tcp.RemoteAddr(r),
@@ -119,7 +122,10 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if _, err := w.Write(data); err != nil && exit != nil { if _, err := w.Write(data); err != nil && exit != nil {
exit <- err select {
case exit <- err:
default:
}
exit = nil exit = nil
} }
} }
+4 -1
View File
@@ -1,6 +1,7 @@
package rtsp package rtsp
import ( import (
"io"
"net" "net"
"net/url" "net/url"
"strings" "strings"
@@ -213,7 +214,9 @@ func tcpHandler(conn *rtsp.Conn) {
}) })
if err := conn.Accept(); err != nil { if err := conn.Accept(); err != nil {
log.Warn().Err(err).Caller().Send() if err != io.EOF {
log.Warn().Err(err).Caller().Send()
}
if closer != nil { if closer != nil {
closer() closer()
} }
+15 -20
View File
@@ -18,7 +18,7 @@ type Receiver struct {
ID byte // Channel for RTSP, PayloadType for MPEG-TS ID byte // Channel for RTSP, PayloadType for MPEG-TS
senders map[*Sender]chan *rtp.Packet senders map[*Sender]chan *rtp.Packet
mu sync.Mutex mu sync.RWMutex
bytes int bytes int
} }
@@ -32,9 +32,9 @@ func (t *Receiver) WriteRTP(packet *rtp.Packet) {
t.mu.Lock() t.mu.Lock()
t.bytes += len(packet.Payload) t.bytes += len(packet.Payload)
for sender, buffer := range t.senders { for sender, buffer := range t.senders {
if len(buffer) < cap(buffer) { select {
buffer <- packet case buffer <- packet:
} else { default:
sender.overflow++ sender.overflow++
} }
} }
@@ -42,11 +42,11 @@ func (t *Receiver) WriteRTP(packet *rtp.Packet) {
} }
func (t *Receiver) Senders() (senders []*Sender) { func (t *Receiver) Senders() (senders []*Sender) {
t.mu.Lock() t.mu.RLock()
for sender := range t.senders { for sender := range t.senders {
senders = append(senders, sender) senders = append(senders, sender)
} }
t.mu.Unlock() t.mu.RUnlock()
return return
} }
@@ -73,12 +73,9 @@ func (t *Receiver) Replace(target *Receiver) {
func (t *Receiver) String() string { func (t *Receiver) String() string {
s := t.Codec.String() + ", bytes=" + strconv.Itoa(t.bytes) s := t.Codec.String() + ", bytes=" + strconv.Itoa(t.bytes)
if t.mu.TryLock() { t.mu.RLock()
s += fmt.Sprintf(", senders=%d", len(t.senders)) s += fmt.Sprintf(", senders=%d", len(t.senders))
t.mu.Unlock() t.mu.RUnlock()
} else {
s += fmt.Sprintf(", senders=?")
}
return s return s
} }
@@ -93,7 +90,7 @@ type Sender struct {
Handler HandlerFunc Handler HandlerFunc
receivers []*Receiver receivers []*Receiver
mu sync.Mutex mu sync.RWMutex
bytes int bytes int
overflow int overflow int
@@ -127,7 +124,6 @@ func (s *Sender) HandleRTP(track *Receiver) {
} }
track.senders[s] = buffer track.senders[s] = buffer
track.mu.Unlock() track.mu.Unlock()
s.mu.Lock() s.mu.Lock()
s.receivers = append(s.receivers, track) s.receivers = append(s.receivers, track)
s.mu.Unlock() s.mu.Unlock()
@@ -135,7 +131,9 @@ func (s *Sender) HandleRTP(track *Receiver) {
go func() { go func() {
// read packets from buffer channel until it will be closed // read packets from buffer channel until it will be closed
for packet := range buffer { for packet := range buffer {
s.mu.Lock()
s.bytes += len(packet.Payload) s.bytes += len(packet.Payload)
s.mu.Unlock()
s.Handler(packet) s.Handler(packet)
} }
@@ -171,12 +169,9 @@ func (s *Sender) Close() {
func (s *Sender) String() string { func (s *Sender) String() string {
info := s.Codec.String() + ", bytes=" + strconv.Itoa(s.bytes) info := s.Codec.String() + ", bytes=" + strconv.Itoa(s.bytes)
if s.mu.TryLock() { s.mu.RLock()
info += ", receivers=" + strconv.Itoa(len(s.receivers)) info += ", receivers=" + strconv.Itoa(len(s.receivers))
s.mu.Unlock() s.mu.RUnlock()
} else {
info += ", receivers=?"
}
if s.overflow > 0 { if s.overflow > 0 {
info += ", overflow=" + strconv.Itoa(s.overflow) info += ", overflow=" + strconv.Itoa(s.overflow)
} }