Add errors output to streams API

This commit is contained in:
Alex X
2025-11-16 18:20:53 +03:00
parent 1fe602679e
commit ac80f1470e
8 changed files with 33 additions and 28 deletions
+2 -2
View File
@@ -30,10 +30,10 @@ func apiStream(w http.ResponseWriter, r *http.Request) {
// 1. link to go2rtc stream: rtsp://...:8554/{stream_name} // 1. link to go2rtc stream: rtsp://...:8554/{stream_name}
// 2. static link to Hass camera // 2. static link to Hass camera
// 3. dynamic link to Hass camera // 3. dynamic link to Hass camera
if streams.Patch(v.Name, v.Channels.First.Url) != nil { if _, err := streams.Patch(v.Name, v.Channels.First.Url); err == nil {
apiOK(w, r) apiOK(w, r)
} else { } else {
http.Error(w, "", http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
} }
// /stream/{id}/channel/0/webrtc // /stream/{id}/channel/0/webrtc
+1 -1
View File
@@ -11,7 +11,7 @@ import (
) )
func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error { func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error {
stream := streams.GetOrPatch(tr.Request.URL.Query()) stream, _ := streams.GetOrPatch(tr.Request.URL.Query())
if stream == nil { if stream == nil {
return errors.New(api.StreamNotFound) return errors.New(api.StreamNotFound)
} }
+2 -2
View File
@@ -36,7 +36,7 @@ func Init() {
var log zerolog.Logger var log zerolog.Logger
func handlerKeyframe(w http.ResponseWriter, r *http.Request) { func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
stream := streams.GetOrPatch(r.URL.Query()) stream, _ := streams.GetOrPatch(r.URL.Query())
if stream == nil { if stream == nil {
http.Error(w, api.StreamNotFound, http.StatusNotFound) http.Error(w, api.StreamNotFound, http.StatusNotFound)
return return
@@ -145,7 +145,7 @@ func inputMjpeg(w http.ResponseWriter, r *http.Request) {
} }
func handlerWS(tr *ws.Transport, _ *ws.Message) error { func handlerWS(tr *ws.Transport, _ *ws.Message) error {
stream := streams.GetOrPatch(tr.Request.URL.Query()) stream, _ := streams.GetOrPatch(tr.Request.URL.Query())
if stream == nil { if stream == nil {
return errors.New(api.StreamNotFound) return errors.New(api.StreamNotFound)
} }
+1 -1
View File
@@ -91,7 +91,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
return return
} }
stream := streams.GetOrPatch(query) stream, _ := streams.GetOrPatch(query)
if stream == nil { if stream == nil {
http.Error(w, api.StreamNotFound, http.StatusNotFound) http.Error(w, api.StreamNotFound, http.StatusNotFound)
return return
+2 -2
View File
@@ -11,7 +11,7 @@ import (
) )
func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error { func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
stream := streams.GetOrPatch(tr.Request.URL.Query()) stream, _ := streams.GetOrPatch(tr.Request.URL.Query())
if stream == nil { if stream == nil {
return errors.New(api.StreamNotFound) return errors.New(api.StreamNotFound)
} }
@@ -43,7 +43,7 @@ func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
} }
func handlerWSMP4(tr *ws.Transport, msg *ws.Message) error { func handlerWSMP4(tr *ws.Transport, msg *ws.Message) error {
stream := streams.GetOrPatch(tr.Request.URL.Query()) stream, _ := streams.GetOrPatch(tr.Request.URL.Query())
if stream == nil { if stream == nil {
return errors.New(api.StreamNotFound) return errors.New(api.StreamNotFound)
} }
+4 -4
View File
@@ -52,8 +52,8 @@ func apiStreams(w http.ResponseWriter, r *http.Request) {
name = src name = src
} }
if New(name, query["src"]...) == nil { if _, err := New(name, query["src"]...); err != nil {
http.Error(w, "", http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
@@ -69,8 +69,8 @@ func apiStreams(w http.ResponseWriter, r *http.Request) {
} }
// support {input} templates: https://github.com/AlexxIT/go2rtc#module-hass // support {input} templates: https://github.com/AlexxIT/go2rtc#module-hass
if Patch(name, src) == nil { if _, err := Patch(name, src); err != nil {
http.Error(w, "", http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
} }
case "POST": case "POST":
+20 -15
View File
@@ -1,6 +1,7 @@
package streams package streams
import ( import (
"errors"
"net/url" "net/url"
"sync" "sync"
"time" "time"
@@ -48,10 +49,14 @@ func Init() {
}) })
} }
func New(name string, sources ...string) *Stream { func New(name string, sources ...string) (*Stream, error) {
for _, source := range sources { for _, source := range sources {
if Validate(source) != nil { if !HasProducer(source) {
return nil return nil, errors.New("streams: source not supported")
}
if err := Validate(source); err != nil {
return nil, err
} }
} }
@@ -61,10 +66,10 @@ func New(name string, sources ...string) *Stream {
streams[name] = stream streams[name] = stream
streamsMu.Unlock() streamsMu.Unlock()
return stream return stream, nil
} }
func Patch(name string, source string) *Stream { func Patch(name string, source string) (*Stream, error) {
streamsMu.Lock() streamsMu.Lock()
defer streamsMu.Unlock() defer streamsMu.Unlock()
@@ -76,7 +81,7 @@ func Patch(name string, source string) *Stream {
// link (alias) streams[name] to streams[rtspName] // link (alias) streams[name] to streams[rtspName]
streams[name] = stream streams[name] = stream
} }
return stream return stream, nil
} }
} }
@@ -85,40 +90,40 @@ func Patch(name string, source string) *Stream {
// link (alias) streams[name] to streams[source] // link (alias) streams[name] to streams[source]
streams[name] = stream streams[name] = stream
} }
return stream return stream, nil
} }
// check if src has supported scheme // check if src has supported scheme
if !HasProducer(source) { if !HasProducer(source) {
return nil return nil, errors.New("streams: source not supported")
} }
if Validate(source) != nil { if err := Validate(source); err != nil {
return nil return nil, err
} }
// check an existing stream with this name // check an existing stream with this name
if stream, ok := streams[name]; ok { if stream, ok := streams[name]; ok {
stream.SetSource(source) stream.SetSource(source)
return stream return stream, nil
} }
// create new stream with this name // create new stream with this name
stream := NewStream(source) stream := NewStream(source)
streams[name] = stream streams[name] = stream
return stream return stream, nil
} }
func GetOrPatch(query url.Values) *Stream { func GetOrPatch(query url.Values) (*Stream, error) {
// check if src param exists // check if src param exists
source := query.Get("src") source := query.Get("src")
if source == "" { if source == "" {
return nil return nil, errors.New("streams: source empty")
} }
// check if src is stream name // check if src is stream name
if stream := Get(source); stream != nil { if stream := Get(source); stream != nil {
return stream return stream, nil
} }
// check if name param provided // check if name param provided
+1 -1
View File
@@ -95,7 +95,7 @@ func asyncHandler(tr *ws.Transport, msg *ws.Message) (err error) {
query := tr.Request.URL.Query() query := tr.Request.URL.Query()
if name := query.Get("src"); name != "" { if name := query.Get("src"); name != "" {
stream = streams.GetOrPatch(query) stream, _ = streams.GetOrPatch(query)
mode = core.ModePassiveConsumer mode = core.ModePassiveConsumer
log.Debug().Str("src", name).Msg("[webrtc] new consumer") log.Debug().Str("src", name).Msg("[webrtc] new consumer")
} else if name = query.Get("dst"); name != "" { } else if name = query.Get("dst"); name != "" {