Fix WebRTC async connection

This commit is contained in:
Alexey Khit
2022-11-16 11:26:56 +03:00
parent e29f2594fa
commit fdb316910f
2 changed files with 41 additions and 26 deletions
+21 -1
View File
@@ -13,7 +13,27 @@ func AddCandidate(address string) {
candidates = append(candidates, address)
}
func addCanditates(answer string) (string, error) {
func asyncCandidates(ctx *api.Context) {
for _, address := range candidates {
address, err := webrtc.LookupIP(address)
if err != nil {
log.Warn().Err(err).Caller().Send()
continue
}
cand, err := webrtc.NewCandidate(address)
if err != nil {
log.Warn().Err(err).Caller().Send()
continue
}
log.Trace().Str("candidate", cand).Msg("[webrtc] config")
ctx.Write(&streamer.Message{Type: webrtc.MsgTypeCandidate, Value: cand})
}
}
func syncCanditates(answer string) (string, error) {
if len(candidates) == 0 {
return answer, nil
}
+20 -25
View File
@@ -33,7 +33,7 @@ func Init() {
address := cfg.Mod.Listen
pionAPI, err := webrtc.NewAPI(address)
if pionAPI == nil {
log.Error().Err(err).Msg("[webrtc] init API")
log.Error().Err(err).Caller().Msg("webrtc.NewAPI")
return
}
@@ -55,7 +55,7 @@ func Init() {
candidates = cfg.Mod.Candidates
api.HandleWS(webrtc.MsgTypeOffer, offerHandler)
api.HandleWS(webrtc.MsgTypeOffer, asyncHandler)
api.HandleWS(webrtc.MsgTypeCandidate, candidateHandler)
api.HandleFunc("api/webrtc", syncHandler)
@@ -66,7 +66,7 @@ var log zerolog.Logger
var NewPConn func() (*pion.PeerConnection, error)
func offerHandler(ctx *api.Context, msg *streamer.Message) {
func asyncHandler(ctx *api.Context, msg *streamer.Message) {
src := ctx.Request.URL.Query().Get("src")
stream := streams.Get(src)
if stream == nil {
@@ -81,7 +81,7 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
conn := new(webrtc.Conn)
conn.Conn, err = NewPConn()
if err != nil {
log.Error().Err(err).Msg("[webrtc] new conn")
log.Error().Err(err).Caller().Msg("NewPConn")
return
}
@@ -104,14 +104,14 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
log.Trace().Msgf("[webrtc] offer:\n%s", offer)
if err = conn.SetOffer(offer); err != nil {
log.Warn().Err(err).Msg("[api.webrtc] set offer")
log.Warn().Err(err).Caller().Msg("conn.SetOffer")
ctx.Error(err)
return
}
// 2. AddConsumer, so we get new tracks
if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Msg("[api.webrtc] add consumer")
log.Warn().Err(err).Caller().Msg("stream.AddConsumer")
_ = conn.Conn.Close()
ctx.Error(err)
return
@@ -120,25 +120,20 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
conn.Init()
// exchange sdp without waiting all candidates
//answer, err := conn.ExchangeSDP(offer, false)
//answer, err := conn.GetAnswer()
answer, err := conn.GetCompleteAnswer()
if err == nil {
answer, err = addCanditates(answer)
}
answer, err := conn.GetAnswer()
log.Trace().Msgf("[webrtc] answer\n%s", answer)
if err != nil {
log.Error().Err(err).Msg("[webrtc] get answer")
log.Error().Err(err).Caller().Msg("conn.GetAnswer")
ctx.Error(err)
return
}
ctx.Write(&streamer.Message{
Type: webrtc.MsgTypeAnswer, Value: answer,
})
ctx.Consumer = conn
ctx.Write(&streamer.Message{Type: webrtc.MsgTypeAnswer, Value: answer})
asyncCandidates(ctx)
}
func syncHandler(w http.ResponseWriter, r *http.Request) {
@@ -151,19 +146,19 @@ func syncHandler(w http.ResponseWriter, r *http.Request) {
// get offer
offer, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Caller().Send()
log.Error().Err(err).Caller().Msg("ioutil.ReadAll")
return
}
answer, err := ExchangeSDP(stream, string(offer), r.UserAgent())
if err != nil {
log.Error().Err(err).Caller().Send()
log.Error().Err(err).Caller().Msg("ExchangeSDP")
return
}
// send SDP to client
if _, err = w.Write([]byte(answer)); err != nil {
log.Error().Err(err).Caller().Send()
log.Error().Err(err).Caller().Msg("w.Write")
}
}
@@ -174,7 +169,7 @@ func ExchangeSDP(
conn := new(webrtc.Conn)
conn.Conn, err = NewPConn()
if err != nil {
log.Error().Err(err).Msg("[webrtc] new conn")
log.Error().Err(err).Caller().Msg("NewPConn")
return
}
@@ -192,13 +187,13 @@ func ExchangeSDP(
log.Trace().Msgf("[webrtc] offer:\n%s", offer)
if err = conn.SetOffer(offer); err != nil {
log.Warn().Err(err).Msg("[api.webrtc] set offer")
log.Warn().Err(err).Caller().Msg("conn.SetOffer")
return
}
// 2. AddConsumer, so we get new tracks
if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Msg("[api.webrtc] add consumer")
log.Warn().Err(err).Caller().Msg("stream.AddConsumer")
_ = conn.Conn.Close()
return
}
@@ -209,12 +204,12 @@ func ExchangeSDP(
//answer, err := conn.ExchangeSDP(offer, false)
answer, err = conn.GetCompleteAnswer()
if err == nil {
answer, err = addCanditates(answer)
answer, err = syncCanditates(answer)
}
log.Trace().Msgf("[webrtc] answer\n%s", answer)
if err != nil {
log.Error().Err(err).Msg("[webrtc] get answer")
log.Error().Err(err).Caller().Msg("conn.GetCompleteAnswer")
}
return