Fix WebRTC async candidates processing

This commit is contained in:
Alexey Khit
2023-02-24 12:22:32 +03:00
parent 4328d2a573
commit f5892e4cfc
3 changed files with 45 additions and 12 deletions
+15 -2
View File
@@ -104,6 +104,8 @@ func apiWS(w http.ResponseWriter, r *http.Request) {
break
}
log.Trace().Str("type", msg.Type).Msg("[api.ws] msg")
if handler := wsHandlers[msg.Type]; handler != nil {
go func() {
if err = handler(tr, msg); err != nil {
@@ -119,8 +121,9 @@ func apiWS(w http.ResponseWriter, r *http.Request) {
var wsUp *websocket.Upgrader
type Transport struct {
Request *http.Request
Consumer interface{} // TODO: rewrite
Request *http.Request
ctx map[any]any
closed bool
mx sync.Mutex
@@ -170,3 +173,13 @@ func (t *Transport) OnClose(f func()) {
}
t.mx.Unlock()
}
// WithContext - run function with Context variable
func (t *Transport) WithContext(f func(ctx map[any]any)) {
t.mx.Lock()
if t.ctx == nil {
t.ctx = map[any]any{}
}
f(t.ctx)
t.mx.Unlock()
}
+29 -7
View File
@@ -56,7 +56,22 @@ func GetCandidates() (candidates []string) {
return
}
func asyncCandidates(tr *api.Transport) {
func asyncCandidates(tr *api.Transport, cons *webrtc.Server) {
tr.WithContext(func(ctx map[any]any) {
if candidates, ok := ctx["candidate"].([]string); ok {
// process candidates that receive before this moment
for _, candidate := range candidates {
cons.AddCandidate(candidate)
}
// remove already processed candidates
delete(ctx, "candidate")
}
// set variable for process candidates after this moment
ctx["webrtc"] = cons
})
for _, candidate := range GetCandidates() {
log.Trace().Str("candidate", candidate).Msg("[webrtc] config")
tr.Write(&api.Message{Type: "webrtc/candidate", Value: candidate})
@@ -97,13 +112,20 @@ func syncCanditates(answer string) (string, error) {
}
func candidateHandler(tr *api.Transport, msg *api.Message) error {
if tr.Consumer == nil {
return nil
}
if conn := tr.Consumer.(*webrtc.Server); conn != nil {
// process incoming candidate in sync function
tr.WithContext(func(ctx map[any]any) {
candidate := msg.String()
log.Trace().Str("candidate", candidate).Msg("[webrtc] remote")
conn.AddCandidate(candidate)
}
if cons, ok := ctx["webrtc"].(*webrtc.Server); ok {
// if webrtc.Server already initialized - process candidate
cons.AddCandidate(candidate)
} else {
// or collect candidate and process it later
list, _ := ctx["candidate"].([]string)
ctx["candidate"] = append(list, candidate)
}
})
return nil
}
+1 -3
View File
@@ -141,11 +141,9 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
return err
}
tr.Consumer = cons
tr.Write(&api.Message{Type: "webrtc/answer", Value: answer})
asyncCandidates(tr)
asyncCandidates(tr, cons)
return nil
}