Fix WebRTC client

This commit is contained in:
Alexey Khit
2023-02-25 19:22:40 +03:00
parent 218eea6806
commit a0e04fb70e
6 changed files with 61 additions and 18 deletions
+4 -2
View File
@@ -39,7 +39,7 @@ func asyncClient(url string) (streamer.Producer, error) {
}() }()
// 2. Create PeerConnection // 2. Create PeerConnection
pc, err := newPeerConnection() pc, err := newPeerConnection(false)
if err != nil { if err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
return nil, err return nil, err
@@ -115,7 +115,7 @@ func asyncClient(url string) (streamer.Producer, error) {
// syncClient - support WebRTC-HTTP Egress Protocol (WHEP) // syncClient - support WebRTC-HTTP Egress Protocol (WHEP)
func syncClient(url string) (streamer.Producer, error) { func syncClient(url string) (streamer.Producer, error) {
// 2. Create PeerConnection // 2. Create PeerConnection
pc, err := newPeerConnection() pc, err := newPeerConnection(false)
if err != nil { if err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
return nil, err return nil, err
@@ -136,6 +136,8 @@ func syncClient(url string) (streamer.Producer, error) {
} }
client := http.Client{Timeout: time.Second * 5000} client := http.Client{Timeout: time.Second * 5000}
defer client.CloseIdleConnections()
res, err := client.Do(req) res, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
+1 -1
View File
@@ -139,7 +139,7 @@ func inputWebRTC(w http.ResponseWriter, r *http.Request) {
log.Trace().Msgf("[webrtc] WHIP offer\n%s", offer) log.Trace().Msgf("[webrtc] WHIP offer\n%s", offer)
pc, err := newPeerConnection() pc, err := newPeerConnection(true)
if err != nil { if err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
+9 -5
View File
@@ -50,8 +50,12 @@ func Init() {
SDPSemantics: pion.SDPSemanticsUnifiedPlanWithFallback, SDPSemantics: pion.SDPSemanticsUnifiedPlanWithFallback,
} }
newPeerConnection = func() (*pion.PeerConnection, error) { newPeerConnection = func(isServer bool) (*pion.PeerConnection, error) {
return pionAPI.NewPeerConnection(pionConf) if isServer {
return pionAPI.NewPeerConnection(pionConf)
} else {
return pion.NewPeerConnection(pionConf)
}
} }
for _, candidate := range cfg.Mod.Candidates { for _, candidate := range cfg.Mod.Candidates {
@@ -73,7 +77,7 @@ func Init() {
var Port string var Port string
var log zerolog.Logger var log zerolog.Logger
var newPeerConnection func() (*pion.PeerConnection, error) var newPeerConnection func(isServer bool) (*pion.PeerConnection, error)
func asyncHandler(tr *api.Transport, msg *api.Message) error { func asyncHandler(tr *api.Transport, msg *api.Message) error {
src := tr.Request.URL.Query().Get("src") src := tr.Request.URL.Query().Get("src")
@@ -85,7 +89,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
log.Debug().Str("url", src).Msg("[webrtc] new consumer") log.Debug().Str("url", src).Msg("[webrtc] new consumer")
// create new PeerConnection instance // create new PeerConnection instance
pc, err := newPeerConnection() pc, err := newPeerConnection(true)
if err != nil { if err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
return err return err
@@ -155,7 +159,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {
} }
func ExchangeSDP(stream *streams.Stream, offer string, userAgent string) (answer string, err error) { func ExchangeSDP(stream *streams.Stream, offer string, userAgent string) (answer string, err error) {
pc, err := newPeerConnection() pc, err := newPeerConnection(true)
if err != nil { if err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
return return
+30 -2
View File
@@ -1,6 +1,10 @@
package webrtc package webrtc
import "github.com/pion/webrtc/v3" import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
)
func (c *Conn) CreateOffer() (string, error) { func (c *Conn) CreateOffer() (string, error) {
init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly} init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}
@@ -30,5 +34,29 @@ func (c *Conn) CreateCompleteOffer() (string, error) {
func (c *Conn) SetAnswer(answer string) (err error) { func (c *Conn) SetAnswer(answer string) (err error) {
desc := webrtc.SessionDescription{SDP: answer, Type: webrtc.SDPTypeAnswer} desc := webrtc.SessionDescription{SDP: answer, Type: webrtc.SDPTypeAnswer}
return c.pc.SetRemoteDescription(desc) if err = c.pc.SetRemoteDescription(desc); err != nil {
return
}
sd := &sdp.SessionDescription{}
if err = sd.Unmarshal([]byte(answer)); err != nil {
return
}
medias := streamer.UnmarshalMedias(sd.MediaDescriptions)
// sort medias, so video will always be before audio
// and ignore application media from Hass default lovelace card
for _, media := range medias {
if media.Kind == streamer.KindVideo {
c.medias = append(c.medias, media)
}
}
for _, media := range medias {
if media.Kind == streamer.KindAudio {
c.medias = append(c.medias, media)
}
}
return nil
} }
+9 -6
View File
@@ -19,10 +19,11 @@ type Conn struct {
send int send int
offer string offer string
start chan struct{}
} }
func NewConn(pc *webrtc.PeerConnection) *Conn { func NewConn(pc *webrtc.PeerConnection) *Conn {
c := &Conn{pc: pc} c := &Conn{pc: pc, start: make(chan struct{})}
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
c.Fire(candidate) c.Fire(candidate)
@@ -64,14 +65,11 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
c.Fire(state) c.Fire(state)
// TODO: rewrite?
switch state { switch state {
case webrtc.PeerConnectionStateDisconnected: case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
// disconnect event comes earlier, than failed // disconnect event comes earlier, than failed
// but it comes only for success connections // but it comes only for success connections
_ = pc.Close() _ = c.Close()
case webrtc.PeerConnectionStateFailed:
_ = pc.Close()
} }
}) })
@@ -79,6 +77,11 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
} }
func (c *Conn) Close() error { func (c *Conn) Close() error {
// unblocked write to chan
select {
case c.start <- struct{}{}:
default:
}
return c.pc.Close() return c.pc.Close()
} }
+8 -2
View File
@@ -1,6 +1,8 @@
package webrtc package webrtc
import "github.com/AlexxIT/go2rtc/pkg/streamer" import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
)
func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
for _, track := range c.tracks { for _, track := range c.tracks {
@@ -8,10 +10,14 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
return track return track
} }
} }
return nil
track := streamer.NewTrack(codec, media.Direction)
c.tracks = append(c.tracks, track)
return track
} }
func (c *Conn) Start() error { func (c *Conn) Start() error {
<-c.start
return nil return nil
} }