reduce audio delay by lowering buffer size

This commit is contained in:
Oliver Eiber
2025-07-31 21:07:45 +02:00
parent 3d38e5e567
commit 975a43d392
+13 -14
View File
@@ -15,6 +15,14 @@ import (
"github.com/pion/rtp" "github.com/pion/rtp"
) )
const (
AudioMixerInterval = 10 * time.Millisecond
AudioChannelBuffer = 10
OutputChannelBuffer = 10
SenderCleanupInterval = 5 * time.Second
SenderTimeoutDuration = 5 * time.Second
)
var ( var (
cltMu sync.Mutex cltMu sync.Mutex
cltMap = make(map[string]*Client) cltMap = make(map[string]*Client)
@@ -31,7 +39,7 @@ type AudioMixer struct {
func NewAudioMixer() *AudioMixer { func NewAudioMixer() *AudioMixer {
return &AudioMixer{ return &AudioMixer{
streams: make(map[string]chan []byte), streams: make(map[string]chan []byte),
output: make(chan []byte, 100), output: make(chan []byte, OutputChannelBuffer),
} }
} }
@@ -50,7 +58,7 @@ func (m *AudioMixer) AddStream(id string) chan []byte {
go m.mixLoop() go m.mixLoop()
} }
stream := make(chan []byte, 100) stream := make(chan []byte, AudioChannelBuffer)
m.streams[id] = stream m.streams[id] = stream
return stream return stream
} }
@@ -66,7 +74,7 @@ func (m *AudioMixer) RemoveStream(id string) {
} }
func (m *AudioMixer) mixLoop() { func (m *AudioMixer) mixLoop() {
ticker := time.NewTicker(20 * time.Millisecond) ticker := time.NewTicker(AudioMixerInterval)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
@@ -164,7 +172,6 @@ func Dial(rawURL string) (*Client, error) {
cltMu.Lock() cltMu.Lock()
defer cltMu.Unlock() defer cltMu.Unlock()
// Check if we already have a client for this URL
if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil { if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil {
return existingClient, nil return existingClient, nil
} }
@@ -235,7 +242,6 @@ func Dial(rawURL string) (*Client, error) {
senderStats: make(map[*core.Sender]time.Time), senderStats: make(map[*core.Sender]time.Time),
} }
// Store the client in the map
cltMap[rawURL] = client cltMap[rawURL] = client
return client, nil return client, nil
@@ -269,7 +275,6 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
c.senderStats[sender] = time.Now() c.senderStats[sender] = time.Now()
c.mu.Unlock() c.mu.Unlock()
default: default:
// Channel is full, skip this packet
} }
} }
} }
@@ -281,7 +286,6 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
go func() { go func() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
// Recover from any panics when mixer is closed
} }
}() }()
@@ -295,7 +299,6 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
if n, err := conn.Write(mixedData); err == nil { if n, err := conn.Write(mixedData); err == nil {
c.Send += n c.Send += n
} else { } else {
// Connection failed, break out of loop
break break
} }
} }
@@ -314,7 +317,7 @@ func (c *Client) Start() error {
} }
go func() { go func() {
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(SenderCleanupInterval)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
c.cleanupOrphanedSenders() c.cleanupOrphanedSenders()
@@ -323,12 +326,10 @@ func (c *Client) Start() error {
buf := make([]byte, 1) buf := make([]byte, 1)
for { for {
// Set read deadline to detect connection issues
_ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) _ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second))
_, err := c.conn.Read(buf) _, err := c.conn.Read(buf)
if err != nil { if err != nil {
c.cleanup() c.cleanup()
// Remove this client from the global map
cltMu.Lock() cltMu.Lock()
delete(cltMap, c.URL) delete(cltMap, c.URL)
cltMu.Unlock() cltMu.Unlock()
@@ -360,7 +361,6 @@ func (c *Client) cleanup() {
c.mixer.mu.Unlock() c.mixer.mu.Unlock()
} }
// Close all senders
for _, sender := range c.Senders { for _, sender := range c.Senders {
sender.Close() sender.Close()
} }
@@ -369,7 +369,6 @@ func (c *Client) cleanup() {
c.trackMap = make(map[*core.Sender]string) c.trackMap = make(map[*core.Sender]string)
c.senderStats = make(map[*core.Sender]time.Time) c.senderStats = make(map[*core.Sender]time.Time)
// Remove from global map
cltMu.Lock() cltMu.Lock()
delete(cltMap, c.URL) delete(cltMap, c.URL)
cltMu.Unlock() cltMu.Unlock()
@@ -385,7 +384,7 @@ func (c *Client) cleanupOrphanedSenders() {
for i, sender := range c.Senders { for i, sender := range c.Senders {
lastActivity, exists := c.senderStats[sender] lastActivity, exists := c.senderStats[sender]
if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= 5*time.Second { if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= SenderTimeoutDuration {
if trackID, exists := c.trackMap[sender]; exists { if trackID, exists := c.trackMap[sender]; exists {
c.mixer.RemoveStream(trackID) c.mixer.RemoveStream(trackID)
delete(c.trackMap, sender) delete(c.trackMap, sender)