Support MP4 over WebSocket

This commit is contained in:
Alexey Khit
2022-12-02 21:32:18 +03:00
parent 258a0ffb91
commit 4c0929d854
5 changed files with 118 additions and 32 deletions
+3 -2
View File
@@ -15,7 +15,8 @@ import (
func Init() {
log = app.GetLogger("mp4")
api.HandleWS(MsgTypeMSE, handlerWS)
api.HandleWS("mse", handlerWS)
api.HandleWS("mp4", handlerWS4)
api.HandleFunc("api/frame.mp4", handlerKeyframe)
api.HandleFunc("api/stream.mp4", handlerMP4)
@@ -36,7 +37,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
exit := make(chan []byte)
cons := &mp4.Keyframe{}
cons := &mp4.Segment{OnlyKeyframe: true}
cons.Listen(func(msg interface{}) {
if data, ok := msg.([]byte); ok && exit != nil {
exit <- data
+33 -6
View File
@@ -8,8 +8,6 @@ import (
"strings"
)
const MsgTypeMSE = "mse" // fMP4
const packetSize = 8192
func handlerWS(ctx *api.Context, msg *streamer.Message) {
@@ -24,7 +22,7 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) {
cons.RemoteAddr = ctx.Request.RemoteAddr
if codecs, ok := msg.Value.(string); ok {
cons.Medias = parseMedias(codecs)
cons.Medias = parseMedias(codecs, true)
}
cons.Listen(func(msg interface{}) {
@@ -47,7 +45,7 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) {
stream.RemoveConsumer(cons)
})
ctx.Write(&streamer.Message{Type: MsgTypeMSE, Value: cons.MimeType()})
ctx.Write(&streamer.Message{Type: "mse", Value: cons.MimeType()})
data, err := cons.Init()
if err != nil {
@@ -61,7 +59,36 @@ func handlerWS(ctx *api.Context, msg *streamer.Message) {
cons.Start()
}
func parseMedias(codecs string) (medias []*streamer.Media) {
func handlerWS4(ctx *api.Context, msg *streamer.Message) {
src := ctx.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src)
if stream == nil {
return
}
cons := &mp4.Segment{}
if codecs, ok := msg.Value.(string); ok {
cons.Medias = parseMedias(codecs, false)
}
cons.Listen(func(msg interface{}) {
if data, ok := msg.([]byte); ok {
ctx.Write(data)
}
})
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
return
}
ctx.OnClose(func() {
stream.RemoveConsumer(cons)
})
}
func parseMedias(codecs string, parseAudio bool) (medias []*streamer.Media) {
var videos []*streamer.Codec
var audios []*streamer.Codec
@@ -88,7 +115,7 @@ func parseMedias(codecs string) (medias []*streamer.Media) {
medias = append(medias, media)
}
if audios != nil {
if audios != nil && parseAudio {
media := &streamer.Media{
Kind: streamer.KindAudio,
Direction: streamer.DirectionRecvonly,
+25 -7
View File
@@ -18,11 +18,17 @@ type Consumer struct {
muxer *Muxer
codecs []*streamer.Codec
start bool
wait byte
send int
}
const (
waitNone byte = iota
waitKeyframe
waitInit
)
func (c *Consumer) GetMedias() []*streamer.Media {
if c.Medias != nil {
return c.Medias
@@ -55,13 +61,18 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
codec := track.Codec
switch codec.Name {
case streamer.CodecH264:
c.wait = waitInit
push := func(packet *rtp.Packet) error {
if packet.Version != h264.RTPPacketVersionAVC {
return nil
}
if !c.start {
return nil
if c.wait != waitNone {
if c.wait == waitInit || !h264.IsKeyframe(packet.Payload) {
return nil
}
c.wait = waitNone
}
buf := c.muxer.Marshal(trackID, packet)
@@ -82,13 +93,18 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
return track.Bind(push)
case streamer.CodecH265:
c.wait = waitInit
push := func(packet *rtp.Packet) error {
if packet.Version != h264.RTPPacketVersionAVC {
return nil
}
if !c.start {
return nil
if c.wait != waitNone {
if c.wait == waitInit || !h265.IsKeyframe(packet.Payload) {
return nil
}
c.wait = waitNone
}
buf := c.muxer.Marshal(trackID, packet)
@@ -107,7 +123,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea
case streamer.CodecAAC:
push := func(packet *rtp.Packet) error {
if !c.start {
if c.wait != waitNone {
return nil
}
@@ -139,7 +155,9 @@ func (c *Consumer) Init() ([]byte, error) {
}
func (c *Consumer) Start() {
c.start = true
if c.wait == waitInit {
c.wait = waitKeyframe
}
}
//
+10 -8
View File
@@ -19,8 +19,6 @@ type Muxer struct {
fragIndex uint32
dts []uint64
pts []uint32
//data []byte
//total int
}
func (m *Muxer) MimeType(codecs []*streamer.Codec) string {
@@ -185,10 +183,13 @@ func (m *Muxer) GetInit(codecs []*streamer.Codec) ([]byte, error) {
return append(FTYP(), data...), nil
}
//func (m *Muxer) Rewind() {
// m.dts = 0
// m.pts = 0
//}
func (m *Muxer) Reset() {
m.fragIndex = 0
for i := range m.dts {
m.dts[i] = 0
m.pts[i] = 0
}
}
func (m *Muxer) Marshal(trackID byte, packet *rtp.Packet) []byte {
run := &mp4fio.TrackFragRun{
@@ -218,15 +219,16 @@ func (m *Muxer) Marshal(trackID byte, packet *rtp.Packet) []byte {
}
entry := mp4io.TrackFragRunEntry{
//Duration: 90000,
Size: uint32(len(packet.Payload)),
}
newTime := packet.Timestamp
if m.pts[trackID] > 0 {
//m.dts += uint64(newTime - m.pts)
entry.Duration = newTime - m.pts[trackID]
m.dts[trackID] += uint64(entry.Duration)
} else {
// important, or Safari will fail with first frame
entry.Duration = 1
}
m.pts[trackID] = newTime
+47 -9
View File
@@ -7,13 +7,20 @@ import (
"github.com/pion/rtp"
)
type Keyframe struct {
type Segment struct {
streamer.Element
MimeType string
Medias []*streamer.Media
MimeType string
OnlyKeyframe bool
}
func (c *Keyframe) GetMedias() []*streamer.Media {
func (c *Segment) GetMedias() []*streamer.Media {
if c.Medias != nil {
return c.Medias
}
// default medias
return []*streamer.Media{
{
Kind: streamer.KindVideo,
@@ -26,7 +33,7 @@ func (c *Keyframe) GetMedias() []*streamer.Media {
}
}
func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
func (c *Segment) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
muxer := &Muxer{}
codecs := []*streamer.Codec{track.Codec}
@@ -40,15 +47,46 @@ func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *strea
switch track.Codec.Name {
case streamer.CodecH264:
push := func(packet *rtp.Packet) error {
if !h264.IsKeyframe(packet.Payload) {
var push streamer.WriterFunc
if c.OnlyKeyframe {
push = func(packet *rtp.Packet) error {
if !h264.IsKeyframe(packet.Payload) {
return nil
}
buf := muxer.Marshal(0, packet)
c.Fire(append(init, buf...))
return nil
}
} else {
var buf []byte
buf := muxer.Marshal(0, packet)
c.Fire(append(init, buf...))
push = func(packet *rtp.Packet) error {
if h264.IsKeyframe(packet.Payload) {
// fist frame - send only IFrame
// other frames - send IFrame and all PFrames
if buf == nil {
buf = append(buf, init...)
b := muxer.Marshal(0, packet)
buf = append(buf, b...)
}
return nil
c.Fire(buf)
buf = buf[:0]
buf = append(buf, init...)
muxer.Reset()
}
if buf != nil {
b := muxer.Marshal(0, packet)
buf = append(buf, b...)
}
return nil
}
}
var wrapper streamer.WrapperFunc