Rewrite MP4, HLS, MPEG-TS consumers

This commit is contained in:
Alexey Khit
2023-08-20 09:57:46 +03:00
parent f67f6e5b9f
commit 2e4e75e386
17 changed files with 492 additions and 635 deletions
+67 -94
View File
@@ -1,7 +1,8 @@
package mp4
import (
"encoding/json"
"errors"
"io"
"sync"
"github.com/AlexxIT/go2rtc/pkg/aac"
@@ -13,27 +14,21 @@ import (
)
type Consumer struct {
core.Listener
Medias []*core.Media
Desc string
UserAgent string
RemoteAddr string
senders []*core.Sender
core.SuperConsumer
wr *core.WriteBuffer
muxer *Muxer
mu sync.Mutex
state byte
start bool
send int
Rotate int `json:"-"`
ScaleX int `json:"-"`
ScaleY int `json:"-"`
}
func (c *Consumer) GetMedias() []*core.Media {
if c.Medias == nil {
func NewConsumer(medias []*core.Media) *Consumer {
if medias == nil {
// default local medias
c.Medias = []*core.Media{
medias = []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
@@ -52,11 +47,16 @@ func (c *Consumer) GetMedias() []*core.Media {
}
}
return c.Medias
cons := &Consumer{
muxer: &Muxer{},
wr: core.NewWriteBuffer(nil),
}
cons.Medias = medias
return cons
}
func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
trackID := byte(len(c.senders))
trackID := byte(len(c.Senders))
codec := track.Codec.Clone()
handler := core.NewSender(media, codec)
@@ -64,22 +64,19 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
switch track.Codec.Name {
case core.CodecH264:
handler.Handler = func(packet *rtp.Packet) {
if packet.Version != h264.RTPPacketVersionAVC {
return
}
if c.state != stateStart {
if c.state != stateInit || !h264.IsKeyframe(packet.Payload) {
if !c.start {
if !h264.IsKeyframe(packet.Payload) {
return
}
c.state = stateStart
c.start = true
}
// important to use Mutex because right fragment order
c.mu.Lock()
buf := c.muxer.Marshal(trackID, packet)
c.Fire(buf)
c.send += len(buf)
b := c.muxer.GetPayload(trackID, packet)
if n, err := c.wr.Write(b); err == nil {
c.Send += n
}
c.mu.Unlock()
}
@@ -91,21 +88,19 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
case core.CodecH265:
handler.Handler = func(packet *rtp.Packet) {
if packet.Version != h264.RTPPacketVersionAVC {
return
}
if c.state != stateStart {
if c.state != stateInit || !h265.IsKeyframe(packet.Payload) {
if !c.start {
if !h265.IsKeyframe(packet.Payload) {
return
}
c.state = stateStart
c.start = true
}
// important to use Mutex because right fragment order
c.mu.Lock()
buf := c.muxer.Marshal(trackID, packet)
c.Fire(buf)
c.send += len(buf)
b := c.muxer.GetPayload(trackID, packet)
if n, err := c.wr.Write(b); err == nil {
c.Send += n
}
c.mu.Unlock()
}
@@ -115,14 +110,16 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
default:
handler.Handler = func(packet *rtp.Packet) {
if c.state != stateStart {
if !c.start {
return
}
// important to use Mutex because right fragment order
c.mu.Lock()
buf := c.muxer.Marshal(trackID, packet)
c.Fire(buf)
c.send += len(buf)
b := c.muxer.GetPayload(trackID, packet)
if n, err := c.wr.Write(b); err == nil {
c.Send += n
}
c.mu.Unlock()
}
@@ -142,64 +139,40 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
}
if handler.Handler == nil {
println("ERROR: MP4 unsupported codec: " + track.Codec.String())
return nil
s := "mp4: unsupported codec: " + track.Codec.String()
println(s)
return errors.New(s)
}
c.muxer.AddTrack(codec)
handler.HandleRTP(track)
c.senders = append(c.senders, handler)
c.Senders = append(c.Senders, handler)
return nil
}
func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
init, err := c.muxer.GetInit()
if err != nil {
return 0, err
}
if c.Rotate != 0 {
PatchVideoRotate(init, c.Rotate)
}
if c.ScaleX != 0 && c.ScaleY != 0 {
PatchVideoScale(init, c.ScaleX, c.ScaleY)
}
if _, err = wr.Write(init); err != nil {
return 0, err
}
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
for _, sender := range c.senders {
sender.Close()
}
return nil
}
func (c *Consumer) Codecs() []*core.Codec {
codecs := make([]*core.Codec, len(c.senders))
for i, sender := range c.senders {
codecs[i] = sender.Codec
}
return codecs
}
func (c *Consumer) MimeCodecs() string {
return c.muxer.MimeCodecs(c.Codecs())
}
func (c *Consumer) MimeType() string {
return `video/mp4; codecs="` + c.MimeCodecs() + `"`
}
func (c *Consumer) Init() ([]byte, error) {
c.muxer = &Muxer{}
return c.muxer.GetInit(c.Codecs())
}
func (c *Consumer) Start() {
for _, sender := range c.senders {
switch sender.Codec.Name {
case core.CodecH264, core.CodecH265:
c.state = stateInit
return
}
}
c.state = stateStart
}
func (c *Consumer) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: c.Desc + " passive consumer",
RemoteAddr: c.RemoteAddr,
UserAgent: c.UserAgent,
Medias: c.Medias,
Senders: c.senders,
Send: c.send,
}
return json.Marshal(info)
_ = c.SuperConsumer.Close()
return c.wr.Close()
}