From 209fe098067e13d13c6ab4086092843d37556a9c Mon Sep 17 00:00:00 2001 From: Alex X Date: Sun, 17 Sep 2023 20:29:28 +0300 Subject: [PATCH] Add active publish logic to streams --- internal/streams/handlers.go | 22 ++++++++++++++++++++++ internal/streams/publish.go | 19 +++++++++++++++++++ internal/streams/streams.go | 4 ++++ 3 files changed, 45 insertions(+) create mode 100644 internal/streams/publish.go diff --git a/internal/streams/handlers.go b/internal/streams/handlers.go index ecb76d7c..3009dd66 100644 --- a/internal/streams/handlers.go +++ b/internal/streams/handlers.go @@ -73,3 +73,25 @@ func Location(url string) (string, error) { return "", nil } + +// TODO: rework + +type ConsumerHandler func(url string) (core.Consumer, func(), error) + +var consumerHandlers = map[string]ConsumerHandler{} + +func HandleConsumerFunc(scheme string, handler ConsumerHandler) { + consumerHandlers[scheme] = handler +} + +func GetConsumer(url string) (core.Consumer, func(), error) { + if i := strings.IndexByte(url, ':'); i > 0 { + scheme := url[:i] + + if handler, ok := consumerHandlers[scheme]; ok { + return handler(url) + } + } + + return nil, nil, errors.New("streams: unsupported scheme: " + url) +} diff --git a/internal/streams/publish.go b/internal/streams/publish.go new file mode 100644 index 00000000..259ddb8d --- /dev/null +++ b/internal/streams/publish.go @@ -0,0 +1,19 @@ +package streams + +func (s *Stream) Publish(url string) error { + cons, run, err := GetConsumer(url) + if err != nil { + return err + } + + if err = s.AddConsumer(cons); err != nil { + return err + } + + go func() { + run() + s.RemoveConsumer(cons) + }() + + return nil +} diff --git a/internal/streams/streams.go b/internal/streams/streams.go index bc23bf54..14bc09c8 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -172,6 +172,10 @@ func streamsHandler(w http.ResponseWriter, r *http.Request) { } else { api.ResponseJSON(w, stream) } + } else if stream = Get(src); stream != nil { + if err := stream.Publish(dst); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } } else { http.Error(w, "", http.StatusNotFound) }