From a92e04b6e0885bec723b222f5d16d1ffa35a22a1 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Tue, 22 Jul 2025 20:54:24 +0200 Subject: [PATCH] added audio mixing capability to avoid device overload when multiple backchannel audio streams are connected --- pkg/doorbird/backchannel.go | 195 +++++++++++++++++++++++++++++++++--- 1 file changed, 179 insertions(+), 16 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 8a9a25d9..51b4c194 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -7,19 +7,140 @@ import ( "net" "net/http" "net/url" + "sync" "time" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/pcm" "github.com/pion/rtp" ) -var ( - clt Client -) +var clt Client + +type AudioMixer struct { + mu sync.Mutex + streams map[string]chan []byte + output chan []byte + running bool +} + +func NewAudioMixer() *AudioMixer { + return &AudioMixer{ + streams: make(map[string]chan []byte), + output: make(chan []byte, 100), + } +} + +func (m *AudioMixer) AddStream(id string) chan []byte { + m.mu.Lock() + defer m.mu.Unlock() + + if !m.running { + m.running = true + go m.mixLoop() + } + + stream := make(chan []byte, 100) + m.streams[id] = stream + return stream +} + +func (m *AudioMixer) RemoveStream(id string) { + m.mu.Lock() + defer m.mu.Unlock() + + if stream, exists := m.streams[id]; exists { + close(stream) + delete(m.streams, id) + } +} + +func (m *AudioMixer) mixLoop() { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + + for range ticker.C { + m.mu.Lock() + if len(m.streams) == 0 { + m.mu.Unlock() + continue + } + + var pcmSamples [][]int16 + activeStreams := 0 + + for _, stream := range m.streams { + select { + case data := <-stream: + if len(data) > 0 { + samples := make([]int16, len(data)) + for i, sample := range data { + samples[i] = pcm.PCMUtoPCM(sample) + } + pcmSamples = append(pcmSamples, samples) + activeStreams++ + } + default: + } + } + m.mu.Unlock() + + if activeStreams == 0 { + continue + } + + var mixedLength int + for _, samples := range pcmSamples { + if len(samples) > mixedLength { + mixedLength = len(samples) + } + } + + if mixedLength == 0 { + continue + } + + mixed := make([]int16, mixedLength) + for i := 0; i < mixedLength; i++ { + var sum int32 + var count int32 + + for _, samples := range pcmSamples { + if i < len(samples) { + sum += int32(samples[i]) + count++ + } + } + + if count > 0 { + averaged := sum / count + if averaged > 32767 { + mixed[i] = 32767 + } else if averaged < -32768 { + mixed[i] = -32768 + } else { + mixed[i] = int16(averaged) + } + } + } + + output := make([]byte, len(mixed)) + for i, sample := range mixed { + output[i] = pcm.PCMtoPCMU(sample) + } + + select { + case m.output <- output: + default: + } + } +} type Client struct { core.Connection - conn net.Conn + conn net.Conn + mixer *AudioMixer + trackMap map[*core.Sender]string } func Dial(rawURL string) (*Client, error) { @@ -85,9 +206,10 @@ func Dial(rawURL string) (*Client, error) { Protocol: "http", URL: rawURL, Medias: medias, - // Transport: conn, }, conn, + NewAudioMixer(), + make(map[*core.Sender]string), } return &clt, nil @@ -98,22 +220,35 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - if len(c.Senders) > 0 { - return errors.New("DoorBird backchannel already in use") - } - sender := core.NewSender(media, track.Codec) + trackID := fmt.Sprintf("%d", core.NewID()) + streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { - if c.conn != nil { - _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := c.conn.Write(pkt.Payload); err == nil { - c.Send += n + if c.conn != nil && len(pkt.Payload) > 0 { + select { + case streamChan <- pkt.Payload: + default: } } } - sender.HandleRTP(track) + c.trackMap[sender] = trackID + + if len(c.Senders) == 0 { + go func() { + for mixedData := range c.mixer.output { + if c.conn != nil { + _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := c.conn.Write(mixedData); err == nil { + c.Send += n + } + } + } + }() + } + + sender.WithParent(track).Start() c.Senders = append(c.Senders, sender) return nil } @@ -126,9 +261,37 @@ func (c *Client) Start() error { for { _, err := c.conn.Read(buf) if err != nil { - c.conn.Close() - c.conn = nil + c.cleanup() return err } } } + +func (c *Client) cleanup() { + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + if c.mixer != nil { + c.mixer.mu.Lock() + for id := range c.mixer.streams { + if stream, exists := c.mixer.streams[id]; exists { + close(stream) + } + } + c.mixer.streams = make(map[string]chan []byte) + close(c.mixer.output) + c.mixer.running = false + c.mixer.mu.Unlock() + } + + c.trackMap = make(map[*core.Sender]string) +} + +func (c *Client) RemoveTrack(sender *core.Sender) { + if trackID, exists := c.trackMap[sender]; exists { + c.mixer.RemoveStream(trackID) + delete(c.trackMap, sender) + } +}