From d1eb623fd6b655c7633ca7aeff3a5d00556e792b Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Sun, 20 Aug 2023 21:25:45 +0300 Subject: [PATCH] Add buffer for RTSP output --- pkg/rtsp/consumer.go | 60 +++++++++++++++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/pkg/rtsp/consumer.go b/pkg/rtsp/consumer.go index a0cc9662..ce9e402b 100644 --- a/pkg/rtsp/consumer.go +++ b/pkg/rtsp/consumer.go @@ -1,6 +1,8 @@ package rtsp import ( + "time" + "github.com/AlexxIT/go2rtc/pkg/aac" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/h264" @@ -8,7 +10,6 @@ import ( "github.com/AlexxIT/go2rtc/pkg/mjpeg" "github.com/AlexxIT/go2rtc/pkg/pcm" "github.com/pion/rtp" - "time" ) func (c *Conn) GetMedias() []*core.Media { @@ -74,25 +75,55 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv } func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core.HandlerFunc { + var n int + buf := make([]byte, 4096) // 4KB + handlerFunc := func(packet *rtp.Packet) { if c.state == StateNone { return } - clone := *packet - clone.Header.PayloadType = payloadType + clone := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: packet.Marker, + PayloadType: payloadType, + SequenceNumber: packet.SequenceNumber, + Timestamp: packet.Timestamp, + SSRC: packet.SSRC, + }, + Payload: packet.Payload, + } - size := clone.MarshalSize() + size := 12 + len(packet.Payload) + + if n+4+size > len(buf) { + if len(buf) < 1024*1024 { + buf = append(buf, make([]byte, len(buf))...) + } else { + if _, err := c.conn.Write(buf[:n]); err == nil { + c.send += n + } + n = 0 + } + } //log.Printf("[RTP] codec: %s, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, mark: %v", codec.Name, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker) - data := make([]byte, 4+size) - data[0] = '$' - data[1] = channel - data[2] = byte(size >> 8) - data[3] = byte(size) + chunk := buf[n:] + _ = chunk[4] // bounds + chunk[0] = '$' + chunk[1] = channel + chunk[2] = byte(size >> 8) + chunk[3] = byte(size) - if _, err := clone.MarshalTo(data[4:]); err != nil { + if _, err := clone.MarshalTo(chunk[4:]); err != nil { + return + } + + n += 4 + size + + if !packet.Marker { return } @@ -100,12 +131,13 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. return } - n, err := c.conn.Write(data) - if err != nil { - return + //log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf)) + + if _, err := c.conn.Write(buf[:n]); err == nil { + c.send += n } - c.send += n + n = 0 } if !codec.IsRTP() {