diff --git a/cmd/api/api.go b/cmd/api/api.go index 322d23c4..83761fb8 100644 --- a/cmd/api/api.go +++ b/cmd/api/api.go @@ -76,13 +76,8 @@ func HandleFunc(pattern string, handler http.HandlerFunc) { http.HandleFunc(pattern, handler) } -func HandleWS(msgType string, handler WSHandler) { - wsHandlers[msgType] = handler -} - var basePath string var log zerolog.Logger -var wsHandlers = make(map[string]WSHandler) func middlewareLog(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/cmd/api/ws.go b/cmd/api/ws.go index 68224d25..ab42c537 100644 --- a/cmd/api/ws.go +++ b/cmd/api/ws.go @@ -1,7 +1,6 @@ package api import ( - "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/gorilla/websocket" "net/http" "net/url" @@ -9,6 +8,20 @@ import ( "sync" ) +// Message - struct for data exchange in Web API +type Message struct { + Type string `json:"type"` + Value interface{} `json:"value,omitempty"` +} + +type WSHandler func(tr *Transport, msg *Message) + +func HandleWS(msgType string, handler WSHandler) { + wsHandlers[msgType] = handler +} + +var wsHandlers = make(map[string]WSHandler) + func initWS(origin string) { wsUp = &websocket.Upgrader{ ReadBufferSize: 1024, @@ -63,7 +76,7 @@ func apiWS(w http.ResponseWriter, r *http.Request) { }) for { - msg := new(streamer.Message) + msg := new(Message) if err = ws.ReadJSON(msg); err != nil { log.Trace().Err(err).Caller().Send() _ = ws.Close() @@ -80,8 +93,6 @@ func apiWS(w http.ResponseWriter, r *http.Request) { var wsUp *websocket.Upgrader -type WSHandler func(tr *Transport, msg *streamer.Message) - type Transport struct { Request *http.Request Consumer interface{} // TODO: rewrite @@ -115,9 +126,7 @@ func (t *Transport) Close() { } func (t *Transport) Error(err error) { - t.Write(&streamer.Message{ - Type: "error", Value: err.Error(), - }) + t.Write(&Message{Type: "error", Value: err.Error()}) } func (t *Transport) OnChange(f func()) { diff --git a/cmd/mjpeg/mjpeg.go b/cmd/mjpeg/mjpeg.go index 2219df5f..db6eff16 100644 --- a/cmd/mjpeg/mjpeg.go +++ b/cmd/mjpeg/mjpeg.go @@ -4,7 +4,6 @@ import ( "github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/pkg/mjpeg" - "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/rs/zerolog/log" "net/http" "strconv" @@ -100,8 +99,8 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { //log.Trace().Msg("[api.mjpeg] close") } -func handlerWS(ctx *api.Transport, msg *streamer.Message) { - src := ctx.Request.URL.Query().Get("src") +func handlerWS(tr *api.Transport, msg *api.Message) { + src := tr.Request.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { return @@ -110,7 +109,7 @@ func handlerWS(ctx *api.Transport, msg *streamer.Message) { cons := &mjpeg.Consumer{} cons.Listen(func(msg interface{}) { if data, ok := msg.([]byte); ok { - ctx.Write(data) + tr.Write(data) } }) @@ -119,7 +118,7 @@ func handlerWS(ctx *api.Transport, msg *streamer.Message) { return } - ctx.OnClose(func() { + tr.OnClose(func() { stream.RemoveConsumer(cons) }) } diff --git a/cmd/mp4/ws.go b/cmd/mp4/ws.go index 6b4c891d..0c84eede 100644 --- a/cmd/mp4/ws.go +++ b/cmd/mp4/ws.go @@ -10,16 +10,16 @@ import ( const packetSize = 8192 -func handlerWS(ctx *api.Transport, msg *streamer.Message) { - src := ctx.Request.URL.Query().Get("src") +func handlerWS(tr *api.Transport, msg *api.Message) { + src := tr.Request.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { return } cons := &mp4.Consumer{} - cons.UserAgent = ctx.Request.UserAgent() - cons.RemoteAddr = ctx.Request.RemoteAddr + cons.UserAgent = tr.Request.UserAgent() + cons.RemoteAddr = tr.Request.RemoteAddr if codecs, ok := msg.Value.(string); ok { cons.Medias = parseMedias(codecs, true) @@ -28,39 +28,39 @@ func handlerWS(ctx *api.Transport, msg *streamer.Message) { cons.Listen(func(msg interface{}) { if data, ok := msg.([]byte); ok { for len(data) > packetSize { - ctx.Write(data[:packetSize]) + tr.Write(data[:packetSize]) data = data[packetSize:] } - ctx.Write(data) + tr.Write(data) } }) if err := stream.AddConsumer(cons); err != nil { log.Warn().Err(err).Caller().Send() - ctx.Error(err) + tr.Error(err) return } - ctx.OnClose(func() { + tr.OnClose(func() { stream.RemoveConsumer(cons) }) - ctx.Write(&streamer.Message{Type: "mse", Value: cons.MimeType()}) + tr.Write(&api.Message{Type: "mse", Value: cons.MimeType()}) data, err := cons.Init() if err != nil { log.Warn().Err(err).Caller().Send() - ctx.Error(err) + tr.Error(err) return } - ctx.Write(data) + tr.Write(data) cons.Start() } -func handlerWS4(ctx *api.Transport, msg *streamer.Message) { - src := ctx.Request.URL.Query().Get("src") +func handlerWS4(tr *api.Transport, msg *api.Message) { + src := tr.Request.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { return @@ -74,7 +74,7 @@ func handlerWS4(ctx *api.Transport, msg *streamer.Message) { cons.Listen(func(msg interface{}) { if data, ok := msg.([]byte); ok { - ctx.Write(data) + tr.Write(data) } }) @@ -83,7 +83,7 @@ func handlerWS4(ctx *api.Transport, msg *streamer.Message) { return } - ctx.OnClose(func() { + tr.OnClose(func() { stream.RemoveConsumer(cons) }) } diff --git a/cmd/webrtc/candidates.go b/cmd/webrtc/candidates.go index bc0e6fd4..9a711cdc 100644 --- a/cmd/webrtc/candidates.go +++ b/cmd/webrtc/candidates.go @@ -2,7 +2,6 @@ package webrtc import ( "github.com/AlexxIT/go2rtc/cmd/api" - "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/pion/sdp/v3" ) @@ -13,7 +12,7 @@ func AddCandidate(address string) { candidates = append(candidates, address) } -func asyncCandidates(ctx *api.Transport) { +func asyncCandidates(tr *api.Transport) { for _, address := range candidates { address, err := webrtc.LookupIP(address) if err != nil { @@ -29,7 +28,7 @@ func asyncCandidates(ctx *api.Transport) { log.Trace().Str("candidate", cand).Msg("[webrtc] config") - ctx.Write(&streamer.Message{Type: "webrtc/candidate", Value: cand}) + tr.Write(&api.Message{Type: "webrtc/candidate", Value: cand}) } } @@ -79,11 +78,11 @@ func syncCanditates(answer string) (string, error) { return string(data), nil } -func candidateHandler(ctx *api.Transport, msg *streamer.Message) { - if ctx.Consumer == nil { +func candidateHandler(tr *api.Transport, msg *api.Message) { + if tr.Consumer == nil { return } - if conn := ctx.Consumer.(*webrtc.Conn); conn != nil { + if conn := tr.Consumer.(*webrtc.Conn); conn != nil { s := msg.Value.(string) log.Trace().Str("candidate", s).Msg("[webrtc] remote") conn.AddCandidate(s) diff --git a/cmd/webrtc/webrtc.go b/cmd/webrtc/webrtc.go index 524d962a..f975b839 100644 --- a/cmd/webrtc/webrtc.go +++ b/cmd/webrtc/webrtc.go @@ -4,7 +4,6 @@ import ( "github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/app" "github.com/AlexxIT/go2rtc/cmd/streams" - "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/webrtc" pion "github.com/pion/webrtc/v3" "github.com/rs/zerolog" @@ -66,7 +65,7 @@ var log zerolog.Logger var NewPConn func() (*pion.PeerConnection, error) -func asyncHandler(tr *api.Transport, msg *streamer.Message) { +func asyncHandler(tr *api.Transport, msg *api.Message) { src := tr.Request.URL.Query().Get("src") stream := streams.Get(src) if stream == nil { @@ -96,7 +95,7 @@ func asyncHandler(tr *api.Transport, msg *streamer.Message) { if msg != nil { s := msg.ToJSON().Candidate log.Trace().Str("candidate", s).Msg("[webrtc] local") - tr.Write(&streamer.Message{Type: "webrtc/candidate", Value: s}) + tr.Write(&api.Message{Type: "webrtc/candidate", Value: s}) } } }) @@ -133,7 +132,7 @@ func asyncHandler(tr *api.Transport, msg *streamer.Message) { tr.Consumer = conn - tr.Write(&streamer.Message{Type: "webrtc/answer", Value: answer}) + tr.Write(&api.Message{Type: "webrtc/answer", Value: answer}) asyncCandidates(tr) } diff --git a/pkg/streamer/helpers.go b/pkg/streamer/helpers.go index 7e6e7cf7..e1cf3e70 100644 --- a/pkg/streamer/helpers.go +++ b/pkg/streamer/helpers.go @@ -12,14 +12,6 @@ const ( JSONSend = "send" ) -// Message - struct for data exchange in Web API -type Message struct { - Type string `json:"type"` - Value interface{} `json:"value,omitempty"` -} - -// other - func Between(s, sub1, sub2 string) string { i := strings.Index(s, sub1) if i < 0 {