diff --git a/cmd/webrtc/init.go b/cmd/webrtc/init.go index b0ffc5ca..7aa6f82b 100644 --- a/cmd/webrtc/init.go +++ b/cmd/webrtc/init.go @@ -86,14 +86,24 @@ var log zerolog.Logger var PeerConnection func(active bool) (*pion.PeerConnection, error) func asyncHandler(tr *api.Transport, msg *api.Message) error { - src := tr.Request.URL.Query().Get("src") - stream := streams.GetOrNew(src) + var stream *streams.Stream + var mode streamer.Mode + + query := tr.Request.URL.Query() + if name := query.Get("src"); name != "" { + stream = streams.GetOrNew(name) + mode = streamer.ModePassiveConsumer + log.Debug().Str("src", name).Msg("[webrtc] new consumer") + } else if name = query.Get("dst"); name != "" { + stream = streams.Get(name) + mode = streamer.ModePassiveProducer + log.Debug().Str("src", name).Msg("[webrtc] new producer") + } + if stream == nil { return errors.New(api.StreamNotFound) } - log.Debug().Str("url", src).Msg("[webrtc] new consumer") - // create new PeerConnection instance pc, err := PeerConnection(false) if err != nil { @@ -103,15 +113,21 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { var sendAnswer core.Waiter - cons := webrtc.NewConn(pc) - cons.Desc = "WebRTC/WebSocket async" - cons.Mode = streamer.ModePassiveConsumer - cons.UserAgent = tr.Request.UserAgent() - cons.Listen(func(msg any) { + conn := webrtc.NewConn(pc) + conn.Desc = "WebRTC/WebSocket async" + conn.Mode = mode + conn.UserAgent = tr.Request.UserAgent() + conn.Listen(func(msg any) { switch msg := msg.(type) { case pion.PeerConnectionState: - if msg == pion.PeerConnectionStateClosed { - stream.RemoveConsumer(cons) + if msg != pion.PeerConnectionStateClosed { + return + } + switch mode { + case streamer.ModePassiveConsumer: + stream.RemoveConsumer(conn) + case streamer.ModePassiveProducer: + stream.RemoveProducer(conn) } case *pion.ICECandidate: @@ -136,20 +152,25 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { log.Trace().Msgf("[webrtc] offer:\n%s", offer) - if err = cons.SetOffer(offer); err != nil { + if err = conn.SetOffer(offer); err != nil { log.Warn().Err(err).Caller().Send() return err } - // 2. AddConsumer, so we get new tracks - if err = stream.AddConsumer(cons); err != nil { - log.Debug().Err(err).Msg("[webrtc] add consumer") - _ = cons.Close() - return err + switch mode { + case streamer.ModePassiveConsumer: + // 2. AddConsumer, so we get new tracks + if err = stream.AddConsumer(conn); err != nil { + log.Debug().Err(err).Msg("[webrtc] add consumer") + _ = conn.Close() + return err + } + case streamer.ModePassiveProducer: + stream.AddProducer(conn) } // 3. Exchange SDP without waiting all candidates - answer, err := cons.GetAnswer() + answer, err := conn.GetAnswer() log.Trace().Msgf("[webrtc] answer\n%s", answer) if err != nil { @@ -166,7 +187,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { sendAnswer.Done() - asyncCandidates(tr, cons) + asyncCandidates(tr, conn) return nil }