feat(homekit): add ONVIF motion detection support

- implement ONVIF motion watcher to handle motion events
- add configuration options for motion hold time and ONVIF URL
- remap motion mode from "onvif" to "api" for compatibility
- log ONVIF motion watcher activity for better debugging

feat(onvif): implement event subscription for motion detection
- create PullPoint subscription to receive motion events
- implement methods for pulling messages and renewing subscriptions
- handle event requests and responses specific to motion detection

test(onvif): add unit tests for motion event parsing and subscription
- create tests for parsing various motion event XML responses
- verify correct handling of multiple notifications and edge cases
- test resolving event addresses for ONVIF clients

fix(hksv): improve motion detection logging
- log warnings when accessory or character not found during motion detection
- log number of listeners notified during motion state changes

feat(hap): add listener count method
- introduce method to retrieve the number of listeners for a character

feat(onvif): enhance ONVIF client with event URL handling
- extract event URL from ONVIF device response for subscription management
This commit is contained in:
Sergey Krashevich
2026-03-09 13:06:57 +03:00
parent e3d1085a6d
commit 8a21809f18
10 changed files with 1004 additions and 3 deletions
+32 -1
View File
@@ -7,6 +7,7 @@ import (
"net/http"
"strings"
"sync"
"time"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
@@ -36,6 +37,8 @@ func Init() {
HKSV bool `yaml:"hksv"`
Motion string `yaml:"motion"`
MotionThreshold float64 `yaml:"motion_threshold"`
MotionHoldTime float64 `yaml:"motion_hold_time"`
OnvifURL string `yaml:"onvif_url"`
Speaker *bool `yaml:"speaker"`
} `yaml:"homekit"`
}
@@ -71,6 +74,12 @@ func Init() {
proxyURL = url
}
// Remap "onvif" → "api" for hksv.Server; ONVIF watcher drives motion externally.
motionMode := conf.Motion
if motionMode == "onvif" {
motionMode = "api"
}
srv, err := hksv.NewServer(hksv.Config{
StreamName: id,
Pin: conf.Pin,
@@ -81,7 +90,7 @@ func Init() {
Pairings: conf.Pairings,
ProxyURL: proxyURL,
HKSV: conf.HKSV,
MotionMode: conf.Motion,
MotionMode: motionMode,
MotionThreshold: conf.MotionThreshold,
Speaker: conf.Speaker,
UserAgent: app.UserAgent,
@@ -98,6 +107,28 @@ func Init() {
continue
}
// Start ONVIF motion watcher if configured.
if conf.Motion == "onvif" {
onvifURL := conf.OnvifURL
if onvifURL == "" {
sources := stream.Sources()
log.Debug().Str("stream", id).Strs("sources", sources).
Msg("[homekit] onvif motion: searching for ONVIF URL in stream sources")
onvifURL = findOnvifURL(sources)
}
if onvifURL == "" {
log.Warn().Str("stream", id).Msg("[homekit] onvif motion: no ONVIF URL found, set onvif_url or use onvif:// stream source")
} else {
holdTime := time.Duration(conf.MotionHoldTime) * time.Second
if holdTime <= 0 {
holdTime = 30 * time.Second
}
log.Info().Str("stream", id).Str("onvif_url", onvifURL).
Dur("hold_time", holdTime).Msg("[homekit] starting ONVIF motion watcher")
startOnvifMotionWatcher(srv, onvifURL, holdTime, log)
}
}
entry := srv.MDNSEntry()
entries = append(entries, entry)
+287
View File
@@ -0,0 +1,287 @@
package homekit
import (
"strings"
"sync"
"time"
"github.com/AlexxIT/go2rtc/pkg/hksv"
"github.com/AlexxIT/go2rtc/pkg/onvif"
"github.com/rs/zerolog"
)
const (
onvifSubscriptionTimeout = 60 * time.Second
onvifPullTimeout = 30 * time.Second
onvifMessageLimit = 10
onvifRenewMargin = 10 * time.Second
onvifMinReconnectDelay = 5 * time.Second
onvifMaxReconnectDelay = 60 * time.Second
)
type onvifPullPoint interface {
PullMessages(timeout time.Duration, limit int) ([]byte, error)
Renew(timeout time.Duration) error
Unsubscribe() error
}
type onvifPullPointFactory func(rawURL string, timeout time.Duration) (onvifPullPoint, error)
// onvifMotionWatcher subscribes to ONVIF PullPoint events
// and forwards motion state to an hksv.Server.
type onvifMotionWatcher struct {
srv *hksv.Server
onvifURL string
holdTime time.Duration
log zerolog.Logger
now func() time.Time
newPullPoint onvifPullPointFactory
subscriptionTimeout time.Duration
pullTimeout time.Duration
renewMargin time.Duration
messageLimit int
done chan struct{}
once sync.Once
}
func newOnvifMotionWatcher(srv *hksv.Server, onvifURL string, holdTime time.Duration, log zerolog.Logger) *onvifMotionWatcher {
return &onvifMotionWatcher{
srv: srv,
onvifURL: onvifURL,
holdTime: holdTime,
log: log,
now: time.Now,
newPullPoint: newOnvifPullPoint,
subscriptionTimeout: onvifSubscriptionTimeout,
pullTimeout: onvifPullTimeout,
renewMargin: onvifRenewMargin,
messageLimit: onvifMessageLimit,
done: make(chan struct{}),
}
}
// startOnvifMotionWatcher creates and starts a new ONVIF motion watcher.
func startOnvifMotionWatcher(srv *hksv.Server, onvifURL string, holdTime time.Duration, log zerolog.Logger) *onvifMotionWatcher {
w := newOnvifMotionWatcher(srv, onvifURL, holdTime, log)
go w.run()
return w
}
// stop shuts down the watcher goroutine.
func (w *onvifMotionWatcher) stop() {
w.once.Do(func() { close(w.done) })
}
// run is the main loop: create subscription, poll, handle events, reconnect on failure.
func (w *onvifMotionWatcher) run() {
w.log.Debug().Str("url", w.onvifURL).Dur("hold_time", w.holdTime).
Msg("[homekit] onvif motion watcher starting")
delay := onvifMinReconnectDelay
for {
select {
case <-w.done:
w.log.Debug().Msg("[homekit] onvif motion watcher stopped (before connect)")
return
default:
}
w.log.Debug().Str("url", w.onvifURL).Msg("[homekit] onvif motion connecting to camera")
err := w.connectAndPoll()
if err != nil {
w.log.Warn().Err(err).Str("url", w.onvifURL).Msg("[homekit] onvif motion error")
} else {
delay = onvifMinReconnectDelay
}
select {
case <-w.done:
w.log.Debug().Msg("[homekit] onvif motion watcher stopped (after poll)")
return
default:
}
w.log.Debug().Dur("delay", delay).Msg("[homekit] onvif motion reconnecting")
select {
case <-time.After(delay):
case <-w.done:
w.log.Debug().Msg("[homekit] onvif motion watcher stopped (during backoff)")
return
}
delay *= 2
if delay > onvifMaxReconnectDelay {
delay = onvifMaxReconnectDelay
}
}
}
// connectAndPoll creates a subscription and polls for events until an error occurs or stop is called.
func (w *onvifMotionWatcher) connectAndPoll() error {
w.log.Trace().Str("url", w.onvifURL).Dur("timeout", w.subscriptionTimeout).
Msg("[homekit] onvif motion: creating pull point subscription")
sub, err := w.newPullPoint(w.onvifURL, w.subscriptionTimeout)
if err != nil {
w.log.Debug().Err(err).Str("url", w.onvifURL).
Msg("[homekit] onvif motion: pull point creation failed")
return err
}
w.log.Info().Str("url", w.onvifURL).Msg("[homekit] onvif motion subscription created")
defer func() {
w.log.Trace().Msg("[homekit] onvif motion: unsubscribing")
_ = sub.Unsubscribe()
}()
// motionActive tracks whether we've reported motion=true to the HKSV server.
// Hold timer ensures motion stays active for at least holdTime after last trigger,
// regardless of whether the camera sends explicit "motion=false".
// This matches the behavior of the built-in MotionDetector (30s hold time).
motionActive := false
var holdTimer *time.Timer
defer func() {
if holdTimer != nil {
holdTimer.Stop()
}
}()
renewInterval := w.subscriptionRenewInterval()
renewAt := w.now().Add(renewInterval)
w.log.Trace().Dur("renew_interval", renewInterval).
Msg("[homekit] onvif motion: subscription renew scheduled")
pollCount := 0
for {
select {
case <-w.done:
w.log.Debug().Int("polls", pollCount).
Msg("[homekit] onvif motion: poll loop stopped")
return nil
default:
}
if !renewAt.After(w.now()) {
w.log.Trace().Msg("[homekit] onvif motion: renewing subscription")
if err := sub.Renew(w.subscriptionTimeout); err != nil {
w.log.Warn().Err(err).Msg("[homekit] onvif motion: renew failed")
return err
}
renewAt = w.now().Add(renewInterval)
w.log.Trace().Msg("[homekit] onvif motion: subscription renewed")
}
pullTimeout := w.nextPullTimeout(renewAt)
w.log.Trace().Dur("timeout", pullTimeout).Int("limit", w.messageLimit).
Int("poll", pollCount+1).Msg("[homekit] onvif motion: pulling messages")
b, err := sub.PullMessages(pullTimeout, w.messageLimit)
if err != nil {
w.log.Debug().Err(err).Int("polls", pollCount).
Msg("[homekit] onvif motion: pull messages failed")
return err
}
pollCount++
w.log.Trace().Int("bytes", len(b)).Int("poll", pollCount).
Msg("[homekit] onvif motion: pull response received")
if l := w.log.Trace(); l.Enabled() {
l.Str("body", string(b)).Msg("[homekit] onvif motion: raw response")
}
motion, found := onvif.ParseMotionEvents(b)
w.log.Trace().Bool("found", found).Bool("motion", motion).
Bool("active", motionActive).Msg("[homekit] onvif motion: parse result")
if !found {
w.log.Trace().Msg("[homekit] onvif motion: no motion events in response")
continue
}
if motion {
// Motion detected — activate and start/reset hold timer.
if !motionActive {
motionActive = true
w.srv.SetMotionDetected(true)
w.log.Debug().Msg("[homekit] onvif motion: detected")
} else {
w.log.Trace().Msg("[homekit] onvif motion: still active, resetting hold timer")
}
// Reset hold timer on every motion=true event.
if holdTimer != nil {
holdTimer.Stop()
}
holdTimer = time.AfterFunc(w.holdTime, func() {
motionActive = false
w.srv.SetMotionDetected(false)
w.log.Debug().Msg("[homekit] onvif motion: hold expired")
})
} else {
// Camera sent explicit motion=false.
// Do NOT clear immediately — let the hold timer handle it.
// This ensures motion stays active for at least holdTime,
// giving the Home Hub enough time to open the DataStream.
w.log.Debug().Dur("remaining_hold", w.holdTime).
Bool("active", motionActive).
Msg("[homekit] onvif motion: camera reported clear, waiting for hold timer")
}
}
}
func (w *onvifMotionWatcher) subscriptionRenewInterval() time.Duration {
interval := w.subscriptionTimeout - w.renewMargin
if interval <= 0 {
interval = w.subscriptionTimeout / 2
}
if interval <= 0 {
interval = time.Second
}
return interval
}
func (w *onvifMotionWatcher) nextPullTimeout(renewAt time.Time) time.Duration {
timeout := w.pullTimeout
if timeout <= 0 {
timeout = time.Second
}
if untilRenew := renewAt.Sub(w.now()); untilRenew > 0 && untilRenew < timeout {
timeout = untilRenew
}
if timeout <= 0 {
timeout = time.Second
}
return timeout
}
func newOnvifPullPoint(rawURL string, timeout time.Duration) (onvifPullPoint, error) {
client, err := onvif.NewClient(rawURL)
if err != nil {
return nil, err
}
return client.CreatePullPointSubscription(timeout)
}
// findOnvifURL looks for an onvif:// URL in stream sources.
func findOnvifURL(sources []string) string {
for _, src := range sources {
if strings.HasPrefix(src, "onvif://") || strings.HasPrefix(src, "onvif:") {
return src
}
}
return ""
}
+99
View File
@@ -0,0 +1,99 @@
package homekit
import (
"errors"
"testing"
"time"
"github.com/AlexxIT/go2rtc/pkg/hksv"
"github.com/rs/zerolog"
)
func TestOnvifMotionWatcherConnectAndPollRenewsBeforeLeaseExpires(t *testing.T) {
start := time.Unix(0, 0)
now := start
stopErr := errors.New("stop pull loop")
sub := &fakeOnvifPullPoint{
t: t,
now: &now,
pullErrAt: 3,
pullErr: stopErr,
}
w := newOnvifMotionWatcher(&hksv.Server{}, "onvif://camera", 30*time.Second, zerolog.Nop())
w.now = func() time.Time { return now }
w.newPullPoint = func(rawURL string, timeout time.Duration) (onvifPullPoint, error) {
if rawURL != "onvif://camera" {
t.Fatalf("unexpected ONVIF URL: %s", rawURL)
}
if timeout != 60*time.Second {
t.Fatalf("unexpected subscription timeout: %v", timeout)
}
return sub, nil
}
err := w.connectAndPoll()
if !errors.Is(err, stopErr) {
t.Fatalf("expected %v, got %v", stopErr, err)
}
wantPulls := []time.Duration{30 * time.Second, 20 * time.Second, 30 * time.Second}
if len(sub.pullTimeouts) != len(wantPulls) {
t.Fatalf("unexpected pull count: got %d want %d", len(sub.pullTimeouts), len(wantPulls))
}
for i, want := range wantPulls {
if sub.pullTimeouts[i] != want {
t.Fatalf("pull %d timeout mismatch: got %v want %v", i+1, sub.pullTimeouts[i], want)
}
}
if sub.renewCalls != 1 {
t.Fatalf("expected 1 renew call, got %d", sub.renewCalls)
}
if !sub.unsubscribed {
t.Fatal("expected unsubscribe on exit")
}
}
type fakeOnvifPullPoint struct {
t *testing.T
now *time.Time
pullTimeouts []time.Duration
renewCalls int
unsubscribed bool
pullErrAt int
pullErr error
}
func (f *fakeOnvifPullPoint) PullMessages(timeout time.Duration, limit int) ([]byte, error) {
if limit != 10 {
f.t.Fatalf("unexpected message limit: %d", limit)
}
f.pullTimeouts = append(f.pullTimeouts, timeout)
*f.now = f.now.Add(timeout)
if f.pullErrAt > 0 && len(f.pullTimeouts) == f.pullErrAt {
return nil, f.pullErr
}
return []byte(`<tev:PullMessagesResponse xmlns:tev="http://www.onvif.org/ver10/events/wsdl"/>`), nil
}
func (f *fakeOnvifPullPoint) Renew(timeout time.Duration) error {
if timeout != 60*time.Second {
f.t.Fatalf("unexpected renew timeout: %v", timeout)
}
f.renewCalls++
return nil
}
func (f *fakeOnvifPullPoint) Unsubscribe() error {
f.unsubscribed = true
return nil
}