diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go index 43fb5ebf..294717c7 100644 --- a/pkg/aac/aac.go +++ b/pkg/aac/aac.go @@ -12,6 +12,8 @@ const ( TypeAACMain = 1 TypeAACLC = 2 TypeESCAPE = 31 + + AUTime = 1024 ) // streamtype=5 - audio stream diff --git a/pkg/aac/adts.go b/pkg/aac/adts.go index c5553b29..4753c27b 100644 --- a/pkg/aac/adts.go +++ b/pkg/aac/adts.go @@ -70,6 +70,16 @@ func WriteADTSSize(b []byte, size uint16) { return } +func ADTSTimeSize(b []byte) uint32 { + var units uint32 + for len(b) > ADTSHeaderSize { + auSize := ReadADTSSize(b) + b = b[auSize:] + units++ + } + return units * AUTime +} + func CodecToADTS(codec *core.Codec) []byte { s := core.Between(codec.FmtpLine, "config=", ";") conf, err := hex.DecodeString(s) diff --git a/pkg/aac/rtp.go b/pkg/aac/rtp.go index 1df6e036..9530797a 100644 --- a/pkg/aac/rtp.go +++ b/pkg/aac/rtp.go @@ -32,7 +32,7 @@ func RTPDepay(handler core.HandlerFunc) core.HandlerFunc { headers = headers[2:] units = units[unitSize:] - timestamp += 1024 + timestamp += AUTime clone := *packet clone.Version = RTPPacketVersionAAC @@ -92,7 +92,7 @@ func ADTStoRTP(src []byte) (dst []byte) { func RTPTimeSize(b []byte) uint32 { // convert RTP header size to units count units := binary.BigEndian.Uint16(b) >> 4 - return 1024 * uint32(units) + return uint32(units) * AUTime } func RTPToADTS(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc { diff --git a/pkg/h264/annexb/annexb.go b/pkg/h264/annexb/annexb.go index fcae18a4..3d97704a 100644 --- a/pkg/h264/annexb/annexb.go +++ b/pkg/h264/annexb/annexb.go @@ -7,7 +7,8 @@ import ( ) const StartCode = "\x00\x00\x00\x01" -const startAUD = StartCode + "\x09\xF0" + StartCode +const startAUD = StartCode + "\x09\xF0" +const startAUDstart = startAUD + StartCode // EncodeToAVCC // will change original slice data! @@ -19,12 +20,12 @@ func EncodeToAVCC(b []byte, safeAppend bool) []byte { const minSize = len(StartCode) + 1 // 1. Check frist "start code" - if len(b) < len(startAUD) || string(b[:len(StartCode)]) != StartCode { + if len(b) < len(startAUDstart) || string(b[:len(StartCode)]) != StartCode { return nil } // 2. Skip Access unit delimiter (AUD) from FFmpeg - if string(b[:len(startAUD)]) == startAUD { + if string(b[:len(startAUDstart)]) == startAUDstart { b = b[6:] } @@ -85,6 +86,15 @@ func DecodeAVCC(b []byte, safeClone bool) []byte { return b } +// DecodeAVCCWithAUD - AUD doesn't important for FFmpeg, but important for Safari +func DecodeAVCCWithAUD(src []byte) []byte { + dst := make([]byte, len(startAUD)+len(src)) + copy(dst, startAUD) + copy(dst[len(startAUD):], src) + DecodeAVCC(dst[len(startAUD):], false) + return dst +} + const ( h264PFrame = 1 h264IFrame = 5 @@ -97,11 +107,11 @@ const ( // IndexFrame - get new frame start position in the AnnexB stream func IndexFrame(b []byte) int { - if len(b) < len(startAUD) { + if len(b) < len(startAUDstart) { return -1 } - for i := len(startAUD); ; { + for i := len(startAUDstart); ; { if di := bytes.Index(b[i:], []byte(StartCode)); di < 0 { break } else { diff --git a/pkg/mpegts/consumer.go b/pkg/mpegts/consumer.go index 9dd3c75a..b9be0c88 100644 --- a/pkg/mpegts/consumer.go +++ b/pkg/mpegts/consumer.go @@ -1,6 +1,7 @@ package mpegts import ( + "errors" "io" "github.com/AlexxIT/go2rtc/pkg/aac" @@ -78,8 +79,11 @@ func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Re case core.CodecAAC: pid := c.muxer.AddTrack(StreamTypeAAC) + // convert timestamp to 90000Hz clock + dt := 90000 / float64(track.Codec.ClockRate) + sender.Handler = func(pkt *rtp.Packet) { - pts := pkt.Timestamp * 90000 / track.Codec.ClockRate + pts := uint32(float64(pkt.Timestamp) * dt) b := c.muxer.GetPayload(pid, pts, pkt.Payload) if n, err := c.wr.Write(b); err == nil { c.Send += n @@ -89,7 +93,7 @@ func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Re if track.Codec.IsRTP() { sender.Handler = aac.RTPToADTS(track.Codec, sender.Handler) } else { - panic("todo") + return errors.New("mpegts: aac not supported") } } diff --git a/pkg/mpegts/muxer.go b/pkg/mpegts/muxer.go index 7ea22847..756c0d74 100644 --- a/pkg/mpegts/muxer.go +++ b/pkg/mpegts/muxer.go @@ -28,7 +28,7 @@ func (m *Muxer) AddTrack(streamType byte) (pid uint16) { pes.StreamID = 0xC0 } - pid = startPID + 1 + uint16(len(m.pes)) + pid = pes0PID + uint16(len(m.pes)) m.pes[pid] = pes return @@ -42,37 +42,41 @@ func (m *Muxer) GetHeader() []byte { } // GetPayload - safe to run concurently with different pid -func (m *Muxer) GetPayload(pid uint16, pts uint32, payload []byte) []byte { +func (m *Muxer) GetPayload(pid uint16, timestamp uint32, payload []byte) []byte { pes := m.pes[pid] - size := 8 + len(payload) + switch pes.StreamType { + case StreamTypeH264, StreamTypeH265: + payload = annexb.DecodeAVCCWithAUD(payload) + } - b := make([]byte, 14+len(payload)) - _ = b[14] // bounds + if pes.Timestamp != 0 { + pes.PTS += timestamp - pes.Timestamp + } + //log.Print(pid, pes.PTS, timestamp, pes.Timestamp) + pes.Timestamp = timestamp - b[0] = 0 - b[1] = 0 - b[2] = 1 - b[3] = pes.StreamID - b[6] = 0x80 // Marker bits (binary) - b[7] = 0x80 // PTS indicator - b[8] = 5 // PES header length + // min header size (3 byte) + adv header size (PES) + size := 3 + 5 + len(payload) - // zero size is OK for video stream + b := make([]byte, 6+3+5) + + b[0], b[1], b[2] = 0, 0, 1 // Packet start code prefix + b[3] = pes.StreamID // Stream ID + + // PES Packet length (zero value OK for video) if size <= 0xFFFF { binary.BigEndian.PutUint16(b[4:], uint16(size)) } - WriteTime(b[9:], pts) + // Optional PES header: + b[6] = 0x80 // Marker bits (binary) + b[7] = 0x80 // PTS indicator + b[8] = 5 // PES header length - copy(b[14:], payload) + WriteTime(b[9:], pes.PTS) - switch pes.StreamType { - case StreamTypeH264, StreamTypeH265: - annexb.DecodeAVCC(b[14:], false) // no need to safe clone after copy - } - - pes.Payload = b + pes.Payload = append(b, payload...) pes.Size = 1 // set PUSI in first PES if pes.wr == nil { @@ -91,16 +95,17 @@ func (m *Muxer) GetPayload(pid uint16, pts uint32, payload []byte) []byte { } const patPID = 0 -const startPID = 0x20 +const pmtPID = 0x1000 +const pes0PID = 0x100 func (m *Muxer) writePAT(wr *bits.Writer) { m.writeHeader(wr, patPID) i := wr.Len() + 1 // start for CRC32 m.writePSIHeader(wr, 0, 4) - wr.WriteUint16(1) // Program num - wr.WriteBits8(0b111, 3) // Reserved bits (all to 1) - wr.WriteBits16(startPID, 13) // Program map PID + wr.WriteUint16(1) // Program num + wr.WriteBits8(0b111, 3) // Reserved bits (all to 1) + wr.WriteBits16(pmtPID, 13) // Program map PID crc := checksum(wr.Bytes()[i:]) wr.WriteBytes(byte(crc), byte(crc>>8), byte(crc>>16), byte(crc>>24)) // CRC32 (little endian) @@ -109,7 +114,7 @@ func (m *Muxer) writePAT(wr *bits.Writer) { } func (m *Muxer) writePMT(wr *bits.Writer) { - m.writeHeader(wr, startPID) + m.writeHeader(wr, pmtPID) i := wr.Len() + 1 // start for CRC32 m.writePSIHeader(wr, 2, 4+uint16(len(m.pes))*5) // 4 bytes below + 5 bytes each PES @@ -120,7 +125,11 @@ func (m *Muxer) writePMT(wr *bits.Writer) { wr.WriteBits8(0, 2) // Program info length unused bits (all to 0) wr.WriteBits16(0, 10) // Program info length - for pid, pes := range m.pes { + for pid := uint16(pes0PID); ; pid++ { + pes, ok := m.pes[pid] + if !ok { + break + } wr.WriteByte(pes.StreamType) // Stream type wr.WriteBits8(0b111, 3) // Reserved bits (all to 1) wr.WriteBits16(pid, 13) // Elementary PID