diff --git a/cmd/webrtc/client.go b/cmd/webrtc/client.go index 94cc6798..49726789 100644 --- a/cmd/webrtc/client.go +++ b/cmd/webrtc/client.go @@ -70,6 +70,7 @@ func asyncClient(url string) (streamer.Producer, error) { medias := []*streamer.Media{ {Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly}, {Kind: streamer.KindAudio, Direction: streamer.DirectionRecvonly}, + {Kind: streamer.KindAudio, Direction: streamer.DirectionSendonly}, } // 3. Create offer diff --git a/pkg/webrtc/client.go b/pkg/webrtc/client.go index d39a547d..985126ce 100644 --- a/pkg/webrtc/client.go +++ b/pkg/webrtc/client.go @@ -39,7 +39,10 @@ func (c *Conn) CreateCompleteOffer(medias []*streamer.Media) (string, error) { } func (c *Conn) SetAnswer(answer string) (err error) { - desc := webrtc.SessionDescription{SDP: answer, Type: webrtc.SDPTypeAnswer} + desc := webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: fakeFormatsInAnswer(c.pc.LocalDescription().SDP, answer), + } if err = c.pc.SetRemoteDescription(desc); err != nil { return } @@ -67,3 +70,68 @@ func (c *Conn) SetAnswer(answer string) (err error) { return nil } + +func fakeFormatsInAnswer(offer, answer string) string { + sd2 := &sdp.SessionDescription{} + if err := sd2.Unmarshal([]byte(answer)); err != nil { + return answer + } + + // check if answer has recvonly audio + var ok bool + for _, md2 := range sd2.MediaDescriptions { + if md2.MediaName.Media == "audio" { + if _, ok = md2.Attribute("recvonly"); ok { + break + } + } + } + if !ok { + return answer + } + + sd1 := &sdp.SessionDescription{} + if err := sd1.Unmarshal([]byte(offer)); err != nil { + return answer + } + + var formats []string + var attrs []sdp.Attribute + + for _, md1 := range sd1.MediaDescriptions { + if md1.MediaName.Media == "audio" { + for _, attr := range md1.Attributes { + switch attr.Key { + case "rtpmap", "fmtp", "rtcp-fb", "extmap": + attrs = append(attrs, attr) + } + } + + formats = md1.MediaName.Formats + break + } + } + + for _, md2 := range sd2.MediaDescriptions { + if md2.MediaName.Media == "audio" { + for _, attr := range md2.Attributes { + switch attr.Key { + case "rtpmap", "fmtp", "rtcp-fb", "extmap": + default: + attrs = append(attrs, attr) + } + } + + md2.MediaName.Formats = formats + md2.Attributes = attrs + break + } + } + + b, err := sd2.Marshal() + if err != nil { + return answer + } + + return string(b) +} diff --git a/pkg/webrtc/client_test.go b/pkg/webrtc/client_test.go new file mode 100644 index 00000000..5659285d --- /dev/null +++ b/pkg/webrtc/client_test.go @@ -0,0 +1,102 @@ +package webrtc + +import ( + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestClient(t *testing.T) { + api, err := NewAPI("") + require.Nil(t, err) + + pc, err := api.NewPeerConnection(webrtc.Configuration{}) + require.Nil(t, err) + + prod := NewConn(pc) + + medias := []*streamer.Media{ + {Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly}, + {Kind: streamer.KindAudio, Direction: streamer.DirectionRecvonly}, + {Kind: streamer.KindAudio, Direction: streamer.DirectionSendonly}, + } + + offer, err := prod.CreateOffer(medias) + require.Nil(t, err) + assert.NotEmpty(t, offer) + + require.Len(t, prod.pc.GetReceivers(), 2) + require.Len(t, prod.pc.GetSenders(), 1) + + answer := `v=0 +o=- 1934370540648269799 1678277622 IN IP4 0.0.0.0 +s=- +t=0 0 +a=fingerprint:sha-256 77:8C:9A:62:51:81:69:EA:4E:BE:93:6B:4E:DF:51:D2:2F:E3:DF:E7:F4:8A:18:1A:C0:74:FA:AE:B8:98:29:9B +a=extmap-allow-mixed +a=group:BUNDLE 0 1 2 +m=video 9 UDP/TLS/RTP/SAVPF 97 +c=IN IP4 0.0.0.0 +a=setup:active +a=mid:0 +a=ice-ufrag:xxx +a=ice-pwd:xxx +a=rtcp-mux +a=rtcp-rsize +a=rtpmap:97 H264/90000 +a=fmtp:97 packetization-mode=1;profile-level-id=42e01f +a=extmap:1 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01 +a=ssrc:2815449682 cname:go2rtc +a=ssrc:2815449682 msid:go2rtc video +a=ssrc:2815449682 mslabel:go2rtc +a=ssrc:2815449682 label:video +a=msid:go2rtc video +a=sendonly +m=audio 9 UDP/TLS/RTP/SAVPF 8 +c=IN IP4 0.0.0.0 +a=setup:active +a=mid:1 +a=ice-ufrag:xxx +a=ice-pwd:xxx +a=rtcp-mux +a=rtcp-rsize +a=rtpmap:8 PCMA/8000 +a=extmap:1 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01 +a=ssrc:1392166302 cname:go2rtc +a=ssrc:1392166302 msid:go2rtc audio +a=ssrc:1392166302 mslabel:go2rtc +a=ssrc:1392166302 label:audio +a=msid:go2rtc audio +a=sendonly +m=audio 9 UDP/TLS/RTP/SAVPF 0 +c=IN IP4 0.0.0.0 +a=setup:active +a=mid:2 +a=ice-ufrag:xxx +a=ice-pwd:xxx +a=rtcp-mux +a=rtcp-rsize +a=rtpmap:0 PCMU/8000 +a=extmap:1 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01 +a=recvonly +` + + err = prod.SetAnswer(answer) + require.Nil(t, err) + + sender := prod.pc.GetSenders()[0] + + caps := webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypePCMU, + ClockRate: 8000, + Channels: 0, + } + track := sender.Track() + track, err = webrtc.NewTrackLocalStaticRTP(caps, track.ID(), track.StreamID()) + require.Nil(t, err) + + err = sender.ReplaceTrack(track) + require.Nil(t, err) +} diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index ea76b25d..61440c1c 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -52,10 +52,9 @@ func NewConn(pc *webrtc.PeerConnection) *Conn { }) pc.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { - track := c.getTrack(remote) + track := c.getRecvTrack(remote) if track == nil { - println("ERROR: webrtc: can't find track") - return + return // it's OK when we not need, for example, audio from producer } for { @@ -104,25 +103,40 @@ func (c *Conn) AddCandidate(candidate string) error { return c.pc.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate}) } -func (c *Conn) getTrack(remote *webrtc.TrackRemote) *streamer.Track { +func (c *Conn) getRecvTrack(remote *webrtc.TrackRemote) *streamer.Track { payloadType := uint8(remote.PayloadType()) - // search existing track (two way audio) - for _, track := range c.tracks { - if track.Codec.PayloadType == payloadType { - return track - } - } - - // create new track (incoming WebRTC WHIP) - for _, media := range c.medias { - for _, codec := range media.Codecs { - if codec.PayloadType == payloadType { - track := streamer.NewTrack(media, codec) - c.tracks = append(c.tracks, track) + switch c.Mode { + // browser microphone (backchannel) + case streamer.ModePassiveConsumer: + for _, track := range c.tracks { + if track.Direction == streamer.DirectionRecvonly && track.Codec.PayloadType == payloadType { return track } } + + case streamer.ModeActiveProducer: + // remote track from WebRTC active producer (audio/video) + for _, track := range c.tracks { + if track.Direction == streamer.DirectionSendonly && track.Codec.PayloadType == payloadType { + return track + } + } + + case streamer.ModePassiveProducer: + // remote track from WebRTC passive producer (incoming WebRTC WHIP) + for _, media := range c.medias { + for _, codec := range media.Codecs { + if codec.PayloadType == payloadType { + track := streamer.NewTrack(media, codec) + c.tracks = append(c.tracks, track) + return track + } + } + } + + default: + panic("not implemented") } return nil diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index f4d12b77..b9ebb278 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -14,101 +14,112 @@ func (c *Conn) GetMedias() []*streamer.Media { } func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { - switch track.Direction { - // send our track to WebRTC consumer - case streamer.DirectionSendonly: - codec := track.Codec + switch c.Mode { + case streamer.ModePassiveConsumer: + switch track.Direction { + case streamer.DirectionSendonly: + // send our track to WebRTC consumer + return c.addConsumerSendTrack(track) - // webrtc.codecParametersFuzzySearch - caps := webrtc.RTPCodecCapability{ - MimeType: MimeType(codec), - Channels: codec.Channels, - ClockRate: codec.ClockRate, + case streamer.DirectionRecvonly: + // receive track from WebRTC consumer (microphone, backchannel, two way audio) + return c.addConsumerRecvTrack(track) } - - if codec.Name == streamer.CodecH264 { - // don't know if this really neccessary - // I have tested multiple browsers and H264 profile has no effect on anything - caps.SDPFmtpLine = "packetization-mode=1;profile-level-id=42e01f" - } - - // important to use same streamID so JS will automatically - // join two tracks as one source/stream - trackLocal, err := webrtc.NewTrackLocalStaticRTP( - caps, caps.MimeType[:5], "go2rtc", - ) - if err != nil { - return nil - } - - init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly} - tr, err := c.pc.AddTransceiverFromTrack(trackLocal, init) - if err != nil { - return nil - } - - codecs := []webrtc.RTPCodecParameters{{RTPCodecCapability: caps}} - if err = tr.SetCodecPreferences(codecs); err != nil { - return nil - } - - push := func(packet *rtp.Packet) error { - c.send += packet.MarshalSize() - return trackLocal.WriteRTP(packet) - } - - switch codec.Name { - case streamer.CodecH264: - wrapper := h264.RTPPay(1200) - push = wrapper(push) - - if codec.IsRTP() { - wrapper = h264.RTPDepay(track) - } else { - wrapper = h264.RepairAVC(track) - } - push = wrapper(push) - - case streamer.CodecH265: - // SafariPay because it is the only browser in the world - // that supports WebRTC + H265 - wrapper := h265.SafariPay(1200) - push = wrapper(push) - - wrapper = h265.RTPDepay(track) - push = wrapper(push) - } - - track = track.Bind(push) - c.tracks = append(c.tracks, track) - return track - - // receive track from WebRTC consumer (microphone, backchannel, two way audio) - case streamer.DirectionRecvonly: - caps := webrtc.RTPCodecCapability{ - MimeType: MimeType(track.Codec), - ClockRate: track.Codec.ClockRate, - Channels: track.Codec.Channels, - } - - init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly} - tr, err := c.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, init) - if err != nil { - return nil - } - - codecs := []webrtc.RTPCodecParameters{ - {RTPCodecCapability: caps, PayloadType: webrtc.PayloadType(track.Codec.PayloadType)}, - } - if err = tr.SetCodecPreferences(codecs); err != nil { - return nil - } - - c.tracks = append(c.tracks, track) - return track } - panic("wrong direction") + panic("not implemented") +} + +func (c *Conn) addConsumerSendTrack(track *streamer.Track) *streamer.Track { + codec := track.Codec + + // webrtc.codecParametersFuzzySearch + caps := webrtc.RTPCodecCapability{ + MimeType: MimeType(codec), + Channels: codec.Channels, + ClockRate: codec.ClockRate, + } + + if codec.Name == streamer.CodecH264 { + // don't know if this really neccessary + // I have tested multiple browsers and H264 profile has no effect on anything + caps.SDPFmtpLine = "packetization-mode=1;profile-level-id=42e01f" + } + + // important to use same streamID so JS will automatically + // join two tracks as one source/stream + trackLocal, err := webrtc.NewTrackLocalStaticRTP( + caps, caps.MimeType[:5], "go2rtc", + ) + if err != nil { + return nil + } + + init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly} + tr, err := c.pc.AddTransceiverFromTrack(trackLocal, init) + if err != nil { + return nil + } + + codecs := []webrtc.RTPCodecParameters{{RTPCodecCapability: caps}} + if err = tr.SetCodecPreferences(codecs); err != nil { + return nil + } + + push := func(packet *rtp.Packet) error { + c.send += packet.MarshalSize() + return trackLocal.WriteRTP(packet) + } + + switch codec.Name { + case streamer.CodecH264: + wrapper := h264.RTPPay(1200) + push = wrapper(push) + + if codec.IsRTP() { + wrapper = h264.RTPDepay(track) + } else { + wrapper = h264.RepairAVC(track) + } + push = wrapper(push) + + case streamer.CodecH265: + // SafariPay because it is the only browser in the world + // that supports WebRTC + H265 + wrapper := h265.SafariPay(1200) + push = wrapper(push) + + wrapper = h265.RTPDepay(track) + push = wrapper(push) + } + + track = track.Bind(push) + c.tracks = append(c.tracks, track) + return track +} + +func (c *Conn) addConsumerRecvTrack(track *streamer.Track) *streamer.Track { + caps := webrtc.RTPCodecCapability{ + MimeType: MimeType(track.Codec), + ClockRate: track.Codec.ClockRate, + Channels: track.Codec.Channels, + } + + init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly} + tr, err := c.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, init) + if err != nil { + return nil + } + + codecs := []webrtc.RTPCodecParameters{ + {RTPCodecCapability: caps, PayloadType: webrtc.PayloadType(track.Codec.PayloadType)}, + } + if err = tr.SetCodecPreferences(codecs); err != nil { + return nil + } + + c.tracks = append(c.tracks, track) + return track } func (c *Conn) MarshalJSON() ([]byte, error) { diff --git a/pkg/webrtc/producer.go b/pkg/webrtc/producer.go index c382e061..c9b7e486 100644 --- a/pkg/webrtc/producer.go +++ b/pkg/webrtc/producer.go @@ -2,18 +2,43 @@ package webrtc import ( "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/pion/rtp" + "github.com/pion/webrtc/v3" ) func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { - for _, track := range c.tracks { - if track.Codec == codec { - return track + switch c.Mode { + case streamer.ModeActiveProducer: + // active producer (webrtc source, webtorrent source): + // - creates empty track for remote sendonly media + // - bind go2rtc with pion track for remote recv media (backchannel) + for _, track := range c.tracks { + if track.Codec == codec { + return track + } } + + var track *streamer.Track + if media.Direction == streamer.DirectionSendonly { + track = streamer.NewTrack(media, codec) + } else { + track = c.getProducerSendTrack(media, codec) + } + + c.tracks = append(c.tracks, track) + return track + + case streamer.ModePassiveProducer: + // passive producer (WHIP) + for _, track := range c.tracks { + if track.Codec == codec { + return track + } + } + return nil } - track := streamer.NewTrack(media, codec) - c.tracks = append(c.tracks, track) - return track + panic("not implemented") } func (c *Conn) Start() error { @@ -24,3 +49,96 @@ func (c *Conn) Start() error { func (c *Conn) Stop() error { return c.pc.Close() } + +func (c *Conn) getProducerSendTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { + tr := c.getTranseiver(media.MID) + if tr == nil { + return nil + } + + sender := tr.Sender() + if sender == nil { + return nil + } + + oldTrack := sender.Track() + track := &Track{ + kind: media.Kind, + payloadType: codec.PayloadType, + + id: oldTrack.ID(), + rid: oldTrack.RID(), + streamID: oldTrack.StreamID(), + } + + if err := sender.ReplaceTrack(track); err != nil { + return nil + } + + push := func(packet *rtp.Packet) error { + c.send += packet.MarshalSize() + return track.WriteRTP(packet) + } + + return streamer.NewTrack(media, codec).Bind(push) +} + +func (c *Conn) getTranseiver(mid string) *webrtc.RTPTransceiver { + for _, tr := range c.pc.GetTransceivers() { + if tr.Mid() == mid { + return tr + } + } + return nil +} + +type Track struct { + kind string + id string + rid string + streamID string + payloadType byte + ssrc uint32 + writer webrtc.TrackLocalWriter +} + +func (t *Track) Bind(context webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) { + t.ssrc = uint32(context.SSRC()) + t.writer = context.WriteStream() + + for _, parameters := range context.CodecParameters() { + if byte(parameters.PayloadType) == t.payloadType { + return parameters, nil + } + } + + return webrtc.RTPCodecParameters{}, nil +} + +func (t *Track) Unbind(context webrtc.TrackLocalContext) error { + return nil +} + +func (t *Track) ID() string { + return t.id +} + +func (t *Track) RID() string { + return t.rid +} + +func (t *Track) StreamID() string { + return t.streamID +} + +func (t *Track) Kind() webrtc.RTPCodecType { + return webrtc.NewRTPCodecType(t.kind) +} + +func (t *Track) WriteRTP(packet *rtp.Packet) error { + header := packet.Header + header.SSRC = t.ssrc + header.PayloadType = t.payloadType + _, err := t.writer.WriteRTP(&header, packet.Payload) + return err +}