diff --git a/pkg/bubble/client.go b/pkg/bubble/client.go index e7a1e6c1..b8b77ae9 100644 --- a/pkg/bubble/client.go +++ b/pkg/bubble/client.go @@ -245,6 +245,7 @@ func (c *Client) Handle() error { pkt := &rtp.Packet{ Header: rtp.Header{ Version: 2, + Marker: true, Timestamp: audioTS, }, Payload: b[6+36:], diff --git a/pkg/mpegts/demuxer.go b/pkg/mpegts/demuxer.go index 506bce28..b03396b7 100644 --- a/pkg/mpegts/demuxer.go +++ b/pkg/mpegts/demuxer.go @@ -359,6 +359,7 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) { pkt = &rtp.Packet{ Header: rtp.Header{ Version: 2, + Marker: true, PayloadType: p.StreamType, SequenceNumber: p.Sequence, Timestamp: p.PTSorDTS, @@ -375,6 +376,7 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) { pkt = &rtp.Packet{ Header: rtp.Header{ Version: 2, + Marker: true, PayloadType: p.StreamType, SequenceNumber: p.Sequence, Timestamp: p.PTSorDTS, diff --git a/pkg/rtsp/consumer.go b/pkg/rtsp/consumer.go index ce9e402b..d1874025 100644 --- a/pkg/rtsp/consumer.go +++ b/pkg/rtsp/consumer.go @@ -75,8 +75,15 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv } func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core.HandlerFunc { + var buf []byte var n int - buf := make([]byte, 4096) // 4KB + + video := codec.IsVideo() + if video { + buf = make([]byte, 32*1024) // 32KB + } else { + buf = make([]byte, 2*1024) // 2KB + } handlerFunc := func(packet *rtp.Packet) { if c.state == StateNone { @@ -123,8 +130,8 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. n += 4 + size - if !packet.Marker { - return + if video && !packet.Marker { + return // collect continious video packets to buffer } if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil {