From 465608698581bb2d3c4e240a80877ce4ff632499 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Wed, 3 May 2023 07:49:48 +0300 Subject: [PATCH] Add auto transcoding to JPEG snapshot --- internal/ffmpeg/helpers.go | 12 ++++ internal/mjpeg/{mjpeg.go => init.go} | 25 ++++++-- pkg/h264/avc.go | 13 ++++ pkg/pipe/keyframe.go | 91 ++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 internal/ffmpeg/helpers.go rename internal/mjpeg/{mjpeg.go => init.go} (87%) create mode 100644 pkg/pipe/keyframe.go diff --git a/internal/ffmpeg/helpers.go b/internal/ffmpeg/helpers.go new file mode 100644 index 00000000..30c3520d --- /dev/null +++ b/internal/ffmpeg/helpers.go @@ -0,0 +1,12 @@ +package ffmpeg + +import ( + "bytes" + "os/exec" +) + +func TranscodeToJPEG(b []byte) ([]byte, error) { + cmd := exec.Command("ffmpeg", "-hide_banner", "-i", "-", "-f", "mjpeg", "-") + cmd.Stdin = bytes.NewBuffer(b) + return cmd.Output() +} diff --git a/internal/mjpeg/mjpeg.go b/internal/mjpeg/init.go similarity index 87% rename from internal/mjpeg/mjpeg.go rename to internal/mjpeg/init.go index 30dd97d0..c3945674 100644 --- a/internal/mjpeg/mjpeg.go +++ b/internal/mjpeg/init.go @@ -3,13 +3,17 @@ package mjpeg import ( "errors" "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/ffmpeg" "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mjpeg" + "github.com/AlexxIT/go2rtc/pkg/pipe" "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/rs/zerolog/log" "io" "net/http" "strconv" + "time" ) func Init() { @@ -29,14 +33,16 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { exit := make(chan []byte) - cons := &mjpeg.Consumer{ + cons := &pipe.Keyframe{ RemoteAddr: tcp.RemoteAddr(r), UserAgent: r.UserAgent(), } cons.Listen(func(msg any) { - switch msg := msg.(type) { - case []byte: - exit <- msg + if b, ok := msg.([]byte); ok { + select { + case exit <- b: + default: + } } }) @@ -49,6 +55,17 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { stream.RemoveConsumer(cons) + switch cons.CodecName() { + case core.CodecH264, core.CodecH265: + ts := time.Now() + var err error + if data, err = ffmpeg.TranscodeToJPEG(data); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + log.Debug().Msgf("[mjpeg] transcoding time=%s", time.Since(ts)) + } + h := w.Header() h.Set("Content-Type", "image/jpeg") h.Set("Content-Length", strconv.Itoa(len(data))) diff --git a/pkg/h264/avc.go b/pkg/h264/avc.go index 99fd4598..c21b2f11 100644 --- a/pkg/h264/avc.go +++ b/pkg/h264/avc.go @@ -26,6 +26,19 @@ func AnnexB2AVC(b []byte) []byte { return b } +func AVCtoAnnexB(b []byte) []byte { + b = bytes.Clone(b) + for i := 0; i < len(b); { + size := int(binary.BigEndian.Uint32(b[i:])) + b[i] = 0 + b[i+1] = 0 + b[i+2] = 0 + b[i+3] = 1 + i += 4 + size + } + return b +} + const forbiddenZeroBit = 0x80 const nalUnitType = 0x1F diff --git a/pkg/pipe/keyframe.go b/pkg/pipe/keyframe.go new file mode 100644 index 00000000..0126024e --- /dev/null +++ b/pkg/pipe/keyframe.go @@ -0,0 +1,91 @@ +package pipe + +import ( + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/h265" + "github.com/AlexxIT/go2rtc/pkg/mjpeg" + "github.com/pion/rtp" +) + +type Keyframe struct { + core.Listener + + UserAgent string + RemoteAddr string + + medias []*core.Media + sender *core.Sender +} + +func (k *Keyframe) GetMedias() []*core.Media { + if k.medias == nil { + k.medias = append(k.medias, &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecH264}, + {Name: core.CodecH265}, + {Name: core.CodecJPEG}, + }, + }) + } + return k.medias +} + +func (k *Keyframe) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { + var handler core.HandlerFunc + + switch track.Codec.Name { + case core.CodecH264: + handler = func(packet *rtp.Packet) { + if !h264.IsKeyframe(packet.Payload) { + return + } + b := h264.AVCtoAnnexB(packet.Payload) + k.Fire(b) + } + + if track.Codec.IsRTP() { + handler = h264.RTPDepay(track.Codec, handler) + } + case core.CodecH265: + handler = func(packet *rtp.Packet) { + if !h265.IsKeyframe(packet.Payload) { + return + } + k.Fire(packet.Payload) + } + + if track.Codec.IsRTP() { + handler = h265.RTPDepay(track.Codec, handler) + } + case core.CodecJPEG: + handler = func(packet *rtp.Packet) { + k.Fire(packet.Payload) + } + + if track.Codec.IsRTP() { + handler = mjpeg.RTPDepay(handler) + } + } + + k.sender = core.NewSender(media, track.Codec) + k.sender.Handler = handler + k.sender.HandleRTP(track) + return nil +} + +func (k *Keyframe) CodecName() string { + if k.sender != nil { + return k.sender.Codec.Name + } + return "" +} + +func (k *Keyframe) Stop() error { + if k.sender != nil { + k.sender.Close() + } + return nil +}