e9dc04178e
Add padding to overcome aiohttp 64KB buffer in HA Supervisor. Problem: - HA Supervisor uses aiohttp with 64KB StreamResponse buffer - Small SSE events (~200-500 bytes) were buffered until connection closed - Users saw all streams appear at once instead of real-time updates Solution: - Detect Ingress mode via X-Ingress-Path header - Add 64KB SSE comment padding to fill proxy buffers - Increase progress interval to 3 sec in Ingress mode (reduce traffic) - Normal mode (Docker/direct) unchanged - works exactly as before Traffic impact: - Normal mode: ~17KB per scan (unchanged) - Ingress mode: ~2-3MB per scan (acceptable for real-time updates) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
539 lines
14 KiB
Go
539 lines
14 KiB
Go
package discovery
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/eduard256/Strix/internal/camera/database"
|
|
"github.com/eduard256/Strix/internal/camera/stream"
|
|
"github.com/eduard256/Strix/internal/models"
|
|
"github.com/eduard256/Strix/pkg/sse"
|
|
)
|
|
|
|
// Scanner orchestrates stream discovery
|
|
type Scanner struct {
|
|
loader *database.Loader
|
|
searchEngine *database.SearchEngine
|
|
builder *stream.Builder
|
|
tester *stream.Tester
|
|
onvif *ONVIFDiscovery
|
|
config ScannerConfig
|
|
logger interface{ Debug(string, ...any); Error(string, error, ...any); Info(string, ...any) }
|
|
}
|
|
|
|
// ScannerConfig contains scanner configuration
|
|
type ScannerConfig struct {
|
|
WorkerPoolSize int
|
|
DefaultTimeout time.Duration
|
|
MaxStreams int
|
|
ModelSearchLimit int
|
|
FFProbeTimeout time.Duration
|
|
}
|
|
|
|
// NewScanner creates a new stream scanner
|
|
func NewScanner(
|
|
loader *database.Loader,
|
|
searchEngine *database.SearchEngine,
|
|
builder *stream.Builder,
|
|
tester *stream.Tester,
|
|
onvif *ONVIFDiscovery,
|
|
config ScannerConfig,
|
|
logger interface{ Debug(string, ...any); Error(string, error, ...any); Info(string, ...any) },
|
|
) *Scanner {
|
|
return &Scanner{
|
|
loader: loader,
|
|
searchEngine: searchEngine,
|
|
builder: builder,
|
|
tester: tester,
|
|
onvif: onvif,
|
|
config: config,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// ScanResult contains the scan results
|
|
type ScanResult struct {
|
|
Streams []models.DiscoveredStream
|
|
TotalTested int
|
|
TotalFound int
|
|
Duration time.Duration
|
|
Error error
|
|
}
|
|
|
|
// Scan performs stream discovery
|
|
func (s *Scanner) Scan(ctx context.Context, req models.StreamDiscoveryRequest, streamWriter *sse.StreamWriter) (*ScanResult, error) {
|
|
startTime := time.Now()
|
|
result := &ScanResult{}
|
|
|
|
// Set defaults
|
|
if req.Timeout <= 0 {
|
|
req.Timeout = int(s.config.DefaultTimeout.Seconds())
|
|
}
|
|
if req.MaxStreams <= 0 {
|
|
req.MaxStreams = s.config.MaxStreams
|
|
}
|
|
if req.ModelLimit <= 0 {
|
|
req.ModelLimit = s.config.ModelSearchLimit
|
|
}
|
|
|
|
// Create context with timeout
|
|
scanCtx, cancel := context.WithTimeout(ctx, time.Duration(req.Timeout)*time.Second)
|
|
defer cancel()
|
|
|
|
s.logger.Info("starting stream discovery",
|
|
"target", req.Target,
|
|
"model", req.Model,
|
|
"timeout", req.Timeout,
|
|
"max_streams", req.MaxStreams,
|
|
)
|
|
|
|
// Send initial message
|
|
_ = streamWriter.SendJSON("scan_started", map[string]interface{}{
|
|
"target": req.Target,
|
|
"model": req.Model,
|
|
"max_streams": req.MaxStreams,
|
|
"timeout": req.Timeout,
|
|
})
|
|
|
|
// Check if target is a direct stream URL
|
|
if s.isDirectStreamURL(req.Target) {
|
|
return s.scanDirectStream(scanCtx, req, streamWriter, result)
|
|
}
|
|
|
|
// Extract IP from target
|
|
ip := s.extractIP(req.Target)
|
|
if ip == "" {
|
|
err := fmt.Errorf("invalid target IP: %s", req.Target)
|
|
_ = streamWriter.SendError(err)
|
|
result.Error = err
|
|
return result, err
|
|
}
|
|
|
|
// Collect all streams to test (includes metadata like type)
|
|
streams, err := s.collectStreams(scanCtx, req, ip)
|
|
if err != nil {
|
|
_ = streamWriter.SendError(err)
|
|
result.Error = err
|
|
return result, err
|
|
}
|
|
|
|
s.logger.Info("collected streams for testing", "count", len(streams))
|
|
|
|
// Send progress update
|
|
_ = streamWriter.SendJSON("progress", models.ProgressMessage{
|
|
Tested: 0,
|
|
Found: 0,
|
|
Remaining: len(streams),
|
|
})
|
|
|
|
// Test streams concurrently
|
|
s.testStreamsConcurrently(scanCtx, streams, req, streamWriter, result)
|
|
|
|
// Calculate duration
|
|
result.Duration = time.Since(startTime)
|
|
|
|
// Send completion message
|
|
_ = streamWriter.SendJSON("complete", models.CompleteMessage{
|
|
TotalTested: result.TotalTested,
|
|
TotalFound: result.TotalFound,
|
|
Duration: result.Duration.Seconds(),
|
|
})
|
|
|
|
// Send final done event to signal proper stream closure
|
|
_ = streamWriter.SendJSON("done", map[string]interface{}{
|
|
"message": "Stream discovery finished",
|
|
})
|
|
|
|
// Small delay to ensure all data is flushed to client
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
s.logger.Info("stream discovery completed",
|
|
"tested", result.TotalTested,
|
|
"found", result.TotalFound,
|
|
"duration", result.Duration,
|
|
)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// isDirectStreamURL checks if target is a direct stream URL
|
|
func (s *Scanner) isDirectStreamURL(target string) bool {
|
|
u, err := url.Parse(target)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return u.Scheme == "rtsp" || u.Scheme == "http" || u.Scheme == "https"
|
|
}
|
|
|
|
// scanDirectStream scans a direct stream URL
|
|
func (s *Scanner) scanDirectStream(ctx context.Context, req models.StreamDiscoveryRequest, streamWriter *sse.StreamWriter, result *ScanResult) (*ScanResult, error) {
|
|
s.logger.Debug("testing direct stream URL", "url", req.Target)
|
|
|
|
testResult := s.tester.TestStream(ctx, req.Target)
|
|
result.TotalTested = 1
|
|
|
|
if testResult.Working {
|
|
result.TotalFound = 1
|
|
|
|
discoveredStream := models.DiscoveredStream{
|
|
URL: testResult.URL,
|
|
Type: testResult.Type,
|
|
Protocol: testResult.Protocol,
|
|
Working: true,
|
|
Resolution: testResult.Resolution,
|
|
Codec: testResult.Codec,
|
|
FPS: testResult.FPS,
|
|
Bitrate: testResult.Bitrate,
|
|
HasAudio: testResult.HasAudio,
|
|
TestTime: testResult.TestTime,
|
|
Metadata: testResult.Metadata,
|
|
}
|
|
|
|
result.Streams = append(result.Streams, discoveredStream)
|
|
|
|
// Send to SSE
|
|
_ = streamWriter.SendJSON("stream_found", map[string]interface{}{
|
|
"stream": discoveredStream,
|
|
})
|
|
} else {
|
|
_ = streamWriter.SendJSON("stream_failed", map[string]interface{}{
|
|
"url": req.Target,
|
|
"error": testResult.Error,
|
|
})
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// extractIP extracts IP address from target
|
|
func (s *Scanner) extractIP(target string) string {
|
|
// Remove protocol if present
|
|
if u, err := url.Parse(target); err == nil && u.Host != "" {
|
|
target = u.Host
|
|
}
|
|
|
|
// Remove port if present
|
|
if idx := len(target) - 1; idx >= 0 && target[idx] == ']' {
|
|
// IPv6 address
|
|
return target
|
|
}
|
|
|
|
for i := len(target) - 1; i >= 0; i-- {
|
|
if target[i] == ':' {
|
|
return target[:i]
|
|
}
|
|
}
|
|
|
|
return target
|
|
}
|
|
|
|
|
|
// collectStreams collects all streams to test with their metadata
|
|
func (s *Scanner) collectStreams(ctx context.Context, req models.StreamDiscoveryRequest, ip string) ([]models.DiscoveredStream, error) {
|
|
var allStreams []models.DiscoveredStream
|
|
urlMap := make(map[string]bool) // For deduplication
|
|
var onvifCount, modelCount, popularCount int
|
|
|
|
s.logger.Debug("collectStreams started",
|
|
"ip", ip,
|
|
"model", req.Model,
|
|
"username", req.Username,
|
|
"channel", req.Channel)
|
|
|
|
// Build context for URL generation
|
|
buildCtx := stream.BuildContext{
|
|
IP: ip,
|
|
Username: req.Username,
|
|
Password: req.Password,
|
|
Channel: req.Channel,
|
|
}
|
|
|
|
// 1. ONVIF discovery (always first)
|
|
s.logger.Debug("========================================")
|
|
s.logger.Debug("PHASE 1: STARTING ONVIF DISCOVERY")
|
|
s.logger.Debug("========================================")
|
|
s.logger.Debug("ONVIF parameters",
|
|
"ip", ip,
|
|
"username", req.Username,
|
|
"password_len", len(req.Password),
|
|
"channel", req.Channel)
|
|
|
|
startTime := time.Now()
|
|
onvifStreams, err := s.onvif.DiscoverStreamsForIP(ctx, ip, req.Username, req.Password)
|
|
elapsed := time.Since(startTime)
|
|
|
|
if err != nil {
|
|
s.logger.Error("❌ ONVIF discovery FAILED", err,
|
|
"elapsed", elapsed.String())
|
|
} else {
|
|
s.logger.Debug("✅ ONVIF discovery returned",
|
|
"streams_count", len(onvifStreams),
|
|
"elapsed", elapsed.String())
|
|
|
|
for i, stream := range onvifStreams {
|
|
s.logger.Debug("ONVIF stream returned",
|
|
"index", i+1,
|
|
"url", stream.URL,
|
|
"type", stream.Type,
|
|
"source", stream.Metadata["source"])
|
|
|
|
if !urlMap[stream.URL] {
|
|
allStreams = append(allStreams, stream)
|
|
urlMap[stream.URL] = true
|
|
onvifCount++
|
|
s.logger.Debug(" ✓ Added to stream list (unique)")
|
|
} else {
|
|
s.logger.Debug(" ✗ Skipped (duplicate)")
|
|
}
|
|
}
|
|
s.logger.Debug("ONVIF phase completed",
|
|
"total_streams_returned", len(onvifStreams),
|
|
"unique_streams_added", onvifCount)
|
|
}
|
|
s.logger.Debug("========================================\n")
|
|
|
|
// 2. Model-specific patterns
|
|
if req.Model != "" {
|
|
s.logger.Debug("phase 2: searching model-specific patterns",
|
|
"model", req.Model,
|
|
"limit", req.ModelLimit)
|
|
|
|
// Search for cameras using intelligent brand+model search
|
|
searchResp, err := s.searchEngine.Search(req.Model, req.ModelLimit)
|
|
if err != nil {
|
|
s.logger.Error("model search failed", err)
|
|
} else {
|
|
cameras := searchResp.Cameras
|
|
|
|
// Collect entries from all matching cameras
|
|
var entries []models.CameraEntry
|
|
for _, camera := range cameras {
|
|
entries = append(entries, camera.Entries...)
|
|
}
|
|
|
|
s.logger.Debug("model entries collected",
|
|
"cameras_matched", len(cameras),
|
|
"total_entries", len(entries))
|
|
|
|
// Build streams from entries
|
|
for _, entry := range entries {
|
|
buildCtx.Port = entry.Port
|
|
buildCtx.Protocol = entry.Protocol
|
|
|
|
urls := s.builder.BuildURLsFromEntry(entry, buildCtx)
|
|
for _, url := range urls {
|
|
if !urlMap[url] {
|
|
allStreams = append(allStreams, models.DiscoveredStream{
|
|
URL: url,
|
|
Type: entry.Type,
|
|
Protocol: entry.Protocol,
|
|
Port: entry.Port,
|
|
Working: false, // Will be tested
|
|
})
|
|
urlMap[url] = true
|
|
modelCount++
|
|
}
|
|
}
|
|
}
|
|
|
|
s.logger.Debug("model patterns streams built",
|
|
"total_unique_model_streams", modelCount)
|
|
}
|
|
}
|
|
|
|
// 3. Popular patterns (always add as fallback)
|
|
s.logger.Debug("phase 3: adding popular patterns")
|
|
patterns, err := s.loader.LoadPopularPatterns()
|
|
if err != nil {
|
|
s.logger.Error("failed to load popular patterns", err)
|
|
} else {
|
|
s.logger.Debug("popular patterns loaded", "count", len(patterns))
|
|
|
|
for _, pattern := range patterns {
|
|
entry := models.CameraEntry{
|
|
Type: pattern.Type,
|
|
Protocol: pattern.Protocol,
|
|
Port: pattern.Port,
|
|
URL: pattern.URL,
|
|
}
|
|
|
|
buildCtx.Port = pattern.Port
|
|
buildCtx.Protocol = pattern.Protocol
|
|
|
|
// Generate all URL variants for this pattern
|
|
urls := s.builder.BuildURLsFromEntry(entry, buildCtx)
|
|
for _, url := range urls {
|
|
if !urlMap[url] {
|
|
allStreams = append(allStreams, models.DiscoveredStream{
|
|
URL: url,
|
|
Type: pattern.Type,
|
|
Protocol: pattern.Protocol,
|
|
Port: pattern.Port,
|
|
Working: false, // Will be tested
|
|
})
|
|
urlMap[url] = true
|
|
popularCount++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
totalBeforeDedup := onvifCount + modelCount + popularCount
|
|
duplicatesRemoved := totalBeforeDedup - len(allStreams)
|
|
|
|
s.logger.Debug("stream collection complete",
|
|
"total_unique_streams", len(allStreams),
|
|
"from_onvif", onvifCount,
|
|
"from_model_patterns", modelCount,
|
|
"from_popular_patterns", popularCount,
|
|
"total_before_dedup", totalBeforeDedup,
|
|
"duplicates_removed", duplicatesRemoved)
|
|
|
|
return allStreams, nil
|
|
}
|
|
|
|
// testStreamsConcurrently tests streams concurrently
|
|
func (s *Scanner) testStreamsConcurrently(ctx context.Context, streams []models.DiscoveredStream, req models.StreamDiscoveryRequest, streamWriter *sse.StreamWriter, result *ScanResult) {
|
|
var wg sync.WaitGroup
|
|
var tested int32
|
|
var found int32
|
|
|
|
// Create worker pool
|
|
sem := make(chan struct{}, s.config.WorkerPoolSize)
|
|
streamsChan := make(chan models.DiscoveredStream, 100)
|
|
|
|
// Start periodic progress updates
|
|
progressCtx, cancelProgress := context.WithCancel(ctx)
|
|
defer cancelProgress()
|
|
|
|
go func() {
|
|
// Use longer interval for Ingress mode to reduce traffic (padding is ~64KB per event)
|
|
// Normal mode: 1 second, Ingress mode: 3 seconds
|
|
progressInterval := 1 * time.Second
|
|
if streamWriter.IsIngress() {
|
|
progressInterval = 3 * time.Second
|
|
}
|
|
|
|
ticker := time.NewTicker(progressInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-progressCtx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// Send progress to prevent WriteTimeout and show scanning activity
|
|
_ = streamWriter.SendJSON("progress", models.ProgressMessage{
|
|
Tested: int(atomic.LoadInt32(&tested)),
|
|
Found: int(atomic.LoadInt32(&found)),
|
|
Remaining: len(streams) - int(atomic.LoadInt32(&tested)),
|
|
})
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Start result collector
|
|
var collectorWg sync.WaitGroup
|
|
collectorWg.Add(1)
|
|
go func() {
|
|
defer collectorWg.Done()
|
|
for stream := range streamsChan {
|
|
result.Streams = append(result.Streams, stream)
|
|
|
|
s.logger.Info("sending stream_found event", "url", stream.URL, "type", stream.Type)
|
|
|
|
// Send to SSE
|
|
_ = streamWriter.SendJSON("stream_found", map[string]interface{}{
|
|
"stream": stream,
|
|
})
|
|
|
|
// Send progress (immediate update when stream is found)
|
|
_ = streamWriter.SendJSON("progress", models.ProgressMessage{
|
|
Tested: int(atomic.LoadInt32(&tested)),
|
|
Found: int(atomic.LoadInt32(&found)),
|
|
Remaining: len(streams) - int(atomic.LoadInt32(&tested)),
|
|
})
|
|
|
|
// Check if we've found enough streams
|
|
if int(atomic.LoadInt32(&found)) >= req.MaxStreams {
|
|
s.logger.Debug("max streams reached", "count", req.MaxStreams)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Test each stream
|
|
TestLoop:
|
|
for _, streamToTest := range streams {
|
|
// Check if context is done or max streams reached
|
|
select {
|
|
case <-ctx.Done():
|
|
s.logger.Debug("scan cancelled or timeout")
|
|
break TestLoop
|
|
default:
|
|
}
|
|
|
|
if int(atomic.LoadInt32(&found)) >= req.MaxStreams {
|
|
break
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(stream models.DiscoveredStream) {
|
|
defer wg.Done()
|
|
|
|
// Acquire semaphore
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
// Special handling for ONVIF device service - skip testing, already verified
|
|
if stream.Type == "ONVIF" && stream.Working {
|
|
atomic.AddInt32(&tested, 1)
|
|
atomic.AddInt32(&found, 1)
|
|
streamsChan <- stream
|
|
s.logger.Debug("ONVIF device service added without testing", "url", stream.URL)
|
|
return
|
|
}
|
|
|
|
// Test the stream
|
|
testResult := s.tester.TestStream(ctx, stream.URL)
|
|
atomic.AddInt32(&tested, 1)
|
|
|
|
if testResult.Working {
|
|
atomic.AddInt32(&found, 1)
|
|
|
|
discoveredStream := models.DiscoveredStream{
|
|
URL: testResult.URL,
|
|
Type: testResult.Type,
|
|
Protocol: testResult.Protocol,
|
|
Port: 0, // Will be extracted from URL if needed
|
|
Working: true,
|
|
Resolution: testResult.Resolution,
|
|
Codec: testResult.Codec,
|
|
FPS: testResult.FPS,
|
|
Bitrate: testResult.Bitrate,
|
|
HasAudio: testResult.HasAudio,
|
|
TestTime: testResult.TestTime,
|
|
Metadata: testResult.Metadata,
|
|
}
|
|
|
|
streamsChan <- discoveredStream
|
|
} else {
|
|
s.logger.Debug("stream test failed", "url", stream.URL, "error", testResult.Error)
|
|
}
|
|
}(streamToTest)
|
|
}
|
|
|
|
// Wait for all tests to complete
|
|
wg.Wait()
|
|
close(streamsChan)
|
|
|
|
// Wait for result collector to finish processing all streams
|
|
collectorWg.Wait()
|
|
|
|
// Update final counts
|
|
result.TotalTested = int(atomic.LoadInt32(&tested))
|
|
result.TotalFound = int(atomic.LoadInt32(&found))
|
|
} |