diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 8cdd0136..5e5e8834 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -15,7 +15,10 @@ import ( "github.com/pion/rtp" ) -var clt Client +var ( + cltMu sync.Mutex + cltMap = make(map[string]*Client) +) type AudioMixer struct { mu sync.Mutex @@ -68,6 +71,11 @@ func (m *AudioMixer) mixLoop() { for range ticker.C { m.mu.Lock() + if m.closed { + m.mu.Unlock() + return + } + if len(m.streams) == 0 { m.mu.Unlock() continue @@ -153,8 +161,12 @@ type Client struct { } func Dial(rawURL string) (*Client, error) { - if clt.conn != nil { - return &clt, nil + cltMu.Lock() + defer cltMu.Unlock() + + // Check if we already have a client for this URL + if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil { + return existingClient, nil } u, err := url.Parse(rawURL) @@ -183,6 +195,7 @@ func Dial(rawURL string) (*Client, error) { _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) if _, err = conn.Write([]byte(s)); err != nil { + conn.Close() return nil, err } @@ -208,7 +221,7 @@ func Dial(rawURL string) (*Client, error) { }, } - clt = Client{ + client := &Client{ Connection: core.Connection{ ID: core.NewID(), FormatName: "doorbird", @@ -222,7 +235,10 @@ func Dial(rawURL string) (*Client, error) { senderStats: make(map[*core.Sender]time.Time), } - return &clt, nil + // Store the client in the map + cltMap[rawURL] = client + + return client, nil } func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { @@ -238,17 +254,22 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { + if len(pkt.Payload) == 0 { + return + } + c.mu.RLock() conn := c.conn c.mu.RUnlock() - if conn != nil && len(pkt.Payload) > 0 { + if conn != nil { select { case streamChan <- pkt.Payload: c.mu.Lock() c.senderStats[sender] = time.Now() c.mu.Unlock() default: + // Channel is full, skip this packet } } } @@ -258,11 +279,24 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece if len(c.Senders) == 0 { go func() { + defer func() { + if r := recover(); r != nil { + // Recover from any panics when mixer is closed + } + }() + for mixedData := range c.mixer.output { - if c.conn != nil { - _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := c.conn.Write(mixedData); err == nil { + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn != nil && len(mixedData) > 0 { + _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := conn.Write(mixedData); err == nil { c.Send += n + } else { + // Connection failed, break out of loop + break } } } @@ -289,9 +323,15 @@ func (c *Client) Start() error { buf := make([]byte, 1) for { + // Set read deadline to detect connection issues + _ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) _, err := c.conn.Read(buf) if err != nil { c.cleanup() + // Remove this client from the global map + cltMu.Lock() + delete(cltMap, c.URL) + cltMu.Unlock() return err } } @@ -320,8 +360,19 @@ func (c *Client) cleanup() { c.mixer.mu.Unlock() } + // Close all senders + for _, sender := range c.Senders { + sender.Close() + } + c.Senders = nil + c.trackMap = make(map[*core.Sender]string) c.senderStats = make(map[*core.Sender]time.Time) + + // Remove from global map + cltMu.Lock() + delete(cltMap, c.URL) + cltMu.Unlock() } func (c *Client) cleanupOrphanedSenders() {