Code refactoring

This commit is contained in:
Alexey Khit
2022-12-05 00:20:36 +03:00
parent a746b96adc
commit 7057b4846f
7 changed files with 43 additions and 50 deletions
-5
View File
@@ -76,13 +76,8 @@ func HandleFunc(pattern string, handler http.HandlerFunc) {
http.HandleFunc(pattern, handler) http.HandleFunc(pattern, handler)
} }
func HandleWS(msgType string, handler WSHandler) {
wsHandlers[msgType] = handler
}
var basePath string var basePath string
var log zerolog.Logger var log zerolog.Logger
var wsHandlers = make(map[string]WSHandler)
func middlewareLog(next http.Handler) http.Handler { func middlewareLog(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+16 -7
View File
@@ -1,7 +1,6 @@
package api package api
import ( import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"net/http" "net/http"
"net/url" "net/url"
@@ -9,6 +8,20 @@ import (
"sync" "sync"
) )
// Message - struct for data exchange in Web API
type Message struct {
Type string `json:"type"`
Value interface{} `json:"value,omitempty"`
}
type WSHandler func(tr *Transport, msg *Message)
func HandleWS(msgType string, handler WSHandler) {
wsHandlers[msgType] = handler
}
var wsHandlers = make(map[string]WSHandler)
func initWS(origin string) { func initWS(origin string) {
wsUp = &websocket.Upgrader{ wsUp = &websocket.Upgrader{
ReadBufferSize: 1024, ReadBufferSize: 1024,
@@ -63,7 +76,7 @@ func apiWS(w http.ResponseWriter, r *http.Request) {
}) })
for { for {
msg := new(streamer.Message) msg := new(Message)
if err = ws.ReadJSON(msg); err != nil { if err = ws.ReadJSON(msg); err != nil {
log.Trace().Err(err).Caller().Send() log.Trace().Err(err).Caller().Send()
_ = ws.Close() _ = ws.Close()
@@ -80,8 +93,6 @@ func apiWS(w http.ResponseWriter, r *http.Request) {
var wsUp *websocket.Upgrader var wsUp *websocket.Upgrader
type WSHandler func(tr *Transport, msg *streamer.Message)
type Transport struct { type Transport struct {
Request *http.Request Request *http.Request
Consumer interface{} // TODO: rewrite Consumer interface{} // TODO: rewrite
@@ -115,9 +126,7 @@ func (t *Transport) Close() {
} }
func (t *Transport) Error(err error) { func (t *Transport) Error(err error) {
t.Write(&streamer.Message{ t.Write(&Message{Type: "error", Value: err.Error()})
Type: "error", Value: err.Error(),
})
} }
func (t *Transport) OnChange(f func()) { func (t *Transport) OnChange(f func()) {
+4 -5
View File
@@ -4,7 +4,6 @@ import (
"github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mjpeg" "github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"net/http" "net/http"
"strconv" "strconv"
@@ -100,8 +99,8 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
//log.Trace().Msg("[api.mjpeg] close") //log.Trace().Msg("[api.mjpeg] close")
} }
func handlerWS(ctx *api.Transport, msg *streamer.Message) { func handlerWS(tr *api.Transport, msg *api.Message) {
src := ctx.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
return return
@@ -110,7 +109,7 @@ func handlerWS(ctx *api.Transport, msg *streamer.Message) {
cons := &mjpeg.Consumer{} cons := &mjpeg.Consumer{}
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
ctx.Write(data) tr.Write(data)
} }
}) })
@@ -119,7 +118,7 @@ func handlerWS(ctx *api.Transport, msg *streamer.Message) {
return return
} }
ctx.OnClose(func() { tr.OnClose(func() {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
}) })
} }
+15 -15
View File
@@ -10,16 +10,16 @@ import (
const packetSize = 8192 const packetSize = 8192
func handlerWS(ctx *api.Transport, msg *streamer.Message) { func handlerWS(tr *api.Transport, msg *api.Message) {
src := ctx.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
return return
} }
cons := &mp4.Consumer{} cons := &mp4.Consumer{}
cons.UserAgent = ctx.Request.UserAgent() cons.UserAgent = tr.Request.UserAgent()
cons.RemoteAddr = ctx.Request.RemoteAddr cons.RemoteAddr = tr.Request.RemoteAddr
if codecs, ok := msg.Value.(string); ok { if codecs, ok := msg.Value.(string); ok {
cons.Medias = parseMedias(codecs, true) cons.Medias = parseMedias(codecs, true)
@@ -28,39 +28,39 @@ func handlerWS(ctx *api.Transport, msg *streamer.Message) {
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
for len(data) > packetSize { for len(data) > packetSize {
ctx.Write(data[:packetSize]) tr.Write(data[:packetSize])
data = data[packetSize:] data = data[packetSize:]
} }
ctx.Write(data) tr.Write(data)
} }
}) })
if err := stream.AddConsumer(cons); err != nil { if err := stream.AddConsumer(cons); err != nil {
log.Warn().Err(err).Caller().Send() log.Warn().Err(err).Caller().Send()
ctx.Error(err) tr.Error(err)
return return
} }
ctx.OnClose(func() { tr.OnClose(func() {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
}) })
ctx.Write(&streamer.Message{Type: "mse", Value: cons.MimeType()}) tr.Write(&api.Message{Type: "mse", Value: cons.MimeType()})
data, err := cons.Init() data, err := cons.Init()
if err != nil { if err != nil {
log.Warn().Err(err).Caller().Send() log.Warn().Err(err).Caller().Send()
ctx.Error(err) tr.Error(err)
return return
} }
ctx.Write(data) tr.Write(data)
cons.Start() cons.Start()
} }
func handlerWS4(ctx *api.Transport, msg *streamer.Message) { func handlerWS4(tr *api.Transport, msg *api.Message) {
src := ctx.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src) stream := streams.GetOrNew(src)
if stream == nil { if stream == nil {
return return
@@ -74,7 +74,7 @@ func handlerWS4(ctx *api.Transport, msg *streamer.Message) {
cons.Listen(func(msg interface{}) { cons.Listen(func(msg interface{}) {
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
ctx.Write(data) tr.Write(data)
} }
}) })
@@ -83,7 +83,7 @@ func handlerWS4(ctx *api.Transport, msg *streamer.Message) {
return return
} }
ctx.OnClose(func() { tr.OnClose(func() {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
}) })
} }
+5 -6
View File
@@ -2,7 +2,6 @@ package webrtc
import ( import (
"github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/AlexxIT/go2rtc/pkg/webrtc"
"github.com/pion/sdp/v3" "github.com/pion/sdp/v3"
) )
@@ -13,7 +12,7 @@ func AddCandidate(address string) {
candidates = append(candidates, address) candidates = append(candidates, address)
} }
func asyncCandidates(ctx *api.Transport) { func asyncCandidates(tr *api.Transport) {
for _, address := range candidates { for _, address := range candidates {
address, err := webrtc.LookupIP(address) address, err := webrtc.LookupIP(address)
if err != nil { if err != nil {
@@ -29,7 +28,7 @@ func asyncCandidates(ctx *api.Transport) {
log.Trace().Str("candidate", cand).Msg("[webrtc] config") log.Trace().Str("candidate", cand).Msg("[webrtc] config")
ctx.Write(&streamer.Message{Type: "webrtc/candidate", Value: cand}) tr.Write(&api.Message{Type: "webrtc/candidate", Value: cand})
} }
} }
@@ -79,11 +78,11 @@ func syncCanditates(answer string) (string, error) {
return string(data), nil return string(data), nil
} }
func candidateHandler(ctx *api.Transport, msg *streamer.Message) { func candidateHandler(tr *api.Transport, msg *api.Message) {
if ctx.Consumer == nil { if tr.Consumer == nil {
return return
} }
if conn := ctx.Consumer.(*webrtc.Conn); conn != nil { if conn := tr.Consumer.(*webrtc.Conn); conn != nil {
s := msg.Value.(string) s := msg.Value.(string)
log.Trace().Str("candidate", s).Msg("[webrtc] remote") log.Trace().Str("candidate", s).Msg("[webrtc] remote")
conn.AddCandidate(s) conn.AddCandidate(s)
+3 -4
View File
@@ -4,7 +4,6 @@ import (
"github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/app" "github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/AlexxIT/go2rtc/pkg/webrtc"
pion "github.com/pion/webrtc/v3" pion "github.com/pion/webrtc/v3"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@@ -66,7 +65,7 @@ var log zerolog.Logger
var NewPConn func() (*pion.PeerConnection, error) var NewPConn func() (*pion.PeerConnection, error)
func asyncHandler(tr *api.Transport, msg *streamer.Message) { func asyncHandler(tr *api.Transport, msg *api.Message) {
src := tr.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
stream := streams.Get(src) stream := streams.Get(src)
if stream == nil { if stream == nil {
@@ -96,7 +95,7 @@ func asyncHandler(tr *api.Transport, msg *streamer.Message) {
if msg != nil { if msg != nil {
s := msg.ToJSON().Candidate s := msg.ToJSON().Candidate
log.Trace().Str("candidate", s).Msg("[webrtc] local") log.Trace().Str("candidate", s).Msg("[webrtc] local")
tr.Write(&streamer.Message{Type: "webrtc/candidate", Value: s}) tr.Write(&api.Message{Type: "webrtc/candidate", Value: s})
} }
} }
}) })
@@ -133,7 +132,7 @@ func asyncHandler(tr *api.Transport, msg *streamer.Message) {
tr.Consumer = conn tr.Consumer = conn
tr.Write(&streamer.Message{Type: "webrtc/answer", Value: answer}) tr.Write(&api.Message{Type: "webrtc/answer", Value: answer})
asyncCandidates(tr) asyncCandidates(tr)
} }
-8
View File
@@ -12,14 +12,6 @@ const (
JSONSend = "send" JSONSend = "send"
) )
// Message - struct for data exchange in Web API
type Message struct {
Type string `json:"type"`
Value interface{} `json:"value,omitempty"`
}
// other
func Between(s, sub1, sub2 string) string { func Between(s, sub1, sub2 string) string {
i := strings.Index(s, sub1) i := strings.Index(s, sub1)
if i < 0 { if i < 0 {