Update timestamp processing for MPEG-TS

This commit is contained in:
Alexey Khit
2023-08-21 20:34:18 +03:00
parent 3db4002420
commit e0ad358aa9
5 changed files with 39 additions and 15 deletions
+7
View File
@@ -114,3 +114,10 @@ func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close() _ = c.SuperConsumer.Close()
return c.wr.Close() return c.wr.Close()
} }
func TimestampFromRTP(rtp *rtp.Packet, codec *core.Codec) {
if codec.ClockRate == ClockRate {
return
}
rtp.Timestamp = uint32(float64(rtp.Timestamp) / float64(codec.ClockRate) * ClockRate)
}
+18 -11
View File
@@ -197,7 +197,7 @@ func (d *Demuxer) readPES(pid uint16, start bool) *rtp.Packet {
_ = d.readBit() // Original or Copy _ = d.readBit() // Original or Copy
pts := d.readBit() // PTS indicator pts := d.readBit() // PTS indicator
_ = d.readBit() // DTS indicator dts := d.readBit() // DTS indicator
_ = d.readBit() // ESCR flag _ = d.readBit() // ESCR flag
_ = d.readBit() // ES rate flag _ = d.readBit() // ES rate flag
_ = d.readBit() // DSM trick mode flag _ = d.readBit() // DSM trick mode flag
@@ -213,8 +213,12 @@ func (d *Demuxer) readPES(pid uint16, start bool) *rtp.Packet {
packetSize -= uint16(3 + headerSize) packetSize -= uint16(3 + headerSize)
} }
if pts != 0 { if dts != 0 {
pes.PTS = d.readTime() d.skip(5) // skip PTSorDTS
pes.PTSorDTS = d.readTime()
headerSize -= 10
} else if pts != 0 {
pes.PTSorDTS = d.readTime()
headerSize -= 5 headerSize -= 5
} }
@@ -302,6 +306,7 @@ func (d *Demuxer) setSize(size byte) {
const ( const (
PacketSize = 188 PacketSize = 188
SyncByte = 0x47 // Uppercase G SyncByte = 0x47 // Uppercase G
ClockRate = 90000 // fixed clock rate for PTS/DTS of any type
) )
// https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types // https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types
@@ -320,9 +325,9 @@ type PES struct {
StreamType byte // from PMT table StreamType byte // from PMT table
Sequence uint16 // manual Sequence uint16 // manual
Timestamp uint32 // manual Timestamp uint32 // manual
PTS uint32 // from PTS extra header, always 90000Hz PTSorDTS uint32 // from extra header, always 90000Hz
Payload []byte // from PTS body Payload []byte // from PES body
Size int // from PTS header, can be 0 Size int // from PES header, can be 0
wr *bits.Writer wr *bits.Writer
} }
@@ -343,7 +348,7 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) {
pkt = &rtp.Packet{ pkt = &rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
PayloadType: p.StreamType, PayloadType: p.StreamType,
Timestamp: p.PTS, // PTS is ok, because 90000Hz Timestamp: p.PTSorDTS,
}, },
Payload: annexb.EncodeToAVCC(p.Payload, false), Payload: annexb.EncodeToAVCC(p.Payload, false),
} }
@@ -356,12 +361,13 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) {
Version: 2, Version: 2,
PayloadType: p.StreamType, PayloadType: p.StreamType,
SequenceNumber: p.Sequence, SequenceNumber: p.Sequence,
Timestamp: p.Timestamp, Timestamp: p.PTSorDTS,
//Timestamp: p.Timestamp,
}, },
Payload: aac.ADTStoRTP(p.Payload), Payload: aac.ADTStoRTP(p.Payload),
} }
p.Timestamp += aac.RTPTimeSize(pkt.Payload) // update next timestamp! //p.Timestamp += aac.RTPTimeSize(pkt.Payload) // update next timestamp!
case StreamTypePCMATapo: case StreamTypePCMATapo:
p.Sequence++ p.Sequence++
@@ -371,12 +377,13 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) {
Version: 2, Version: 2,
PayloadType: p.StreamType, PayloadType: p.StreamType,
SequenceNumber: p.Sequence, SequenceNumber: p.Sequence,
Timestamp: p.Timestamp, Timestamp: p.PTSorDTS,
//Timestamp: p.Timestamp,
}, },
Payload: p.Payload, Payload: p.Payload,
} }
p.Timestamp += uint32(len(p.Payload)) // update next timestamp! //p.Timestamp += uint32(len(p.Payload)) // update next timestamp!
} }
p.Payload = nil p.Payload = nil
+3 -3
View File
@@ -51,9 +51,9 @@ func (m *Muxer) GetPayload(pid uint16, timestamp uint32, payload []byte) []byte
} }
if pes.Timestamp != 0 { if pes.Timestamp != 0 {
pes.PTS += timestamp - pes.Timestamp pes.PTSorDTS += timestamp - pes.Timestamp
} }
//log.Print(pid, pes.PTS, timestamp, pes.Timestamp) //log.Print(pid, pes.PTSorDTS, timestamp, pes.Timestamp)
pes.Timestamp = timestamp pes.Timestamp = timestamp
// min header size (3 byte) + adv header size (PES) // min header size (3 byte) + adv header size (PES)
@@ -74,7 +74,7 @@ func (m *Muxer) GetPayload(pid uint16, timestamp uint32, payload []byte) []byte
b[7] = 0x80 // PTS indicator b[7] = 0x80 // PTS indicator
b[8] = 5 // PES header length b[8] = 5 // PES header length
WriteTime(b[9:], pes.PTS) WriteTime(b[9:], pes.PTSorDTS)
pes.Payload = append(b, payload...) pes.Payload = append(b, payload...)
pes.Size = 1 // set PUSI in first PES pes.Size = 1 // set PUSI in first PES
+9
View File
@@ -9,6 +9,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h265" "github.com/AlexxIT/go2rtc/pkg/h265"
"github.com/pion/rtp"
) )
type Producer struct { type Producer struct {
@@ -43,6 +44,7 @@ func (c *Producer) Start() error {
for _, receiver := range c.Receivers { for _, receiver := range c.Receivers {
if receiver.ID == pkt.PayloadType { if receiver.ID == pkt.PayloadType {
TimestampToRTP(pkt, receiver.Codec)
receiver.WriteRTP(pkt) receiver.WriteRTP(pkt)
break break
} }
@@ -135,3 +137,10 @@ func StreamType(codec *core.Codec) uint8 {
} }
return 0 return 0
} }
func TimestampToRTP(rtp *rtp.Packet, codec *core.Codec) {
if codec.ClockRate == ClockRate {
return
}
rtp.Timestamp = uint32(float64(rtp.Timestamp) * float64(codec.ClockRate) / ClockRate)
}
+1
View File
@@ -191,6 +191,7 @@ func (c *Client) Handle() error {
for _, receiver := range c.receivers { for _, receiver := range c.receivers {
if receiver.ID == pkt.PayloadType { if receiver.ID == pkt.PayloadType {
mpegts.TimestampToRTP(pkt, receiver.Codec)
receiver.WriteRTP(pkt) receiver.WriteRTP(pkt)
break break
} }