From 66c858e00e2488d9b8a8205da6f4bfe4ec903739 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Wed, 30 Aug 2023 05:57:00 +0300 Subject: [PATCH] Rewrite JPEG snapshot consumer --- internal/ffmpeg/jpeg.go | 24 ++++++++--- internal/mjpeg/init.go | 27 +++++-------- pkg/magic/keyframe.go | 89 +++++++++++++++++++++++------------------ 3 files changed, 79 insertions(+), 61 deletions(-) diff --git a/internal/ffmpeg/jpeg.go b/internal/ffmpeg/jpeg.go index ce01f3a1..63d886dd 100644 --- a/internal/ffmpeg/jpeg.go +++ b/internal/ffmpeg/jpeg.go @@ -12,22 +12,36 @@ import ( "github.com/AlexxIT/go2rtc/pkg/shell" ) -func TranscodeToJPEG(b []byte, query url.Values) ([]byte, error) { - ffmpegArgs := parseQuery(query) - cmdArgs := shell.QuoteSplit(ffmpegArgs.String()) +func JPEGWithQuery(b []byte, query url.Values) ([]byte, error) { + args := parseQuery(query) + return transcode(b, args.String()) +} + +func JPEGWithScale(b []byte, width, height int) ([]byte, error) { + args := defaultArgs() + args.AddFilter(fmt.Sprintf("scale=%d:%d", width, height)) + return transcode(b, args.String()) +} + +func transcode(b []byte, args string) ([]byte, error) { + cmdArgs := shell.QuoteSplit(args) cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) cmd.Stdin = bytes.NewBuffer(b) return cmd.Output() } -func parseQuery(query url.Values) *ffmpeg.Args { - args := &ffmpeg.Args{ +func defaultArgs() *ffmpeg.Args { + return &ffmpeg.Args{ Bin: defaults["bin"], Global: defaults["global"], Input: "-i -", Codecs: []string{defaults["mjpeg"]}, Output: defaults["output/mjpeg"], } +} + +func parseQuery(query url.Values) *ffmpeg.Args { + args := defaultArgs() var width = -1 var height = -1 diff --git a/internal/mjpeg/init.go b/internal/mjpeg/init.go index f5f04a63..588246e1 100644 --- a/internal/mjpeg/init.go +++ b/internal/mjpeg/init.go @@ -33,27 +33,18 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { return } - exit := make(chan []byte) - - cons := &magic.Keyframe{ - RemoteAddr: tcp.RemoteAddr(r), - UserAgent: r.UserAgent(), - } - cons.Listen(func(msg any) { - if b, ok := msg.([]byte); ok { - select { - case exit <- b: - default: - } - } - }) + cons := magic.NewKeyframe() + cons.RemoteAddr = tcp.RemoteAddr(r) + cons.UserAgent = r.UserAgent() if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Caller().Send() return } - data := <-exit + once := &core.OnceBuffer{} // init and first frame + _, _ = cons.WriteTo(once) + b := once.Buffer() stream.RemoveConsumer(cons) @@ -61,7 +52,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { case core.CodecH264, core.CodecH265: ts := time.Now() var err error - if data, err = ffmpeg.TranscodeToJPEG(data, r.URL.Query()); err != nil { + if b, err = ffmpeg.JPEGWithQuery(b, r.URL.Query()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -70,12 +61,12 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { h := w.Header() h.Set("Content-Type", "image/jpeg") - h.Set("Content-Length", strconv.Itoa(len(data))) + h.Set("Content-Length", strconv.Itoa(once.Len())) h.Set("Cache-Control", "no-cache") h.Set("Connection", "close") h.Set("Pragma", "no-cache") - if _, err := w.Write(data); err != nil { + if _, err := w.Write(b); err != nil { log.Error().Err(err).Caller().Send() } } diff --git a/pkg/magic/keyframe.go b/pkg/magic/keyframe.go index 220f3a3d..ac2e0e1c 100644 --- a/pkg/magic/keyframe.go +++ b/pkg/magic/keyframe.go @@ -1,6 +1,8 @@ package magic import ( + "io" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h264/annexb" @@ -10,83 +12,94 @@ import ( ) type Keyframe struct { - core.Listener - - UserAgent string - RemoteAddr string - - medias []*core.Media - sender *core.Sender + core.SuperConsumer + wr *core.WriteBuffer } -func (k *Keyframe) GetMedias() []*core.Media { - if k.medias == nil { - k.medias = append(k.medias, &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecH264}, - {Name: core.CodecH265}, - {Name: core.CodecJPEG}, +func NewKeyframe() *Keyframe { + return &Keyframe{ + core.SuperConsumer{ + Medias: []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecH264}, + {Name: core.CodecH265}, + {Name: core.CodecJPEG}, + }, + }, }, - }) + }, + core.NewWriteBuffer(nil), } - return k.medias } func (k *Keyframe) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { - var handler core.HandlerFunc + sender := core.NewSender(media, track.Codec) switch track.Codec.Name { case core.CodecH264: - handler = func(packet *rtp.Packet) { + sender.Handler = func(packet *rtp.Packet) { if !h264.IsKeyframe(packet.Payload) { return } b := annexb.DecodeAVCC(packet.Payload, true) - k.Fire(b) + if n, err := k.wr.Write(b); err == nil { + k.Send += n + } } if track.Codec.IsRTP() { - handler = h264.RTPDepay(track.Codec, handler) + sender.Handler = h264.RTPDepay(track.Codec, sender.Handler) } + case core.CodecH265: - handler = func(packet *rtp.Packet) { + sender.Handler = func(packet *rtp.Packet) { if !h265.IsKeyframe(packet.Payload) { return } - k.Fire(packet.Payload) + b := annexb.DecodeAVCC(packet.Payload, true) + if n, err := k.wr.Write(b); err == nil { + k.Send += n + } } if track.Codec.IsRTP() { - handler = h265.RTPDepay(track.Codec, handler) + sender.Handler = h264.RTPDepay(track.Codec, sender.Handler) + } else { + sender.Handler = h264.RepairAVCC(track.Codec, sender.Handler) } + case core.CodecJPEG: - handler = func(packet *rtp.Packet) { - k.Fire(packet.Payload) + sender.Handler = func(packet *rtp.Packet) { + if n, err := k.wr.Write(packet.Payload); err == nil { + k.Send += n + } } if track.Codec.IsRTP() { - handler = mjpeg.RTPDepay(handler) + sender.Handler = mjpeg.RTPDepay(sender.Handler) } } - k.sender = core.NewSender(media, track.Codec) - k.sender.Handler = handler - k.sender.HandleRTP(track) + sender.HandleRTP(track) + k.Senders = append(k.Senders, sender) return nil } func (k *Keyframe) CodecName() string { - if k.sender != nil { - return k.sender.Codec.Name + if len(k.Senders) != 1 { + return "" } - return "" + return k.Senders[0].Codec.Name +} + +func (k *Keyframe) WriteTo(wr io.Writer) (int64, error) { + return k.wr.WriteTo(wr) } func (k *Keyframe) Stop() error { - if k.sender != nil { - k.sender.Close() - } - return nil + _ = k.SuperConsumer.Close() + return k.wr.Close() }