From dfc1f45f974f8b182b1f9574dc8c10d2e0817cab Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 2 Jun 2025 22:06:47 +0300 Subject: [PATCH 01/12] support preloading streams --- internal/streams/preload.go | 30 ++++++++++++ internal/streams/streams.go | 17 +++++-- pkg/preload/producer.go | 92 +++++++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 4 deletions(-) create mode 100644 internal/streams/preload.go create mode 100644 pkg/preload/producer.go diff --git a/internal/streams/preload.go b/internal/streams/preload.go new file mode 100644 index 00000000..c811cc5c --- /dev/null +++ b/internal/streams/preload.go @@ -0,0 +1,30 @@ +package streams + +import ( + "net/url" + "strings" + + "github.com/AlexxIT/go2rtc/pkg/preload" +) + +func (s *Stream) Preload(query url.Values) error { + cons := preload.NewPreload(query) + + if err := s.AddConsumer(cons); err != nil { + return err + } + + return nil +} + +func Preload(src string) { + name, rawQuery, _ := strings.Cut(src, "#") + query := ParseQuery(rawQuery) + + if stream := Get(name); stream != nil { + if err := stream.Preload(query); err != nil { + log.Error().Err(err).Caller().Send() + } + return + } +} diff --git a/internal/streams/streams.go b/internal/streams/streams.go index dcbaba28..7bbccace 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -16,6 +16,7 @@ func Init() { var cfg struct { Streams map[string]any `yaml:"streams"` Publish map[string]any `yaml:"publish"` + Preload []string `yaml:"preload"` } app.LoadConfig(&cfg) @@ -29,14 +30,22 @@ func Init() { api.HandleFunc("api/streams", apiStreams) api.HandleFunc("api/streams.dot", apiStreamsDOT) - if cfg.Publish == nil { + if cfg.Publish == nil && cfg.Preload == nil { return } time.AfterFunc(time.Second, func() { - for name, dst := range cfg.Publish { - if stream := Get(name); stream != nil { - Publish(stream, dst) + if cfg.Publish != nil { + for name, dst := range cfg.Publish { + if stream := Get(name); stream != nil { + Publish(stream, dst) + } + } + } + + if cfg.Preload != nil { + for _, src := range cfg.Preload { + Preload(src) } } }) diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go new file mode 100644 index 00000000..811cf2e4 --- /dev/null +++ b/pkg/preload/producer.go @@ -0,0 +1,92 @@ +package preload + +import ( + "net/url" + "strings" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +type Preload struct { + core.Connection + + Closed core.Waiter +} + +func NewPreload(query url.Values) *Preload { + medias := core.ParseQuery(query) + + for _, value := range query["microphone"] { + media := &core.Media{Kind: core.KindAudio, Direction: core.DirectionRecvonly} + + for _, name := range strings.Split(value, ",") { + name = strings.ToUpper(name) + switch name { + case "", "COPY": + name = core.CodecAny + } + media.Codecs = append(media.Codecs, &core.Codec{Name: name}) + } + + medias = append(medias, media) + } + + if len(medias) == 0 { + medias = []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{{Name: core.CodecAny}}, + }, + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{{Name: core.CodecAny}}, + }, + } + } + + return &Preload{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "preload", + Medias: medias, + Protocol: "native", + RemoteAddr: "localhost", + UserAgent: "go2rtc", + }, + } +} + +func (p *Preload) GetMedias() []*core.Media { + return p.Medias +} + +func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + sender := core.NewSender(media, track.Codec) + sender.Bind(track) + p.Senders = append(p.Senders, sender) + return nil +} + +func (p *Preload) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + receiver := core.NewReceiver(media, codec) + p.Receivers = append(p.Receivers, receiver) + return receiver, nil +} + +func (p *Preload) Start() error { + p.Closed.Wait() + return nil +} + +func (p *Preload) Stop() error { + for _, receiver := range p.Receivers { + receiver.Close() + } + for _, sender := range p.Senders { + sender.Close() + } + p.Closed.Done(nil) + return nil +} From 020549ef60103724575aee52e20096e817cb694b Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 2 Jun 2025 22:16:43 +0300 Subject: [PATCH 02/12] readme --- README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.md b/README.md index 90a2537f..09cafaeb 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ Ultimate camera streaming application with support RTSP, WebRTC, HomeKit, FFmpeg * [Incoming sources](#incoming-sources) * [Stream to camera](#stream-to-camera) * [Publish stream](#publish-stream) + * [Preload stream](#preload-stream) * [Module: API](#module-api) * [Module: RTSP](#module-rtsp) * [Module: RTMP](#module-rtmp) @@ -818,6 +819,25 @@ streams: - **Telegram Desktop App** > Any public or private channel or group (where you admin) > Live stream > Start with... > Start streaming. - **YouTube** > Create > Go live > Stream latency: Ultra low-latency > Copy: Stream URL + Stream key. +### Preload stream + +You can preload any stream on go2rtc start. This is useful for cameras that take a long time to start up. + +```yaml +preload: + - my_stream1 + - my_stream2#video#audio#microphone + - my_stream3#video=h265#audio=opus +streams: + my_stream1: + - rtsp://129.168.3.1:554/stream1 + my_stream2: + - rtsp://129.168.3.1:554/stream1 + my_stream3: + - rtsp://129.168.3.1:554/stream1 + - ffmpeg:my_stream3#video=copy#audio=opus +```` + ### Module: API The HTTP API is the main part for interacting with the application. Default address: `http://localhost:1984/`. From 493fa1ef6a1478e39c6a18f075c1434e43007242 Mon Sep 17 00:00:00 2001 From: seydx Date: Thu, 5 Jun 2025 11:33:03 +0300 Subject: [PATCH 03/12] add api endpoints and change config syntax --- internal/streams/api.go | 62 +++++++++++++++++++++++++++++++++++++ internal/streams/preload.go | 26 +++++++++------- internal/streams/streams.go | 11 ++++--- pkg/preload/producer.go | 6 ++-- 4 files changed, 85 insertions(+), 20 deletions(-) diff --git a/internal/streams/api.go b/internal/streams/api.go index 061e61c2..c0c6744b 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -122,3 +122,65 @@ func apiStreamsDOT(w http.ResponseWriter, r *http.Request) { api.Response(w, dot, "text/vnd.graphviz") } + +func apiPreload(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + src := query.Get("src") + query.Del("src") + + videoQuery := query.Get("video") + audioQuery := query.Get("audio") + micQuery := query.Get("microphone") + + if src == "" { + http.Error(w, "no source", http.StatusBadRequest) + return + } + + switch r.Method { + case "PUT": + // check if stream exists + if stream := Get(src); stream == nil { + http.Error(w, "stream not found", http.StatusNotFound) + return + } + + // check if consumer exists + if cons, ok := preloads[src]; ok { + cons.Stop() + delete(preloads, src) + } + + var rawQuery string + if videoQuery != "" { + rawQuery += "video=" + videoQuery + "#" + } + if audioQuery != "" { + rawQuery += "audio=" + audioQuery + "#" + } + if micQuery != "" { + rawQuery += "microphone=" + micQuery + } + + if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil { + log.Error().Err(err).Str("src", src).Msg("Failed to patch config for PUT") + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + Preload(src, rawQuery) + + case "DELETE": + if cons, ok := preloads[src]; ok { + cons.Stop() + delete(preloads, src) + } + + if err := app.PatchConfig([]string{"preload", src}, nil); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + } + + default: + http.Error(w, "", http.StatusMethodNotAllowed) + } +} diff --git a/internal/streams/preload.go b/internal/streams/preload.go index c811cc5c..7314df55 100644 --- a/internal/streams/preload.go +++ b/internal/streams/preload.go @@ -2,13 +2,15 @@ package streams import ( "net/url" - "strings" "github.com/AlexxIT/go2rtc/pkg/preload" ) -func (s *Stream) Preload(query url.Values) error { - cons := preload.NewPreload(query) +var preloads = map[string]*preload.Preload{} + +func (s *Stream) Preload(name string, query url.Values) error { + cons := preload.NewPreload(name, query) + preloads[name] = cons if err := s.AddConsumer(cons); err != nil { return err @@ -17,14 +19,16 @@ func (s *Stream) Preload(query url.Values) error { return nil } -func Preload(src string) { - name, rawQuery, _ := strings.Cut(src, "#") - query := ParseQuery(rawQuery) - - if stream := Get(name); stream != nil { - if err := stream.Preload(query); err != nil { - log.Error().Err(err).Caller().Send() - } +func Preload(src string, rawQuery string) { + // skip if exists + if _, ok := preloads[src]; ok { return } + + if stream := Get(src); stream != nil { + query := ParseQuery(rawQuery) + if err := stream.Preload(src, query); err != nil { + log.Error().Err(err).Caller().Send() + } + } } diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 7bbccace..8f07ea12 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -14,9 +14,9 @@ import ( func Init() { var cfg struct { - Streams map[string]any `yaml:"streams"` - Publish map[string]any `yaml:"publish"` - Preload []string `yaml:"preload"` + Streams map[string]any `yaml:"streams"` + Publish map[string]any `yaml:"publish"` + Preload map[string]string `yaml:"preload"` } app.LoadConfig(&cfg) @@ -29,6 +29,7 @@ func Init() { api.HandleFunc("api/streams", apiStreams) api.HandleFunc("api/streams.dot", apiStreamsDOT) + api.HandleFunc("api/preload", apiPreload) if cfg.Publish == nil && cfg.Preload == nil { return @@ -44,8 +45,8 @@ func Init() { } if cfg.Preload != nil { - for _, src := range cfg.Preload { - Preload(src) + for name, rawQuery := range cfg.Preload { + Preload(name, rawQuery) } } }) diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go index 811cf2e4..05a50d52 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/producer.go @@ -9,11 +9,10 @@ import ( type Preload struct { core.Connection - Closed core.Waiter } -func NewPreload(query url.Values) *Preload { +func NewPreload(name string, query url.Values) *Preload { medias := core.ParseQuery(query) for _, value := range query["microphone"] { @@ -49,11 +48,10 @@ func NewPreload(query url.Values) *Preload { return &Preload{ Connection: core.Connection{ ID: core.NewID(), - FormatName: "preload", Medias: medias, Protocol: "native", RemoteAddr: "localhost", - UserAgent: "go2rtc", + UserAgent: "go2rtc/preload", }, } } From 8ab7aeb8b25995199980e2411793b302ee9ab126 Mon Sep 17 00:00:00 2001 From: seydx Date: Thu, 5 Jun 2025 15:51:14 +0300 Subject: [PATCH 04/12] update readme --- README.md | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 09cafaeb..627d9bb9 100644 --- a/README.md +++ b/README.md @@ -825,18 +825,19 @@ You can preload any stream on go2rtc start. This is useful for cameras that take ```yaml preload: - - my_stream1 - - my_stream2#video#audio#microphone - - my_stream3#video=h265#audio=opus + camera1: # default: video&audio = ANY + camera2: "video" # preload only video track + camera3: "video=h264#audio=opus" # initialize transcoding pipeline + streams: - my_stream1: - - rtsp://129.168.3.1:554/stream1 - my_stream2: - - rtsp://129.168.3.1:554/stream1 - my_stream3: - - rtsp://129.168.3.1:554/stream1 - - ffmpeg:my_stream3#video=copy#audio=opus -```` + camera1: + - rtsp://192.168.1.100/stream + camera2: + - rtsp://192.168.1.101/stream + camera3: + - rtsp://192.168.1.102/h265stream + - ffmpeg:camera3#video=h264#audio=opus#hardware +``` ### Module: API From 91eeefec68405539d6d15996dbbb4d0d951b82fb Mon Sep 17 00:00:00 2001 From: seydx Date: Thu, 5 Jun 2025 16:01:49 +0300 Subject: [PATCH 05/12] openapi: add preload endpoints --- api/openapi.yaml | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/api/openapi.yaml b/api/openapi.yaml index 618acb48..a2d66a87 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -237,6 +237,54 @@ paths: + /api/preload: + put: + summary: Preload new stream + tags: [ Streams list ] + parameters: + - name: src + in: query + description: Stream source (name) + required: true + schema: { type: string } + example: "camera1" + - name: video + in: query + description: Video codecs filter + required: false + schema: { type: string } + example: all,h264,h265,... + - name: audio + in: query + description: Audio codecs filter + required: false + schema: { type: string } + example: all,aac,opus,... + - name: microphone + in: query + description: Microphone codecs filter + required: false + schema: { type: string } + example: all,aac,opus,... + responses: + default: + description: Default response + delete: + summary: Delete preloaded stream + tags: [ Streams list ] + parameters: + - name: src + in: query + description: Stream source (name) + required: true + schema: { type: string } + example: "camera1" + responses: + default: + description: Default response + + + /api/streams?src={src}: get: summary: Get stream info in JSON format From b6579122d1037934bccf33e1007a15e76bd2f166 Mon Sep 17 00:00:00 2001 From: seydx Date: Fri, 6 Jun 2025 03:11:28 +0200 Subject: [PATCH 06/12] fix --- internal/streams/api.go | 12 +++++++++--- pkg/preload/producer.go | 6 +++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/internal/streams/api.go b/internal/streams/api.go index c0c6744b..47febeb4 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -140,14 +140,15 @@ func apiPreload(w http.ResponseWriter, r *http.Request) { switch r.Method { case "PUT": // check if stream exists - if stream := Get(src); stream == nil { + stream := Get(src) + if stream == nil { http.Error(w, "stream not found", http.StatusNotFound) return } // check if consumer exists if cons, ok := preloads[src]; ok { - cons.Stop() + stream.RemoveConsumer(cons) delete(preloads, src) } @@ -172,7 +173,12 @@ func apiPreload(w http.ResponseWriter, r *http.Request) { case "DELETE": if cons, ok := preloads[src]; ok { - cons.Stop() + if stream := Get(src); stream != nil { + stream.RemoveConsumer(cons) + } else { + cons.Stop() + } + delete(preloads, src) } diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go index 05a50d52..932f5e29 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/producer.go @@ -9,7 +9,7 @@ import ( type Preload struct { core.Connection - Closed core.Waiter + closed core.Waiter } func NewPreload(name string, query url.Values) *Preload { @@ -74,7 +74,7 @@ func (p *Preload) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver } func (p *Preload) Start() error { - p.Closed.Wait() + p.closed.Wait() return nil } @@ -85,6 +85,6 @@ func (p *Preload) Stop() error { for _, sender := range p.Senders { sender.Close() } - p.Closed.Done(nil) + p.closed.Done(nil) return nil } From 6732e726d51b8433a413f81973e6ccd4a9c2848b Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 16 Jun 2025 00:33:16 +0200 Subject: [PATCH 07/12] update preload consumer to handle RTP packets --- pkg/preload/producer.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go index 932f5e29..8eb6aec2 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/producer.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" ) type Preload struct { @@ -62,7 +63,10 @@ func (p *Preload) GetMedias() []*core.Media { func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { sender := core.NewSender(media, track.Codec) - sender.Bind(track) + sender.Handler = func(pkt *rtp.Packet) { + p.Send += pkt.MarshalSize() + } + sender.HandleRTP(track) p.Senders = append(p.Senders, sender) return nil } From 57714544004b19da46771baccc0c6296bde8ae51 Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 16 Jun 2025 00:50:48 +0200 Subject: [PATCH 08/12] use preload as format name --- pkg/preload/producer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go index 8eb6aec2..748dff16 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/producer.go @@ -49,10 +49,9 @@ func NewPreload(name string, query url.Values) *Preload { return &Preload{ Connection: core.Connection{ ID: core.NewID(), + FormatName: "preload", Medias: medias, - Protocol: "native", RemoteAddr: "localhost", - UserAgent: "go2rtc/preload", }, } } From ef318f663e854d3ba5e8af3641947edfec43df4d Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 16 Jun 2025 09:32:07 +0200 Subject: [PATCH 09/12] fix preload queries --- internal/streams/api.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/internal/streams/api.go b/internal/streams/api.go index 47febeb4..1b91f906 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -128,10 +128,6 @@ func apiPreload(w http.ResponseWriter, r *http.Request) { src := query.Get("src") query.Del("src") - videoQuery := query.Get("video") - audioQuery := query.Get("audio") - micQuery := query.Get("microphone") - if src == "" { http.Error(w, "no source", http.StatusBadRequest) return @@ -152,15 +148,28 @@ func apiPreload(w http.ResponseWriter, r *http.Request) { delete(preloads, src) } + // parse query parameters var rawQuery string - if videoQuery != "" { - rawQuery += "video=" + videoQuery + "#" + if query.Has("video") { + if videoQuery := query.Get("video"); videoQuery != "" { + rawQuery += "video=" + videoQuery + "#" + } else { + rawQuery += "video#" + } } - if audioQuery != "" { - rawQuery += "audio=" + audioQuery + "#" + if query.Has("audio") { + if audioQuery := query.Get("audio"); audioQuery != "" { + rawQuery += "audio=" + audioQuery + "#" + } else { + rawQuery += "audio#" + } } - if micQuery != "" { - rawQuery += "microphone=" + micQuery + if query.Has("microphone") { + if micQuery := query.Get("microphone"); micQuery != "" { + rawQuery += "microphone=" + micQuery + "#" + } else { + rawQuery += "microphone#" + } } if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil { From 647b2acf487a36b9d478c8659a6c5f43880f840a Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 16 Jun 2025 09:58:55 +0200 Subject: [PATCH 10/12] cleanup --- pkg/preload/{producer.go => consumer.go} | 11 ----------- 1 file changed, 11 deletions(-) rename pkg/preload/{producer.go => consumer.go} (84%) diff --git a/pkg/preload/producer.go b/pkg/preload/consumer.go similarity index 84% rename from pkg/preload/producer.go rename to pkg/preload/consumer.go index 748dff16..4d3735a8 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/consumer.go @@ -51,15 +51,10 @@ func NewPreload(name string, query url.Values) *Preload { ID: core.NewID(), FormatName: "preload", Medias: medias, - RemoteAddr: "localhost", }, } } -func (p *Preload) GetMedias() []*core.Media { - return p.Medias -} - func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { sender := core.NewSender(media, track.Codec) sender.Handler = func(pkt *rtp.Packet) { @@ -70,12 +65,6 @@ func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Rec return nil } -func (p *Preload) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - receiver := core.NewReceiver(media, codec) - p.Receivers = append(p.Receivers, receiver) - return receiver, nil -} - func (p *Preload) Start() error { p.closed.Wait() return nil From 22cc8ac2c49b636423ee785844a71c761634d736 Mon Sep 17 00:00:00 2001 From: Alex X Date: Wed, 1 Oct 2025 16:57:39 +0300 Subject: [PATCH 11/12] Code refactoring for #1762 --- internal/streams/api.go | 80 ++++++++----------------- internal/streams/preload.go | 56 +++++++++++++----- internal/streams/streams.go | 16 +++-- pkg/preload/consumer.go | 82 -------------------------- pkg/probe/{producer.go => consumer.go} | 33 ++--------- 5 files changed, 79 insertions(+), 188 deletions(-) delete mode 100644 pkg/preload/consumer.go rename pkg/probe/{producer.go => consumer.go} (60%) diff --git a/internal/streams/api.go b/internal/streams/api.go index 1b91f906..d162cdf9 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -5,6 +5,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/app" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/probe" ) @@ -27,7 +28,7 @@ func apiStreams(w http.ResponseWriter, r *http.Request) { return } - cons := probe.NewProbe(query) + cons := probe.Create("probe", query) if len(cons.Medias) != 0 { cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { @@ -126,73 +127,44 @@ func apiStreamsDOT(w http.ResponseWriter, r *http.Request) { func apiPreload(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() src := query.Get("src") - query.Del("src") - if src == "" { - http.Error(w, "no source", http.StatusBadRequest) + // check if stream exists + stream := Get(src) + if stream == nil { + http.Error(w, "", http.StatusNotFound) return } switch r.Method { case "PUT": - // check if stream exists - stream := Get(src) - if stream == nil { - http.Error(w, "stream not found", http.StatusNotFound) + // it's safe to delete from map while iterating + for k := range query { + switch k { + case core.KindVideo, core.KindAudio, "microphone": + default: + delete(query, k) + } + } + + rawQuery := query.Encode() + + if err := AddPreload(stream, rawQuery); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } - // check if consumer exists - if cons, ok := preloads[src]; ok { - stream.RemoveConsumer(cons) - delete(preloads, src) - } - - // parse query parameters - var rawQuery string - if query.Has("video") { - if videoQuery := query.Get("video"); videoQuery != "" { - rawQuery += "video=" + videoQuery + "#" - } else { - rawQuery += "video#" - } - } - if query.Has("audio") { - if audioQuery := query.Get("audio"); audioQuery != "" { - rawQuery += "audio=" + audioQuery + "#" - } else { - rawQuery += "audio#" - } - } - if query.Has("microphone") { - if micQuery := query.Get("microphone"); micQuery != "" { - rawQuery += "microphone=" + micQuery + "#" - } else { - rawQuery += "microphone#" - } - } - if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil { - log.Error().Err(err).Str("src", src).Msg("Failed to patch config for PUT") - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + case "DELETE": + if err := DelPreload(stream); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } - Preload(src, rawQuery) - - case "DELETE": - if cons, ok := preloads[src]; ok { - if stream := Get(src); stream != nil { - stream.RemoveConsumer(cons) - } else { - cons.Stop() - } - - delete(preloads, src) - } - if err := app.PatchConfig([]string{"preload", src}, nil); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusInternalServerError) } default: diff --git a/internal/streams/preload.go b/internal/streams/preload.go index 7314df55..527746ac 100644 --- a/internal/streams/preload.go +++ b/internal/streams/preload.go @@ -1,34 +1,58 @@ package streams import ( + "errors" "net/url" + "sync" - "github.com/AlexxIT/go2rtc/pkg/preload" + "github.com/AlexxIT/go2rtc/pkg/probe" ) -var preloads = map[string]*preload.Preload{} +var preloads = map[*Stream]*probe.Probe{} +var preloadsMu sync.Mutex -func (s *Stream) Preload(name string, query url.Values) error { - cons := preload.NewPreload(name, query) - preloads[name] = cons +func Preload(stream *Stream, rawQuery string) { + if err := AddPreload(stream, rawQuery); err != nil { + log.Error().Err(err).Caller().Send() + } +} - if err := s.AddConsumer(cons); err != nil { +func AddPreload(stream *Stream, rawQuery string) error { + if rawQuery == "" { + rawQuery = "video&audio" + } + + query, err := url.ParseQuery(rawQuery) + if err != nil { return err } + preloadsMu.Lock() + defer preloadsMu.Unlock() + + if cons := preloads[stream]; cons != nil { + stream.RemoveConsumer(cons) + } + + cons := probe.Create("preload", query) + + if err = stream.AddConsumer(cons); err != nil { + return err + } + + preloads[stream] = cons return nil } -func Preload(src string, rawQuery string) { - // skip if exists - if _, ok := preloads[src]; ok { - return +func DelPreload(stream *Stream) error { + preloadsMu.Lock() + defer preloadsMu.Unlock() + + if cons := preloads[stream]; cons != nil { + stream.RemoveConsumer(cons) + delete(preloads, stream) + return nil } - if stream := Get(src); stream != nil { - query := ParseQuery(rawQuery) - if err := stream.Preload(src, query); err != nil { - log.Error().Err(err).Caller().Send() - } - } + return errors.New("streams: preload not found") } diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 8f07ea12..a0b1ed68 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -36,17 +36,15 @@ func Init() { } time.AfterFunc(time.Second, func() { - if cfg.Publish != nil { - for name, dst := range cfg.Publish { - if stream := Get(name); stream != nil { - Publish(stream, dst) - } + // range for nil map is OK + for name, dst := range cfg.Publish { + if stream := Get(name); stream != nil { + Publish(stream, dst) } } - - if cfg.Preload != nil { - for name, rawQuery := range cfg.Preload { - Preload(name, rawQuery) + for name, rawQuery := range cfg.Preload { + if stream := Get(name); stream != nil { + Preload(stream, rawQuery) } } }) diff --git a/pkg/preload/consumer.go b/pkg/preload/consumer.go deleted file mode 100644 index 4d3735a8..00000000 --- a/pkg/preload/consumer.go +++ /dev/null @@ -1,82 +0,0 @@ -package preload - -import ( - "net/url" - "strings" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/pion/rtp" -) - -type Preload struct { - core.Connection - closed core.Waiter -} - -func NewPreload(name string, query url.Values) *Preload { - medias := core.ParseQuery(query) - - for _, value := range query["microphone"] { - media := &core.Media{Kind: core.KindAudio, Direction: core.DirectionRecvonly} - - for _, name := range strings.Split(value, ",") { - name = strings.ToUpper(name) - switch name { - case "", "COPY": - name = core.CodecAny - } - media.Codecs = append(media.Codecs, &core.Codec{Name: name}) - } - - medias = append(medias, media) - } - - if len(medias) == 0 { - medias = []*core.Media{ - { - Kind: core.KindVideo, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{{Name: core.CodecAny}}, - }, - { - Kind: core.KindAudio, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{{Name: core.CodecAny}}, - }, - } - } - - return &Preload{ - Connection: core.Connection{ - ID: core.NewID(), - FormatName: "preload", - Medias: medias, - }, - } -} - -func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - sender := core.NewSender(media, track.Codec) - sender.Handler = func(pkt *rtp.Packet) { - p.Send += pkt.MarshalSize() - } - sender.HandleRTP(track) - p.Senders = append(p.Senders, sender) - return nil -} - -func (p *Preload) Start() error { - p.closed.Wait() - return nil -} - -func (p *Preload) Stop() error { - for _, receiver := range p.Receivers { - receiver.Close() - } - for _, sender := range p.Senders { - sender.Close() - } - p.closed.Done(nil) - return nil -} diff --git a/pkg/probe/producer.go b/pkg/probe/consumer.go similarity index 60% rename from pkg/probe/producer.go rename to pkg/probe/consumer.go index 1fbd3efb..c6aa4478 100644 --- a/pkg/probe/producer.go +++ b/pkg/probe/consumer.go @@ -11,7 +11,7 @@ type Probe struct { core.Connection } -func NewProbe(query url.Values) *Probe { +func Create(name string, query url.Values) *Probe { medias := core.ParseQuery(query) for _, value := range query["microphone"] { @@ -32,39 +32,18 @@ func NewProbe(query url.Values) *Probe { return &Probe{ Connection: core.Connection{ ID: core.NewID(), - FormatName: "probe", + FormatName: name, Medias: medias, }, } } -func (p *Probe) GetMedias() []*core.Media { - return p.Medias -} - func (p *Probe) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { sender := core.NewSender(media, track.Codec) - sender.Bind(track) + sender.Handler = func(pkt *core.Packet) { + p.Send += len(pkt.Payload) + } + sender.HandleRTP(track) p.Senders = append(p.Senders, sender) return nil } - -func (p *Probe) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - receiver := core.NewReceiver(media, codec) - p.Receivers = append(p.Receivers, receiver) - return receiver, nil -} - -func (p *Probe) Start() error { - return nil -} - -func (p *Probe) Stop() error { - for _, receiver := range p.Receivers { - receiver.Close() - } - for _, sender := range p.Senders { - sender.Close() - } - return nil -} From 4dd1f73a189865051c49d73cc6dbbc28b0185018 Mon Sep 17 00:00:00 2001 From: Alex X Date: Wed, 1 Oct 2025 17:03:32 +0300 Subject: [PATCH 12/12] Update readme for #1762 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index dfbe79b0..5b3754c9 100644 --- a/README.md +++ b/README.md @@ -844,7 +844,7 @@ You can preload any stream on go2rtc start. This is useful for cameras that take preload: camera1: # default: video&audio = ANY camera2: "video" # preload only video track - camera3: "video=h264#audio=opus" # initialize transcoding pipeline + camera3: "video=h264&audio=opus" # preload H264 video and OPUS audio streams: camera1: