Add buffer for RTSP output

This commit is contained in:
Alexey Khit
2023-08-20 21:25:45 +03:00
parent 873cf65317
commit d1eb623fd6
+46 -14
View File
@@ -1,6 +1,8 @@
package rtsp package rtsp
import ( import (
"time"
"github.com/AlexxIT/go2rtc/pkg/aac" "github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h264"
@@ -8,7 +10,6 @@ import (
"github.com/AlexxIT/go2rtc/pkg/mjpeg" "github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/pcm" "github.com/AlexxIT/go2rtc/pkg/pcm"
"github.com/pion/rtp" "github.com/pion/rtp"
"time"
) )
func (c *Conn) GetMedias() []*core.Media { func (c *Conn) GetMedias() []*core.Media {
@@ -74,25 +75,55 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
} }
func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core.HandlerFunc { func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core.HandlerFunc {
var n int
buf := make([]byte, 4096) // 4KB
handlerFunc := func(packet *rtp.Packet) { handlerFunc := func(packet *rtp.Packet) {
if c.state == StateNone { if c.state == StateNone {
return return
} }
clone := *packet clone := rtp.Packet{
clone.Header.PayloadType = payloadType Header: rtp.Header{
Version: 2,
Marker: packet.Marker,
PayloadType: payloadType,
SequenceNumber: packet.SequenceNumber,
Timestamp: packet.Timestamp,
SSRC: packet.SSRC,
},
Payload: packet.Payload,
}
size := clone.MarshalSize() size := 12 + len(packet.Payload)
if n+4+size > len(buf) {
if len(buf) < 1024*1024 {
buf = append(buf, make([]byte, len(buf))...)
} else {
if _, err := c.conn.Write(buf[:n]); err == nil {
c.send += n
}
n = 0
}
}
//log.Printf("[RTP] codec: %s, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, mark: %v", codec.Name, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker) //log.Printf("[RTP] codec: %s, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, mark: %v", codec.Name, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker)
data := make([]byte, 4+size) chunk := buf[n:]
data[0] = '$' _ = chunk[4] // bounds
data[1] = channel chunk[0] = '$'
data[2] = byte(size >> 8) chunk[1] = channel
data[3] = byte(size) chunk[2] = byte(size >> 8)
chunk[3] = byte(size)
if _, err := clone.MarshalTo(data[4:]); err != nil { if _, err := clone.MarshalTo(chunk[4:]); err != nil {
return
}
n += 4 + size
if !packet.Marker {
return return
} }
@@ -100,12 +131,13 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core.
return return
} }
n, err := c.conn.Write(data) //log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf))
if err != nil {
return if _, err := c.conn.Write(buf[:n]); err == nil {
c.send += n
} }
c.send += n n = 0
} }
if !codec.IsRTP() { if !codec.IsRTP() {