diff --git a/cmd/webrtc/candidates.go b/cmd/webrtc/candidates.go index 28b06908..1ce249ea 100644 --- a/cmd/webrtc/candidates.go +++ b/cmd/webrtc/candidates.go @@ -56,7 +56,7 @@ func GetCandidates() (candidates []string) { return } -func asyncCandidates(tr *api.Transport, cons *webrtc.Server) { +func asyncCandidates(tr *api.Transport, cons *webrtc.Conn) { tr.WithContext(func(ctx map[any]any) { if candidates, ok := ctx["candidate"].([]string); ok { // process candidates that receive before this moment @@ -117,9 +117,9 @@ func candidateHandler(tr *api.Transport, msg *api.Message) error { candidate := msg.String() log.Trace().Str("candidate", candidate).Msg("[webrtc] remote") - if cons, ok := ctx["webrtc"].(*webrtc.Server); ok { + if cons, ok := ctx["webrtc"].(*webrtc.Conn); ok { // if webrtc.Server already initialized - process candidate - cons.AddCandidate(candidate) + _ = cons.AddCandidate(candidate) } else { // or collect candidate and process it later list, _ := ctx["candidate"].([]string) diff --git a/cmd/webrtc/client.go b/cmd/webrtc/client.go new file mode 100644 index 00000000..6f1b7e39 --- /dev/null +++ b/cmd/webrtc/client.go @@ -0,0 +1,154 @@ +package webrtc + +import ( + "errors" + "github.com/AlexxIT/go2rtc/cmd/api" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/AlexxIT/go2rtc/pkg/webrtc" + "github.com/gorilla/websocket" + pion "github.com/pion/webrtc/v3" + "io" + "net/http" + "strings" + "time" +) + +func streamsHandler(url string) (streamer.Producer, error) { + url = url[7:] + if i := strings.Index(url, "://"); i > 0 { + switch url[:i] { + case "ws", "wss": + return asyncClient(url) + case "http", "https": + return syncClient(url) + } + } + return nil, errors.New("unsupported url: " + url) +} + +func asyncClient(url string) (streamer.Producer, error) { + // 1. Connect to signalign server + ws, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + _ = ws.Close() + } + }() + + // 2. Create PeerConnection + pc, err := newPeerConnection() + if err != nil { + log.Error().Err(err).Caller().Send() + return nil, err + } + + prod := webrtc.NewConn(pc) + prod.Listen(func(msg any) { + switch msg := msg.(type) { + case pion.PeerConnectionState: + _ = ws.Close() + + case *pion.ICECandidate: + if msg != nil { + s := msg.ToJSON().Candidate + log.Trace().Str("candidate", s).Msg("[webrtc] local") + _ = ws.WriteJSON(&api.Message{Type: "webrtc/candidate", Value: s}) + } + } + }) + + // 3. Create offer + offer, err := prod.CreateOffer() + if err != nil { + return nil, err + } + + // 4. Send offer + msg := &api.Message{Type: "webrtc/offer", Value: offer} + if err = ws.WriteJSON(msg); err != nil { + return nil, err + } + + // 5. Get answer + if err = ws.ReadJSON(msg); err != nil { + return nil, err + } + + if msg.Type != "webrtc/answer" { + return nil, errors.New("wrong answer: " + msg.Type) + } + + answer := msg.String() + if err = prod.SetAnswer(answer); err != nil { + return nil, err + } + + // 6. Continue to receiving candidates + go func() { + for { + // receive data from remote + msg := new(api.Message) + if err = ws.ReadJSON(msg); err != nil { + if cerr, ok := err.(*websocket.CloseError); ok { + log.Trace().Err(err).Caller().Msgf("[webrtc] ws code=%d", cerr) + } + break + } + + switch msg.Type { + case "webrtc/candidate": + if msg.Value != nil { + _ = prod.AddCandidate(msg.String()) + } + } + } + + _ = ws.Close() + }() + + return prod, nil +} + +// syncClient - support WebRTC-HTTP Egress Protocol (WHEP) +func syncClient(url string) (streamer.Producer, error) { + // 2. Create PeerConnection + pc, err := newPeerConnection() + if err != nil { + log.Error().Err(err).Caller().Send() + return nil, err + } + + prod := webrtc.NewConn(pc) + + // 3. Create offer + offer, err := prod.CreateCompleteOffer() + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", url, strings.NewReader(offer)) + req.Header.Set("Content-Type", MimeSDP) + if err != nil { + return nil, err + } + + client := http.Client{Timeout: time.Second * 5000} + res, err := client.Do(req) + if err != nil { + return nil, err + } + + answer, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + if err = prod.SetAnswer(string(answer)); err != nil { + return nil, err + } + + return prod, nil +} diff --git a/cmd/webrtc/server.go b/cmd/webrtc/server.go new file mode 100644 index 00000000..0f56ca4f --- /dev/null +++ b/cmd/webrtc/server.go @@ -0,0 +1,196 @@ +package webrtc + +import ( + "encoding/json" + "github.com/AlexxIT/go2rtc/cmd/streams" + "github.com/AlexxIT/go2rtc/pkg/webrtc" + pion "github.com/pion/webrtc/v3" + "io" + "net/http" + "strconv" + "strings" + "time" +) + +const MimeSDP = "application/sdp" + +var sessions = map[string]*webrtc.Conn{} + +func syncHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "POST": + query := r.URL.Query() + if query.Get("src") != "" { + // WHEP or JSON SDP or raw SDP exchange + outputWebRTC(w, r) + } else if query.Get("dst") != "" { + // WHIP SDP exchange + inputWebRTC(w, r) + } else { + http.Error(w, "", http.StatusBadRequest) + } + + case "PATCH": + // TODO: WHEP/WHIP + http.Error(w, "", http.StatusMethodNotAllowed) + + case "DELETE": + if id := r.URL.Query().Get("id"); id != "" { + if conn, ok := sessions[id]; ok { + delete(sessions, id) + _ = conn.Close() + } else { + http.Error(w, "", http.StatusNotFound) + } + } else { + http.Error(w, "", http.StatusBadRequest) + } + + default: + http.Error(w, "", http.StatusMethodNotAllowed) + } +} + +// outputWebRTC support API depending on Content-Type: +// 1. application/json - receive {"type":"offer","sdp":"v=0\r\n..."} and response {"type":"answer","sdp":"v=0\r\n..."} +// 2. application/sdp - receive/response SDP via WebRTC-HTTP Egress Protocol (WHEP) +// 3. other - receive/response raw SDP +func outputWebRTC(w http.ResponseWriter, r *http.Request) { + url := r.URL.Query().Get("src") + stream := streams.Get(url) + if stream == nil { + return + } + + mediaType := r.Header.Get("Content-Type") + if mediaType != "" { + mediaType, _, _ = strings.Cut(mediaType, ";") + mediaType = strings.ToLower(strings.TrimSpace(mediaType)) + } + + var offer string + + switch mediaType { + case "application/json": + var desc pion.SessionDescription + if err := json.NewDecoder(r.Body).Decode(&desc); err != nil { + log.Error().Err(err).Caller().Send() + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + offer = desc.SDP + + default: + body, err := io.ReadAll(r.Body) + if err != nil { + log.Error().Err(err).Caller().Send() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + offer = string(body) + } + + answer, err := ExchangeSDP(stream, offer, r.UserAgent()) + if err != nil { + log.Error().Err(err).Caller().Send() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + switch mediaType { + case "application/json": + w.Header().Set("Content-Type", mediaType) + + v := pion.SessionDescription{ + Type: pion.SDPTypeAnswer, SDP: answer, + } + err = json.NewEncoder(w).Encode(v) + + case MimeSDP: + w.Header().Set("Content-Type", mediaType) + w.WriteHeader(http.StatusCreated) + + _, err = w.Write([]byte(answer)) + + default: + _, err = w.Write([]byte(answer)) + } + + if err != nil { + log.Error().Err(err).Caller().Send() + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func inputWebRTC(w http.ResponseWriter, r *http.Request) { + dst := r.URL.Query().Get("dst") + stream := streams.Get(dst) + if stream == nil { + stream = streams.New(dst, nil) + } + + // 1. Get offer + offer, err := io.ReadAll(r.Body) + if err != nil { + log.Error().Err(err).Caller().Send() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + log.Trace().Msgf("[webrtc] WHIP offer\n%s", offer) + + pc, err := newPeerConnection() + if err != nil { + log.Error().Err(err).Caller().Send() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // create new webrtc instance + conn := webrtc.NewConn(pc) + conn.UserAgent = r.UserAgent() + + if err = conn.SetOffer(string(offer)); err != nil { + log.Warn().Err(err).Caller().Send() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + answer, err := conn.GetCompleteAnswer() + if err == nil { + answer, err = syncCanditates(answer) + } + if err != nil { + log.Warn().Err(err).Caller().Send() + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + log.Trace().Msgf("[webrtc] WHIP answer\n%s", answer) + + id := strconv.FormatInt(time.Now().UnixNano(), 36) + sessions[id] = conn + + conn.Listen(func(msg interface{}) { + switch msg := msg.(type) { + case pion.PeerConnectionState: + if msg == pion.PeerConnectionStateClosed { + stream.RemoveProducer(conn) + if _, ok := sessions[id]; ok { + delete(sessions, id) + } + } + } + }) + + stream.AddProducer(conn) + + w.Header().Set("Content-Type", MimeSDP) + w.Header().Set("Location", "webrtc?id="+id) + w.WriteHeader(http.StatusCreated) + + if _, err = w.Write([]byte(answer)); err != nil { + log.Warn().Err(err).Caller().Send() + return + } +} diff --git a/cmd/webrtc/webrtc.go b/cmd/webrtc/webrtc.go index 8dd340fb..dd41a2a9 100644 --- a/cmd/webrtc/webrtc.go +++ b/cmd/webrtc/webrtc.go @@ -1,7 +1,6 @@ package webrtc import ( - "encoding/json" "errors" "github.com/AlexxIT/go2rtc/cmd/api" "github.com/AlexxIT/go2rtc/cmd/app" @@ -9,10 +8,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/webrtc" pion "github.com/pion/webrtc/v3" "github.com/rs/zerolog" - "io" - "mime" "net" - "net/http" ) func Init() { @@ -62,11 +58,16 @@ func Init() { AddCandidate(candidate) } + // async WebRTC server (two API versions) api.HandleWS("webrtc", asyncHandler) api.HandleWS("webrtc/offer", asyncHandler) api.HandleWS("webrtc/candidate", candidateHandler) + // sync WebRTC server (two API versions) api.HandleFunc("api/webrtc", syncHandler) + + // WebRTC client + streams.HandleFunc("webrtc", streamsHandler) } var Port string @@ -90,7 +91,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { return err } - cons := webrtc.NewServer(pc) + cons := webrtc.NewConn(pc) cons.UserAgent = tr.Request.UserAgent() cons.Listen(func(msg any) { switch msg := msg.(type) { @@ -153,65 +154,6 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { return nil } -func syncHandler(w http.ResponseWriter, r *http.Request) { - url := r.URL.Query().Get("src") - stream := streams.Get(url) - if stream == nil { - return - } - - ct := r.Header.Get("Content-Type") - if ct != "" { - ct, _, _ = mime.ParseMediaType(ct) - } - - // V2 - json/object exchange, V1 - raw SDP exchange - apiV2 := ct == "application/json" - - var offer string - if apiV2 { - var desc pion.SessionDescription - if err := json.NewDecoder(r.Body).Decode(&desc); err != nil { - log.Error().Err(err).Caller().Send() - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - offer = desc.SDP - } else { - body, err := io.ReadAll(r.Body) - if err != nil { - log.Error().Err(err).Caller().Send() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - offer = string(body) - } - - answer, err := ExchangeSDP(stream, offer, r.UserAgent()) - if err != nil { - log.Error().Err(err).Caller().Send() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // send SDP to client - if apiV2 { - w.Header().Set("Content-Type", ct) - - v := pion.SessionDescription{ - Type: pion.SDPTypeAnswer, SDP: answer, - } - if err = json.NewEncoder(w).Encode(v); err != nil { - log.Error().Err(err).Caller().Send() - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } else { - if _, err = w.Write([]byte(answer)); err != nil { - log.Error().Err(err).Caller().Send() - } - } -} - func ExchangeSDP(stream *streams.Stream, offer string, userAgent string) (answer string, err error) { pc, err := newPeerConnection() if err != nil { @@ -220,7 +162,7 @@ func ExchangeSDP(stream *streams.Stream, offer string, userAgent string) (answer } // create new webrtc instance - conn := webrtc.NewServer(pc) + conn := webrtc.NewConn(pc) conn.UserAgent = userAgent conn.Listen(func(msg interface{}) { switch msg := msg.(type) { diff --git a/pkg/webrtc/client.go b/pkg/webrtc/client.go new file mode 100644 index 00000000..946ae3f6 --- /dev/null +++ b/pkg/webrtc/client.go @@ -0,0 +1,34 @@ +package webrtc + +import "github.com/pion/webrtc/v3" + +func (c *Conn) CreateOffer() (string, error) { + init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly} + _, _ = c.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, init) + _, _ = c.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, init) + + desc, err := c.pc.CreateOffer(nil) + if err != nil { + return "", err + } + + if err = c.pc.SetLocalDescription(desc); err != nil { + return "", err + } + + return desc.SDP, nil +} + +func (c *Conn) CreateCompleteOffer() (string, error) { + if _, err := c.CreateOffer(); err != nil { + return "", err + } + + <-webrtc.GatheringCompletePromise(c.pc) + return c.pc.LocalDescription().SDP, nil +} + +func (c *Conn) SetAnswer(answer string) (err error) { + desc := webrtc.SessionDescription{SDP: answer, Type: webrtc.SDPTypeAnswer} + return c.pc.SetRemoteDescription(desc) +} diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go new file mode 100644 index 00000000..4473f2f9 --- /dev/null +++ b/pkg/webrtc/conn.go @@ -0,0 +1,148 @@ +package webrtc + +import ( + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/pion/webrtc/v3" +) + +type Conn struct { + streamer.Element + + UserAgent string + + pc *webrtc.PeerConnection + + medias []*streamer.Media + tracks []*streamer.Track + + receive int + send int + + offer string +} + +func NewConn(pc *webrtc.PeerConnection) *Conn { + c := &Conn{pc: pc} + + pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { + c.Fire(candidate) + }) + + pc.OnDataChannel(func(channel *webrtc.DataChannel) { + c.Fire(channel) + }) + + pc.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + track := c.getTrack(remote) + if track == nil { + println("ERROR: webrtc: can't find track") + return + } + + for { + packet, _, err := remote.ReadRTP() + if err != nil { + return + } + if len(packet.Payload) == 0 { + continue + } + c.receive += len(packet.Payload) + _ = track.WriteRTP(packet) + } + }) + + // OK connection: + // 15:01:46 ICE connection state changed: checking + // 15:01:46 peer connection state changed: connected + // 15:01:54 peer connection state changed: disconnected + // 15:02:20 peer connection state changed: failed + // + // Fail connection: + // 14:53:08 ICE connection state changed: checking + // 14:53:39 peer connection state changed: failed + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + c.Fire(state) + + // TODO: rewrite? + switch state { + case webrtc.PeerConnectionStateDisconnected: + // disconnect event comes earlier, than failed + // but it comes only for success connections + _ = pc.Close() + case webrtc.PeerConnectionStateFailed: + _ = pc.Close() + } + }) + + return c +} + +func (c *Conn) Close() error { + return c.pc.Close() +} + +func (c *Conn) AddCandidate(candidate string) error { + // pion uses only candidate value from json/object candidate struct + return c.pc.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate}) +} + +func (c *Conn) getTrack(remote *webrtc.TrackRemote) *streamer.Track { + payloadType := uint8(remote.PayloadType()) + + // search existing track (two way audio) + for _, track := range c.tracks { + if track.Codec.PayloadType == payloadType { + return track + } + } + + // create new track (incoming WebRTC WHIP) + for _, media := range c.medias { + for _, codec := range media.Codecs { + if codec.PayloadType == payloadType { + track := streamer.NewTrack(codec, media.Direction) + c.tracks = append(c.tracks, track) + return track + } + } + } + + return nil +} + +func (c *Conn) remote() string { + if c.pc == nil { + return "" + } + + for _, trans := range c.pc.GetTransceivers() { + if trans == nil { + continue + } + + receiver := trans.Receiver() + if receiver == nil { + continue + } + + transport := receiver.Transport() + if transport == nil { + continue + } + + iceTransport := transport.ICETransport() + if iceTransport == nil { + continue + } + + pair, _ := iceTransport.GetSelectedCandidatePair() + if pair == nil || pair.Remote == nil { + continue + } + + return pair.Remote.String() + } + + return "" +} diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index f013274d..828a03f1 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -9,11 +9,11 @@ import ( "github.com/pion/webrtc/v3" ) -func (c *Server) GetMedias() []*streamer.Media { +func (c *Conn) GetMedias() []*streamer.Media { return c.medias } -func (c *Server) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { +func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { switch track.Direction { // send our track to WebRTC consumer case streamer.DirectionSendonly: @@ -41,7 +41,14 @@ func (c *Server) AddTrack(media *streamer.Media, track *streamer.Track) *streame return nil } - if _, err = c.conn.AddTrack(trackLocal); err != nil { + init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly} + tr, err := c.pc.AddTransceiverFromTrack(trackLocal, init) + if err != nil { + return nil + } + + codecs := []webrtc.RTPCodecParameters{{RTPCodecCapability: caps}} + if err = tr.SetCodecPreferences(codecs); err != nil { return nil } @@ -78,37 +85,39 @@ func (c *Server) AddTrack(media *streamer.Media, track *streamer.Track) *streame // receive track from WebRTC consumer (microphone, backchannel, two way audio) case streamer.DirectionRecvonly: - for _, tr := range c.conn.GetTransceivers() { - if tr.Mid() != media.MID { - continue - } - - codec := track.Codec - caps := webrtc.RTPCodecCapability{ - MimeType: MimeType(codec), - ClockRate: codec.ClockRate, - Channels: codec.Channels, - } - codecs := []webrtc.RTPCodecParameters{ - {RTPCodecCapability: caps}, - } - if err := tr.SetCodecPreferences(codecs); err != nil { - return nil - } - - c.tracks = append(c.tracks, track) - return track + caps := webrtc.RTPCodecCapability{ + MimeType: MimeType(track.Codec), + ClockRate: track.Codec.ClockRate, + Channels: track.Codec.Channels, } + + init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly} + tr, err := c.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, init) + if err != nil { + return nil + } + + codecs := []webrtc.RTPCodecParameters{ + {RTPCodecCapability: caps, PayloadType: webrtc.PayloadType(track.Codec.PayloadType)}, + } + if err = tr.SetCodecPreferences(codecs); err != nil { + return nil + } + + c.tracks = append(c.tracks, track) + return track } panic("wrong direction") } -func (c *Server) MarshalJSON() ([]byte, error) { +func (c *Conn) MarshalJSON() ([]byte, error) { info := &streamer.Info{ - Type: "WebRTC client", + Type: "WebRTC", RemoteAddr: c.remote(), UserAgent: c.UserAgent, + Medias: c.medias, + Tracks: c.tracks, Recv: uint32(c.receive), Send: uint32(c.send), } diff --git a/pkg/webrtc/producer.go b/pkg/webrtc/producer.go new file mode 100644 index 00000000..c0d0574d --- /dev/null +++ b/pkg/webrtc/producer.go @@ -0,0 +1,20 @@ +package webrtc + +import "github.com/AlexxIT/go2rtc/pkg/streamer" + +func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { + for _, track := range c.tracks { + if track.Codec == codec { + return track + } + } + return nil +} + +func (c *Conn) Start() error { + return nil +} + +func (c *Conn) Stop() error { + return c.pc.Close() +} diff --git a/pkg/webrtc/server.go b/pkg/webrtc/server.go index 098c1f53..e1586e44 100644 --- a/pkg/webrtc/server.go +++ b/pkg/webrtc/server.go @@ -6,96 +6,11 @@ import ( "github.com/pion/webrtc/v3" ) -type Server struct { - streamer.Element +func (c *Conn) SetOffer(offer string) (err error) { + c.offer = offer - UserAgent string - - conn *webrtc.PeerConnection - - medias []*streamer.Media - tracks []*streamer.Track - - receive int - send int -} - -func NewServer(conn *webrtc.PeerConnection) *Server { - c := &Server{conn: conn} - - conn.OnICECandidate(func(candidate *webrtc.ICECandidate) { - c.Fire(candidate) - }) - - conn.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { - for _, track := range c.tracks { - if track.Direction != streamer.DirectionRecvonly { - continue - } - if track.Codec.PayloadType != uint8(remote.PayloadType()) { - continue - } - - for { - packet, _, err := remote.ReadRTP() - if err != nil { - return - } - if len(packet.Payload) == 0 { - continue - } - c.receive += len(packet.Payload) - _ = track.WriteRTP(packet) - } - } - - //fmt.Printf("TODO: webrtc ontrack %+v\n", remote) - }) - - conn.OnDataChannel(func(channel *webrtc.DataChannel) { - c.Fire(channel) - }) - - // OK connection: - // 15:01:46 ICE connection state changed: checking - // 15:01:46 peer connection state changed: connected - // 15:01:54 peer connection state changed: disconnected - // 15:02:20 peer connection state changed: failed - // - // Fail connection: - // 14:53:08 ICE connection state changed: checking - // 14:53:39 peer connection state changed: failed - conn.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - c.Fire(state) - - // TODO: remove - switch state { - case webrtc.PeerConnectionStateConnected: - c.Fire(streamer.StatePlaying) // TODO: remove - case webrtc.PeerConnectionStateDisconnected: - c.Fire(streamer.StateNull) // TODO: remove - // disconnect event comes earlier, than failed - // but it comes only for success connections - _ = conn.Close() - case webrtc.PeerConnectionStateFailed: - _ = conn.Close() - } - }) - - return c -} - -func (c *Server) SetOffer(offer string) (err error) { - desc := webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, SDP: offer, - } - if err = c.conn.SetRemoteDescription(desc); err != nil { - return - } - - rawSDP := []byte(c.conn.RemoteDescription().SDP) sd := &sdp.SessionDescription{} - if err = sd.Unmarshal(rawSDP); err != nil { + if err = sd.Unmarshal([]byte(offer)); err != nil { return } @@ -117,85 +32,39 @@ func (c *Server) SetOffer(offer string) (err error) { return } -func (c *Server) GetAnswer() (answer string, err error) { - for _, tr := range c.conn.GetTransceivers() { - if tr.Direction() != webrtc.RTPTransceiverDirectionSendonly { - continue - } +func (c *Conn) GetAnswer() (answer string, err error) { + // we need to process remote offer after we create transeivers + desc := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: c.offer} + if err = c.pc.SetRemoteDescription(desc); err != nil { + return "", err + } - // disable transceivers if we don't have track - // make direction=inactive - // don't really necessary, but anyway - if tr.Sender() == nil { + // disable transceivers if we don't have track + // make direction=inactive + // don't really necessary, but anyway + for _, tr := range c.pc.GetTransceivers() { + if tr.Direction() == webrtc.RTPTransceiverDirectionSendonly && tr.Sender() == nil { if err = tr.Stop(); err != nil { return } } } - var sdAnswer webrtc.SessionDescription - sdAnswer, err = c.conn.CreateAnswer(nil) - if err != nil { + if desc, err = c.pc.CreateAnswer(nil); err != nil { + return + } + if err = c.pc.SetLocalDescription(desc); err != nil { return } - if err = c.conn.SetLocalDescription(sdAnswer); err != nil { - return - } - - return sdAnswer.SDP, nil + return desc.SDP, nil } -func (c *Server) GetCompleteAnswer() (answer string, err error) { +func (c *Conn) GetCompleteAnswer() (answer string, err error) { if _, err = c.GetAnswer(); err != nil { return } - <-webrtc.GatheringCompletePromise(c.conn) - return c.conn.LocalDescription().SDP, nil -} - -func (c *Server) Close() error { - return c.conn.Close() -} - -func (c *Server) AddCandidate(candidate string) { - // pion uses only candidate value from json/object candidate struct - _ = c.conn.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate}) -} - -func (c *Server) remote() string { - if c.conn == nil { - return "" - } - - for _, trans := range c.conn.GetTransceivers() { - if trans == nil { - continue - } - - receiver := trans.Receiver() - if receiver == nil { - continue - } - - transport := receiver.Transport() - if transport == nil { - continue - } - - iceTransport := transport.ICETransport() - if iceTransport == nil { - continue - } - - pair, _ := iceTransport.GetSelectedCandidatePair() - if pair == nil || pair.Remote == nil { - continue - } - - return pair.Remote.String() - } - - return "" + <-webrtc.GatheringCompletePromise(c.pc) + return c.pc.LocalDescription().SDP, nil } diff --git a/www/webrtc.html b/www/webrtc.html index ca6bbab0..39954bdc 100644 --- a/www/webrtc.html +++ b/www/webrtc.html @@ -37,7 +37,6 @@ pc.createOffer().then(offer => { pc.setLocalDescription(offer).then(() => { - console.log(offer.sdp); const msg = {type: 'webrtc/offer', value: pc.localDescription.sdp}; ws.send(JSON.stringify(msg)); }); @@ -82,14 +81,10 @@ video.srcObject = ev.streams[0]; } - // Safari don't support "offerToReceiveVideo" - // so need to create transeivers manually - pc.addTransceiver('video', {direction: 'recvonly'}); - pc.addTransceiver('audio', {direction: 'recvonly'}); - if (stream) { stream.getTracks().forEach(track => { - const sender = pc.addTrack(track, stream) + pc.addTransceiver('audio', {direction: 'sendonly'}); + const sender = pc.addTrack(track, stream); // track.stop(); // setTimeout(() => { // navigator.mediaDevices.getUserMedia({audio: true}).then(stream => { @@ -100,6 +95,11 @@ // }, 10000); }); } + + // Safari don't support "offerToReceiveVideo" + // so need to create transeivers manually + pc.addTransceiver('video', {direction: 'recvonly'}); + pc.addTransceiver('audio', {direction: 'recvonly'}); } if (navigator.mediaDevices) {