From fdb316910ff0add463492782ad96b101557977fe Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Wed, 16 Nov 2022 11:26:56 +0300 Subject: [PATCH] Fix WebRTC async connection --- cmd/webrtc/candidates.go | 22 +++++++++++++++++++- cmd/webrtc/webrtc.go | 45 ++++++++++++++++++---------------------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/cmd/webrtc/candidates.go b/cmd/webrtc/candidates.go index c614e4f2..cf632d6a 100644 --- a/cmd/webrtc/candidates.go +++ b/cmd/webrtc/candidates.go @@ -13,7 +13,27 @@ func AddCandidate(address string) { candidates = append(candidates, address) } -func addCanditates(answer string) (string, error) { +func asyncCandidates(ctx *api.Context) { + for _, address := range candidates { + address, err := webrtc.LookupIP(address) + if err != nil { + log.Warn().Err(err).Caller().Send() + continue + } + + cand, err := webrtc.NewCandidate(address) + if err != nil { + log.Warn().Err(err).Caller().Send() + continue + } + + log.Trace().Str("candidate", cand).Msg("[webrtc] config") + + ctx.Write(&streamer.Message{Type: webrtc.MsgTypeCandidate, Value: cand}) + } +} + +func syncCanditates(answer string) (string, error) { if len(candidates) == 0 { return answer, nil } diff --git a/cmd/webrtc/webrtc.go b/cmd/webrtc/webrtc.go index 4fcb8ad5..70860548 100644 --- a/cmd/webrtc/webrtc.go +++ b/cmd/webrtc/webrtc.go @@ -33,7 +33,7 @@ func Init() { address := cfg.Mod.Listen pionAPI, err := webrtc.NewAPI(address) if pionAPI == nil { - log.Error().Err(err).Msg("[webrtc] init API") + log.Error().Err(err).Caller().Msg("webrtc.NewAPI") return } @@ -55,7 +55,7 @@ func Init() { candidates = cfg.Mod.Candidates - api.HandleWS(webrtc.MsgTypeOffer, offerHandler) + api.HandleWS(webrtc.MsgTypeOffer, asyncHandler) api.HandleWS(webrtc.MsgTypeCandidate, candidateHandler) api.HandleFunc("api/webrtc", syncHandler) @@ -66,7 +66,7 @@ var log zerolog.Logger var NewPConn func() (*pion.PeerConnection, error) -func offerHandler(ctx *api.Context, msg *streamer.Message) { +func asyncHandler(ctx *api.Context, msg *streamer.Message) { src := ctx.Request.URL.Query().Get("src") stream := streams.Get(src) if stream == nil { @@ -81,7 +81,7 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) { conn := new(webrtc.Conn) conn.Conn, err = NewPConn() if err != nil { - log.Error().Err(err).Msg("[webrtc] new conn") + log.Error().Err(err).Caller().Msg("NewPConn") return } @@ -104,14 +104,14 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) { log.Trace().Msgf("[webrtc] offer:\n%s", offer) if err = conn.SetOffer(offer); err != nil { - log.Warn().Err(err).Msg("[api.webrtc] set offer") + log.Warn().Err(err).Caller().Msg("conn.SetOffer") ctx.Error(err) return } // 2. AddConsumer, so we get new tracks if err = stream.AddConsumer(conn); err != nil { - log.Warn().Err(err).Msg("[api.webrtc] add consumer") + log.Warn().Err(err).Caller().Msg("stream.AddConsumer") _ = conn.Conn.Close() ctx.Error(err) return @@ -120,25 +120,20 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) { conn.Init() // exchange sdp without waiting all candidates - //answer, err := conn.ExchangeSDP(offer, false) - //answer, err := conn.GetAnswer() - answer, err := conn.GetCompleteAnswer() - if err == nil { - answer, err = addCanditates(answer) - } + answer, err := conn.GetAnswer() log.Trace().Msgf("[webrtc] answer\n%s", answer) if err != nil { - log.Error().Err(err).Msg("[webrtc] get answer") + log.Error().Err(err).Caller().Msg("conn.GetAnswer") ctx.Error(err) return } - ctx.Write(&streamer.Message{ - Type: webrtc.MsgTypeAnswer, Value: answer, - }) - ctx.Consumer = conn + + ctx.Write(&streamer.Message{Type: webrtc.MsgTypeAnswer, Value: answer}) + + asyncCandidates(ctx) } func syncHandler(w http.ResponseWriter, r *http.Request) { @@ -151,19 +146,19 @@ func syncHandler(w http.ResponseWriter, r *http.Request) { // get offer offer, err := ioutil.ReadAll(r.Body) if err != nil { - log.Error().Err(err).Caller().Send() + log.Error().Err(err).Caller().Msg("ioutil.ReadAll") return } answer, err := ExchangeSDP(stream, string(offer), r.UserAgent()) if err != nil { - log.Error().Err(err).Caller().Send() + log.Error().Err(err).Caller().Msg("ExchangeSDP") return } // send SDP to client if _, err = w.Write([]byte(answer)); err != nil { - log.Error().Err(err).Caller().Send() + log.Error().Err(err).Caller().Msg("w.Write") } } @@ -174,7 +169,7 @@ func ExchangeSDP( conn := new(webrtc.Conn) conn.Conn, err = NewPConn() if err != nil { - log.Error().Err(err).Msg("[webrtc] new conn") + log.Error().Err(err).Caller().Msg("NewPConn") return } @@ -192,13 +187,13 @@ func ExchangeSDP( log.Trace().Msgf("[webrtc] offer:\n%s", offer) if err = conn.SetOffer(offer); err != nil { - log.Warn().Err(err).Msg("[api.webrtc] set offer") + log.Warn().Err(err).Caller().Msg("conn.SetOffer") return } // 2. AddConsumer, so we get new tracks if err = stream.AddConsumer(conn); err != nil { - log.Warn().Err(err).Msg("[api.webrtc] add consumer") + log.Warn().Err(err).Caller().Msg("stream.AddConsumer") _ = conn.Conn.Close() return } @@ -209,12 +204,12 @@ func ExchangeSDP( //answer, err := conn.ExchangeSDP(offer, false) answer, err = conn.GetCompleteAnswer() if err == nil { - answer, err = addCanditates(answer) + answer, err = syncCanditates(answer) } log.Trace().Msgf("[webrtc] answer\n%s", answer) if err != nil { - log.Error().Err(err).Msg("[webrtc] get answer") + log.Error().Err(err).Caller().Msg("conn.GetCompleteAnswer") } return