From f5892e4cfc2f1b14de5475ea00d89d3dae30e46c Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Fri, 24 Feb 2023 12:22:32 +0300 Subject: [PATCH] Fix WebRTC async candidates processing --- cmd/api/ws.go | 17 +++++++++++++++-- cmd/webrtc/candidates.go | 36 +++++++++++++++++++++++++++++------- cmd/webrtc/webrtc.go | 4 +--- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/cmd/api/ws.go b/cmd/api/ws.go index 04e7142e..4a065885 100644 --- a/cmd/api/ws.go +++ b/cmd/api/ws.go @@ -104,6 +104,8 @@ func apiWS(w http.ResponseWriter, r *http.Request) { break } + log.Trace().Str("type", msg.Type).Msg("[api.ws] msg") + if handler := wsHandlers[msg.Type]; handler != nil { go func() { if err = handler(tr, msg); err != nil { @@ -119,8 +121,9 @@ func apiWS(w http.ResponseWriter, r *http.Request) { var wsUp *websocket.Upgrader type Transport struct { - Request *http.Request - Consumer interface{} // TODO: rewrite + Request *http.Request + + ctx map[any]any closed bool mx sync.Mutex @@ -170,3 +173,13 @@ func (t *Transport) OnClose(f func()) { } t.mx.Unlock() } + +// WithContext - run function with Context variable +func (t *Transport) WithContext(f func(ctx map[any]any)) { + t.mx.Lock() + if t.ctx == nil { + t.ctx = map[any]any{} + } + f(t.ctx) + t.mx.Unlock() +} diff --git a/cmd/webrtc/candidates.go b/cmd/webrtc/candidates.go index ec895845..28b06908 100644 --- a/cmd/webrtc/candidates.go +++ b/cmd/webrtc/candidates.go @@ -56,7 +56,22 @@ func GetCandidates() (candidates []string) { return } -func asyncCandidates(tr *api.Transport) { +func asyncCandidates(tr *api.Transport, cons *webrtc.Server) { + tr.WithContext(func(ctx map[any]any) { + if candidates, ok := ctx["candidate"].([]string); ok { + // process candidates that receive before this moment + for _, candidate := range candidates { + cons.AddCandidate(candidate) + } + + // remove already processed candidates + delete(ctx, "candidate") + } + + // set variable for process candidates after this moment + ctx["webrtc"] = cons + }) + for _, candidate := range GetCandidates() { log.Trace().Str("candidate", candidate).Msg("[webrtc] config") tr.Write(&api.Message{Type: "webrtc/candidate", Value: candidate}) @@ -97,13 +112,20 @@ func syncCanditates(answer string) (string, error) { } func candidateHandler(tr *api.Transport, msg *api.Message) error { - if tr.Consumer == nil { - return nil - } - if conn := tr.Consumer.(*webrtc.Server); conn != nil { + // process incoming candidate in sync function + tr.WithContext(func(ctx map[any]any) { candidate := msg.String() log.Trace().Str("candidate", candidate).Msg("[webrtc] remote") - conn.AddCandidate(candidate) - } + + if cons, ok := ctx["webrtc"].(*webrtc.Server); ok { + // if webrtc.Server already initialized - process candidate + cons.AddCandidate(candidate) + } else { + // or collect candidate and process it later + list, _ := ctx["candidate"].([]string) + ctx["candidate"] = append(list, candidate) + } + }) + return nil } diff --git a/cmd/webrtc/webrtc.go b/cmd/webrtc/webrtc.go index d940f3d8..667a6259 100644 --- a/cmd/webrtc/webrtc.go +++ b/cmd/webrtc/webrtc.go @@ -141,11 +141,9 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { return err } - tr.Consumer = cons - tr.Write(&api.Message{Type: "webrtc/answer", Value: answer}) - asyncCandidates(tr) + asyncCandidates(tr, cons) return nil }