From 8a21809f1833f5023c036c9939959099e7263092 Mon Sep 17 00:00:00 2001 From: Sergey Krashevich Date: Mon, 9 Mar 2026 13:06:57 +0300 Subject: [PATCH] feat(homekit): add ONVIF motion detection support - implement ONVIF motion watcher to handle motion events - add configuration options for motion hold time and ONVIF URL - remap motion mode from "onvif" to "api" for compatibility - log ONVIF motion watcher activity for better debugging feat(onvif): implement event subscription for motion detection - create PullPoint subscription to receive motion events - implement methods for pulling messages and renewing subscriptions - handle event requests and responses specific to motion detection test(onvif): add unit tests for motion event parsing and subscription - create tests for parsing various motion event XML responses - verify correct handling of multiple notifications and edge cases - test resolving event addresses for ONVIF clients fix(hksv): improve motion detection logging - log warnings when accessory or character not found during motion detection - log number of listeners notified during motion state changes feat(hap): add listener count method - introduce method to retrieve the number of listeners for a character feat(onvif): enhance ONVIF client with event URL handling - extract event URL from ONVIF device response for subscription management --- internal/homekit/homekit.go | 33 ++- internal/homekit/onvif_motion.go | 287 ++++++++++++++++++++++++++ internal/homekit/onvif_motion_test.go | 99 +++++++++ pkg/hap/character.go | 4 + pkg/hksv/hksv.go | 8 +- pkg/onvif/client.go | 6 + pkg/onvif/envelope.go | 46 +++++ pkg/onvif/events.go | 248 ++++++++++++++++++++++ pkg/onvif/events_test.go | 263 +++++++++++++++++++++++ pkg/onvif/helpers.go | 13 ++ 10 files changed, 1004 insertions(+), 3 deletions(-) create mode 100644 internal/homekit/onvif_motion.go create mode 100644 internal/homekit/onvif_motion_test.go create mode 100644 pkg/onvif/events.go create mode 100644 pkg/onvif/events_test.go diff --git a/internal/homekit/homekit.go b/internal/homekit/homekit.go index 1a85c974..5eb75d1e 100644 --- a/internal/homekit/homekit.go +++ b/internal/homekit/homekit.go @@ -7,6 +7,7 @@ import ( "net/http" "strings" "sync" + "time" "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/app" @@ -36,6 +37,8 @@ func Init() { HKSV bool `yaml:"hksv"` Motion string `yaml:"motion"` MotionThreshold float64 `yaml:"motion_threshold"` + MotionHoldTime float64 `yaml:"motion_hold_time"` + OnvifURL string `yaml:"onvif_url"` Speaker *bool `yaml:"speaker"` } `yaml:"homekit"` } @@ -71,6 +74,12 @@ func Init() { proxyURL = url } + // Remap "onvif" → "api" for hksv.Server; ONVIF watcher drives motion externally. + motionMode := conf.Motion + if motionMode == "onvif" { + motionMode = "api" + } + srv, err := hksv.NewServer(hksv.Config{ StreamName: id, Pin: conf.Pin, @@ -81,7 +90,7 @@ func Init() { Pairings: conf.Pairings, ProxyURL: proxyURL, HKSV: conf.HKSV, - MotionMode: conf.Motion, + MotionMode: motionMode, MotionThreshold: conf.MotionThreshold, Speaker: conf.Speaker, UserAgent: app.UserAgent, @@ -98,6 +107,28 @@ func Init() { continue } + // Start ONVIF motion watcher if configured. + if conf.Motion == "onvif" { + onvifURL := conf.OnvifURL + if onvifURL == "" { + sources := stream.Sources() + log.Debug().Str("stream", id).Strs("sources", sources). + Msg("[homekit] onvif motion: searching for ONVIF URL in stream sources") + onvifURL = findOnvifURL(sources) + } + if onvifURL == "" { + log.Warn().Str("stream", id).Msg("[homekit] onvif motion: no ONVIF URL found, set onvif_url or use onvif:// stream source") + } else { + holdTime := time.Duration(conf.MotionHoldTime) * time.Second + if holdTime <= 0 { + holdTime = 30 * time.Second + } + log.Info().Str("stream", id).Str("onvif_url", onvifURL). + Dur("hold_time", holdTime).Msg("[homekit] starting ONVIF motion watcher") + startOnvifMotionWatcher(srv, onvifURL, holdTime, log) + } + } + entry := srv.MDNSEntry() entries = append(entries, entry) diff --git a/internal/homekit/onvif_motion.go b/internal/homekit/onvif_motion.go new file mode 100644 index 00000000..247225f0 --- /dev/null +++ b/internal/homekit/onvif_motion.go @@ -0,0 +1,287 @@ +package homekit + +import ( + "strings" + "sync" + "time" + + "github.com/AlexxIT/go2rtc/pkg/hksv" + "github.com/AlexxIT/go2rtc/pkg/onvif" + "github.com/rs/zerolog" +) + +const ( + onvifSubscriptionTimeout = 60 * time.Second + onvifPullTimeout = 30 * time.Second + onvifMessageLimit = 10 + onvifRenewMargin = 10 * time.Second + onvifMinReconnectDelay = 5 * time.Second + onvifMaxReconnectDelay = 60 * time.Second +) + +type onvifPullPoint interface { + PullMessages(timeout time.Duration, limit int) ([]byte, error) + Renew(timeout time.Duration) error + Unsubscribe() error +} + +type onvifPullPointFactory func(rawURL string, timeout time.Duration) (onvifPullPoint, error) + +// onvifMotionWatcher subscribes to ONVIF PullPoint events +// and forwards motion state to an hksv.Server. +type onvifMotionWatcher struct { + srv *hksv.Server + onvifURL string + holdTime time.Duration + log zerolog.Logger + + now func() time.Time + newPullPoint onvifPullPointFactory + subscriptionTimeout time.Duration + pullTimeout time.Duration + renewMargin time.Duration + messageLimit int + + done chan struct{} + once sync.Once +} + +func newOnvifMotionWatcher(srv *hksv.Server, onvifURL string, holdTime time.Duration, log zerolog.Logger) *onvifMotionWatcher { + return &onvifMotionWatcher{ + srv: srv, + onvifURL: onvifURL, + holdTime: holdTime, + log: log, + now: time.Now, + newPullPoint: newOnvifPullPoint, + subscriptionTimeout: onvifSubscriptionTimeout, + pullTimeout: onvifPullTimeout, + renewMargin: onvifRenewMargin, + messageLimit: onvifMessageLimit, + done: make(chan struct{}), + } +} + +// startOnvifMotionWatcher creates and starts a new ONVIF motion watcher. +func startOnvifMotionWatcher(srv *hksv.Server, onvifURL string, holdTime time.Duration, log zerolog.Logger) *onvifMotionWatcher { + w := newOnvifMotionWatcher(srv, onvifURL, holdTime, log) + go w.run() + return w +} + +// stop shuts down the watcher goroutine. +func (w *onvifMotionWatcher) stop() { + w.once.Do(func() { close(w.done) }) +} + +// run is the main loop: create subscription, poll, handle events, reconnect on failure. +func (w *onvifMotionWatcher) run() { + w.log.Debug().Str("url", w.onvifURL).Dur("hold_time", w.holdTime). + Msg("[homekit] onvif motion watcher starting") + + delay := onvifMinReconnectDelay + + for { + select { + case <-w.done: + w.log.Debug().Msg("[homekit] onvif motion watcher stopped (before connect)") + return + default: + } + + w.log.Debug().Str("url", w.onvifURL).Msg("[homekit] onvif motion connecting to camera") + + err := w.connectAndPoll() + if err != nil { + w.log.Warn().Err(err).Str("url", w.onvifURL).Msg("[homekit] onvif motion error") + } else { + delay = onvifMinReconnectDelay + } + + select { + case <-w.done: + w.log.Debug().Msg("[homekit] onvif motion watcher stopped (after poll)") + return + default: + } + + w.log.Debug().Dur("delay", delay).Msg("[homekit] onvif motion reconnecting") + + select { + case <-time.After(delay): + case <-w.done: + w.log.Debug().Msg("[homekit] onvif motion watcher stopped (during backoff)") + return + } + + delay *= 2 + if delay > onvifMaxReconnectDelay { + delay = onvifMaxReconnectDelay + } + } +} + +// connectAndPoll creates a subscription and polls for events until an error occurs or stop is called. +func (w *onvifMotionWatcher) connectAndPoll() error { + w.log.Trace().Str("url", w.onvifURL).Dur("timeout", w.subscriptionTimeout). + Msg("[homekit] onvif motion: creating pull point subscription") + + sub, err := w.newPullPoint(w.onvifURL, w.subscriptionTimeout) + if err != nil { + w.log.Debug().Err(err).Str("url", w.onvifURL). + Msg("[homekit] onvif motion: pull point creation failed") + return err + } + + w.log.Info().Str("url", w.onvifURL).Msg("[homekit] onvif motion subscription created") + + defer func() { + w.log.Trace().Msg("[homekit] onvif motion: unsubscribing") + _ = sub.Unsubscribe() + }() + + // motionActive tracks whether we've reported motion=true to the HKSV server. + // Hold timer ensures motion stays active for at least holdTime after last trigger, + // regardless of whether the camera sends explicit "motion=false". + // This matches the behavior of the built-in MotionDetector (30s hold time). + motionActive := false + var holdTimer *time.Timer + defer func() { + if holdTimer != nil { + holdTimer.Stop() + } + }() + + renewInterval := w.subscriptionRenewInterval() + renewAt := w.now().Add(renewInterval) + + w.log.Trace().Dur("renew_interval", renewInterval). + Msg("[homekit] onvif motion: subscription renew scheduled") + + pollCount := 0 + + for { + select { + case <-w.done: + w.log.Debug().Int("polls", pollCount). + Msg("[homekit] onvif motion: poll loop stopped") + return nil + default: + } + + if !renewAt.After(w.now()) { + w.log.Trace().Msg("[homekit] onvif motion: renewing subscription") + if err := sub.Renew(w.subscriptionTimeout); err != nil { + w.log.Warn().Err(err).Msg("[homekit] onvif motion: renew failed") + return err + } + renewAt = w.now().Add(renewInterval) + w.log.Trace().Msg("[homekit] onvif motion: subscription renewed") + } + + pullTimeout := w.nextPullTimeout(renewAt) + + w.log.Trace().Dur("timeout", pullTimeout).Int("limit", w.messageLimit). + Int("poll", pollCount+1).Msg("[homekit] onvif motion: pulling messages") + + b, err := sub.PullMessages(pullTimeout, w.messageLimit) + if err != nil { + w.log.Debug().Err(err).Int("polls", pollCount). + Msg("[homekit] onvif motion: pull messages failed") + return err + } + pollCount++ + + w.log.Trace().Int("bytes", len(b)).Int("poll", pollCount). + Msg("[homekit] onvif motion: pull response received") + + if l := w.log.Trace(); l.Enabled() { + l.Str("body", string(b)).Msg("[homekit] onvif motion: raw response") + } + + motion, found := onvif.ParseMotionEvents(b) + + w.log.Trace().Bool("found", found).Bool("motion", motion). + Bool("active", motionActive).Msg("[homekit] onvif motion: parse result") + + if !found { + w.log.Trace().Msg("[homekit] onvif motion: no motion events in response") + continue + } + + if motion { + // Motion detected — activate and start/reset hold timer. + if !motionActive { + motionActive = true + w.srv.SetMotionDetected(true) + w.log.Debug().Msg("[homekit] onvif motion: detected") + } else { + w.log.Trace().Msg("[homekit] onvif motion: still active, resetting hold timer") + } + + // Reset hold timer on every motion=true event. + if holdTimer != nil { + holdTimer.Stop() + } + holdTimer = time.AfterFunc(w.holdTime, func() { + motionActive = false + w.srv.SetMotionDetected(false) + w.log.Debug().Msg("[homekit] onvif motion: hold expired") + }) + } else { + // Camera sent explicit motion=false. + // Do NOT clear immediately — let the hold timer handle it. + // This ensures motion stays active for at least holdTime, + // giving the Home Hub enough time to open the DataStream. + w.log.Debug().Dur("remaining_hold", w.holdTime). + Bool("active", motionActive). + Msg("[homekit] onvif motion: camera reported clear, waiting for hold timer") + } + } +} + +func (w *onvifMotionWatcher) subscriptionRenewInterval() time.Duration { + interval := w.subscriptionTimeout - w.renewMargin + if interval <= 0 { + interval = w.subscriptionTimeout / 2 + } + if interval <= 0 { + interval = time.Second + } + return interval +} + +func (w *onvifMotionWatcher) nextPullTimeout(renewAt time.Time) time.Duration { + timeout := w.pullTimeout + if timeout <= 0 { + timeout = time.Second + } + + if untilRenew := renewAt.Sub(w.now()); untilRenew > 0 && untilRenew < timeout { + timeout = untilRenew + } + + if timeout <= 0 { + timeout = time.Second + } + + return timeout +} + +func newOnvifPullPoint(rawURL string, timeout time.Duration) (onvifPullPoint, error) { + client, err := onvif.NewClient(rawURL) + if err != nil { + return nil, err + } + return client.CreatePullPointSubscription(timeout) +} + +// findOnvifURL looks for an onvif:// URL in stream sources. +func findOnvifURL(sources []string) string { + for _, src := range sources { + if strings.HasPrefix(src, "onvif://") || strings.HasPrefix(src, "onvif:") { + return src + } + } + return "" +} diff --git a/internal/homekit/onvif_motion_test.go b/internal/homekit/onvif_motion_test.go new file mode 100644 index 00000000..f6c18e93 --- /dev/null +++ b/internal/homekit/onvif_motion_test.go @@ -0,0 +1,99 @@ +package homekit + +import ( + "errors" + "testing" + "time" + + "github.com/AlexxIT/go2rtc/pkg/hksv" + "github.com/rs/zerolog" +) + +func TestOnvifMotionWatcherConnectAndPollRenewsBeforeLeaseExpires(t *testing.T) { + start := time.Unix(0, 0) + now := start + stopErr := errors.New("stop pull loop") + + sub := &fakeOnvifPullPoint{ + t: t, + now: &now, + pullErrAt: 3, + pullErr: stopErr, + } + + w := newOnvifMotionWatcher(&hksv.Server{}, "onvif://camera", 30*time.Second, zerolog.Nop()) + w.now = func() time.Time { return now } + w.newPullPoint = func(rawURL string, timeout time.Duration) (onvifPullPoint, error) { + if rawURL != "onvif://camera" { + t.Fatalf("unexpected ONVIF URL: %s", rawURL) + } + if timeout != 60*time.Second { + t.Fatalf("unexpected subscription timeout: %v", timeout) + } + return sub, nil + } + + err := w.connectAndPoll() + if !errors.Is(err, stopErr) { + t.Fatalf("expected %v, got %v", stopErr, err) + } + + wantPulls := []time.Duration{30 * time.Second, 20 * time.Second, 30 * time.Second} + if len(sub.pullTimeouts) != len(wantPulls) { + t.Fatalf("unexpected pull count: got %d want %d", len(sub.pullTimeouts), len(wantPulls)) + } + for i, want := range wantPulls { + if sub.pullTimeouts[i] != want { + t.Fatalf("pull %d timeout mismatch: got %v want %v", i+1, sub.pullTimeouts[i], want) + } + } + + if sub.renewCalls != 1 { + t.Fatalf("expected 1 renew call, got %d", sub.renewCalls) + } + if !sub.unsubscribed { + t.Fatal("expected unsubscribe on exit") + } +} + +type fakeOnvifPullPoint struct { + t *testing.T + + now *time.Time + + pullTimeouts []time.Duration + renewCalls int + unsubscribed bool + + pullErrAt int + pullErr error +} + +func (f *fakeOnvifPullPoint) PullMessages(timeout time.Duration, limit int) ([]byte, error) { + if limit != 10 { + f.t.Fatalf("unexpected message limit: %d", limit) + } + + f.pullTimeouts = append(f.pullTimeouts, timeout) + *f.now = f.now.Add(timeout) + + if f.pullErrAt > 0 && len(f.pullTimeouts) == f.pullErrAt { + return nil, f.pullErr + } + + return []byte(``), nil +} + +func (f *fakeOnvifPullPoint) Renew(timeout time.Duration) error { + if timeout != 60*time.Second { + f.t.Fatalf("unexpected renew timeout: %v", timeout) + } + + f.renewCalls++ + return nil +} + +func (f *fakeOnvifPullPoint) Unsubscribe() error { + f.unsubscribed = true + return nil +} diff --git a/pkg/hap/character.go b/pkg/hap/character.go index afa321e2..fb5add41 100644 --- a/pkg/hap/character.go +++ b/pkg/hap/character.go @@ -48,6 +48,10 @@ func (c *Character) RemoveListener(w io.Writer) { } } +func (c *Character) ListenerCount() int { + return len(c.listeners) +} + func (c *Character) NotifyListeners(ignore io.Writer) error { if c.listeners == nil { return nil diff --git a/pkg/hksv/hksv.go b/pkg/hksv/hksv.go index af0945ab..23bf06b5 100644 --- a/pkg/hksv/hksv.go +++ b/pkg/hksv/hksv.go @@ -600,15 +600,19 @@ func (s *Server) GetImage(conn net.Conn, width, height int) []byte { // SetMotionDetected triggers or clears the motion detected characteristic. func (s *Server) SetMotionDetected(detected bool) { if s.accessory == nil { + s.log.Warn().Str("stream", s.stream).Msg("[hksv] SetMotionDetected: accessory is nil") return } char := s.accessory.GetCharacter("22") // MotionDetected if char == nil { + s.log.Warn().Str("stream", s.stream).Msg("[hksv] SetMotionDetected: char 22 (MotionDetected) not found") return } char.Value = detected - _ = char.NotifyListeners(nil) - s.log.Debug().Str("stream", s.stream).Bool("motion", detected).Msg("[hksv] motion") + listeners := char.ListenerCount() + err := char.NotifyListeners(nil) + s.log.Debug().Str("stream", s.stream).Bool("motion", detected). + Int("listeners", listeners).Err(err).Msg("[hksv] motion") } // MotionDetected returns the current motion detected state. diff --git a/pkg/onvif/client.go b/pkg/onvif/client.go index bad103c7..f293f508 100644 --- a/pkg/onvif/client.go +++ b/pkg/onvif/client.go @@ -20,6 +20,7 @@ type Client struct { deviceURL string mediaURL string imaginURL string + eventURL string } func NewClient(rawURL string) (*Client, error) { @@ -44,6 +45,11 @@ func NewClient(rawURL string) (*Client, error) { s = FindTagValue(b, "Imaging.+?XAddr") client.imaginURL = baseURL + GetPath(s, "/onvif/imaging_service") + s = FindTagValue(b, "Events.+?XAddr") + if s != "" { + client.eventURL = baseURL + getURLPath(s, "/onvif/event_service") + } + return client, nil } diff --git a/pkg/onvif/envelope.go b/pkg/onvif/envelope.go index 76a41260..ab3bb4d2 100644 --- a/pkg/onvif/envelope.go +++ b/pkg/onvif/envelope.go @@ -18,6 +18,14 @@ const ( prefix1 = `` prefix2 = `` suffix = `` + + eventPrefix = `` + + `` ) func NewEnvelope() *Envelope { @@ -71,3 +79,41 @@ func (e *Envelope) Appendf(format string, args ...any) { func (e *Envelope) Bytes() []byte { return append(e.buf, suffix...) } + +func NewEventEnvelope() *Envelope { + e := &Envelope{buf: make([]byte, 0, 1024)} + e.Append(eventPrefix, prefix2) + return e +} + +func NewEventEnvelopeWithUser(user *url.Userinfo) *Envelope { + if user == nil { + return NewEventEnvelope() + } + + nonce := core.RandString(16, 36) + created := time.Now().UTC().Format(time.RFC3339Nano) + pass, _ := user.Password() + + h := sha1.New() + h.Write([]byte(nonce + created + pass)) + + e := &Envelope{buf: make([]byte, 0, 1024)} + e.Append(eventPrefix) + e.Appendf(` + + + %s + %s + %s + %s + + +`, + user.Username(), + base64.StdEncoding.EncodeToString(h.Sum(nil)), + base64.StdEncoding.EncodeToString([]byte(nonce)), + created) + e.Append(prefix2) + return e +} diff --git a/pkg/onvif/events.go b/pkg/onvif/events.go new file mode 100644 index 00000000..e975e5ad --- /dev/null +++ b/pkg/onvif/events.go @@ -0,0 +1,248 @@ +package onvif + +import ( + "bytes" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "regexp" + "strings" + "time" + + "github.com/rs/zerolog/log" +) + +// EventSubscription holds state for an ONVIF PullPoint event subscription. +type EventSubscription struct { + client *Client + // address is the PullPoint subscription manager URL (from CreatePullPointSubscription response). + address string +} + +// CreatePullPointSubscription creates an ONVIF PullPoint subscription on the camera's event service. +// The timeout specifies how long the subscription stays alive before needing renewal. +func (c *Client) CreatePullPointSubscription(timeout time.Duration) (*EventSubscription, error) { + if c.eventURL == "" { + return nil, errors.New("onvif: event service not available") + } + + secs := int(timeout.Seconds()) + if secs < 10 { + secs = 10 + } + + log.Debug().Str("event_url", c.eventURL).Int("timeout_secs", secs). + Msg("[onvif] creating pull point subscription") + + body := fmt.Sprintf(``+ + `PT%dS`+ + ``, secs) + + b, err := c.EventRequest(c.eventURL, body) + if err != nil { + return nil, fmt.Errorf("onvif: create pull point: %w", err) + } + + log.Trace().Str("response", string(b)).Msg("[onvif] create pull point response") + + // Extract subscription reference address from response. + // Response contains: http://... + addr := FindTagValue(b, "Address") + if addr == "" { + return nil, errors.New("onvif: no subscription address in response") + } + + log.Debug().Str("raw_address", addr).Msg("[onvif] subscription address from camera") + + // Some cameras return relative paths or localhost — fix using camera's host. + resolved := c.resolveEventAddress(addr) + + log.Debug().Str("resolved_address", resolved).Msg("[onvif] subscription address resolved") + + return &EventSubscription{ + client: c, + address: resolved, + }, nil +} + +// PullMessages polls the camera for events. This is a long-poll: it blocks +// up to the specified timeout waiting for events. Returns raw XML response. +func (s *EventSubscription) PullMessages(timeout time.Duration, limit int) ([]byte, error) { + secs := int(timeout.Seconds()) + if secs < 1 { + secs = 1 + } + if limit < 1 { + limit = 1 + } + + body := fmt.Sprintf(``+ + `PT%dS`+ + `%d`+ + ``, secs, limit) + + return s.client.EventRequest(s.address, body) +} + +// Renew extends the subscription lifetime by the specified duration. +func (s *EventSubscription) Renew(timeout time.Duration) error { + secs := int(timeout.Seconds()) + + log.Trace().Str("address", s.address).Int("timeout_secs", secs). + Msg("[onvif] renewing subscription") + + body := fmt.Sprintf(``+ + `PT%dS`+ + ``, secs) + + _, err := s.client.EventRequest(s.address, body) + return err +} + +// Unsubscribe terminates the subscription on the camera (best-effort). +func (s *EventSubscription) Unsubscribe() error { + log.Trace().Str("address", s.address).Msg("[onvif] unsubscribing") + _, err := s.client.EventRequest(s.address, ``) + return err +} + +// EventRequest sends a SOAP request with event-specific namespaces. +func (c *Client) EventRequest(reqURL, body string) ([]byte, error) { + if reqURL == "" { + return nil, errors.New("onvif: unsupported service") + } + + e := NewEventEnvelopeWithUser(c.url.User) + e.Append(body) + + log.Trace().Str("url", reqURL).Msg("[onvif] event request sending") + + // Use a longer timeout for PullMessages (long-poll). + client := &http.Client{Timeout: 90 * time.Second} + res, err := client.Post(reqURL, `application/soap+xml;charset=utf-8`, bytes.NewReader(e.Bytes())) + if err != nil { + log.Trace().Err(err).Str("url", reqURL).Msg("[onvif] event request failed") + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + log.Debug().Str("url", reqURL).Int("status", res.StatusCode). + Msg("[onvif] event request non-200 response") + return nil, errors.New("onvif: event request failed " + res.Status) + } + + b, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + log.Trace().Str("url", reqURL).Int("bytes", len(b)). + Msg("[onvif] event request response received") + + return b, nil +} + +// resolveEventAddress fixes subscription addresses returned by the camera. +// The camera may return its internal IP, localhost, or a relative path. +// We always use the host from the original client URL since we know it's reachable. +func (c *Client) resolveEventAddress(addr string) string { + u, err := url.Parse(addr) + if err != nil { + return addr + } + + // Always use the host we connected to (handles Docker, NAT, port mapping, etc.). + u.Host = c.url.Host + + if u.Scheme == "" { + u.Scheme = "http" + } + + return u.String() +} + +// ParseMotionEvents extracts motion state from a PullMessages response. +// Returns (motionDetected, found). If no motion-related notification is present, found=false. +// +// Recognizes common ONVIF motion event topics: +// - tns1:RuleEngine/CellMotionDetector/Motion (IsMotion property) +// - tns1:VideoSource/MotionAlarm (State property) +// - tns1:RuleEngine/MotionRegionDetector/Motion +func ParseMotionEvents(b []byte) (motion bool, found bool) { + s := string(b) + + // Find notification messages containing motion-related topics. + reTopic := regexp.MustCompile(`(?s)<[^>]*Topic[^>]*>([^<]*)]+Name="(IsMotion|State)"[^>]+Value="(\w+)"`) + + topics := reTopic.FindAllStringSubmatch(s, -1) + if len(topics) == 0 { + log.Trace().Msg("[onvif] parse: no topics found in response") + return false, false + } + + log.Trace().Int("topic_count", len(topics)).Msg("[onvif] parse: topics found") + for i, t := range topics { + if len(t) >= 2 { + log.Trace().Int("idx", i).Str("topic", t[1]).Msg("[onvif] parse: topic") + } + } + + // Split response into individual NotificationMessage blocks. + messages := splitNotificationMessages(s) + + log.Trace().Int("message_count", len(messages)).Msg("[onvif] parse: notification messages") + + for _, msg := range messages { + // Check if this message's topic is motion-related. + topicMatch := reTopic.FindStringSubmatch(msg) + if len(topicMatch) < 2 { + log.Trace().Msg("[onvif] parse: message has no topic, skipping") + continue + } + topic := topicMatch[1] + + if !isMotionTopic(topic) { + log.Trace().Str("topic", topic).Msg("[onvif] parse: non-motion topic, skipping") + continue + } + + log.Trace().Str("topic", topic).Msg("[onvif] parse: motion topic found") + + // Extract the motion value from this message. + valueMatch := reValue.FindStringSubmatch(msg) + if len(valueMatch) < 3 { + log.Trace().Str("topic", topic).Msg("[onvif] parse: no IsMotion/State value in message") + continue + } + + val := strings.ToLower(valueMatch[2]) + motion = val == "true" || val == "1" + found = true + + log.Trace().Str("topic", topic).Str("name", valueMatch[1]). + Str("value", valueMatch[2]).Bool("motion", motion). + Msg("[onvif] parse: motion value extracted") + // Use the last motion event if multiple are present. + } + + return motion, found +} + +// isMotionTopic checks if a topic string relates to motion detection. +func isMotionTopic(topic string) bool { + topic = strings.ToLower(topic) + return strings.Contains(topic, "motiondetector") || + strings.Contains(topic, "motionalarm") || + strings.Contains(topic, "motionregiondetector") || + strings.Contains(topic, "cellmotiondetector") +} + +// splitNotificationMessages splits the XML response into individual notification message blocks. +func splitNotificationMessages(s string) []string { + re := regexp.MustCompile(`(?s)<[^>]*NotificationMessage[^>]*>.*?]*NotificationMessage>`) + return re.FindAllString(s, -1) +} diff --git a/pkg/onvif/events_test.go b/pkg/onvif/events_test.go new file mode 100644 index 00000000..4d1f147a --- /dev/null +++ b/pkg/onvif/events_test.go @@ -0,0 +1,263 @@ +package onvif + +import ( + "net/url" + "testing" +) + +func TestParseMotionEvents_CellMotionDetector_True(t *testing.T) { + // Dahua-style CellMotionDetector/Motion with IsMotion=true + xml := ` + + + + +tns1:RuleEngine/CellMotionDetector/Motion + + + + + + + + + + +` + + motion, found := ParseMotionEvents([]byte(xml)) + if !found { + t.Fatal("expected found=true") + } + if !motion { + t.Fatal("expected motion=true") + } +} + +func TestParseMotionEvents_CellMotionDetector_False(t *testing.T) { + xml := ` + + + + +tns1:RuleEngine/CellMotionDetector/Motion + + + + + + + + + + +` + + motion, found := ParseMotionEvents([]byte(xml)) + if !found { + t.Fatal("expected found=true") + } + if motion { + t.Fatal("expected motion=false") + } +} + +func TestParseMotionEvents_VideoSourceMotionAlarm_True(t *testing.T) { + // Hikvision-style VideoSource/MotionAlarm with State=true + xml := ` + + + + +tns1:VideoSource/MotionAlarm + + + + + + + + + + +` + + motion, found := ParseMotionEvents([]byte(xml)) + if !found { + t.Fatal("expected found=true") + } + if !motion { + t.Fatal("expected motion=true") + } +} + +func TestParseMotionEvents_VideoSourceMotionAlarm_False(t *testing.T) { + xml := ` + + + + +tns1:VideoSource/MotionAlarm + + + + + + + + + + +` + + motion, found := ParseMotionEvents([]byte(xml)) + if !found { + t.Fatal("expected found=true") + } + if motion { + t.Fatal("expected motion=false") + } +} + +func TestParseMotionEvents_NoMotionTopic(t *testing.T) { + // Response with a non-motion topic (e.g., tampering) + xml := ` + + + + +tns1:VideoSource/ImageTooBlurry/ImagingService + + + + + + + + + + +` + + _, found := ParseMotionEvents([]byte(xml)) + if found { + t.Fatal("expected found=false for non-motion topic") + } +} + +func TestParseMotionEvents_EmptyResponse(t *testing.T) { + // PullMessages response with no notifications (timeout, no events) + xml := ` + + + +2025-01-15T10:00:00Z +2025-01-15T10:01:00Z + + +` + + _, found := ParseMotionEvents([]byte(xml)) + if found { + t.Fatal("expected found=false for empty response") + } +} + +func TestParseMotionEvents_MotionRegionDetector(t *testing.T) { + xml := ` + + + + +tns1:RuleEngine/MotionRegionDetector/Motion + + + + + + + + + + +` + + motion, found := ParseMotionEvents([]byte(xml)) + if !found { + t.Fatal("expected found=true") + } + if !motion { + t.Fatal("expected motion=true for Value=1") + } +} + +func TestParseMotionEvents_MultipleNotifications(t *testing.T) { + // Multiple notifications: first motion=true, then motion=false. Should return last. + xml := ` + + + + +tns1:RuleEngine/CellMotionDetector/Motion + + + + + + + + + +tns1:RuleEngine/CellMotionDetector/Motion + + + + + + + + + + +` + + motion, found := ParseMotionEvents([]byte(xml)) + if !found { + t.Fatal("expected found=true") + } + if motion { + t.Fatal("expected motion=false (last notification)") + } +} + +func TestResolveEventAddress_RelativePath(t *testing.T) { + u, _ := url.Parse("http://camera.example/onvif/device_service") + client := &Client{url: u} + + got := client.resolveEventAddress("/onvif/Subscription?Idx=1") + want := "http://camera.example/onvif/Subscription?Idx=1" + + if got != want { + t.Fatalf("unexpected resolved address: got %q want %q", got, want) + } +} + +func TestResolveEventAddress_DockerInternalIP(t *testing.T) { + u, _ := url.Parse("http://localhost:18080/onvif/device_service") + client := &Client{url: u} + + got := client.resolveEventAddress("http://172.17.0.2:8080/onvif/events_service?sub=1") + want := "http://localhost:18080/onvif/events_service?sub=1" + + if got != want { + t.Fatalf("unexpected resolved address: got %q want %q", got, want) + } +} diff --git a/pkg/onvif/helpers.go b/pkg/onvif/helpers.go index 8fac9ac4..0cab6e04 100644 --- a/pkg/onvif/helpers.go +++ b/pkg/onvif/helpers.go @@ -150,6 +150,19 @@ func GetPosixTZ(current time.Time) string { return prefix + fmt.Sprintf("%02d:%02d", offset/60, offset%60) } +// getURLPath extracts the path from a URL string, using defPath as fallback. +// Unlike GetPath, this correctly returns the actual path from the URL. +func getURLPath(rawURL, defPath string) string { + if rawURL == "" { + return defPath + } + u, err := url.Parse(rawURL) + if err != nil || u.Path == "" { + return defPath + } + return u.Path +} + func GetPath(urlOrPath, defPath string) string { if urlOrPath == "" || urlOrPath[0] == '/' { return defPath