From 0ac505ba0908ea52973fa8f7dc6ce2381f2de3ab Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Tue, 29 Aug 2023 17:16:51 +0300 Subject: [PATCH] Simplify MJPEG consumer --- internal/mjpeg/init.go | 14 ++++---- pkg/mjpeg/consumer.go | 81 ++++++++++++++---------------------------- 2 files changed, 32 insertions(+), 63 deletions(-) diff --git a/internal/mjpeg/init.go b/internal/mjpeg/init.go index 19f8f6c9..f5f04a63 100644 --- a/internal/mjpeg/init.go +++ b/internal/mjpeg/init.go @@ -96,10 +96,9 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) { return } - cons := &mjpeg.Consumer{ - RemoteAddr: tcp.RemoteAddr(r), - UserAgent: r.UserAgent(), - } + cons := mjpeg.NewConsumer() + cons.RemoteAddr = tcp.RemoteAddr(r) + cons.UserAgent = r.UserAgent() if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Msg("[api.mjpeg] add consumer") @@ -168,10 +167,9 @@ func handlerWS(tr *ws.Transport, _ *ws.Message) error { return errors.New(api.StreamNotFound) } - cons := &mjpeg.Consumer{ - RemoteAddr: tcp.RemoteAddr(tr.Request), - UserAgent: tr.Request.UserAgent(), - } + cons := mjpeg.NewConsumer() + cons.RemoteAddr = tcp.RemoteAddr(tr.Request) + cons.UserAgent = tr.Request.UserAgent() if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Caller().Send() diff --git a/pkg/mjpeg/consumer.go b/pkg/mjpeg/consumer.go index 3fa36040..444cbdcc 100644 --- a/pkg/mjpeg/consumer.go +++ b/pkg/mjpeg/consumer.go @@ -1,7 +1,6 @@ package mjpeg import ( - "encoding/json" "io" "github.com/AlexxIT/go2rtc/pkg/core" @@ -9,51 +8,42 @@ import ( ) type Consumer struct { - UserAgent string - RemoteAddr string - - medias []*core.Media - sender *core.Sender - + core.SuperConsumer wr *core.WriteBuffer - - send int } -func (c *Consumer) GetMedias() []*core.Media { - if c.medias == nil { - c.medias = []*core.Media{ - { - Kind: core.KindVideo, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecJPEG}, +func NewConsumer() *Consumer { + return &Consumer{ + core.SuperConsumer{ + Type: "MJPEG passive consumer", + Medias: []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecJPEG}, + }, }, }, - } + }, + core.NewWriteBuffer(nil), } - return c.medias } func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { - if c.wr == nil { - c.wr = core.NewWriteBuffer(nil) - } - - if c.sender == nil { - c.sender = core.NewSender(media, track.Codec) - c.sender.Handler = func(packet *rtp.Packet) { - if n, err := c.wr.Write(packet.Payload); err == nil { - c.send += n - } - } - - if track.Codec.IsRTP() { - c.sender.Handler = RTPDepay(c.sender.Handler) + sender := core.NewSender(media, track.Codec) + sender.Handler = func(packet *rtp.Packet) { + if n, err := c.wr.Write(packet.Payload); err == nil { + c.Send += n } } - c.sender.HandleRTP(track) + if track.Codec.IsRTP() { + sender.Handler = RTPDepay(sender.Handler) + } + + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) return nil } @@ -62,25 +52,6 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { } func (c *Consumer) Stop() error { - if c.sender != nil { - c.sender.Close() - } - if c.wr != nil { - _ = c.wr.Close() - } - return nil -} - -func (c *Consumer) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "MJPEG passive consumer", - RemoteAddr: c.RemoteAddr, - UserAgent: c.UserAgent, - Medias: c.medias, - Send: c.send, - } - if c.sender != nil { - info.Senders = []*core.Sender{c.sender} - } - return json.Marshal(info) + _ = c.SuperConsumer.Close() + return c.wr.Close() }