Adds errors output to API

This commit is contained in:
Alexey Khit
2022-12-05 20:03:26 +03:00
parent 7057b4846f
commit b965c191b7
7 changed files with 46 additions and 33 deletions
+2
View File
@@ -76,6 +76,8 @@ func HandleFunc(pattern string, handler http.HandlerFunc) {
http.HandleFunc(pattern, handler) http.HandleFunc(pattern, handler)
} }
const StreamNotFound = "stream not found"
var basePath string var basePath string
var log zerolog.Logger var log zerolog.Logger
+6 -6
View File
@@ -14,7 +14,7 @@ type Message struct {
Value interface{} `json:"value,omitempty"` Value interface{} `json:"value,omitempty"`
} }
type WSHandler func(tr *Transport, msg *Message) type WSHandler func(tr *Transport, msg *Message) error
func HandleWS(msgType string, handler WSHandler) { func HandleWS(msgType string, handler WSHandler) {
wsHandlers[msgType] = handler wsHandlers[msgType] = handler
@@ -84,7 +84,11 @@ func apiWS(w http.ResponseWriter, r *http.Request) {
} }
if handler := wsHandlers[msg.Type]; handler != nil { if handler := wsHandlers[msg.Type]; handler != nil {
handler(tr, msg) go func() {
if err = handler(tr, msg); err != nil {
tr.Write(&Message{Type: "error", Value: msg.Type + ": " + err.Error()})
}
}()
} }
} }
@@ -125,10 +129,6 @@ func (t *Transport) Close() {
} }
} }
func (t *Transport) Error(err error) {
t.Write(&Message{Type: "error", Value: err.Error()})
}
func (t *Transport) OnChange(f func()) { func (t *Transport) OnChange(f func()) {
t.onChange = f t.onChange = f
} }
+8 -3
View File
@@ -1,6 +1,7 @@
package mjpeg package mjpeg
import ( import (
"errors"
"github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mjpeg" "github.com/AlexxIT/go2rtc/pkg/mjpeg"
@@ -20,6 +21,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src") src := r.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
http.Error(w, api.StreamNotFound, http.StatusNotFound)
return return
} }
@@ -60,6 +62,7 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src") src := r.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
http.Error(w, api.StreamNotFound, http.StatusNotFound)
return return
} }
@@ -99,11 +102,11 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
//log.Trace().Msg("[api.mjpeg] close") //log.Trace().Msg("[api.mjpeg] close")
} }
func handlerWS(tr *api.Transport, msg *api.Message) { func handlerWS(tr *api.Transport, _ *api.Message) error {
src := tr.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
return return errors.New(api.StreamNotFound)
} }
cons := &mjpeg.Consumer{} cons := &mjpeg.Consumer{}
@@ -115,10 +118,12 @@ func handlerWS(tr *api.Transport, msg *api.Message) {
if err := stream.AddConsumer(cons); err != nil { if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
return return err
} }
tr.OnClose(func() { tr.OnClose(func() {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
}) })
return nil
} }
+2
View File
@@ -32,6 +32,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src") src := r.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
http.Error(w, api.StreamNotFound, http.StatusNotFound)
return return
} }
@@ -73,6 +74,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src") src := r.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
http.Error(w, api.StreamNotFound, http.StatusNotFound)
return return
} }
+12 -9
View File
@@ -1,6 +1,7 @@
package mp4 package mp4
import ( import (
"errors"
"github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mp4" "github.com/AlexxIT/go2rtc/pkg/mp4"
@@ -10,11 +11,11 @@ import (
const packetSize = 8192 const packetSize = 8192
func handlerWS(tr *api.Transport, msg *api.Message) { func handlerWS(tr *api.Transport, msg *api.Message) error {
src := tr.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
return return errors.New(api.StreamNotFound)
} }
cons := &mp4.Consumer{} cons := &mp4.Consumer{}
@@ -37,8 +38,7 @@ func handlerWS(tr *api.Transport, msg *api.Message) {
if err := stream.AddConsumer(cons); err != nil { if err := stream.AddConsumer(cons); err != nil {
log.Warn().Err(err).Caller().Send() log.Warn().Err(err).Caller().Send()
tr.Error(err) return err
return
} }
tr.OnClose(func() { tr.OnClose(func() {
@@ -50,20 +50,21 @@ func handlerWS(tr *api.Transport, msg *api.Message) {
data, err := cons.Init() data, err := cons.Init()
if err != nil { if err != nil {
log.Warn().Err(err).Caller().Send() log.Warn().Err(err).Caller().Send()
tr.Error(err) return err
return
} }
tr.Write(data) tr.Write(data)
cons.Start() cons.Start()
return nil
} }
func handlerWS4(tr *api.Transport, msg *api.Message) { func handlerWS4(tr *api.Transport, msg *api.Message) error {
src := tr.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
return return errors.New(api.StreamNotFound)
} }
cons := &mp4.Segment{} cons := &mp4.Segment{}
@@ -80,12 +81,14 @@ func handlerWS4(tr *api.Transport, msg *api.Message) {
if err := stream.AddConsumer(cons); err != nil { if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
return return err
} }
tr.OnClose(func() { tr.OnClose(func() {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
}) })
return nil
} }
func parseMedias(codecs string, parseAudio bool) (medias []*streamer.Media) { func parseMedias(codecs string, parseAudio bool) (medias []*streamer.Media) {
+3 -2
View File
@@ -78,13 +78,14 @@ func syncCanditates(answer string) (string, error) {
return string(data), nil return string(data), nil
} }
func candidateHandler(tr *api.Transport, msg *api.Message) { func candidateHandler(tr *api.Transport, msg *api.Message) error {
if tr.Consumer == nil { if tr.Consumer == nil {
return return nil
} }
if conn := tr.Consumer.(*webrtc.Conn); conn != nil { if conn := tr.Consumer.(*webrtc.Conn); conn != nil {
s := msg.Value.(string) s := msg.Value.(string)
log.Trace().Str("candidate", s).Msg("[webrtc] remote") log.Trace().Str("candidate", s).Msg("[webrtc] remote")
conn.AddCandidate(s) conn.AddCandidate(s)
} }
return nil
} }
+13 -13
View File
@@ -1,6 +1,7 @@
package webrtc package webrtc
import ( import (
"errors"
"github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/app" "github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/cmd/streams"
@@ -65,11 +66,11 @@ var log zerolog.Logger
var NewPConn func() (*pion.PeerConnection, error) var NewPConn func() (*pion.PeerConnection, error)
func asyncHandler(tr *api.Transport, msg *api.Message) { func asyncHandler(tr *api.Transport, msg *api.Message) error {
src := tr.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.Get(src) stream := streams.Get(src)
if stream == nil { if stream == nil {
return return errors.New(api.StreamNotFound)
} }
log.Debug().Str("url", src).Msg("[webrtc] new consumer") log.Debug().Str("url", src).Msg("[webrtc] new consumer")
@@ -80,8 +81,8 @@ func asyncHandler(tr *api.Transport, msg *api.Message) {
conn := new(webrtc.Conn) conn := new(webrtc.Conn)
conn.Conn, err = NewPConn() conn.Conn, err = NewPConn()
if err != nil { if err != nil {
log.Error().Err(err).Caller().Msg("NewPConn") log.Error().Err(err).Caller().Send()
return return err
} }
conn.UserAgent = tr.Request.UserAgent() conn.UserAgent = tr.Request.UserAgent()
@@ -105,17 +106,15 @@ func asyncHandler(tr *api.Transport, msg *api.Message) {
log.Trace().Msgf("[webrtc] offer:\n%s", offer) log.Trace().Msgf("[webrtc] offer:\n%s", offer)
if err = conn.SetOffer(offer); err != nil { if err = conn.SetOffer(offer); err != nil {
log.Warn().Err(err).Caller().Msg("conn.SetOffer") log.Warn().Err(err).Caller().Send()
tr.Error(err) return err
return
} }
// 2. AddConsumer, so we get new tracks // 2. AddConsumer, so we get new tracks
if err = stream.AddConsumer(conn); err != nil { if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Caller().Msg("stream.AddConsumer") log.Warn().Err(err).Caller().Send()
_ = conn.Conn.Close() _ = conn.Conn.Close()
tr.Error(err) return err
return
} }
conn.Init() conn.Init()
@@ -125,9 +124,8 @@ func asyncHandler(tr *api.Transport, msg *api.Message) {
log.Trace().Msgf("[webrtc] answer\n%s", answer) log.Trace().Msgf("[webrtc] answer\n%s", answer)
if err != nil { if err != nil {
log.Error().Err(err).Caller().Msg("conn.GetAnswer") log.Error().Err(err).Caller().Send()
tr.Error(err) return err
return
} }
tr.Consumer = conn tr.Consumer = conn
@@ -135,6 +133,8 @@ func asyncHandler(tr *api.Transport, msg *api.Message) {
tr.Write(&api.Message{Type: "webrtc/answer", Value: answer}) tr.Write(&api.Message{Type: "webrtc/answer", Value: answer})
asyncCandidates(tr) asyncCandidates(tr)
return nil
} }
func syncHandler(w http.ResponseWriter, r *http.Request) { func syncHandler(w http.ResponseWriter, r *http.Request) {