From 77c45901706abe432ec9250676aa7b32dfaa6908 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Thu, 18 Aug 2022 17:27:29 +0300 Subject: [PATCH] Adds keyframe API --- cmd/api/api.go | 2 ++ cmd/api/keyframe.go | 40 ++++++++++++++++++++++ pkg/keyframe/consumer.go | 72 ++++++++++++++++++++++++++++++++++++++++ pkg/mp4/helpers.go | 47 ++++++++++++++++++++++++++ pkg/mp4/muxer.go | 37 +++++++++++++++++++++ www/index.html | 4 ++- 6 files changed, 201 insertions(+), 1 deletion(-) create mode 100644 cmd/api/keyframe.go create mode 100644 pkg/keyframe/consumer.go create mode 100644 pkg/mp4/helpers.go create mode 100644 pkg/mp4/muxer.go diff --git a/cmd/api/api.go b/cmd/api/api.go index a33d6bb2..21c6d95a 100644 --- a/cmd/api/api.go +++ b/cmd/api/api.go @@ -39,6 +39,8 @@ func Init() { HandleFunc("/", fileServerHandlder) } + HandleFunc("/api/frame.mp4", frameHandler) + HandleFunc("/api/frame.raw", frameHandler) HandleFunc("/api/stack", stackHandler) HandleFunc("/api/stats", statsHandler) HandleFunc("/api/ws", apiWS) diff --git a/cmd/api/keyframe.go b/cmd/api/keyframe.go new file mode 100644 index 00000000..5223ef9d --- /dev/null +++ b/cmd/api/keyframe.go @@ -0,0 +1,40 @@ +package api + +import ( + "github.com/AlexxIT/go2rtc/cmd/streams" + "github.com/AlexxIT/go2rtc/pkg/keyframe" + "net/http" + "strings" +) + +func frameHandler(w http.ResponseWriter, r *http.Request) { + url := r.URL.Query().Get("url") + stream := streams.Get(url) + if stream == nil { + return + } + + var ch = make(chan []byte) + + cons := new(keyframe.Consumer) + cons.IsMP4 = strings.HasSuffix(r.URL.Path, ".mp4") + cons.Listen(func(msg interface{}) { + switch msg.(type) { + case []byte: + ch <- msg.([]byte) + } + }) + + if err := stream.AddConsumer(cons); err != nil { + log.Warn().Err(err).Msg("[api.frame] add consumer") + return + } + + data := <-ch + + stream.RemoveConsumer(cons) + + if _, err := w.Write(data); err != nil { + log.Error().Err(err).Msg("[api.frame] write") + } +} diff --git a/pkg/keyframe/consumer.go b/pkg/keyframe/consumer.go new file mode 100644 index 00000000..b721ddd3 --- /dev/null +++ b/pkg/keyframe/consumer.go @@ -0,0 +1,72 @@ +package keyframe + +import ( + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/mp4" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/pion/rtp" +) + +var annexB = []byte{0, 0, 0, 1} + +type Consumer struct { + streamer.Element + IsMP4 bool +} + +func (k *Consumer) GetMedias() []*streamer.Media { + // support keyframe extraction only for one coded... + codec := streamer.NewCodec(streamer.CodecH264) + return []*streamer.Media{ + { + Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly, + Codecs: []*streamer.Codec{codec}, + }, + } +} + +func (k *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { + // sps and pps without AVC headers + sps, pps := h264.GetParameterSet(track.Codec.FmtpLine) + + push := func(packet *rtp.Packet) error { + // TODO: remove it, unnecessary + if packet.Version != h264.RTPPacketVersionAVC { + panic("wrong packet type") + } + + switch h264.NALUType(packet.Payload) { + case h264.NALUTypeSPS: + sps = packet.Payload[4:] // remove AVC header + case h264.NALUTypePPS: + pps = packet.Payload[4:] // remove AVC header + case h264.NALUTypeIFrame: + if sps == nil || pps == nil { + return nil + } + + var data []byte + + if k.IsMP4 { + data = mp4.MarshalMP4(sps, pps, packet.Payload) + } else { + data = append(data, annexB...) + data = append(data, sps...) + data = append(data, annexB...) + data = append(data, pps...) + data = append(data, annexB...) + data = append(data, packet.Payload[4:]...) + } + + k.Fire(data) + } + return nil + } + + if !h264.IsAVC(track.Codec) { + wrapper := h264.RTPDepay(track) + push = wrapper(push) + } + + return track.Bind(push) +} diff --git a/pkg/mp4/helpers.go b/pkg/mp4/helpers.go new file mode 100644 index 00000000..d6b5cb97 --- /dev/null +++ b/pkg/mp4/helpers.go @@ -0,0 +1,47 @@ +package mp4 + +import ( + "errors" + "io" +) + +type MemoryWriter struct { + buf []byte + pos int +} + +func (m *MemoryWriter) Write(p []byte) (n int, err error) { + minCap := m.pos + len(p) + if minCap > cap(m.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(m.buf), minCap+len(p)) // add some extra + copy(buf2, m.buf) + m.buf = buf2 + } + if minCap > len(m.buf) { + m.buf = m.buf[:minCap] + } + copy(m.buf[m.pos:], p) + m.pos += len(p) + return len(p), nil +} + +func (m *MemoryWriter) Seek(offset int64, whence int) (int64, error) { + newPos, offs := 0, int(offset) + switch whence { + case io.SeekStart: + newPos = offs + case io.SeekCurrent: + newPos = m.pos + offs + case io.SeekEnd: + newPos = len(m.buf) + offs + } + if newPos < 0 { + return 0, errors.New("negative result pos") + } + m.pos = newPos + return int64(newPos), nil +} + +func (m *MemoryWriter) Bytes() []byte { + return m.buf +} diff --git a/pkg/mp4/muxer.go b/pkg/mp4/muxer.go new file mode 100644 index 00000000..6f6392ee --- /dev/null +++ b/pkg/mp4/muxer.go @@ -0,0 +1,37 @@ +package mp4 + +import ( + "github.com/deepch/vdk/av" + "github.com/deepch/vdk/codec/h264parser" + "github.com/deepch/vdk/format/mp4" + "time" +) + +func MarshalMP4(sps, pps, frame []byte) []byte { + writer := &MemoryWriter{} + muxer := mp4.NewMuxer(writer) + + stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps) + if err != nil { + panic(err) + } + + if err = muxer.WriteHeader([]av.CodecData{stream}); err != nil { + panic(err) + } + + pkt := av.Packet{ + CompositionTime: time.Millisecond, + IsKeyFrame: true, + Duration: time.Second, + Data: frame, + } + if err = muxer.WritePacket(pkt); err != nil { + panic(err) + } + if err = muxer.WriteTrailer(); err != nil { + panic(err) + } + + return writer.buf +} diff --git a/www/index.html b/www/index.html index 47ccc7d3..39e70eac 100644 --- a/www/index.html +++ b/www/index.html @@ -20,7 +20,9 @@ const links = [ 'webrtc-async', - 'webrtc-sync', + // 'webrtc-sync', + 'frame.mp4', + 'frame.raw', 'mse', ];