From 1cfc2fa2e50ff6ea285b2e20392dbe081c48b0f9 Mon Sep 17 00:00:00 2001 From: eduard256 Date: Wed, 5 Nov 2025 23:20:51 +0300 Subject: [PATCH] Add ONVIF device service endpoint discovery for PTZ control --- internal/camera/discovery/onvif_simple.go | 180 ++++++++++++++++++++-- internal/camera/discovery/scanner.go | 119 +++++++++----- 2 files changed, 249 insertions(+), 50 deletions(-) diff --git a/internal/camera/discovery/onvif_simple.go b/internal/camera/discovery/onvif_simple.go index 32f823b..7a83f23 100644 --- a/internal/camera/discovery/onvif_simple.go +++ b/internal/camera/discovery/onvif_simple.go @@ -29,86 +29,186 @@ func NewONVIFDiscovery(logger interface{ Debug(string, ...any); Error(string, er // DiscoverStreamsForIP discovers all possible streams for a given IP func (o *ONVIFDiscovery) DiscoverStreamsForIP(ctx context.Context, ip, username, password string) ([]models.DiscoveredStream, error) { + o.logger.Debug("=== ONVIF DiscoverStreamsForIP STARTED ===", + "ip", ip, + "username", username, + "password_len", len(password)) + // Clean IP (remove port if present) if idx := strings.IndexByte(ip, ':'); idx > 0 { + o.logger.Debug("cleaning IP address", "original", ip, "cleaned", ip[:idx]) ip = ip[:idx] } var allStreams []models.DiscoveredStream // Try real ONVIF discovery first + o.logger.Debug(">>> Starting ONVIF device discovery", "ip", ip) onvifStreams := o.discoverViaONVIF(ctx, ip, username, password) + o.logger.Debug("<<< ONVIF device discovery completed", "streams_found", len(onvifStreams)) + + if len(onvifStreams) > 0 { + o.logger.Debug("ONVIF streams details:") + for i, stream := range onvifStreams { + o.logger.Debug(" ONVIF stream found", + "index", i, + "url", stream.URL, + "protocol", stream.Protocol, + "port", stream.Port, + "type", stream.Type) + } + } allStreams = append(allStreams, onvifStreams...) // Add common RTSP streams + o.logger.Debug(">>> Adding common RTSP streams", "ip", ip) commonStreams := o.getCommonRTSPStreams(ip, username, password) + o.logger.Debug("<<< Common RTSP streams added", "count", len(commonStreams)) allStreams = append(allStreams, commonStreams...) - o.logger.Debug("collected streams", "onvif", len(onvifStreams), "common", len(commonStreams), "total", len(allStreams)) + o.logger.Debug("=== ONVIF DiscoverStreamsForIP COMPLETED ===", + "onvif_streams", len(onvifStreams), + "common_streams", len(commonStreams), + "total_streams", len(allStreams)) return allStreams, nil } // discoverViaONVIF performs real ONVIF discovery func (o *ONVIFDiscovery) discoverViaONVIF(ctx context.Context, ip, username, password string) []models.DiscoveredStream { + o.logger.Debug(">>> discoverViaONVIF STARTED", "ip", ip) var streams []models.DiscoveredStream // Try standard ONVIF ports ports := []int{80, 8080, 8000} + o.logger.Debug("Will try ONVIF ports", "ports", ports) + + for portIdx, port := range ports { + o.logger.Debug("--- Trying ONVIF port ---", + "port_index", portIdx+1, + "total_ports", len(ports), + "port", port) - for _, port := range ports { // Create timeout context for ONVIF connection onvifCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() xaddr := fmt.Sprintf("%s:%d", ip, port) - - o.logger.Debug("trying ONVIF connection", "xaddr", xaddr) + o.logger.Debug("Creating ONVIF device", + "xaddr", xaddr, + "username", username, + "has_password", password != "") // Create ONVIF device + startTime := time.Now() dev, err := onvif.NewDevice(onvif.DeviceParams{ Xaddr: xaddr, Username: username, Password: password, }) + elapsed := time.Since(startTime) + if err != nil { - o.logger.Debug("ONVIF device creation failed", "xaddr", xaddr, "error", err.Error()) + o.logger.Debug("❌ ONVIF device creation FAILED", + "xaddr", xaddr, + "error", err.Error(), + "elapsed", elapsed.String()) continue } + o.logger.Debug("✅ ONVIF device created successfully", + "xaddr", xaddr, + "elapsed", elapsed.String()) + // Try to get profiles with context + o.logger.Debug("Getting media profiles...", "xaddr", xaddr) profileStreams := o.getProfileStreams(onvifCtx, dev, ip) + if len(profileStreams) > 0 { + // Add ONVIF device service endpoint + deviceServiceURL := fmt.Sprintf("http://%s/onvif/device_service", xaddr) + streams = append(streams, models.DiscoveredStream{ + URL: deviceServiceURL, + Type: "ONVIF", + Protocol: "http", + Port: port, + Working: true, // Mark as working since ONVIF connection succeeded + Metadata: map[string]interface{}{ + "source": "onvif", + "description": "ONVIF Device Service - used for PTZ control and device management", + }, + }) + + // Add profile streams streams = append(streams, profileStreams...) - o.logger.Debug("ONVIF discovery successful", "xaddr", xaddr, "profiles", len(profileStreams)) + + o.logger.Debug("🎉 ONVIF discovery SUCCESSFUL!", + "xaddr", xaddr, + "device_service", deviceServiceURL, + "profiles_found", len(profileStreams)) + + // Log device service + o.logger.Debug(" Device Service", + "url", deviceServiceURL) + + // Log each profile + for i, stream := range profileStreams { + o.logger.Debug(" Profile stream", + "index", i+1, + "url", stream.URL, + "metadata", stream.Metadata) + } break // Found working port, stop trying + } else { + o.logger.Debug("⚠️ No profiles returned from port", "xaddr", xaddr) } } + o.logger.Debug("<<< discoverViaONVIF COMPLETED", + "total_streams_found", len(streams)) + return streams } // getProfileStreams gets stream URIs from media profiles func (o *ONVIFDiscovery) getProfileStreams(ctx context.Context, dev *onvif.Device, ip string) []models.DiscoveredStream { + o.logger.Debug(">>> getProfileStreams STARTED", "ip", ip) var streams []models.DiscoveredStream // Get media profiles + o.logger.Debug("Calling GetProfiles ONVIF method...") getProfilesReq := media.GetProfiles{} + startTime := time.Now() profilesResp, err := dev.CallMethod(getProfilesReq) + elapsed := time.Since(startTime) + if err != nil { - o.logger.Debug("failed to get ONVIF profiles", "error", err.Error()) + o.logger.Debug("❌ Failed to call GetProfiles", + "error", err.Error(), + "elapsed", elapsed.String()) return streams } defer profilesResp.Body.Close() + o.logger.Debug("✅ GetProfiles call successful", + "elapsed", elapsed.String(), + "status_code", profilesResp.StatusCode) + // Read and parse XML response + o.logger.Debug("Reading response body...") body, err := io.ReadAll(profilesResp.Body) if err != nil { - o.logger.Debug("failed to read profiles response", "error", err.Error()) + o.logger.Debug("❌ Failed to read profiles response", + "error", err.Error()) return streams } + o.logger.Debug("Response body read", + "body_length", len(body), + "body_preview", string(body[:min(200, len(body))])) + // Parse SOAP envelope + o.logger.Debug("Parsing SOAP envelope...") var envelope struct { XMLName xml.Name `xml:"Envelope"` Body struct { @@ -117,14 +217,29 @@ func (o *ONVIFDiscovery) getProfileStreams(ctx context.Context, dev *onvif.Devic } if err := xml.Unmarshal(body, &envelope); err != nil { - o.logger.Debug("failed to parse profiles response", "error", err.Error()) + o.logger.Debug("❌ Failed to parse profiles response", + "error", err.Error()) return streams } + profileCount := len(envelope.Body.GetProfilesResponse.Profiles) + o.logger.Debug("✅ SOAP envelope parsed successfully", + "profiles_count", profileCount) + // Get stream URI for each profile - for _, profile := range envelope.Body.GetProfilesResponse.Profiles { + for i, profile := range envelope.Body.GetProfilesResponse.Profiles { + o.logger.Debug("Processing profile", + "index", i+1, + "total", profileCount, + "token", string(profile.Token), + "name", string(profile.Name)) + streamURI := o.getStreamURI(dev, string(profile.Token)) if streamURI != "" { + o.logger.Debug("✅ Got stream URI for profile", + "profile_token", string(profile.Token), + "stream_uri", streamURI) + streams = append(streams, models.DiscoveredStream{ URL: streamURI, Type: "FFMPEG", @@ -137,14 +252,29 @@ func (o *ONVIFDiscovery) getProfileStreams(ctx context.Context, dev *onvif.Devic "profile_name": string(profile.Name), }, }) + } else { + o.logger.Debug("⚠️ Failed to get stream URI for profile", + "profile_token", string(profile.Token)) } } + o.logger.Debug("<<< getProfileStreams COMPLETED", + "streams_collected", len(streams)) + return streams } +func min(a, b int) int { + if a < b { + return a + } + return b +} + // getStreamURI retrieves stream URI for a profile func (o *ONVIFDiscovery) getStreamURI(dev *onvif.Device, profileToken string) string { + o.logger.Debug(">>> getStreamURI STARTED", "profile_token", profileToken) + stream := xsdonvif.StreamType("RTP-Unicast") protocol := xsdonvif.TransportProtocol("RTSP") token := xsdonvif.ReferenceToken(profileToken) @@ -159,20 +289,37 @@ func (o *ONVIFDiscovery) getStreamURI(dev *onvif.Device, profileToken string) st }, } + o.logger.Debug("Calling GetStreamUri ONVIF method...", "profile_token", profileToken) + startTime := time.Now() resp, err := dev.CallMethod(getStreamURIReq) + elapsed := time.Since(startTime) + if err != nil { - o.logger.Debug("failed to get stream URI", "profile", profileToken, "error", err.Error()) + o.logger.Debug("❌ Failed to get stream URI", + "profile", profileToken, + "error", err.Error(), + "elapsed", elapsed.String()) return "" } defer resp.Body.Close() + o.logger.Debug("✅ GetStreamUri call successful", + "profile", profileToken, + "elapsed", elapsed.String(), + "status_code", resp.StatusCode) + // Read and parse XML response body, err := io.ReadAll(resp.Body) if err != nil { - o.logger.Debug("failed to read stream URI response", "error", err.Error()) + o.logger.Debug("❌ Failed to read stream URI response", + "error", err.Error()) return "" } + o.logger.Debug("Response body read", + "body_length", len(body), + "body_preview", string(body[:min(200, len(body))])) + // Parse SOAP envelope var envelope struct { XMLName xml.Name `xml:"Envelope"` @@ -182,11 +329,16 @@ func (o *ONVIFDiscovery) getStreamURI(dev *onvif.Device, profileToken string) st } if err := xml.Unmarshal(body, &envelope); err != nil { - o.logger.Debug("failed to parse stream URI response", "error", err.Error()) + o.logger.Debug("❌ Failed to parse stream URI response", + "error", err.Error()) return "" } - return string(envelope.Body.GetStreamUriResponse.MediaUri.Uri) + streamURI := string(envelope.Body.GetStreamUriResponse.MediaUri.Uri) + o.logger.Debug("<<< getStreamURI COMPLETED", + "stream_uri", streamURI) + + return streamURI } // getCommonRTSPStreams returns common RTSP stream URLs diff --git a/internal/camera/discovery/scanner.go b/internal/camera/discovery/scanner.go index c367814..301db24 100644 --- a/internal/camera/discovery/scanner.go +++ b/internal/camera/discovery/scanner.go @@ -113,25 +113,25 @@ func (s *Scanner) Scan(ctx context.Context, req models.StreamDiscoveryRequest, s return result, err } - // Collect all URLs to test - urls, err := s.collectURLs(scanCtx, req, ip) + // Collect all streams to test (includes metadata like type) + streams, err := s.collectStreams(scanCtx, req, ip) if err != nil { streamWriter.SendError(err) result.Error = err return result, err } - s.logger.Info("collected URLs for testing", "count", len(urls)) + s.logger.Info("collected streams for testing", "count", len(streams)) // Send progress update streamWriter.SendJSON("progress", models.ProgressMessage{ Tested: 0, Found: 0, - Remaining: len(urls), + Remaining: len(streams), }) - // Test URLs concurrently - s.testURLsConcurrently(scanCtx, urls, req, streamWriter, result) + // Test streams concurrently + s.testStreamsConcurrently(scanCtx, streams, req, streamWriter, result) // Calculate duration result.Duration = time.Since(startTime) @@ -275,13 +275,13 @@ func (s *Scanner) embedCredentialsInURL(streamURL, username, password, authMetho return embeddedURL } -// collectURLs collects all URLs to test -func (s *Scanner) collectURLs(ctx context.Context, req models.StreamDiscoveryRequest, ip string) ([]string, error) { - var allURLs []string +// collectStreams collects all streams to test with their metadata +func (s *Scanner) collectStreams(ctx context.Context, req models.StreamDiscoveryRequest, ip string) ([]models.DiscoveredStream, error) { + var allStreams []models.DiscoveredStream urlMap := make(map[string]bool) // For deduplication var onvifCount, modelCount, popularCount int - s.logger.Debug("collectURLs started", + s.logger.Debug("collectStreams started", "ip", ip, "model", req.Model, "username", req.Username, @@ -296,22 +296,48 @@ func (s *Scanner) collectURLs(ctx context.Context, req models.StreamDiscoveryReq } // 1. ONVIF discovery (always first) - s.logger.Debug("phase 1: starting ONVIF discovery", "ip", ip) + s.logger.Debug("========================================") + s.logger.Debug("PHASE 1: STARTING ONVIF DISCOVERY") + s.logger.Debug("========================================") + s.logger.Debug("ONVIF parameters", + "ip", ip, + "username", req.Username, + "password_len", len(req.Password), + "channel", req.Channel) + + startTime := time.Now() onvifStreams, err := s.onvif.DiscoverStreamsForIP(ctx, ip, req.Username, req.Password) + elapsed := time.Since(startTime) + if err != nil { - s.logger.Error("ONVIF discovery failed", err) + s.logger.Error("❌ ONVIF discovery FAILED", err, + "elapsed", elapsed.String()) } else { - for _, stream := range onvifStreams { + s.logger.Debug("✅ ONVIF discovery returned", + "streams_count", len(onvifStreams), + "elapsed", elapsed.String()) + + for i, stream := range onvifStreams { + s.logger.Debug("ONVIF stream returned", + "index", i+1, + "url", stream.URL, + "type", stream.Type, + "source", stream.Metadata["source"]) + if !urlMap[stream.URL] { - allURLs = append(allURLs, stream.URL) + allStreams = append(allStreams, stream) urlMap[stream.URL] = true onvifCount++ + s.logger.Debug(" ✓ Added to stream list (unique)") + } else { + s.logger.Debug(" ✗ Skipped (duplicate)") } } - s.logger.Debug("ONVIF discovery completed", - "streams_found", len(onvifStreams), - "unique_urls_added", onvifCount) + s.logger.Debug("ONVIF phase completed", + "total_streams_returned", len(onvifStreams), + "unique_streams_added", onvifCount) } + s.logger.Debug("========================================\n") // 2. Model-specific patterns if req.Model != "" { @@ -334,7 +360,7 @@ func (s *Scanner) collectURLs(ctx context.Context, req models.StreamDiscoveryReq "cameras_matched", len(cameras), "total_entries", len(entries)) - // Build URLs from entries + // Build streams from entries for _, entry := range entries { buildCtx.Port = entry.Port buildCtx.Protocol = entry.Protocol @@ -342,15 +368,21 @@ func (s *Scanner) collectURLs(ctx context.Context, req models.StreamDiscoveryReq urls := s.builder.BuildURLsFromEntry(entry, buildCtx) for _, url := range urls { if !urlMap[url] { - allURLs = append(allURLs, url) + allStreams = append(allStreams, models.DiscoveredStream{ + URL: url, + Type: entry.Type, + Protocol: entry.Protocol, + Port: entry.Port, + Working: false, // Will be tested + }) urlMap[url] = true modelCount++ } } } - s.logger.Debug("model patterns URLs built", - "total_unique_model_urls", modelCount) + s.logger.Debug("model patterns streams built", + "total_unique_model_streams", modelCount) } } @@ -375,7 +407,13 @@ func (s *Scanner) collectURLs(ctx context.Context, req models.StreamDiscoveryReq url := s.builder.BuildURL(entry, buildCtx) if !urlMap[url] { - allURLs = append(allURLs, url) + allStreams = append(allStreams, models.DiscoveredStream{ + URL: url, + Type: pattern.Type, + Protocol: pattern.Protocol, + Port: pattern.Port, + Working: false, // Will be tested + }) urlMap[url] = true popularCount++ } @@ -383,21 +421,21 @@ func (s *Scanner) collectURLs(ctx context.Context, req models.StreamDiscoveryReq } totalBeforeDedup := onvifCount + modelCount + popularCount - duplicatesRemoved := totalBeforeDedup - len(allURLs) + duplicatesRemoved := totalBeforeDedup - len(allStreams) - s.logger.Debug("URL collection complete", - "total_unique_urls", len(allURLs), + s.logger.Debug("stream collection complete", + "total_unique_streams", len(allStreams), "from_onvif", onvifCount, "from_model_patterns", modelCount, "from_popular_patterns", popularCount, "total_before_dedup", totalBeforeDedup, "duplicates_removed", duplicatesRemoved) - return allURLs, nil + return allStreams, nil } -// testURLsConcurrently tests URLs concurrently -func (s *Scanner) testURLsConcurrently(ctx context.Context, urls []string, req models.StreamDiscoveryRequest, streamWriter *sse.StreamWriter, result *ScanResult) { +// testStreamsConcurrently tests streams concurrently +func (s *Scanner) testStreamsConcurrently(ctx context.Context, streams []models.DiscoveredStream, req models.StreamDiscoveryRequest, streamWriter *sse.StreamWriter, result *ScanResult) { var wg sync.WaitGroup var tested int32 var found int32 @@ -427,7 +465,7 @@ func (s *Scanner) testURLsConcurrently(ctx context.Context, urls []string, req m streamWriter.SendJSON("progress", models.ProgressMessage{ Tested: int(currentTested), Found: int(atomic.LoadInt32(&found)), - Remaining: len(urls) - int(currentTested), + Remaining: len(streams) - int(currentTested), }) lastTested = currentTested } @@ -449,7 +487,7 @@ func (s *Scanner) testURLsConcurrently(ctx context.Context, urls []string, req m streamWriter.SendJSON("progress", models.ProgressMessage{ Tested: int(atomic.LoadInt32(&tested)), Found: int(atomic.LoadInt32(&found)), - Remaining: len(urls) - int(atomic.LoadInt32(&tested)), + Remaining: len(streams) - int(atomic.LoadInt32(&tested)), }) // Check if we've found enough streams @@ -459,8 +497,8 @@ func (s *Scanner) testURLsConcurrently(ctx context.Context, urls []string, req m } }() - // Test each URL - for _, url := range urls { + // Test each stream + for _, streamToTest := range streams { // Check if context is done or max streams reached select { case <-ctx.Done(): @@ -474,15 +512,24 @@ func (s *Scanner) testURLsConcurrently(ctx context.Context, urls []string, req m } wg.Add(1) - go func(url string) { + go func(stream models.DiscoveredStream) { defer wg.Done() // Acquire semaphore sem <- struct{}{} defer func() { <-sem }() + // Special handling for ONVIF device service - skip testing, already verified + if stream.Type == "ONVIF" && stream.Working { + atomic.AddInt32(&tested, 1) + atomic.AddInt32(&found, 1) + streamsChan <- stream + s.logger.Debug("ONVIF device service added without testing", "url", stream.URL) + return + } + // Test the stream - testResult := s.tester.TestStream(ctx, url, req.Username, req.Password) + testResult := s.tester.TestStream(ctx, stream.URL, req.Username, req.Password) atomic.AddInt32(&tested, 1) if testResult.Working { @@ -509,9 +556,9 @@ func (s *Scanner) testURLsConcurrently(ctx context.Context, urls []string, req m streamsChan <- discoveredStream } else { - s.logger.Debug("stream test failed", "url", url, "error", testResult.Error) + s.logger.Debug("stream test failed", "url", stream.URL, "error", testResult.Error) } - }(url) + }(streamToTest) } // Wait for all tests to complete