diff --git a/pkg/mpegts/consumer.go b/pkg/mpegts/consumer.go index ad124397..eb0902fc 100644 --- a/pkg/mpegts/consumer.go +++ b/pkg/mpegts/consumer.go @@ -114,3 +114,10 @@ func (c *Consumer) Stop() error { _ = c.SuperConsumer.Close() return c.wr.Close() } + +func TimestampFromRTP(rtp *rtp.Packet, codec *core.Codec) { + if codec.ClockRate == ClockRate { + return + } + rtp.Timestamp = uint32(float64(rtp.Timestamp) / float64(codec.ClockRate) * ClockRate) +} diff --git a/pkg/mpegts/demuxer.go b/pkg/mpegts/demuxer.go index b9271bcd..506bce28 100644 --- a/pkg/mpegts/demuxer.go +++ b/pkg/mpegts/demuxer.go @@ -197,7 +197,7 @@ func (d *Demuxer) readPES(pid uint16, start bool) *rtp.Packet { _ = d.readBit() // Original or Copy pts := d.readBit() // PTS indicator - _ = d.readBit() // DTS indicator + dts := d.readBit() // DTS indicator _ = d.readBit() // ESCR flag _ = d.readBit() // ES rate flag _ = d.readBit() // DSM trick mode flag @@ -213,8 +213,12 @@ func (d *Demuxer) readPES(pid uint16, start bool) *rtp.Packet { packetSize -= uint16(3 + headerSize) } - if pts != 0 { - pes.PTS = d.readTime() + if dts != 0 { + d.skip(5) // skip PTSorDTS + pes.PTSorDTS = d.readTime() + headerSize -= 10 + } else if pts != 0 { + pes.PTSorDTS = d.readTime() headerSize -= 5 } @@ -301,7 +305,8 @@ func (d *Demuxer) setSize(size byte) { const ( PacketSize = 188 - SyncByte = 0x47 // Uppercase G + SyncByte = 0x47 // Uppercase G + ClockRate = 90000 // fixed clock rate for PTS/DTS of any type ) // https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types @@ -320,9 +325,9 @@ type PES struct { StreamType byte // from PMT table Sequence uint16 // manual Timestamp uint32 // manual - PTS uint32 // from PTS extra header, always 90000Hz - Payload []byte // from PTS body - Size int // from PTS header, can be 0 + PTSorDTS uint32 // from extra header, always 90000Hz + Payload []byte // from PES body + Size int // from PES header, can be 0 wr *bits.Writer } @@ -343,7 +348,7 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) { pkt = &rtp.Packet{ Header: rtp.Header{ PayloadType: p.StreamType, - Timestamp: p.PTS, // PTS is ok, because 90000Hz + Timestamp: p.PTSorDTS, }, Payload: annexb.EncodeToAVCC(p.Payload, false), } @@ -356,12 +361,13 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) { Version: 2, PayloadType: p.StreamType, SequenceNumber: p.Sequence, - Timestamp: p.Timestamp, + Timestamp: p.PTSorDTS, + //Timestamp: p.Timestamp, }, Payload: aac.ADTStoRTP(p.Payload), } - p.Timestamp += aac.RTPTimeSize(pkt.Payload) // update next timestamp! + //p.Timestamp += aac.RTPTimeSize(pkt.Payload) // update next timestamp! case StreamTypePCMATapo: p.Sequence++ @@ -371,12 +377,13 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) { Version: 2, PayloadType: p.StreamType, SequenceNumber: p.Sequence, - Timestamp: p.Timestamp, + Timestamp: p.PTSorDTS, + //Timestamp: p.Timestamp, }, Payload: p.Payload, } - p.Timestamp += uint32(len(p.Payload)) // update next timestamp! + //p.Timestamp += uint32(len(p.Payload)) // update next timestamp! } p.Payload = nil diff --git a/pkg/mpegts/muxer.go b/pkg/mpegts/muxer.go index 756c0d74..193850ea 100644 --- a/pkg/mpegts/muxer.go +++ b/pkg/mpegts/muxer.go @@ -51,9 +51,9 @@ func (m *Muxer) GetPayload(pid uint16, timestamp uint32, payload []byte) []byte } if pes.Timestamp != 0 { - pes.PTS += timestamp - pes.Timestamp + pes.PTSorDTS += timestamp - pes.Timestamp } - //log.Print(pid, pes.PTS, timestamp, pes.Timestamp) + //log.Print(pid, pes.PTSorDTS, timestamp, pes.Timestamp) pes.Timestamp = timestamp // min header size (3 byte) + adv header size (PES) @@ -74,7 +74,7 @@ func (m *Muxer) GetPayload(pid uint16, timestamp uint32, payload []byte) []byte b[7] = 0x80 // PTS indicator b[8] = 5 // PES header length - WriteTime(b[9:], pes.PTS) + WriteTime(b[9:], pes.PTSorDTS) pes.Payload = append(b, payload...) pes.Size = 1 // set PUSI in first PES diff --git a/pkg/mpegts/producer.go b/pkg/mpegts/producer.go index 2c8f5347..c7484cf0 100644 --- a/pkg/mpegts/producer.go +++ b/pkg/mpegts/producer.go @@ -9,6 +9,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h265" + "github.com/pion/rtp" ) type Producer struct { @@ -43,6 +44,7 @@ func (c *Producer) Start() error { for _, receiver := range c.Receivers { if receiver.ID == pkt.PayloadType { + TimestampToRTP(pkt, receiver.Codec) receiver.WriteRTP(pkt) break } @@ -135,3 +137,10 @@ func StreamType(codec *core.Codec) uint8 { } return 0 } + +func TimestampToRTP(rtp *rtp.Packet, codec *core.Codec) { + if codec.ClockRate == ClockRate { + return + } + rtp.Timestamp = uint32(float64(rtp.Timestamp) * float64(codec.ClockRate) / ClockRate) +} diff --git a/pkg/tapo/client.go b/pkg/tapo/client.go index db4e624a..4f2a3f80 100644 --- a/pkg/tapo/client.go +++ b/pkg/tapo/client.go @@ -191,6 +191,7 @@ func (c *Client) Handle() error { for _, receiver := range c.receivers { if receiver.ID == pkt.PayloadType { + mpegts.TimestampToRTP(pkt, receiver.Codec) receiver.WriteRTP(pkt) break }