From 22cc8ac2c49b636423ee785844a71c761634d736 Mon Sep 17 00:00:00 2001 From: Alex X Date: Wed, 1 Oct 2025 16:57:39 +0300 Subject: [PATCH] Code refactoring for #1762 --- internal/streams/api.go | 80 ++++++++----------------- internal/streams/preload.go | 56 +++++++++++++----- internal/streams/streams.go | 16 +++-- pkg/preload/consumer.go | 82 -------------------------- pkg/probe/{producer.go => consumer.go} | 33 ++--------- 5 files changed, 79 insertions(+), 188 deletions(-) delete mode 100644 pkg/preload/consumer.go rename pkg/probe/{producer.go => consumer.go} (60%) diff --git a/internal/streams/api.go b/internal/streams/api.go index 1b91f906..d162cdf9 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -5,6 +5,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/app" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/probe" ) @@ -27,7 +28,7 @@ func apiStreams(w http.ResponseWriter, r *http.Request) { return } - cons := probe.NewProbe(query) + cons := probe.Create("probe", query) if len(cons.Medias) != 0 { cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { @@ -126,73 +127,44 @@ func apiStreamsDOT(w http.ResponseWriter, r *http.Request) { func apiPreload(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() src := query.Get("src") - query.Del("src") - if src == "" { - http.Error(w, "no source", http.StatusBadRequest) + // check if stream exists + stream := Get(src) + if stream == nil { + http.Error(w, "", http.StatusNotFound) return } switch r.Method { case "PUT": - // check if stream exists - stream := Get(src) - if stream == nil { - http.Error(w, "stream not found", http.StatusNotFound) + // it's safe to delete from map while iterating + for k := range query { + switch k { + case core.KindVideo, core.KindAudio, "microphone": + default: + delete(query, k) + } + } + + rawQuery := query.Encode() + + if err := AddPreload(stream, rawQuery); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } - // check if consumer exists - if cons, ok := preloads[src]; ok { - stream.RemoveConsumer(cons) - delete(preloads, src) - } - - // parse query parameters - var rawQuery string - if query.Has("video") { - if videoQuery := query.Get("video"); videoQuery != "" { - rawQuery += "video=" + videoQuery + "#" - } else { - rawQuery += "video#" - } - } - if query.Has("audio") { - if audioQuery := query.Get("audio"); audioQuery != "" { - rawQuery += "audio=" + audioQuery + "#" - } else { - rawQuery += "audio#" - } - } - if query.Has("microphone") { - if micQuery := query.Get("microphone"); micQuery != "" { - rawQuery += "microphone=" + micQuery + "#" - } else { - rawQuery += "microphone#" - } - } - if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil { - log.Error().Err(err).Str("src", src).Msg("Failed to patch config for PUT") - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + case "DELETE": + if err := DelPreload(stream); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } - Preload(src, rawQuery) - - case "DELETE": - if cons, ok := preloads[src]; ok { - if stream := Get(src); stream != nil { - stream.RemoveConsumer(cons) - } else { - cons.Stop() - } - - delete(preloads, src) - } - if err := app.PatchConfig([]string{"preload", src}, nil); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusInternalServerError) } default: diff --git a/internal/streams/preload.go b/internal/streams/preload.go index 7314df55..527746ac 100644 --- a/internal/streams/preload.go +++ b/internal/streams/preload.go @@ -1,34 +1,58 @@ package streams import ( + "errors" "net/url" + "sync" - "github.com/AlexxIT/go2rtc/pkg/preload" + "github.com/AlexxIT/go2rtc/pkg/probe" ) -var preloads = map[string]*preload.Preload{} +var preloads = map[*Stream]*probe.Probe{} +var preloadsMu sync.Mutex -func (s *Stream) Preload(name string, query url.Values) error { - cons := preload.NewPreload(name, query) - preloads[name] = cons +func Preload(stream *Stream, rawQuery string) { + if err := AddPreload(stream, rawQuery); err != nil { + log.Error().Err(err).Caller().Send() + } +} - if err := s.AddConsumer(cons); err != nil { +func AddPreload(stream *Stream, rawQuery string) error { + if rawQuery == "" { + rawQuery = "video&audio" + } + + query, err := url.ParseQuery(rawQuery) + if err != nil { return err } + preloadsMu.Lock() + defer preloadsMu.Unlock() + + if cons := preloads[stream]; cons != nil { + stream.RemoveConsumer(cons) + } + + cons := probe.Create("preload", query) + + if err = stream.AddConsumer(cons); err != nil { + return err + } + + preloads[stream] = cons return nil } -func Preload(src string, rawQuery string) { - // skip if exists - if _, ok := preloads[src]; ok { - return +func DelPreload(stream *Stream) error { + preloadsMu.Lock() + defer preloadsMu.Unlock() + + if cons := preloads[stream]; cons != nil { + stream.RemoveConsumer(cons) + delete(preloads, stream) + return nil } - if stream := Get(src); stream != nil { - query := ParseQuery(rawQuery) - if err := stream.Preload(src, query); err != nil { - log.Error().Err(err).Caller().Send() - } - } + return errors.New("streams: preload not found") } diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 8f07ea12..a0b1ed68 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -36,17 +36,15 @@ func Init() { } time.AfterFunc(time.Second, func() { - if cfg.Publish != nil { - for name, dst := range cfg.Publish { - if stream := Get(name); stream != nil { - Publish(stream, dst) - } + // range for nil map is OK + for name, dst := range cfg.Publish { + if stream := Get(name); stream != nil { + Publish(stream, dst) } } - - if cfg.Preload != nil { - for name, rawQuery := range cfg.Preload { - Preload(name, rawQuery) + for name, rawQuery := range cfg.Preload { + if stream := Get(name); stream != nil { + Preload(stream, rawQuery) } } }) diff --git a/pkg/preload/consumer.go b/pkg/preload/consumer.go deleted file mode 100644 index 4d3735a8..00000000 --- a/pkg/preload/consumer.go +++ /dev/null @@ -1,82 +0,0 @@ -package preload - -import ( - "net/url" - "strings" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/pion/rtp" -) - -type Preload struct { - core.Connection - closed core.Waiter -} - -func NewPreload(name string, query url.Values) *Preload { - medias := core.ParseQuery(query) - - for _, value := range query["microphone"] { - media := &core.Media{Kind: core.KindAudio, Direction: core.DirectionRecvonly} - - for _, name := range strings.Split(value, ",") { - name = strings.ToUpper(name) - switch name { - case "", "COPY": - name = core.CodecAny - } - media.Codecs = append(media.Codecs, &core.Codec{Name: name}) - } - - medias = append(medias, media) - } - - if len(medias) == 0 { - medias = []*core.Media{ - { - Kind: core.KindVideo, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{{Name: core.CodecAny}}, - }, - { - Kind: core.KindAudio, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{{Name: core.CodecAny}}, - }, - } - } - - return &Preload{ - Connection: core.Connection{ - ID: core.NewID(), - FormatName: "preload", - Medias: medias, - }, - } -} - -func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - sender := core.NewSender(media, track.Codec) - sender.Handler = func(pkt *rtp.Packet) { - p.Send += pkt.MarshalSize() - } - sender.HandleRTP(track) - p.Senders = append(p.Senders, sender) - return nil -} - -func (p *Preload) Start() error { - p.closed.Wait() - return nil -} - -func (p *Preload) Stop() error { - for _, receiver := range p.Receivers { - receiver.Close() - } - for _, sender := range p.Senders { - sender.Close() - } - p.closed.Done(nil) - return nil -} diff --git a/pkg/probe/producer.go b/pkg/probe/consumer.go similarity index 60% rename from pkg/probe/producer.go rename to pkg/probe/consumer.go index 1fbd3efb..c6aa4478 100644 --- a/pkg/probe/producer.go +++ b/pkg/probe/consumer.go @@ -11,7 +11,7 @@ type Probe struct { core.Connection } -func NewProbe(query url.Values) *Probe { +func Create(name string, query url.Values) *Probe { medias := core.ParseQuery(query) for _, value := range query["microphone"] { @@ -32,39 +32,18 @@ func NewProbe(query url.Values) *Probe { return &Probe{ Connection: core.Connection{ ID: core.NewID(), - FormatName: "probe", + FormatName: name, Medias: medias, }, } } -func (p *Probe) GetMedias() []*core.Media { - return p.Medias -} - func (p *Probe) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { sender := core.NewSender(media, track.Codec) - sender.Bind(track) + sender.Handler = func(pkt *core.Packet) { + p.Send += len(pkt.Payload) + } + sender.HandleRTP(track) p.Senders = append(p.Senders, sender) return nil } - -func (p *Probe) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - receiver := core.NewReceiver(media, codec) - p.Receivers = append(p.Receivers, receiver) - return receiver, nil -} - -func (p *Probe) Start() error { - return nil -} - -func (p *Probe) Stop() error { - for _, receiver := range p.Receivers { - receiver.Close() - } - for _, sender := range p.Senders { - sender.Close() - } - return nil -}