Files
Strix/pkg/sse/sse.go
T
eduard256 e9dc04178e Fix SSE real-time streaming in Home Assistant Ingress mode
Add padding to overcome aiohttp 64KB buffer in HA Supervisor.

Problem:
- HA Supervisor uses aiohttp with 64KB StreamResponse buffer
- Small SSE events (~200-500 bytes) were buffered until connection closed
- Users saw all streams appear at once instead of real-time updates

Solution:
- Detect Ingress mode via X-Ingress-Path header
- Add 64KB SSE comment padding to fill proxy buffers
- Increase progress interval to 3 sec in Ingress mode (reduce traffic)
- Normal mode (Docker/direct) unchanged - works exactly as before

Traffic impact:
- Normal mode: ~17KB per scan (unchanged)
- Ingress mode: ~2-3MB per scan (acceptable for real-time updates)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-11 16:34:05 +00:00

405 lines
9.6 KiB
Go

package sse
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
)
const (
// IngressPaddingSize is the padding size for Home Assistant Ingress mode.
// HA Supervisor uses aiohttp with 64KB buffer for StreamResponse.
// We need to fill this buffer to force immediate delivery of SSE events.
IngressPaddingSize = 64 * 1024 // 64KB
// IngressHeader is the header that Home Assistant Ingress adds to requests
IngressHeader = "X-Ingress-Path"
)
// Event represents a Server-Sent Event
type Event struct {
ID string
Type string
Data interface{}
Retry int
Comment string
}
// Client represents an SSE client connection
type Client struct {
ID string
Channel chan Event
Response http.ResponseWriter
Request *http.Request
Context context.Context
Cancel context.CancelFunc
}
// Server manages SSE connections
type Server struct {
clients map[string]*Client
register chan *Client
unregister chan *Client
broadcast chan Event
logger interface{ Debug(string, ...any); Error(string, error, ...any) }
}
// NewServer creates a new SSE server
func NewServer(logger interface{ Debug(string, ...any); Error(string, error, ...any) }) *Server {
return &Server{
clients: make(map[string]*Client),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan Event),
logger: logger,
}
}
// Start starts the SSE server
func (s *Server) Start(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
// Close all client connections
for _, client := range s.clients {
client.Cancel()
close(client.Channel)
}
return
case client := <-s.register:
s.clients[client.ID] = client
s.logger.Debug("SSE client registered", "id", client.ID)
case client := <-s.unregister:
if _, ok := s.clients[client.ID]; ok {
delete(s.clients, client.ID)
close(client.Channel)
s.logger.Debug("SSE client unregistered", "id", client.ID)
}
case event := <-s.broadcast:
for _, client := range s.clients {
select {
case client.Channel <- event:
default:
// Client's channel is full, close it
s.unregister <- client
}
}
}
}
}()
}
// ServeHTTP handles SSE connections
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Check if SSE is supported
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
// Set headers for SSE
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("X-Accel-Buffering", "no") // Disable Nginx buffering
// Create client
ctx, cancel := context.WithCancel(r.Context())
client := &Client{
ID: generateClientID(),
Channel: make(chan Event, 100),
Response: w,
Request: r,
Context: ctx,
Cancel: cancel,
}
// Register client
s.register <- client
// Remove client on disconnect
defer func() {
s.unregister <- client
cancel()
}()
// Send initial connection event
s.SendToClient(client, Event{
Type: "connected",
Data: map[string]string{"id": client.ID},
})
// Listen for events
for {
select {
case <-ctx.Done():
return
case <-r.Context().Done():
return
case event := <-client.Channel:
if err := s.writeEvent(w, flusher, event); err != nil {
s.logger.Error("failed to write SSE event", err, "client", client.ID)
return
}
}
}
}
// SendToClient sends an event to a specific client
func (s *Server) SendToClient(client *Client, event Event) {
select {
case client.Channel <- event:
default:
// Channel is full, log warning
s.logger.Debug("client channel full, dropping event", "client", client.ID)
}
}
// Broadcast sends an event to all clients
func (s *Server) Broadcast(event Event) {
s.broadcast <- event
}
// writeEvent writes an event to the response writer
func (s *Server) writeEvent(w http.ResponseWriter, flusher http.Flusher, event Event) error {
// Write event ID if present
if event.ID != "" {
if _, err := fmt.Fprintf(w, "id: %s\n", event.ID); err != nil {
return err
}
}
// Write event type if present
if event.Type != "" {
if _, err := fmt.Fprintf(w, "event: %s\n", event.Type); err != nil {
return err
}
}
// Write retry if present
if event.Retry > 0 {
if _, err := fmt.Fprintf(w, "retry: %d\n", event.Retry); err != nil {
return err
}
}
// Write comment if present
if event.Comment != "" {
if _, err := fmt.Fprintf(w, ": %s\n", event.Comment); err != nil {
return err
}
}
// Write data
if event.Data != nil {
var dataStr string
switch v := event.Data.(type) {
case string:
dataStr = v
case []byte:
dataStr = string(v)
default:
data, err := json.Marshal(v)
if err != nil {
return err
}
dataStr = string(data)
}
// Split data by newlines for proper SSE format
for _, line := range splitLines(dataStr) {
if _, err := fmt.Fprintf(w, "data: %s\n", line); err != nil {
return err
}
}
}
// End event with double newline
if _, err := fmt.Fprintf(w, "\n"); err != nil {
return err
}
// Flush the data
flusher.Flush()
return nil
}
// splitLines splits a string into lines
func splitLines(s string) []string {
var lines []string
var current string
for _, ch := range s {
if ch == '\n' {
lines = append(lines, current)
current = ""
} else {
current += string(ch)
}
}
if current != "" {
lines = append(lines, current)
}
return lines
}
// generateClientID generates a unique client ID
func generateClientID() string {
return fmt.Sprintf("client-%d-%d", time.Now().Unix(), time.Now().Nanosecond())
}
// StreamWriter provides a simple interface for writing SSE events
type StreamWriter struct {
client *Client
server *Server
isIngress bool // True when running through Home Assistant Ingress proxy
}
// NewStreamWriter creates a new stream writer for a client
func (s *Server) NewStreamWriter(w http.ResponseWriter, r *http.Request) (*StreamWriter, error) {
// Check if SSE is supported
flusher, ok := w.(http.Flusher)
if !ok {
return nil, fmt.Errorf("SSE not supported")
}
// Set headers for SSE
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("X-Accel-Buffering", "no")
// Send initial flush to establish connection
flusher.Flush()
// Detect Home Assistant Ingress mode by checking for X-Ingress-Path header
isIngress := r.Header.Get(IngressHeader) != ""
// Create client
ctx, cancel := context.WithCancel(r.Context())
client := &Client{
ID: generateClientID(),
Channel: make(chan Event, 100),
Response: w,
Request: r,
Context: ctx,
Cancel: cancel,
}
return &StreamWriter{
client: client,
server: s,
isIngress: isIngress,
}, nil
}
// SendEvent sends an event through the stream writer
func (sw *StreamWriter) SendEvent(eventType string, data interface{}) error {
event := Event{
Type: eventType,
Data: data,
}
flusher, ok := sw.client.Response.(http.Flusher)
if !ok {
return fmt.Errorf("response does not support flushing")
}
// Use Ingress-aware write method
return sw.writeEventWithIngress(sw.client.Response, flusher, event)
}
// writeEventWithIngress writes an event and adds padding for Ingress mode
func (sw *StreamWriter) writeEventWithIngress(w http.ResponseWriter, flusher http.Flusher, event Event) error {
// Write the event using standard method
if err := sw.server.writeEvent(w, flusher, event); err != nil {
return err
}
// In Ingress mode, add padding to fill the 64KB buffer and force immediate delivery
if sw.isIngress {
if err := sw.writePadding(w, flusher); err != nil {
return err
}
}
return nil
}
// writePadding writes SSE comment padding to fill proxy buffers.
// SSE comments (lines starting with ':') are ignored by clients.
func (sw *StreamWriter) writePadding(w http.ResponseWriter, flusher http.Flusher) error {
// Create padding using SSE comments which are ignored by clients
// Each line is ": " + padding content + "\n"
// We need ~64KB to fill the aiohttp StreamResponse buffer
const lineSize = 1024 // 1KB per line
const numLines = 64 // 64 lines = 64KB
paddingLine := ": " + strings.Repeat(".", lineSize-4) + "\n" // -4 for ": " and "\n"
for i := 0; i < numLines; i++ {
if _, err := fmt.Fprint(w, paddingLine); err != nil {
return err
}
}
// Flush the padding
flusher.Flush()
return nil
}
// SendJSON sends JSON data as an event
func (sw *StreamWriter) SendJSON(eventType string, v interface{}) error {
return sw.SendEvent(eventType, v)
}
// IsIngress returns true if running through Home Assistant Ingress proxy
func (sw *StreamWriter) IsIngress() bool {
return sw.isIngress
}
// SendMessage sends a simple message
func (sw *StreamWriter) SendMessage(message string) error {
return sw.SendEvent("message", map[string]string{"message": message})
}
// SendError sends an error message
func (sw *StreamWriter) SendError(err error) error {
return sw.SendEvent("error", map[string]string{"error": err.Error()})
}
// SendProgress sends a progress update
func (sw *StreamWriter) SendProgress(current, total int, message string) error {
return sw.SendEvent("progress", map[string]interface{}{
"current": current,
"total": total,
"message": message,
"percent": float64(current) / float64(total) * 100,
})
}
// Close closes the stream writer
func (sw *StreamWriter) Close() {
// Perform final flush if possible
if flusher, ok := sw.client.Response.(http.Flusher); ok {
flusher.Flush()
}
sw.client.Cancel()
}