fix unexpected close of backchannel streams

This commit is contained in:
Oliver Eiber
2025-07-30 23:37:06 +02:00
parent 7d2ad92c4b
commit 3d38e5e567
+60 -9
View File
@@ -15,7 +15,10 @@ import (
"github.com/pion/rtp" "github.com/pion/rtp"
) )
var clt Client var (
cltMu sync.Mutex
cltMap = make(map[string]*Client)
)
type AudioMixer struct { type AudioMixer struct {
mu sync.Mutex mu sync.Mutex
@@ -68,6 +71,11 @@ func (m *AudioMixer) mixLoop() {
for range ticker.C { for range ticker.C {
m.mu.Lock() m.mu.Lock()
if m.closed {
m.mu.Unlock()
return
}
if len(m.streams) == 0 { if len(m.streams) == 0 {
m.mu.Unlock() m.mu.Unlock()
continue continue
@@ -153,8 +161,12 @@ type Client struct {
} }
func Dial(rawURL string) (*Client, error) { func Dial(rawURL string) (*Client, error) {
if clt.conn != nil { cltMu.Lock()
return &clt, nil defer cltMu.Unlock()
// Check if we already have a client for this URL
if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil {
return existingClient, nil
} }
u, err := url.Parse(rawURL) u, err := url.Parse(rawURL)
@@ -183,6 +195,7 @@ func Dial(rawURL string) (*Client, error) {
_ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline))
if _, err = conn.Write([]byte(s)); err != nil { if _, err = conn.Write([]byte(s)); err != nil {
conn.Close()
return nil, err return nil, err
} }
@@ -208,7 +221,7 @@ func Dial(rawURL string) (*Client, error) {
}, },
} }
clt = Client{ client := &Client{
Connection: core.Connection{ Connection: core.Connection{
ID: core.NewID(), ID: core.NewID(),
FormatName: "doorbird", FormatName: "doorbird",
@@ -222,7 +235,10 @@ func Dial(rawURL string) (*Client, error) {
senderStats: make(map[*core.Sender]time.Time), senderStats: make(map[*core.Sender]time.Time),
} }
return &clt, nil // Store the client in the map
cltMap[rawURL] = client
return client, nil
} }
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
@@ -238,17 +254,22 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
streamChan := c.mixer.AddStream(trackID) streamChan := c.mixer.AddStream(trackID)
sender.Handler = func(pkt *rtp.Packet) { sender.Handler = func(pkt *rtp.Packet) {
if len(pkt.Payload) == 0 {
return
}
c.mu.RLock() c.mu.RLock()
conn := c.conn conn := c.conn
c.mu.RUnlock() c.mu.RUnlock()
if conn != nil && len(pkt.Payload) > 0 { if conn != nil {
select { select {
case streamChan <- pkt.Payload: case streamChan <- pkt.Payload:
c.mu.Lock() c.mu.Lock()
c.senderStats[sender] = time.Now() c.senderStats[sender] = time.Now()
c.mu.Unlock() c.mu.Unlock()
default: default:
// Channel is full, skip this packet
} }
} }
} }
@@ -258,11 +279,24 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
if len(c.Senders) == 0 { if len(c.Senders) == 0 {
go func() { go func() {
defer func() {
if r := recover(); r != nil {
// Recover from any panics when mixer is closed
}
}()
for mixedData := range c.mixer.output { for mixedData := range c.mixer.output {
if c.conn != nil { c.mu.RLock()
_ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) conn := c.conn
if n, err := c.conn.Write(mixedData); err == nil { c.mu.RUnlock()
if conn != nil && len(mixedData) > 0 {
_ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline))
if n, err := conn.Write(mixedData); err == nil {
c.Send += n c.Send += n
} else {
// Connection failed, break out of loop
break
} }
} }
} }
@@ -289,9 +323,15 @@ func (c *Client) Start() error {
buf := make([]byte, 1) buf := make([]byte, 1)
for { for {
// Set read deadline to detect connection issues
_ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second))
_, err := c.conn.Read(buf) _, err := c.conn.Read(buf)
if err != nil { if err != nil {
c.cleanup() c.cleanup()
// Remove this client from the global map
cltMu.Lock()
delete(cltMap, c.URL)
cltMu.Unlock()
return err return err
} }
} }
@@ -320,8 +360,19 @@ func (c *Client) cleanup() {
c.mixer.mu.Unlock() c.mixer.mu.Unlock()
} }
// Close all senders
for _, sender := range c.Senders {
sender.Close()
}
c.Senders = nil
c.trackMap = make(map[*core.Sender]string) c.trackMap = make(map[*core.Sender]string)
c.senderStats = make(map[*core.Sender]time.Time) c.senderStats = make(map[*core.Sender]time.Time)
// Remove from global map
cltMu.Lock()
delete(cltMap, c.URL)
cltMu.Unlock()
} }
func (c *Client) cleanupOrphanedSenders() { func (c *Client) cleanupOrphanedSenders() {