Rewrite WebRTC producer/consumer tracks handlers

This commit is contained in:
Alexey Khit
2023-03-09 14:28:13 +03:00
parent 77842643c8
commit 3d34854387
6 changed files with 429 additions and 115 deletions
+1
View File
@@ -70,6 +70,7 @@ func asyncClient(url string) (streamer.Producer, error) {
medias := []*streamer.Media{
{Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly},
{Kind: streamer.KindAudio, Direction: streamer.DirectionRecvonly},
{Kind: streamer.KindAudio, Direction: streamer.DirectionSendonly},
}
// 3. Create offer
+69 -1
View File
@@ -39,7 +39,10 @@ func (c *Conn) CreateCompleteOffer(medias []*streamer.Media) (string, error) {
}
func (c *Conn) SetAnswer(answer string) (err error) {
desc := webrtc.SessionDescription{SDP: answer, Type: webrtc.SDPTypeAnswer}
desc := webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: fakeFormatsInAnswer(c.pc.LocalDescription().SDP, answer),
}
if err = c.pc.SetRemoteDescription(desc); err != nil {
return
}
@@ -67,3 +70,68 @@ func (c *Conn) SetAnswer(answer string) (err error) {
return nil
}
func fakeFormatsInAnswer(offer, answer string) string {
sd2 := &sdp.SessionDescription{}
if err := sd2.Unmarshal([]byte(answer)); err != nil {
return answer
}
// check if answer has recvonly audio
var ok bool
for _, md2 := range sd2.MediaDescriptions {
if md2.MediaName.Media == "audio" {
if _, ok = md2.Attribute("recvonly"); ok {
break
}
}
}
if !ok {
return answer
}
sd1 := &sdp.SessionDescription{}
if err := sd1.Unmarshal([]byte(offer)); err != nil {
return answer
}
var formats []string
var attrs []sdp.Attribute
for _, md1 := range sd1.MediaDescriptions {
if md1.MediaName.Media == "audio" {
for _, attr := range md1.Attributes {
switch attr.Key {
case "rtpmap", "fmtp", "rtcp-fb", "extmap":
attrs = append(attrs, attr)
}
}
formats = md1.MediaName.Formats
break
}
}
for _, md2 := range sd2.MediaDescriptions {
if md2.MediaName.Media == "audio" {
for _, attr := range md2.Attributes {
switch attr.Key {
case "rtpmap", "fmtp", "rtcp-fb", "extmap":
default:
attrs = append(attrs, attr)
}
}
md2.MediaName.Formats = formats
md2.Attributes = attrs
break
}
}
b, err := sd2.Marshal()
if err != nil {
return answer
}
return string(b)
}
+102
View File
@@ -0,0 +1,102 @@
package webrtc
import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)
func TestClient(t *testing.T) {
api, err := NewAPI("")
require.Nil(t, err)
pc, err := api.NewPeerConnection(webrtc.Configuration{})
require.Nil(t, err)
prod := NewConn(pc)
medias := []*streamer.Media{
{Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly},
{Kind: streamer.KindAudio, Direction: streamer.DirectionRecvonly},
{Kind: streamer.KindAudio, Direction: streamer.DirectionSendonly},
}
offer, err := prod.CreateOffer(medias)
require.Nil(t, err)
assert.NotEmpty(t, offer)
require.Len(t, prod.pc.GetReceivers(), 2)
require.Len(t, prod.pc.GetSenders(), 1)
answer := `v=0
o=- 1934370540648269799 1678277622 IN IP4 0.0.0.0
s=-
t=0 0
a=fingerprint:sha-256 77:8C:9A:62:51:81:69:EA:4E:BE:93:6B:4E:DF:51:D2:2F:E3:DF:E7:F4:8A:18:1A:C0:74:FA:AE:B8:98:29:9B
a=extmap-allow-mixed
a=group:BUNDLE 0 1 2
m=video 9 UDP/TLS/RTP/SAVPF 97
c=IN IP4 0.0.0.0
a=setup:active
a=mid:0
a=ice-ufrag:xxx
a=ice-pwd:xxx
a=rtcp-mux
a=rtcp-rsize
a=rtpmap:97 H264/90000
a=fmtp:97 packetization-mode=1;profile-level-id=42e01f
a=extmap:1 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01
a=ssrc:2815449682 cname:go2rtc
a=ssrc:2815449682 msid:go2rtc video
a=ssrc:2815449682 mslabel:go2rtc
a=ssrc:2815449682 label:video
a=msid:go2rtc video
a=sendonly
m=audio 9 UDP/TLS/RTP/SAVPF 8
c=IN IP4 0.0.0.0
a=setup:active
a=mid:1
a=ice-ufrag:xxx
a=ice-pwd:xxx
a=rtcp-mux
a=rtcp-rsize
a=rtpmap:8 PCMA/8000
a=extmap:1 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01
a=ssrc:1392166302 cname:go2rtc
a=ssrc:1392166302 msid:go2rtc audio
a=ssrc:1392166302 mslabel:go2rtc
a=ssrc:1392166302 label:audio
a=msid:go2rtc audio
a=sendonly
m=audio 9 UDP/TLS/RTP/SAVPF 0
c=IN IP4 0.0.0.0
a=setup:active
a=mid:2
a=ice-ufrag:xxx
a=ice-pwd:xxx
a=rtcp-mux
a=rtcp-rsize
a=rtpmap:0 PCMU/8000
a=extmap:1 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01
a=recvonly
`
err = prod.SetAnswer(answer)
require.Nil(t, err)
sender := prod.pc.GetSenders()[0]
caps := webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMU,
ClockRate: 8000,
Channels: 0,
}
track := sender.Track()
track, err = webrtc.NewTrackLocalStaticRTP(caps, track.ID(), track.StreamID())
require.Nil(t, err)
err = sender.ReplaceTrack(track)
require.Nil(t, err)
}
+31 -17
View File
@@ -52,10 +52,9 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
})
pc.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
track := c.getTrack(remote)
track := c.getRecvTrack(remote)
if track == nil {
println("ERROR: webrtc: can't find track")
return
return // it's OK when we not need, for example, audio from producer
}
for {
@@ -104,25 +103,40 @@ func (c *Conn) AddCandidate(candidate string) error {
return c.pc.AddICECandidate(webrtc.ICECandidateInit{Candidate: candidate})
}
func (c *Conn) getTrack(remote *webrtc.TrackRemote) *streamer.Track {
func (c *Conn) getRecvTrack(remote *webrtc.TrackRemote) *streamer.Track {
payloadType := uint8(remote.PayloadType())
// search existing track (two way audio)
for _, track := range c.tracks {
if track.Codec.PayloadType == payloadType {
return track
}
}
// create new track (incoming WebRTC WHIP)
for _, media := range c.medias {
for _, codec := range media.Codecs {
if codec.PayloadType == payloadType {
track := streamer.NewTrack(media, codec)
c.tracks = append(c.tracks, track)
switch c.Mode {
// browser microphone (backchannel)
case streamer.ModePassiveConsumer:
for _, track := range c.tracks {
if track.Direction == streamer.DirectionRecvonly && track.Codec.PayloadType == payloadType {
return track
}
}
case streamer.ModeActiveProducer:
// remote track from WebRTC active producer (audio/video)
for _, track := range c.tracks {
if track.Direction == streamer.DirectionSendonly && track.Codec.PayloadType == payloadType {
return track
}
}
case streamer.ModePassiveProducer:
// remote track from WebRTC passive producer (incoming WebRTC WHIP)
for _, media := range c.medias {
for _, codec := range media.Codecs {
if codec.PayloadType == payloadType {
track := streamer.NewTrack(media, codec)
c.tracks = append(c.tracks, track)
return track
}
}
}
default:
panic("not implemented")
}
return nil
+102 -91
View File
@@ -14,101 +14,112 @@ func (c *Conn) GetMedias() []*streamer.Media {
}
func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track {
switch track.Direction {
// send our track to WebRTC consumer
case streamer.DirectionSendonly:
codec := track.Codec
switch c.Mode {
case streamer.ModePassiveConsumer:
switch track.Direction {
case streamer.DirectionSendonly:
// send our track to WebRTC consumer
return c.addConsumerSendTrack(track)
// webrtc.codecParametersFuzzySearch
caps := webrtc.RTPCodecCapability{
MimeType: MimeType(codec),
Channels: codec.Channels,
ClockRate: codec.ClockRate,
case streamer.DirectionRecvonly:
// receive track from WebRTC consumer (microphone, backchannel, two way audio)
return c.addConsumerRecvTrack(track)
}
if codec.Name == streamer.CodecH264 {
// don't know if this really neccessary
// I have tested multiple browsers and H264 profile has no effect on anything
caps.SDPFmtpLine = "packetization-mode=1;profile-level-id=42e01f"
}
// important to use same streamID so JS will automatically
// join two tracks as one source/stream
trackLocal, err := webrtc.NewTrackLocalStaticRTP(
caps, caps.MimeType[:5], "go2rtc",
)
if err != nil {
return nil
}
init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}
tr, err := c.pc.AddTransceiverFromTrack(trackLocal, init)
if err != nil {
return nil
}
codecs := []webrtc.RTPCodecParameters{{RTPCodecCapability: caps}}
if err = tr.SetCodecPreferences(codecs); err != nil {
return nil
}
push := func(packet *rtp.Packet) error {
c.send += packet.MarshalSize()
return trackLocal.WriteRTP(packet)
}
switch codec.Name {
case streamer.CodecH264:
wrapper := h264.RTPPay(1200)
push = wrapper(push)
if codec.IsRTP() {
wrapper = h264.RTPDepay(track)
} else {
wrapper = h264.RepairAVC(track)
}
push = wrapper(push)
case streamer.CodecH265:
// SafariPay because it is the only browser in the world
// that supports WebRTC + H265
wrapper := h265.SafariPay(1200)
push = wrapper(push)
wrapper = h265.RTPDepay(track)
push = wrapper(push)
}
track = track.Bind(push)
c.tracks = append(c.tracks, track)
return track
// receive track from WebRTC consumer (microphone, backchannel, two way audio)
case streamer.DirectionRecvonly:
caps := webrtc.RTPCodecCapability{
MimeType: MimeType(track.Codec),
ClockRate: track.Codec.ClockRate,
Channels: track.Codec.Channels,
}
init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}
tr, err := c.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, init)
if err != nil {
return nil
}
codecs := []webrtc.RTPCodecParameters{
{RTPCodecCapability: caps, PayloadType: webrtc.PayloadType(track.Codec.PayloadType)},
}
if err = tr.SetCodecPreferences(codecs); err != nil {
return nil
}
c.tracks = append(c.tracks, track)
return track
}
panic("wrong direction")
panic("not implemented")
}
func (c *Conn) addConsumerSendTrack(track *streamer.Track) *streamer.Track {
codec := track.Codec
// webrtc.codecParametersFuzzySearch
caps := webrtc.RTPCodecCapability{
MimeType: MimeType(codec),
Channels: codec.Channels,
ClockRate: codec.ClockRate,
}
if codec.Name == streamer.CodecH264 {
// don't know if this really neccessary
// I have tested multiple browsers and H264 profile has no effect on anything
caps.SDPFmtpLine = "packetization-mode=1;profile-level-id=42e01f"
}
// important to use same streamID so JS will automatically
// join two tracks as one source/stream
trackLocal, err := webrtc.NewTrackLocalStaticRTP(
caps, caps.MimeType[:5], "go2rtc",
)
if err != nil {
return nil
}
init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}
tr, err := c.pc.AddTransceiverFromTrack(trackLocal, init)
if err != nil {
return nil
}
codecs := []webrtc.RTPCodecParameters{{RTPCodecCapability: caps}}
if err = tr.SetCodecPreferences(codecs); err != nil {
return nil
}
push := func(packet *rtp.Packet) error {
c.send += packet.MarshalSize()
return trackLocal.WriteRTP(packet)
}
switch codec.Name {
case streamer.CodecH264:
wrapper := h264.RTPPay(1200)
push = wrapper(push)
if codec.IsRTP() {
wrapper = h264.RTPDepay(track)
} else {
wrapper = h264.RepairAVC(track)
}
push = wrapper(push)
case streamer.CodecH265:
// SafariPay because it is the only browser in the world
// that supports WebRTC + H265
wrapper := h265.SafariPay(1200)
push = wrapper(push)
wrapper = h265.RTPDepay(track)
push = wrapper(push)
}
track = track.Bind(push)
c.tracks = append(c.tracks, track)
return track
}
func (c *Conn) addConsumerRecvTrack(track *streamer.Track) *streamer.Track {
caps := webrtc.RTPCodecCapability{
MimeType: MimeType(track.Codec),
ClockRate: track.Codec.ClockRate,
Channels: track.Codec.Channels,
}
init := webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}
tr, err := c.pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, init)
if err != nil {
return nil
}
codecs := []webrtc.RTPCodecParameters{
{RTPCodecCapability: caps, PayloadType: webrtc.PayloadType(track.Codec.PayloadType)},
}
if err = tr.SetCodecPreferences(codecs); err != nil {
return nil
}
c.tracks = append(c.tracks, track)
return track
}
func (c *Conn) MarshalJSON() ([]byte, error) {
+124 -6
View File
@@ -2,18 +2,43 @@ package webrtc
import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
)
func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
for _, track := range c.tracks {
if track.Codec == codec {
return track
switch c.Mode {
case streamer.ModeActiveProducer:
// active producer (webrtc source, webtorrent source):
// - creates empty track for remote sendonly media
// - bind go2rtc with pion track for remote recv media (backchannel)
for _, track := range c.tracks {
if track.Codec == codec {
return track
}
}
var track *streamer.Track
if media.Direction == streamer.DirectionSendonly {
track = streamer.NewTrack(media, codec)
} else {
track = c.getProducerSendTrack(media, codec)
}
c.tracks = append(c.tracks, track)
return track
case streamer.ModePassiveProducer:
// passive producer (WHIP)
for _, track := range c.tracks {
if track.Codec == codec {
return track
}
}
return nil
}
track := streamer.NewTrack(media, codec)
c.tracks = append(c.tracks, track)
return track
panic("not implemented")
}
func (c *Conn) Start() error {
@@ -24,3 +49,96 @@ func (c *Conn) Start() error {
func (c *Conn) Stop() error {
return c.pc.Close()
}
func (c *Conn) getProducerSendTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
tr := c.getTranseiver(media.MID)
if tr == nil {
return nil
}
sender := tr.Sender()
if sender == nil {
return nil
}
oldTrack := sender.Track()
track := &Track{
kind: media.Kind,
payloadType: codec.PayloadType,
id: oldTrack.ID(),
rid: oldTrack.RID(),
streamID: oldTrack.StreamID(),
}
if err := sender.ReplaceTrack(track); err != nil {
return nil
}
push := func(packet *rtp.Packet) error {
c.send += packet.MarshalSize()
return track.WriteRTP(packet)
}
return streamer.NewTrack(media, codec).Bind(push)
}
func (c *Conn) getTranseiver(mid string) *webrtc.RTPTransceiver {
for _, tr := range c.pc.GetTransceivers() {
if tr.Mid() == mid {
return tr
}
}
return nil
}
type Track struct {
kind string
id string
rid string
streamID string
payloadType byte
ssrc uint32
writer webrtc.TrackLocalWriter
}
func (t *Track) Bind(context webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error) {
t.ssrc = uint32(context.SSRC())
t.writer = context.WriteStream()
for _, parameters := range context.CodecParameters() {
if byte(parameters.PayloadType) == t.payloadType {
return parameters, nil
}
}
return webrtc.RTPCodecParameters{}, nil
}
func (t *Track) Unbind(context webrtc.TrackLocalContext) error {
return nil
}
func (t *Track) ID() string {
return t.id
}
func (t *Track) RID() string {
return t.rid
}
func (t *Track) StreamID() string {
return t.streamID
}
func (t *Track) Kind() webrtc.RTPCodecType {
return webrtc.NewRTPCodecType(t.kind)
}
func (t *Track) WriteRTP(packet *rtp.Packet) error {
header := packet.Header
header.SSRC = t.ssrc
header.PayloadType = t.payloadType
_, err := t.writer.WriteRTP(&header, packet.Payload)
return err
}