diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 51b4c194..8cdd0136 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -22,6 +22,7 @@ type AudioMixer struct { streams map[string]chan []byte output chan []byte running bool + closed bool } func NewAudioMixer() *AudioMixer { @@ -35,6 +36,12 @@ func (m *AudioMixer) AddStream(id string) chan []byte { m.mu.Lock() defer m.mu.Unlock() + if m.closed { + ch := make(chan []byte) + close(ch) + return ch + } + if !m.running { m.running = true go m.mixLoop() @@ -138,9 +145,11 @@ func (m *AudioMixer) mixLoop() { type Client struct { core.Connection - conn net.Conn - mixer *AudioMixer - trackMap map[*core.Sender]string + conn net.Conn + mixer *AudioMixer + trackMap map[*core.Sender]string + senderStats map[*core.Sender]time.Time + mu sync.RWMutex } func Dial(rawURL string) (*Client, error) { @@ -200,16 +209,17 @@ func Dial(rawURL string) (*Client, error) { } clt = Client{ - core.Connection{ + Connection: core.Connection{ ID: core.NewID(), FormatName: "doorbird", Protocol: "http", URL: rawURL, Medias: medias, }, - conn, - NewAudioMixer(), - make(map[*core.Sender]string), + conn: conn, + mixer: NewAudioMixer(), + trackMap: make(map[*core.Sender]string), + senderStats: make(map[*core.Sender]time.Time), } return &clt, nil @@ -220,20 +230,31 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + c.mu.Lock() + defer c.mu.Unlock() + sender := core.NewSender(media, track.Codec) trackID := fmt.Sprintf("%d", core.NewID()) streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { - if c.conn != nil && len(pkt.Payload) > 0 { + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn != nil && len(pkt.Payload) > 0 { select { case streamChan <- pkt.Payload: + c.mu.Lock() + c.senderStats[sender] = time.Now() + c.mu.Unlock() default: } } } c.trackMap[sender] = trackID + c.senderStats[sender] = time.Now() if len(c.Senders) == 0 { go func() { @@ -257,6 +278,15 @@ func (c *Client) Start() error { if c.conn == nil { return nil } + + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for range ticker.C { + c.cleanupOrphanedSenders() + } + }() + buf := make([]byte, 1) for { _, err := c.conn.Read(buf) @@ -268,6 +298,9 @@ func (c *Client) Start() error { } func (c *Client) cleanup() { + c.mu.Lock() + defer c.mu.Unlock() + if c.conn != nil { c.conn.Close() c.conn = nil @@ -275,23 +308,67 @@ func (c *Client) cleanup() { if c.mixer != nil { c.mixer.mu.Lock() - for id := range c.mixer.streams { - if stream, exists := c.mixer.streams[id]; exists { - close(stream) - } + c.mixer.closed = true + for id, stream := range c.mixer.streams { + close(stream) + delete(c.mixer.streams, id) + } + if c.mixer.running { + close(c.mixer.output) + c.mixer.running = false } - c.mixer.streams = make(map[string]chan []byte) - close(c.mixer.output) - c.mixer.running = false c.mixer.mu.Unlock() } c.trackMap = make(map[*core.Sender]string) + c.senderStats = make(map[*core.Sender]time.Time) +} + +func (c *Client) cleanupOrphanedSenders() { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + removedCount := 0 + validIndex := 0 + + for i, sender := range c.Senders { + lastActivity, exists := c.senderStats[sender] + if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= 5*time.Second { + if trackID, exists := c.trackMap[sender]; exists { + c.mixer.RemoveStream(trackID) + delete(c.trackMap, sender) + } + delete(c.senderStats, sender) + sender.Close() + removedCount++ + } else { + c.Senders[validIndex] = c.Senders[i] + validIndex++ + } + } + + c.Senders = c.Senders[:validIndex] + + if removedCount > 0 { + fmt.Printf("DoorBird: Cleaned up %d orphaned senders, %d remain active\n", removedCount, validIndex) + } } func (c *Client) RemoveTrack(sender *core.Sender) { + c.mu.Lock() + defer c.mu.Unlock() + if trackID, exists := c.trackMap[sender]; exists { c.mixer.RemoveStream(trackID) delete(c.trackMap, sender) } + delete(c.senderStats, sender) + + for i, s := range c.Senders { + if s == sender { + c.Senders = append(c.Senders[:i], c.Senders[i+1:]...) + break + } + } }