diff --git a/internal/streams/preload.go b/internal/streams/preload.go new file mode 100644 index 00000000..c811cc5c --- /dev/null +++ b/internal/streams/preload.go @@ -0,0 +1,30 @@ +package streams + +import ( + "net/url" + "strings" + + "github.com/AlexxIT/go2rtc/pkg/preload" +) + +func (s *Stream) Preload(query url.Values) error { + cons := preload.NewPreload(query) + + if err := s.AddConsumer(cons); err != nil { + return err + } + + return nil +} + +func Preload(src string) { + name, rawQuery, _ := strings.Cut(src, "#") + query := ParseQuery(rawQuery) + + if stream := Get(name); stream != nil { + if err := stream.Preload(query); err != nil { + log.Error().Err(err).Caller().Send() + } + return + } +} diff --git a/internal/streams/streams.go b/internal/streams/streams.go index dcbaba28..7bbccace 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -16,6 +16,7 @@ func Init() { var cfg struct { Streams map[string]any `yaml:"streams"` Publish map[string]any `yaml:"publish"` + Preload []string `yaml:"preload"` } app.LoadConfig(&cfg) @@ -29,14 +30,22 @@ func Init() { api.HandleFunc("api/streams", apiStreams) api.HandleFunc("api/streams.dot", apiStreamsDOT) - if cfg.Publish == nil { + if cfg.Publish == nil && cfg.Preload == nil { return } time.AfterFunc(time.Second, func() { - for name, dst := range cfg.Publish { - if stream := Get(name); stream != nil { - Publish(stream, dst) + if cfg.Publish != nil { + for name, dst := range cfg.Publish { + if stream := Get(name); stream != nil { + Publish(stream, dst) + } + } + } + + if cfg.Preload != nil { + for _, src := range cfg.Preload { + Preload(src) } } }) diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go new file mode 100644 index 00000000..811cf2e4 --- /dev/null +++ b/pkg/preload/producer.go @@ -0,0 +1,92 @@ +package preload + +import ( + "net/url" + "strings" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +type Preload struct { + core.Connection + + Closed core.Waiter +} + +func NewPreload(query url.Values) *Preload { + medias := core.ParseQuery(query) + + for _, value := range query["microphone"] { + media := &core.Media{Kind: core.KindAudio, Direction: core.DirectionRecvonly} + + for _, name := range strings.Split(value, ",") { + name = strings.ToUpper(name) + switch name { + case "", "COPY": + name = core.CodecAny + } + media.Codecs = append(media.Codecs, &core.Codec{Name: name}) + } + + medias = append(medias, media) + } + + if len(medias) == 0 { + medias = []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{{Name: core.CodecAny}}, + }, + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{{Name: core.CodecAny}}, + }, + } + } + + return &Preload{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "preload", + Medias: medias, + Protocol: "native", + RemoteAddr: "localhost", + UserAgent: "go2rtc", + }, + } +} + +func (p *Preload) GetMedias() []*core.Media { + return p.Medias +} + +func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + sender := core.NewSender(media, track.Codec) + sender.Bind(track) + p.Senders = append(p.Senders, sender) + return nil +} + +func (p *Preload) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + receiver := core.NewReceiver(media, codec) + p.Receivers = append(p.Receivers, receiver) + return receiver, nil +} + +func (p *Preload) Start() error { + p.Closed.Wait() + return nil +} + +func (p *Preload) Stop() error { + for _, receiver := range p.Receivers { + receiver.Close() + } + for _, sender := range p.Senders { + sender.Close() + } + p.Closed.Done(nil) + return nil +}