Rewrite JPEG snapshot consumer

This commit is contained in:
Alexey Khit
2023-08-30 05:57:00 +03:00
parent ef63cec7a8
commit 66c858e00e
3 changed files with 79 additions and 61 deletions
+19 -5
View File
@@ -12,22 +12,36 @@ import (
"github.com/AlexxIT/go2rtc/pkg/shell" "github.com/AlexxIT/go2rtc/pkg/shell"
) )
func TranscodeToJPEG(b []byte, query url.Values) ([]byte, error) { func JPEGWithQuery(b []byte, query url.Values) ([]byte, error) {
ffmpegArgs := parseQuery(query) args := parseQuery(query)
cmdArgs := shell.QuoteSplit(ffmpegArgs.String()) return transcode(b, args.String())
}
func JPEGWithScale(b []byte, width, height int) ([]byte, error) {
args := defaultArgs()
args.AddFilter(fmt.Sprintf("scale=%d:%d", width, height))
return transcode(b, args.String())
}
func transcode(b []byte, args string) ([]byte, error) {
cmdArgs := shell.QuoteSplit(args)
cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
cmd.Stdin = bytes.NewBuffer(b) cmd.Stdin = bytes.NewBuffer(b)
return cmd.Output() return cmd.Output()
} }
func parseQuery(query url.Values) *ffmpeg.Args { func defaultArgs() *ffmpeg.Args {
args := &ffmpeg.Args{ return &ffmpeg.Args{
Bin: defaults["bin"], Bin: defaults["bin"],
Global: defaults["global"], Global: defaults["global"],
Input: "-i -", Input: "-i -",
Codecs: []string{defaults["mjpeg"]}, Codecs: []string{defaults["mjpeg"]},
Output: defaults["output/mjpeg"], Output: defaults["output/mjpeg"],
} }
}
func parseQuery(query url.Values) *ffmpeg.Args {
args := defaultArgs()
var width = -1 var width = -1
var height = -1 var height = -1
+9 -18
View File
@@ -33,27 +33,18 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
return return
} }
exit := make(chan []byte) cons := magic.NewKeyframe()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons := &magic.Keyframe{ cons.UserAgent = r.UserAgent()
RemoteAddr: tcp.RemoteAddr(r),
UserAgent: r.UserAgent(),
}
cons.Listen(func(msg any) {
if b, ok := msg.([]byte); ok {
select {
case exit <- b:
default:
}
}
})
if err := stream.AddConsumer(cons); err != nil { if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
return return
} }
data := <-exit once := &core.OnceBuffer{} // init and first frame
_, _ = cons.WriteTo(once)
b := once.Buffer()
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
@@ -61,7 +52,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
case core.CodecH264, core.CodecH265: case core.CodecH264, core.CodecH265:
ts := time.Now() ts := time.Now()
var err error var err error
if data, err = ffmpeg.TranscodeToJPEG(data, r.URL.Query()); err != nil { if b, err = ffmpeg.JPEGWithQuery(b, r.URL.Query()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
@@ -70,12 +61,12 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
h := w.Header() h := w.Header()
h.Set("Content-Type", "image/jpeg") h.Set("Content-Type", "image/jpeg")
h.Set("Content-Length", strconv.Itoa(len(data))) h.Set("Content-Length", strconv.Itoa(once.Len()))
h.Set("Cache-Control", "no-cache") h.Set("Cache-Control", "no-cache")
h.Set("Connection", "close") h.Set("Connection", "close")
h.Set("Pragma", "no-cache") h.Set("Pragma", "no-cache")
if _, err := w.Write(data); err != nil { if _, err := w.Write(b); err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
} }
} }
+51 -38
View File
@@ -1,6 +1,8 @@
package magic package magic
import ( import (
"io"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb" "github.com/AlexxIT/go2rtc/pkg/h264/annexb"
@@ -10,83 +12,94 @@ import (
) )
type Keyframe struct { type Keyframe struct {
core.Listener core.SuperConsumer
wr *core.WriteBuffer
UserAgent string
RemoteAddr string
medias []*core.Media
sender *core.Sender
} }
func (k *Keyframe) GetMedias() []*core.Media { func NewKeyframe() *Keyframe {
if k.medias == nil { return &Keyframe{
k.medias = append(k.medias, &core.Media{ core.SuperConsumer{
Kind: core.KindVideo, Medias: []*core.Media{
Direction: core.DirectionSendonly, {
Codecs: []*core.Codec{ Kind: core.KindVideo,
{Name: core.CodecH264}, Direction: core.DirectionSendonly,
{Name: core.CodecH265}, Codecs: []*core.Codec{
{Name: core.CodecJPEG}, {Name: core.CodecH264},
{Name: core.CodecH265},
{Name: core.CodecJPEG},
},
},
}, },
}) },
core.NewWriteBuffer(nil),
} }
return k.medias
} }
func (k *Keyframe) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { func (k *Keyframe) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
var handler core.HandlerFunc sender := core.NewSender(media, track.Codec)
switch track.Codec.Name { switch track.Codec.Name {
case core.CodecH264: case core.CodecH264:
handler = func(packet *rtp.Packet) { sender.Handler = func(packet *rtp.Packet) {
if !h264.IsKeyframe(packet.Payload) { if !h264.IsKeyframe(packet.Payload) {
return return
} }
b := annexb.DecodeAVCC(packet.Payload, true) b := annexb.DecodeAVCC(packet.Payload, true)
k.Fire(b) if n, err := k.wr.Write(b); err == nil {
k.Send += n
}
} }
if track.Codec.IsRTP() { if track.Codec.IsRTP() {
handler = h264.RTPDepay(track.Codec, handler) sender.Handler = h264.RTPDepay(track.Codec, sender.Handler)
} }
case core.CodecH265: case core.CodecH265:
handler = func(packet *rtp.Packet) { sender.Handler = func(packet *rtp.Packet) {
if !h265.IsKeyframe(packet.Payload) { if !h265.IsKeyframe(packet.Payload) {
return return
} }
k.Fire(packet.Payload) b := annexb.DecodeAVCC(packet.Payload, true)
if n, err := k.wr.Write(b); err == nil {
k.Send += n
}
} }
if track.Codec.IsRTP() { if track.Codec.IsRTP() {
handler = h265.RTPDepay(track.Codec, handler) sender.Handler = h264.RTPDepay(track.Codec, sender.Handler)
} else {
sender.Handler = h264.RepairAVCC(track.Codec, sender.Handler)
} }
case core.CodecJPEG: case core.CodecJPEG:
handler = func(packet *rtp.Packet) { sender.Handler = func(packet *rtp.Packet) {
k.Fire(packet.Payload) if n, err := k.wr.Write(packet.Payload); err == nil {
k.Send += n
}
} }
if track.Codec.IsRTP() { if track.Codec.IsRTP() {
handler = mjpeg.RTPDepay(handler) sender.Handler = mjpeg.RTPDepay(sender.Handler)
} }
} }
k.sender = core.NewSender(media, track.Codec) sender.HandleRTP(track)
k.sender.Handler = handler k.Senders = append(k.Senders, sender)
k.sender.HandleRTP(track)
return nil return nil
} }
func (k *Keyframe) CodecName() string { func (k *Keyframe) CodecName() string {
if k.sender != nil { if len(k.Senders) != 1 {
return k.sender.Codec.Name return ""
} }
return "" return k.Senders[0].Codec.Name
}
func (k *Keyframe) WriteTo(wr io.Writer) (int64, error) {
return k.wr.WriteTo(wr)
} }
func (k *Keyframe) Stop() error { func (k *Keyframe) Stop() error {
if k.sender != nil { _ = k.SuperConsumer.Close()
k.sender.Close() return k.wr.Close()
}
return nil
} }