Merge pull request #1407 from edenhaus/streams-api-multiple-sources

Extend streams API to allow multiple sources
This commit is contained in:
Alex X
2024-10-24 20:47:19 +03:00
committed by GitHub
3 changed files with 14 additions and 6 deletions
+2 -2
View File
@@ -48,12 +48,12 @@ func apiStreams(w http.ResponseWriter, r *http.Request) {
name = src name = src
} }
if New(name, src) == nil { if New(name, query["src"]...) == nil {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
if err := app.PatchConfig(name, src, "streams"); err != nil { if err := app.PatchConfig(name, query["src"], "streams"); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
} }
+6
View File
@@ -21,6 +21,12 @@ func NewStream(source any) *Stream {
return &Stream{ return &Stream{
producers: []*Producer{NewProducer(source)}, producers: []*Producer{NewProducer(source)},
} }
case []string:
s := new(Stream)
for _, str := range source {
s.producers = append(s.producers, NewProducer(str))
}
return s
case []any: case []any:
s := new(Stream) s := new(Stream)
for _, src := range source { for _, src := range source {
+6 -4
View File
@@ -56,12 +56,14 @@ func Validate(source string) error {
return nil return nil
} }
func New(name string, source string) *Stream { func New(name string, sources ...string) *Stream {
if Validate(source) != nil { for _, source := range sources {
return nil if Validate(source) != nil {
return nil
}
} }
stream := NewStream(source) stream := NewStream(sources)
streamsMu.Lock() streamsMu.Lock()
streams[name] = stream streams[name] = stream