diff --git a/pkg/alsa/playback_linux.go b/pkg/alsa/playback_linux.go index 80c41890..9c517530 100644 --- a/pkg/alsa/playback_linux.go +++ b/pkg/alsa/playback_linux.go @@ -42,23 +42,23 @@ func (p *Playback) GetTrack(media *core.Media, codec *core.Codec) (*core.Receive } func (p *Playback) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - in := track.Codec + src := track.Codec // support probe - if in.Name == core.CodecAny { - in = &core.Codec{ + if src.Name == core.CodecAny { + src = &core.Codec{ Name: core.CodecPCML, ClockRate: 16000, Channels: 2, } } - out := &core.Codec{ + dst := &core.Codec{ Name: core.CodecPCML, - ClockRate: in.ClockRate, + ClockRate: src.ClockRate, Channels: 2, } - sender := core.NewSender(media, out) + sender := core.NewSender(media, dst) sender.Handler = func(pkt *rtp.Packet) { if n, err := p.dev.Write(pkt.Payload); err == nil { @@ -66,15 +66,15 @@ func (p *Playback) AddTrack(media *core.Media, codec *core.Codec, track *core.Re } } - if sender.Handler = pcm.Convert(in, out, sender.Handler); sender.Handler == nil { - return fmt.Errorf("alsa: can't convert %s to %s", in, out) + if sender.Handler = pcm.TranscodeHandler(dst, src, sender.Handler); sender.Handler == nil { + return fmt.Errorf("alsa: can't convert %s to %s", src, dst) } // typical card support: // - Formats: S16_LE, S32_LE // - ClockRates: 8000 - 192000 // - Channels: 2 - 10 - err := p.dev.SetHWParams(device.SNDRV_PCM_FORMAT_S16_LE, out.ClockRate, 2) + err := p.dev.SetHWParams(device.SNDRV_PCM_FORMAT_S16_LE, dst.ClockRate, 2) if err != nil { return err } diff --git a/pkg/pcm/handlers.go b/pkg/pcm/handlers.go new file mode 100644 index 00000000..39075199 --- /dev/null +++ b/pkg/pcm/handlers.go @@ -0,0 +1,99 @@ +package pcm + +import ( + "sync" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +// RepackG711 - Repack G.711 PCMA/PCMU into frames of size 1024 +// 1. Fixes WebRTC audio quality issue (monotonic timestamp) +// 2. Fixes Reolink Doorbell backchannel issue (zero timestamp) +// https://github.com/AlexxIT/go2rtc/issues/331 +func RepackG711(zeroTS bool, handler core.HandlerFunc) core.HandlerFunc { + const PacketSize = 1024 + + var buf []byte + var seq uint16 + var ts uint32 + + // fix https://github.com/AlexxIT/go2rtc/issues/432 + var mu sync.Mutex + + return func(packet *rtp.Packet) { + mu.Lock() + + buf = append(buf, packet.Payload...) + if len(buf) < PacketSize { + mu.Unlock() + return + } + + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, // should be true + PayloadType: packet.PayloadType, // will be owerwriten + SequenceNumber: seq, + SSRC: packet.SSRC, + }, + Payload: buf[:PacketSize], + } + + seq++ + + // don't know if zero TS important for Reolink Doorbell + // don't have this strange devices for tests + if !zeroTS { + pkt.Timestamp = ts + ts += PacketSize + } + + buf = buf[PacketSize:] + + mu.Unlock() + + handler(pkt) + } +} + +// LittleToBig - convert PCM little endian to PCM big endian +func LittleToBig(handler core.HandlerFunc) core.HandlerFunc { + return func(packet *rtp.Packet) { + clone := *packet + clone.Payload = FlipEndian(packet.Payload) + handler(&clone) + } +} + +func TranscodeHandler(dst, src *core.Codec, handler core.HandlerFunc) core.HandlerFunc { + var ts uint32 + k := float32(BytesPerFrame(dst)) / float32(BytesPerFrame(src)) + f := Transcode(dst, src) + + return func(packet *rtp.Packet) { + ts += uint32(k * float32(len(packet.Payload))) + + clone := *packet + clone.Payload = f(packet.Payload) + clone.Timestamp = ts + handler(&clone) + } +} + +func BytesPerFrame(codec *core.Codec) byte { + channels := byte(codec.Channels) + if channels == 0 { + channels = 1 + } + + switch codec.Name { + case core.CodecPCML, core.CodecPCM: + return 2 * channels + case core.CodecPCMU, core.CodecPCMA: + return channels + } + + return 0 +} diff --git a/pkg/pcm/pcm.go b/pkg/pcm/pcm.go index a7556d16..6872c503 100644 --- a/pkg/pcm/pcm.go +++ b/pkg/pcm/pcm.go @@ -1,277 +1,187 @@ package pcm -import ( - "sync" +import "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/pion/rtp" -) - -// ResampleToG711 - convert PCMA/PCM/PCML to PCMA and PCMU to PCMU with decreasing sample rate -func ResampleToG711(codec *core.Codec, sampleRate uint32, handler core.HandlerFunc) core.HandlerFunc { - n := float32(codec.ClockRate) / float32(sampleRate) - - if codec.Channels == 2 { - n *= 2 // hacky way for support two channels audio - } - - switch codec.Name { - case core.CodecPCMA: - return DownsampleByte(PCMAtoPCM, PCMtoPCMA, n, handler) - case core.CodecPCMU: - return DownsampleByte(PCMUtoPCM, PCMtoPCMU, n, handler) - case core.CodecPCM, core.CodecPCML: - if n == 1 { - handler = ResamplePCM(PCMtoPCMA, handler) - } else { - handler = DownsamplePCM(PCMtoPCMA, n, handler) - } - - if codec.Name == core.CodecPCML { - return LittleToBig(handler) - } - - return handler - } - - panic(core.Caller()) -} - -// DownsampleByte - convert PCMA/PCMU to PCMA/PCMU with decreasing sample rate (N times) -func DownsampleByte( - toPCM func(byte) int16, fromPCM func(int16) byte, n float32, handler core.HandlerFunc, -) core.HandlerFunc { +func Downsample(k float32) func([]int16) []int16 { var sampleN, sampleSum float32 - var ts uint32 - - return func(packet *rtp.Packet) { - samples := len(packet.Payload) - newLen := uint32((float32(samples) + sampleN) / n) - - oldSamples := packet.Payload - newSamples := make([]byte, newLen) + return func(src []int16) (dst []int16) { var i int - for _, sample := range oldSamples { - sampleSum += float32(toPCM(sample)) - if sampleN++; sampleN >= n { - newSamples[i] = fromPCM(int16(sampleSum / n)) + dst = make([]int16, int((float32(len(src))+sampleN)/k)) + for _, sample := range src { + sampleSum += float32(sample) + sampleN++ + if sampleN >= k { + dst[i] = int16(sampleSum / k) i++ sampleSum = 0 - sampleN -= n + sampleN -= k } } - - ts += newLen - - clone := *packet - clone.Payload = newSamples - clone.Timestamp = ts - handler(&clone) + return } } -// LittleToBig - conver PCM little endian to PCM big endian -func LittleToBig(handler core.HandlerFunc) core.HandlerFunc { - return func(packet *rtp.Packet) { - size := len(packet.Payload) - b := make([]byte, size) - for i := 0; i < size; i += 2 { - b[i] = packet.Payload[i+1] - b[i+1] = packet.Payload[i] - } +func Upsample(k float32) func([]int16) []int16 { + var sampleN float32 - clone := *packet - clone.Payload = b - handler(&clone) - } -} + return func(src []int16) (dst []int16) { + var i int + dst = make([]int16, int(k*float32(len(src)))) + for _, sample := range src { + sampleN += k + for sampleN > 0 { + dst[i] = sample + i++ -// ResamplePCM - convert PCM to PCMA/PCMU with same sample rate -func ResamplePCM(fromPCM func(int16) byte, handler core.HandlerFunc) core.HandlerFunc { - var ts uint32 - - return func(packet *rtp.Packet) { - len1 := len(packet.Payload) - len2 := len1 / 2 - - oldSamples := packet.Payload - newSamples := make([]byte, len2) - - var i2 int - for i1 := 0; i1 < len1; i1 += 2 { - sample := int16(uint16(oldSamples[i1])<<8 | uint16(oldSamples[i1+1])) - newSamples[i2] = fromPCM(sample) - i2++ - } - - ts += uint32(len2) - - clone := *packet - clone.Payload = newSamples - clone.Timestamp = ts - handler(&clone) - } -} - -// DownsamplePCM - convert PCM to PCMA/PCMU with decreasing sample rate (N times) -func DownsamplePCM(fromPCM func(int16) byte, n float32, handler core.HandlerFunc) core.HandlerFunc { - var sampleN, sampleSum float32 - var ts uint32 - - return func(packet *rtp.Packet) { - samples := len(packet.Payload) / 2 - newLen := uint32((float32(samples) + sampleN) / n) - - oldSamples := packet.Payload - newSamples := make([]byte, newLen) - - var i2 int - for i1 := 0; i1 < len(packet.Payload); i1 += 2 { - sampleSum += float32(int16(uint16(oldSamples[i1])<<8 | uint16(oldSamples[i1+1]))) - if sampleN++; sampleN >= n { - newSamples[i2] = fromPCM(int16(sampleSum / n)) - i2++ - - sampleSum = 0 - sampleN -= n + sampleN -= 1 } } - - ts += newLen - - clone := *packet - clone.Payload = newSamples - clone.Timestamp = ts - handler(&clone) + return } } -// RepackG711 - Repack G.711 PCMA/PCMU into frames of size 1024 -// 1. Fixes WebRTC audio quality issue (monotonic timestamp) -// 2. Fixes Reolink Doorbell backchannel issue (zero timestamp) -// https://github.com/AlexxIT/go2rtc/issues/331 -func RepackG711(zeroTS bool, handler core.HandlerFunc) core.HandlerFunc { - const PacketSize = 1024 +func FlipEndian(src []byte) (dst []byte) { + var i, j int + n := len(src) + dst = make([]byte, n) + for i < n { + x := src[i] + i++ + dst[j] = src[i] + j++ + i++ + dst[j] = x + j++ + } + return +} - var buf []byte - var seq uint16 - var ts uint32 +func Transcode(dst, src *core.Codec) func([]byte) []byte { + var reader func([]byte) []int16 + var writer func([]int16) []byte + var filters []func([]int16) []int16 - // fix https://github.com/AlexxIT/go2rtc/issues/432 - var mu sync.Mutex - - return func(packet *rtp.Packet) { - mu.Lock() - - buf = append(buf, packet.Payload...) - if len(buf) < PacketSize { - mu.Unlock() + switch src.Name { + case core.CodecPCML: + reader = func(src []byte) (dst []int16) { + var i, j int + n := len(src) + dst = make([]int16, n/2) + for i < n { + lo := src[i] + i++ + hi := src[i] + i++ + dst[j] = int16(hi)<<8 | int16(lo) + j++ + } return } - - pkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, // should be true - PayloadType: packet.PayloadType, // will be owerwriten - SequenceNumber: seq, - SSRC: packet.SSRC, - }, - Payload: buf[:PacketSize], - } - - seq++ - - // don't know if zero TS important for Reolink Doorbell - // don't have this strange devices for tests - if !zeroTS { - pkt.Timestamp = ts - ts += PacketSize - } - - buf = buf[PacketSize:] - - mu.Unlock() - - handler(pkt) - } -} - -func Convert(in, out *core.Codec, handler core.HandlerFunc) core.HandlerFunc { - if in.Name == out.Name && in.Channels == out.Channels && in.ClockRate == out.ClockRate { - return handler - } - - switch { - case in.Name == core.CodecPCML && in.Channels <= 1 && - out.Name == core.CodecPCML && out.Channels == 2: - return func(pkt *core.Packet) { - n := len(pkt.Payload) - payload := make([]byte, 2*n) - for i, j := 0, 0; i < n; { - hi := pkt.Payload[i] + case core.CodecPCM: + reader = func(src []byte) (dst []int16) { + var i, j int + n := len(src) + dst = make([]int16, n/2) + for i < n { + hi := src[i] i++ - lo := pkt.Payload[i] + lo := src[i] i++ - payload[j] = hi - j++ - payload[j] = lo - j++ - payload[j] = hi - j++ - payload[j] = lo + dst[j] = int16(hi)<<8 | int16(lo) j++ } - pkt.Payload = payload - handler(pkt) + return } - - case in.Name == core.CodecPCM && in.Channels <= 1 && - out.Name == core.CodecPCML && out.Channels == 2: - return func(pkt *core.Packet) { - n := len(pkt.Payload) - payload := make([]byte, 2*n) - for i, j := 0, 0; i < n; { - hi := pkt.Payload[i] - i++ - lo := pkt.Payload[i] - i++ - payload[j] = lo - j++ - payload[j] = hi - j++ - payload[j] = lo - j++ - payload[j] = hi - j++ - } - pkt.Payload = payload - handler(pkt) - } - - case in.Name == core.CodecPCMA && in.Channels <= 1 && - out.Name == core.CodecPCML && out.Channels == 2: - return func(pkt *core.Packet) { - payload := make([]byte, 4*len(pkt.Payload)) + case core.CodecPCMU: + reader = func(src []byte) (dst []int16) { var i int - for _, b := range pkt.Payload { - s16 := PCMAtoPCM(b) - hi := byte(s16 >> 8) - lo := byte(s16) - payload[i] = hi - i++ - payload[i] = lo - i++ - payload[i] = hi - i++ - payload[i] = lo + dst = make([]int16, len(src)) + for _, sample := range src { + dst[i] = PCMUtoPCM(sample) i++ } - pkt.Payload = payload - handler(pkt) + return + } + case core.CodecPCMA: + reader = func(src []byte) (dst []int16) { + var i int + dst = make([]int16, len(src)) + for _, sample := range src { + dst[i] = PCMAtoPCM(sample) + i++ + } + return } } - return nil + + if src.Channels > 1 { + filters = append(filters, Downsample(float32(src.Channels))) + } + + if src.ClockRate > dst.ClockRate { + filters = append(filters, Downsample(float32(src.ClockRate)/float32(dst.ClockRate))) + } else if src.ClockRate < dst.ClockRate { + filters = append(filters, Upsample(float32(dst.ClockRate)/float32(src.ClockRate))) + } + + if dst.Channels > 1 { + filters = append(filters, Upsample(float32(dst.Channels))) + } + + switch dst.Name { + case core.CodecPCML: + writer = func(src []int16) (dst []byte) { + var i int + dst = make([]byte, len(src)*2) + for _, sample := range src { + dst[i] = byte(sample) + i++ + dst[i] = byte(sample >> 8) + i++ + } + return + } + case core.CodecPCM: + writer = func(src []int16) (dst []byte) { + var i int + dst = make([]byte, len(src)*2) + for _, sample := range src { + dst[i] = byte(sample >> 8) + i++ + dst[i] = byte(sample) + i++ + } + return + } + case core.CodecPCMU: + writer = func(src []int16) (dst []byte) { + var i int + dst = make([]byte, len(src)) + for _, sample := range src { + dst[i] = PCMtoPCMU(sample) + i++ + } + return + } + case core.CodecPCMA: + writer = func(src []int16) (dst []byte) { + var i int + dst = make([]byte, len(src)) + for _, sample := range src { + dst[i] = PCMtoPCMA(sample) + i++ + } + return + } + } + + return func(b []byte) []byte { + samples := reader(b) + for _, filter := range filters { + samples = filter(samples) + } + return writer(samples) + } } diff --git a/pkg/pcm/pcm_test.go b/pkg/pcm/pcm_test.go new file mode 100644 index 00000000..2832be63 --- /dev/null +++ b/pkg/pcm/pcm_test.go @@ -0,0 +1,79 @@ +package pcm + +import ( + "encoding/hex" + "fmt" + "testing" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/stretchr/testify/require" +) + +func TestTranscode(t *testing.T) { + tests := []struct { + name string + src core.Codec + dst core.Codec + source string + expect string + }{ + { + name: "s16be->s16be", + src: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 1}, + dst: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 1}, + source: "FCCA00130343062808130B510D9E0F7610DA111113EA15BD16F2168215D41561", + expect: "FCCA00130343062808130B510D9E0F7610DA111113EA15BD16F2168215D41561", + }, + { + name: "s16be->s16le", + src: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 1}, + dst: core.Codec{Name: core.CodecPCML, ClockRate: 8000, Channels: 1}, + source: "FCCA00130343062808130B510D9E0F7610DA111113EA15BD16F2168215D41561", + expect: "CAFC1300430328061308510B9E0D760FDA101111EA13BD15F2168216D4156115", + }, + { + name: "s16be->mulaw", + src: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 1}, + dst: core.Codec{Name: core.CodecPCMU, ClockRate: 8000, Channels: 1}, + source: "FCCA00130343062808130B510D9E0F7610DA111113EA15BD16F2168215D41561", + expect: "52FDD1C5BEB8B3B0AEAEABA9A8A8A9AA", + }, + { + name: "s16be->alaw", + src: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 1}, + dst: core.Codec{Name: core.CodecPCMA, ClockRate: 8000, Channels: 1}, + source: "FCCA00130343062808130B510D9E0F7610DA111113EA15BD16F2168215D41561", + expect: "7CD4FFED95939E9B8584868083838080", + }, + { + name: "2ch->1ch", + src: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 2}, + dst: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 1}, + source: "FCCAFCCA001300130343034306280628081308130B510B510D9E0D9E0F760F76", + expect: "FCCA00130343062808130B510D9E0F76", + }, + { + name: "1ch->2ch", + src: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 1}, + dst: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 2}, + source: "FCCA00130343062808130B510D9E0F76", + expect: "FCCAFCCA001300130343034306280628081308130B510B510D9E0D9E0F760F76", + }, + { + name: "16khz->8khz", + src: core.Codec{Name: core.CodecPCM, ClockRate: 16000, Channels: 1}, + dst: core.Codec{Name: core.CodecPCM, ClockRate: 8000, Channels: 1}, + source: "FCCAFCCA001300130343034306280628081308130B510B510D9E0D9E0F760F76", + expect: "FCCA00130343062808130B510D9E0F76", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + f := Transcode(&test.dst, &test.src) + b, _ := hex.DecodeString(test.source) + b = f(b) + s := fmt.Sprintf("%X", b) + require.Equal(t, test.expect, s) + }) + } +} diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index e9d7b2e5..ebc3a008 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -73,7 +73,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv codec.Name = core.CodecPCMA } codec.ClockRate = 8000 - sender.Handler = pcm.ResampleToG711(track.Codec, 8000, sender.Handler) + sender.Handler = pcm.TranscodeHandler(codec, track.Codec, sender.Handler) } }