Add support WebRTC async passive producer
This commit is contained in:
+40
-19
@@ -86,14 +86,24 @@ var log zerolog.Logger
|
|||||||
var PeerConnection func(active bool) (*pion.PeerConnection, error)
|
var PeerConnection func(active bool) (*pion.PeerConnection, error)
|
||||||
|
|
||||||
func asyncHandler(tr *api.Transport, msg *api.Message) error {
|
func asyncHandler(tr *api.Transport, msg *api.Message) error {
|
||||||
src := tr.Request.URL.Query().Get("src")
|
var stream *streams.Stream
|
||||||
stream := streams.GetOrNew(src)
|
var mode streamer.Mode
|
||||||
|
|
||||||
|
query := tr.Request.URL.Query()
|
||||||
|
if name := query.Get("src"); name != "" {
|
||||||
|
stream = streams.GetOrNew(name)
|
||||||
|
mode = streamer.ModePassiveConsumer
|
||||||
|
log.Debug().Str("src", name).Msg("[webrtc] new consumer")
|
||||||
|
} else if name = query.Get("dst"); name != "" {
|
||||||
|
stream = streams.Get(name)
|
||||||
|
mode = streamer.ModePassiveProducer
|
||||||
|
log.Debug().Str("src", name).Msg("[webrtc] new producer")
|
||||||
|
}
|
||||||
|
|
||||||
if stream == nil {
|
if stream == nil {
|
||||||
return errors.New(api.StreamNotFound)
|
return errors.New(api.StreamNotFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug().Str("url", src).Msg("[webrtc] new consumer")
|
|
||||||
|
|
||||||
// create new PeerConnection instance
|
// create new PeerConnection instance
|
||||||
pc, err := PeerConnection(false)
|
pc, err := PeerConnection(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -103,15 +113,21 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
|
|||||||
|
|
||||||
var sendAnswer core.Waiter
|
var sendAnswer core.Waiter
|
||||||
|
|
||||||
cons := webrtc.NewConn(pc)
|
conn := webrtc.NewConn(pc)
|
||||||
cons.Desc = "WebRTC/WebSocket async"
|
conn.Desc = "WebRTC/WebSocket async"
|
||||||
cons.Mode = streamer.ModePassiveConsumer
|
conn.Mode = mode
|
||||||
cons.UserAgent = tr.Request.UserAgent()
|
conn.UserAgent = tr.Request.UserAgent()
|
||||||
cons.Listen(func(msg any) {
|
conn.Listen(func(msg any) {
|
||||||
switch msg := msg.(type) {
|
switch msg := msg.(type) {
|
||||||
case pion.PeerConnectionState:
|
case pion.PeerConnectionState:
|
||||||
if msg == pion.PeerConnectionStateClosed {
|
if msg != pion.PeerConnectionStateClosed {
|
||||||
stream.RemoveConsumer(cons)
|
return
|
||||||
|
}
|
||||||
|
switch mode {
|
||||||
|
case streamer.ModePassiveConsumer:
|
||||||
|
stream.RemoveConsumer(conn)
|
||||||
|
case streamer.ModePassiveProducer:
|
||||||
|
stream.RemoveProducer(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
case *pion.ICECandidate:
|
case *pion.ICECandidate:
|
||||||
@@ -136,20 +152,25 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
|
|||||||
|
|
||||||
log.Trace().Msgf("[webrtc] offer:\n%s", offer)
|
log.Trace().Msgf("[webrtc] offer:\n%s", offer)
|
||||||
|
|
||||||
if err = cons.SetOffer(offer); err != nil {
|
if err = conn.SetOffer(offer); err != nil {
|
||||||
log.Warn().Err(err).Caller().Send()
|
log.Warn().Err(err).Caller().Send()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. AddConsumer, so we get new tracks
|
switch mode {
|
||||||
if err = stream.AddConsumer(cons); err != nil {
|
case streamer.ModePassiveConsumer:
|
||||||
log.Debug().Err(err).Msg("[webrtc] add consumer")
|
// 2. AddConsumer, so we get new tracks
|
||||||
_ = cons.Close()
|
if err = stream.AddConsumer(conn); err != nil {
|
||||||
return err
|
log.Debug().Err(err).Msg("[webrtc] add consumer")
|
||||||
|
_ = conn.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case streamer.ModePassiveProducer:
|
||||||
|
stream.AddProducer(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Exchange SDP without waiting all candidates
|
// 3. Exchange SDP without waiting all candidates
|
||||||
answer, err := cons.GetAnswer()
|
answer, err := conn.GetAnswer()
|
||||||
log.Trace().Msgf("[webrtc] answer\n%s", answer)
|
log.Trace().Msgf("[webrtc] answer\n%s", answer)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -166,7 +187,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
|
|||||||
|
|
||||||
sendAnswer.Done()
|
sendAnswer.Done()
|
||||||
|
|
||||||
asyncCandidates(tr, cons)
|
asyncCandidates(tr, conn)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user