diff --git a/cmd/api/api.go b/cmd/api/api.go index 83761fb8..e5d29f34 100644 --- a/cmd/api/api.go +++ b/cmd/api/api.go @@ -76,6 +76,8 @@ func HandleFunc(pattern string, handler http.HandlerFunc) { http.HandleFunc(pattern, handler) } +const StreamNotFound = "stream not found" + var basePath string var log zerolog.Logger diff --git a/cmd/api/ws.go b/cmd/api/ws.go index ab42c537..f515eb82 100644 --- a/cmd/api/ws.go +++ b/cmd/api/ws.go @@ -14,7 +14,7 @@ type Message struct { Value interface{} `json:"value,omitempty"` } -type WSHandler func(tr *Transport, msg *Message) +type WSHandler func(tr *Transport, msg *Message) error func HandleWS(msgType string, handler WSHandler) { wsHandlers[msgType] = handler @@ -84,7 +84,11 @@ func apiWS(w http.ResponseWriter, r *http.Request) { } if handler := wsHandlers[msg.Type]; handler != nil { - handler(tr, msg) + go func() { + if err = handler(tr, msg); err != nil { + tr.Write(&Message{Type: "error", Value: msg.Type + ": " + err.Error()}) + } + }() } } @@ -125,10 +129,6 @@ func (t *Transport) Close() { } } -func (t *Transport) Error(err error) { - t.Write(&Message{Type: "error", Value: err.Error()}) -} - func (t *Transport) OnChange(f func()) { t.onChange = f } diff --git a/cmd/mjpeg/mjpeg.go b/cmd/mjpeg/mjpeg.go index db6eff16..5c16999e 100644 --- a/cmd/mjpeg/mjpeg.go +++ b/cmd/mjpeg/mjpeg.go @@ -1,6 +1,7 @@ package mjpeg import ( + "errors" "github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/pkg/mjpeg" @@ -20,6 +21,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { src := r.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { + http.Error(w, api.StreamNotFound, http.StatusNotFound) return } @@ -60,6 +62,7 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { src := r.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { + http.Error(w, api.StreamNotFound, http.StatusNotFound) return } @@ -99,11 +102,11 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { //log.Trace().Msg("[api.mjpeg] close") } -func handlerWS(tr *api.Transport, msg *api.Message) { +func handlerWS(tr *api.Transport, _ *api.Message) error { src := tr.Request.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { - return + return errors.New(api.StreamNotFound) } cons := &mjpeg.Consumer{} @@ -115,10 +118,12 @@ func handlerWS(tr *api.Transport, msg *api.Message) { if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Caller().Send() - return + return err } tr.OnClose(func() { stream.RemoveConsumer(cons) }) + + return nil } diff --git a/cmd/mp4/mp4.go b/cmd/mp4/mp4.go index 5779d0ff..47aeeb82 100644 --- a/cmd/mp4/mp4.go +++ b/cmd/mp4/mp4.go @@ -32,6 +32,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { src := r.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { + http.Error(w, api.StreamNotFound, http.StatusNotFound) return } @@ -73,6 +74,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) { src := r.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { + http.Error(w, api.StreamNotFound, http.StatusNotFound) return } diff --git a/cmd/mp4/ws.go b/cmd/mp4/ws.go index 0c84eede..56b2965b 100644 --- a/cmd/mp4/ws.go +++ b/cmd/mp4/ws.go @@ -1,6 +1,7 @@ package mp4 import ( + "errors" "github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/pkg/mp4" @@ -10,11 +11,11 @@ import ( const packetSize = 8192 -func handlerWS(tr *api.Transport, msg *api.Message) { +func handlerWS(tr *api.Transport, msg *api.Message) error { src := tr.Request.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { - return + return errors.New(api.StreamNotFound) } cons := &mp4.Consumer{} @@ -37,8 +38,7 @@ func handlerWS(tr *api.Transport, msg *api.Message) { if err := stream.AddConsumer(cons); err != nil { log.Warn().Err(err).Caller().Send() - tr.Error(err) - return + return err } tr.OnClose(func() { @@ -50,20 +50,21 @@ func handlerWS(tr *api.Transport, msg *api.Message) { data, err := cons.Init() if err != nil { log.Warn().Err(err).Caller().Send() - tr.Error(err) - return + return err } tr.Write(data) cons.Start() + + return nil } -func handlerWS4(tr *api.Transport, msg *api.Message) { +func handlerWS4(tr *api.Transport, msg *api.Message) error { src := tr.Request.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { - return + return errors.New(api.StreamNotFound) } cons := &mp4.Segment{} @@ -80,12 +81,14 @@ func handlerWS4(tr *api.Transport, msg *api.Message) { if err := stream.AddConsumer(cons); err != nil { log.Error().Err(err).Caller().Send() - return + return err } tr.OnClose(func() { stream.RemoveConsumer(cons) }) + + return nil } func parseMedias(codecs string, parseAudio bool) (medias []*streamer.Media) { diff --git a/cmd/webrtc/candidates.go b/cmd/webrtc/candidates.go index 9a711cdc..677c0a15 100644 --- a/cmd/webrtc/candidates.go +++ b/cmd/webrtc/candidates.go @@ -78,13 +78,14 @@ func syncCanditates(answer string) (string, error) { return string(data), nil } -func candidateHandler(tr *api.Transport, msg *api.Message) { +func candidateHandler(tr *api.Transport, msg *api.Message) error { if tr.Consumer == nil { - return + return nil } if conn := tr.Consumer.(*webrtc.Conn); conn != nil { s := msg.Value.(string) log.Trace().Str("candidate", s).Msg("[webrtc] remote") conn.AddCandidate(s) } + return nil } diff --git a/cmd/webrtc/webrtc.go b/cmd/webrtc/webrtc.go index f975b839..394460de 100644 --- a/cmd/webrtc/webrtc.go +++ b/cmd/webrtc/webrtc.go @@ -1,6 +1,7 @@ package webrtc import ( + "errors" "github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/app" "github.com/AlexxIT/go2rtc/cmd/streams" @@ -65,11 +66,11 @@ var log zerolog.Logger var NewPConn func() (*pion.PeerConnection, error) -func asyncHandler(tr *api.Transport, msg *api.Message) { +func asyncHandler(tr *api.Transport, msg *api.Message) error { src := tr.Request.URL.Query().Get("src") stream := streams.Get(src) if stream == nil { - return + return errors.New(api.StreamNotFound) } log.Debug().Str("url", src).Msg("[webrtc] new consumer") @@ -80,8 +81,8 @@ func asyncHandler(tr *api.Transport, msg *api.Message) { conn := new(webrtc.Conn) conn.Conn, err = NewPConn() if err != nil { - log.Error().Err(err).Caller().Msg("NewPConn") - return + log.Error().Err(err).Caller().Send() + return err } conn.UserAgent = tr.Request.UserAgent() @@ -105,17 +106,15 @@ func asyncHandler(tr *api.Transport, msg *api.Message) { log.Trace().Msgf("[webrtc] offer:\n%s", offer) if err = conn.SetOffer(offer); err != nil { - log.Warn().Err(err).Caller().Msg("conn.SetOffer") - tr.Error(err) - return + log.Warn().Err(err).Caller().Send() + return err } // 2. AddConsumer, so we get new tracks if err = stream.AddConsumer(conn); err != nil { - log.Warn().Err(err).Caller().Msg("stream.AddConsumer") + log.Warn().Err(err).Caller().Send() _ = conn.Conn.Close() - tr.Error(err) - return + return err } conn.Init() @@ -125,9 +124,8 @@ func asyncHandler(tr *api.Transport, msg *api.Message) { log.Trace().Msgf("[webrtc] answer\n%s", answer) if err != nil { - log.Error().Err(err).Caller().Msg("conn.GetAnswer") - tr.Error(err) - return + log.Error().Err(err).Caller().Send() + return err } tr.Consumer = conn @@ -135,6 +133,8 @@ func asyncHandler(tr *api.Transport, msg *api.Message) { tr.Write(&api.Message{Type: "webrtc/answer", Value: answer}) asyncCandidates(tr) + + return nil } func syncHandler(w http.ResponseWriter, r *http.Request) {