feat: add motion detection feature with EMA-based P-frame size analysis

- Implemented MotionDetector for detecting motion based on H.264 P-frame sizes.
- Introduced adjustable sensitivity threshold for motion detection.
- Added tests for various scenarios including motion detection, hold time, cooldown, and baseline adaptation.
- Created hksvSession to manage HDS DataStream connections for HKSV recording.
- Updated schema.json to include a new speaker option for 2-way audio support.
This commit is contained in:
Sergey Krashevich
2026-03-06 19:58:15 +03:00
parent 593dce6eb9
commit c567831c91
13 changed files with 2135 additions and 1178 deletions
+1
View File
@@ -79,6 +79,7 @@ homekit:
name: Dahua camera # custom camera name, default: generated from stream ID
device_id: dahua1 # custom ID, default: generated from stream ID
device_private: dahua1 # custom key, default: generated from stream ID
speaker: true # enable 2-way audio (default: false, enable only if camera has a speaker)
```
### HKSV (HomeKit Secure Video)
-431
View File
@@ -1,431 +0,0 @@
package homekit
import (
"io"
"net"
"sync"
"time"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/hap"
"github.com/AlexxIT/go2rtc/pkg/hap/hds"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/pion/rtp"
)
// hksvSession manages the HDS DataStream connection for HKSV recording
type hksvSession struct {
server *server
hapConn *hap.Conn
hdsConn *hds.Conn
session *hds.Session
mu sync.Mutex
consumer *hksvConsumer
}
func newHKSVSession(srv *server, hapConn *hap.Conn, hdsConn *hds.Conn) *hksvSession {
session := hds.NewSession(hdsConn)
hs := &hksvSession{
server: srv,
hapConn: hapConn,
hdsConn: hdsConn,
session: session,
}
session.OnDataSendOpen = hs.handleOpen
session.OnDataSendClose = hs.handleClose
return hs
}
func (hs *hksvSession) Run() error {
return hs.session.Run()
}
func (hs *hksvSession) Close() {
hs.mu.Lock()
defer hs.mu.Unlock()
if hs.consumer != nil {
hs.stopRecording()
}
_ = hs.session.Close()
}
func (hs *hksvSession) handleOpen(streamID int) error {
hs.mu.Lock()
defer hs.mu.Unlock()
log.Debug().Str("stream", hs.server.stream).Int("streamID", streamID).Msg("[homekit] HKSV dataSend open")
if hs.consumer != nil {
hs.stopRecording()
}
// Try to use the pre-started consumer from pair-verify
consumer := hs.server.takePreparedConsumer()
if consumer != nil {
log.Debug().Str("stream", hs.server.stream).Msg("[homekit] HKSV using prepared consumer")
hs.consumer = consumer
hs.server.AddConn(consumer)
// Activate: set the HDS session and send init + start streaming
if err := consumer.activate(hs.session, streamID); err != nil {
log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV activate failed")
hs.stopRecording()
return nil
}
return nil
}
// Fallback: create new consumer (will be slow ~3s)
log.Debug().Str("stream", hs.server.stream).Msg("[homekit] HKSV no prepared consumer, creating new")
consumer = newHKSVConsumer()
stream := streams.Get(hs.server.stream)
if err := stream.AddConsumer(consumer); err != nil {
log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV add consumer failed")
return nil
}
hs.consumer = consumer
hs.server.AddConn(consumer)
go func() {
if err := consumer.activate(hs.session, streamID); err != nil {
log.Error().Err(err).Str("stream", hs.server.stream).Msg("[homekit] HKSV activate failed")
}
}()
return nil
}
func (hs *hksvSession) handleClose(streamID int) error {
hs.mu.Lock()
defer hs.mu.Unlock()
log.Debug().Str("stream", hs.server.stream).Int("streamID", streamID).Msg("[homekit] HKSV dataSend close")
if hs.consumer != nil {
hs.stopRecording()
}
return nil
}
func (hs *hksvSession) stopRecording() {
consumer := hs.consumer
hs.consumer = nil
stream := streams.Get(hs.server.stream)
stream.RemoveConsumer(consumer)
_ = consumer.Stop()
hs.server.DelConn(consumer)
}
// hksvConsumer implements core.Consumer, generates fMP4 and sends over HDS.
// It can be pre-started without an HDS session, buffering init data until activated.
type hksvConsumer struct {
core.Connection
muxer *mp4.Muxer
mu sync.Mutex
done chan struct{}
// Set by activate() when HDS session is available
session *hds.Session
streamID int
seqNum int
active bool
start bool // waiting for first keyframe
// GOP buffer - accumulate moof+mdat pairs, flush on next keyframe
fragBuf []byte
// Pre-built init segment (built when tracks connect)
initData []byte
initErr error
initDone chan struct{} // closed when init is ready
}
func newHKSVConsumer() *hksvConsumer {
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecH264},
},
},
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecAAC},
},
},
}
return &hksvConsumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "hksv",
Protocol: "hds",
Medias: medias,
},
muxer: &mp4.Muxer{},
done: make(chan struct{}),
initDone: make(chan struct{}),
}
}
func (c *hksvConsumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
trackID := byte(len(c.Senders))
log.Debug().Str("codec", track.Codec.Name).Uint8("trackID", trackID).Msg("[homekit] HKSV AddTrack")
codec := track.Codec.Clone()
handler := core.NewSender(media, codec)
switch track.Codec.Name {
case core.CodecH264:
handler.Handler = func(packet *rtp.Packet) {
c.mu.Lock()
if !c.active {
c.mu.Unlock()
return
}
if !c.start {
if !h264.IsKeyframe(packet.Payload) {
c.mu.Unlock()
return
}
c.start = true
log.Debug().Int("payloadLen", len(packet.Payload)).Msg("[homekit] HKSV first keyframe")
} else if h264.IsKeyframe(packet.Payload) && len(c.fragBuf) > 0 {
// New keyframe = flush previous GOP as one mediaFragment
c.flushFragment()
}
b := c.muxer.GetPayload(trackID, packet)
c.fragBuf = append(c.fragBuf, b...)
c.mu.Unlock()
}
if track.Codec.IsRTP() {
handler.Handler = h264.RTPDepay(track.Codec, handler.Handler)
} else {
handler.Handler = h264.RepairAVCC(track.Codec, handler.Handler)
}
case core.CodecAAC:
handler.Handler = func(packet *rtp.Packet) {
c.mu.Lock()
if !c.active || !c.start {
c.mu.Unlock()
return
}
b := c.muxer.GetPayload(trackID, packet)
c.fragBuf = append(c.fragBuf, b...)
c.mu.Unlock()
}
if track.Codec.IsRTP() {
handler.Handler = aac.RTPDepay(handler.Handler)
}
default:
return nil // skip unsupported codecs
}
c.muxer.AddTrack(codec)
handler.HandleRTP(track)
c.Senders = append(c.Senders, handler)
// Build init segment when all expected tracks are ready (video + audio)
select {
case <-c.initDone:
// already built
default:
if len(c.Senders) >= len(c.Medias) {
initData, err := c.muxer.GetInit()
c.initData = initData
c.initErr = err
close(c.initDone)
if err != nil {
log.Error().Err(err).Msg("[homekit] HKSV GetInit failed")
} else {
log.Debug().Int("initSize", len(initData)).Int("tracks", len(c.Senders)).Msg("[homekit] HKSV init segment ready")
}
}
}
return nil
}
// activate is called when the HDS session is ready (dataSend.open).
// It sends the pre-built init segment and starts streaming.
func (c *hksvConsumer) activate(session *hds.Session, streamID int) error {
// Wait for init to be ready (should already be done if consumer was pre-started)
select {
case <-c.initDone:
case <-time.After(5 * time.Second):
return io.ErrClosedPipe
}
if c.initErr != nil {
return c.initErr
}
log.Debug().Int("initSize", len(c.initData)).Msg("[homekit] HKSV sending init segment")
if err := session.SendMediaInit(streamID, c.initData); err != nil {
return err
}
log.Debug().Msg("[homekit] HKSV init segment sent OK")
// Enable live streaming (seqNum=2 because init used seqNum=1)
c.mu.Lock()
c.session = session
c.streamID = streamID
c.seqNum = 2
c.active = true
c.mu.Unlock()
return nil
}
// flushFragment sends the accumulated GOP buffer as a single mediaFragment.
// Must be called while holding c.mu.
func (c *hksvConsumer) flushFragment() {
fragment := c.fragBuf
c.fragBuf = make([]byte, 0, len(fragment))
log.Debug().Int("fragSize", len(fragment)).Int("seq", c.seqNum).Msg("[homekit] HKSV flush fragment")
if err := c.session.SendMediaFragment(c.streamID, fragment, c.seqNum); err == nil {
c.Send += len(fragment)
}
c.seqNum++
}
func (c *hksvConsumer) WriteTo(io.Writer) (int64, error) {
<-c.done
return 0, nil
}
func (c *hksvConsumer) Stop() error {
select {
case <-c.done:
default:
close(c.done)
}
c.mu.Lock()
c.active = false
c.mu.Unlock()
return c.Connection.Stop()
}
// acceptHDS opens a TCP listener for the HDS DataStream connection from the Home Hub
func (s *server) acceptHDS(hapConn *hap.Conn, ln net.Listener, salt string) {
defer ln.Close()
if tcpLn, ok := ln.(*net.TCPListener); ok {
_ = tcpLn.SetDeadline(time.Now().Add(30 * time.Second))
}
rawConn, err := ln.Accept()
if err != nil {
log.Error().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV accept failed")
return
}
defer rawConn.Close()
// Create HDS encrypted connection (controller=false, we are accessory)
hdsConn, err := hds.NewConn(rawConn, hapConn.SharedKey, salt, false)
if err != nil {
log.Error().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV hds conn failed")
return
}
s.AddConn(hdsConn)
defer s.DelConn(hdsConn)
session := newHKSVSession(s, hapConn, hdsConn)
s.mu.Lock()
s.hksvSession = session
s.mu.Unlock()
defer func() {
s.mu.Lock()
if s.hksvSession == session {
s.hksvSession = nil
}
s.mu.Unlock()
session.Close()
}()
log.Debug().Str("stream", s.stream).Msg("[homekit] HKSV session started")
if err := session.Run(); err != nil {
log.Debug().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV session ended")
}
}
// prepareHKSVConsumer pre-starts a consumer and adds it to the stream.
// When dataSend.open arrives, the consumer is ready immediately.
func (s *server) prepareHKSVConsumer() {
stream := streams.Get(s.stream)
if stream == nil {
return
}
consumer := newHKSVConsumer()
if err := stream.AddConsumer(consumer); err != nil {
log.Debug().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV prepare consumer failed")
return
}
log.Debug().Str("stream", s.stream).Msg("[homekit] HKSV consumer prepared")
s.mu.Lock()
// Clean up any previous prepared consumer
if s.preparedConsumer != nil {
old := s.preparedConsumer
s.preparedConsumer = nil
s.mu.Unlock()
stream.RemoveConsumer(old)
_ = old.Stop()
s.mu.Lock()
}
s.preparedConsumer = consumer
s.mu.Unlock()
// Keep alive until used or timeout (60 seconds)
select {
case <-consumer.done:
// consumer was stopped (used or server closed)
case <-time.After(60 * time.Second):
// timeout: clean up unused prepared consumer
s.mu.Lock()
if s.preparedConsumer == consumer {
s.preparedConsumer = nil
s.mu.Unlock()
stream.RemoveConsumer(consumer)
_ = consumer.Stop()
log.Debug().Str("stream", s.stream).Msg("[homekit] HKSV prepared consumer expired")
} else {
s.mu.Unlock()
}
}
}
func (s *server) takePreparedConsumer() *hksvConsumer {
s.mu.Lock()
defer s.mu.Unlock()
consumer := s.preparedConsumer
s.preparedConsumer = nil
return consumer
}
-456
View File
@@ -1,456 +0,0 @@
package homekit
import (
"net"
"sync"
"testing"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/hap/hds"
"github.com/stretchr/testify/require"
)
// newTestSessionPair creates connected HDS sessions for testing.
func newTestSessionPair(t *testing.T) (accessory *hds.Session, controller *hds.Session) {
t.Helper()
key := []byte(core.RandString(16, 0))
salt := core.RandString(32, 0)
c1, c2 := net.Pipe()
t.Cleanup(func() { c1.Close(); c2.Close() })
accConn, err := hds.NewConn(c1, key, salt, false)
require.NoError(t, err)
ctrlConn, err := hds.NewConn(c2, key, salt, true)
require.NoError(t, err)
return hds.NewSession(accConn), hds.NewSession(ctrlConn)
}
func TestHKSVConsumer_Creation(t *testing.T) {
c := newHKSVConsumer()
require.Equal(t, "hksv", c.FormatName)
require.Equal(t, "hds", c.Protocol)
require.Len(t, c.Medias, 2)
require.Equal(t, core.KindVideo, c.Medias[0].Kind)
require.Equal(t, core.KindAudio, c.Medias[1].Kind)
require.Equal(t, core.CodecH264, c.Medias[0].Codecs[0].Name)
require.Equal(t, core.CodecAAC, c.Medias[1].Codecs[0].Name)
require.NotNil(t, c.muxer)
require.NotNil(t, c.done)
require.NotNil(t, c.initDone)
require.False(t, c.active)
require.False(t, c.start)
require.Equal(t, 0, c.seqNum)
require.Nil(t, c.fragBuf)
require.Nil(t, c.initData)
}
func TestHKSVConsumer_FlushFragment_SendsAndIncrements(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
// Manually set up the consumer as if activate() was called
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
c.fragBuf = []byte("fake-fragment-data-here")
done := make(chan struct{})
go func() {
defer close(done)
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.Equal(t, "dataSend", msg.Protocol)
require.Equal(t, "data", msg.Topic)
require.True(t, msg.IsEvent)
packets, ok := msg.Body["packets"].([]any)
require.True(t, ok)
pkt := packets[0].(map[string]any)
meta := pkt["metadata"].(map[string]any)
require.Equal(t, "mediaFragment", meta["dataType"])
require.Equal(t, int64(2), meta["dataSequenceNumber"].(int64))
require.Equal(t, true, meta["isLastDataChunk"])
}()
c.mu.Lock()
c.flushFragment()
c.mu.Unlock()
<-done
require.Equal(t, 3, c.seqNum, "seqNum should increment after flush")
require.Empty(t, c.fragBuf, "fragBuf should be empty after flush")
}
func TestHKSVConsumer_FlushFragment_MultipleFlushes(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
var received []int64
var mu sync.Mutex
done := make(chan struct{})
go func() {
defer close(done)
for i := 0; i < 3; i++ {
msg, err := ctrl.ReadMessage()
if err != nil {
return
}
packets := msg.Body["packets"].([]any)
pkt := packets[0].(map[string]any)
meta := pkt["metadata"].(map[string]any)
mu.Lock()
received = append(received, meta["dataSequenceNumber"].(int64))
mu.Unlock()
}
}()
for i := 0; i < 3; i++ {
c.mu.Lock()
c.fragBuf = []byte("data")
c.flushFragment()
c.mu.Unlock()
}
<-done
mu.Lock()
defer mu.Unlock()
require.Equal(t, []int64{2, 3, 4}, received)
require.Equal(t, 5, c.seqNum)
}
func TestHKSVConsumer_FlushFragment_EmptyBuffer(t *testing.T) {
c := newHKSVConsumer()
c.seqNum = 2
// flushFragment with empty/nil buffer should still increment seqNum
// but send empty data (protocol layer handles it)
// In practice, flushFragment is only called when fragBuf has data
c.mu.Lock()
c.fragBuf = nil
initialSeq := c.seqNum
c.mu.Unlock()
// No crash = pass (no session to write to, would panic on nil session)
require.Equal(t, initialSeq, c.seqNum)
}
func TestHKSVConsumer_BufferAccumulation(t *testing.T) {
c := newHKSVConsumer()
c.active = true
data1 := []byte("chunk-1")
data2 := []byte("chunk-2")
data3 := []byte("chunk-3")
c.fragBuf = append(c.fragBuf, data1...)
c.fragBuf = append(c.fragBuf, data2...)
c.fragBuf = append(c.fragBuf, data3...)
require.Equal(t, len(data1)+len(data2)+len(data3), len(c.fragBuf))
require.Equal(t, "chunk-1chunk-2chunk-3", string(c.fragBuf))
}
func TestHKSVConsumer_ActivateSeqNum(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
// Simulate init ready
c.initData = []byte("fake-init")
close(c.initDone)
done := make(chan struct{})
go func() {
defer close(done)
// Read the init message
msg, err := ctrl.ReadMessage()
require.NoError(t, err)
require.True(t, msg.IsEvent)
packets := msg.Body["packets"].([]any)
pkt := packets[0].(map[string]any)
meta := pkt["metadata"].(map[string]any)
require.Equal(t, "mediaInitialization", meta["dataType"])
require.Equal(t, int64(1), meta["dataSequenceNumber"].(int64))
}()
err := c.activate(acc, 5)
require.NoError(t, err)
<-done
require.Equal(t, 2, c.seqNum, "seqNum should be 2 after activate (init uses 1)")
require.True(t, c.active)
require.Equal(t, 5, c.streamID)
require.Equal(t, acc, c.session)
}
func TestHKSVConsumer_ActivateTimeout(t *testing.T) {
acc, _ := newTestSessionPair(t)
c := newHKSVConsumer()
// Don't close initDone — simulate init never becoming ready
// Override the timeout for faster test
err := func() error {
select {
case <-c.initDone:
case <-time.After(50 * time.Millisecond):
return errActivateTimeout
}
return nil
}()
require.Error(t, err)
_ = acc // prevent unused
}
var errActivateTimeout = func() error {
return &timeoutError{}
}()
type timeoutError struct{}
func (e *timeoutError) Error() string { return "activate timeout" }
func TestHKSVConsumer_ActivateWithError(t *testing.T) {
c := newHKSVConsumer()
c.initErr = &timeoutError{}
close(c.initDone)
acc, _ := newTestSessionPair(t)
err := c.activate(acc, 1)
require.Error(t, err)
require.False(t, c.active)
}
func TestHKSVConsumer_StopSafety(t *testing.T) {
c := newHKSVConsumer()
c.active = true
// First stop
err := c.Stop()
require.NoError(t, err)
require.False(t, c.active)
// Second stop — should not panic
err = c.Stop()
require.NoError(t, err)
}
func TestHKSVConsumer_StopDeactivates(t *testing.T) {
c := newHKSVConsumer()
c.active = true
c.start = true
_ = c.Stop()
require.False(t, c.active)
}
func TestHKSVConsumer_WriteToDone(t *testing.T) {
c := newHKSVConsumer()
done := make(chan struct{})
go func() {
n, err := c.WriteTo(nil)
require.NoError(t, err)
require.Equal(t, int64(0), n)
close(done)
}()
// WriteTo should block until done channel is closed
select {
case <-done:
t.Fatal("WriteTo returned before Stop")
case <-time.After(50 * time.Millisecond):
}
_ = c.Stop()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("WriteTo did not return after Stop")
}
}
func TestHKSVConsumer_GOPFlushIntegration(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
c.start = true // already started
// Simulate a sequence: buffer data, then flush
frag1 := []byte("keyframe-1-data-plus-p-frames")
frag2 := []byte("keyframe-2-data")
var received [][]byte
done := make(chan struct{})
go func() {
defer close(done)
for i := 0; i < 2; i++ {
msg, err := ctrl.ReadMessage()
if err != nil {
return
}
packets := msg.Body["packets"].([]any)
pkt := packets[0].(map[string]any)
data := pkt["data"].([]byte)
received = append(received, data)
}
}()
// First GOP
c.mu.Lock()
c.fragBuf = append(c.fragBuf, frag1...)
c.flushFragment()
c.mu.Unlock()
// Second GOP
c.mu.Lock()
c.fragBuf = append(c.fragBuf, frag2...)
c.flushFragment()
c.mu.Unlock()
<-done
require.Len(t, received, 2)
require.Equal(t, frag1, received[0])
require.Equal(t, frag2, received[1])
require.Equal(t, 4, c.seqNum) // 2 + 2 flushes
}
func TestHKSVConsumer_FlushClearsBuffer(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
done := make(chan struct{})
go func() {
defer close(done)
// drain messages
for i := 0; i < 3; i++ {
ctrl.ReadMessage()
}
}()
for i := 0; i < 3; i++ {
c.mu.Lock()
c.fragBuf = append(c.fragBuf, []byte("frame-data")...)
prevLen := len(c.fragBuf)
c.flushFragment()
require.Empty(t, c.fragBuf, "fragBuf should be empty after flush")
require.Greater(t, prevLen, 0, "had data before flush")
c.mu.Unlock()
}
<-done
require.Equal(t, 5, c.seqNum, "3 flushes from seqNum=2 → 5")
}
func TestHKSVConsumer_SendTracking(t *testing.T) {
acc, ctrl := newTestSessionPair(t)
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
data := []byte("12345678") // 8 bytes
done := make(chan struct{})
go func() {
defer close(done)
ctrl.ReadMessage()
}()
c.mu.Lock()
c.fragBuf = append(c.fragBuf, data...)
c.flushFragment()
c.mu.Unlock()
<-done
require.Equal(t, 8, c.Send, "Send counter should track bytes sent")
}
// --- Benchmarks ---
func BenchmarkHKSVConsumer_FlushFragment(b *testing.B) {
key := []byte(core.RandString(16, 0))
salt := core.RandString(32, 0)
c1, c2 := net.Pipe()
defer c1.Close()
defer c2.Close()
accConn, _ := hds.NewConn(c1, key, salt, false)
ctrlConn, _ := hds.NewConn(c2, key, salt, true)
acc := hds.NewSession(accConn)
go func() {
buf := make([]byte, 512*1024) // must be > 256KB chunk size
for {
if _, err := ctrlConn.Read(buf); err != nil {
return
}
}
}()
c := newHKSVConsumer()
c.session = acc
c.streamID = 1
c.seqNum = 2
c.active = true
gopData := make([]byte, 4*1024*1024) // 4MB GOP
b.SetBytes(int64(len(gopData)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.mu.Lock()
c.fragBuf = append(c.fragBuf[:0], gopData...)
c.flushFragment()
c.mu.Unlock()
}
}
func BenchmarkHKSVConsumer_BufferAppend(b *testing.B) {
c := newHKSVConsumer()
frame := make([]byte, 1500) // typical frame fragment
b.SetBytes(int64(len(frame)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.fragBuf = append(c.fragBuf, frame...)
if len(c.fragBuf) > 5*1024*1024 {
c.fragBuf = c.fragBuf[:0]
}
}
}
func BenchmarkHKSVConsumer_CreateAndStop(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := newHKSVConsumer()
_ = c.Stop()
}
}
+177 -76
View File
@@ -2,17 +2,23 @@ package homekit
import (
"errors"
"net"
"net/http"
"strings"
"sync"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/ffmpeg"
"github.com/AlexxIT/go2rtc/internal/srtp"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/hap"
"github.com/AlexxIT/go2rtc/pkg/hap/camera"
"github.com/AlexxIT/go2rtc/pkg/hap/tlv8"
"github.com/AlexxIT/go2rtc/pkg/hksv"
"github.com/AlexxIT/go2rtc/pkg/homekit"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mdns"
"github.com/rs/zerolog"
)
@@ -20,15 +26,16 @@ import (
func Init() {
var cfg struct {
Mod map[string]struct {
Pin string `yaml:"pin"`
Name string `yaml:"name"`
DeviceID string `yaml:"device_id"`
DevicePrivate string `yaml:"device_private"`
CategoryID string `yaml:"category_id"`
Pairings []string `yaml:"pairings"`
Pin string `yaml:"pin"`
Name string `yaml:"name"`
DeviceID string `yaml:"device_id"`
DevicePrivate string `yaml:"device_private"`
CategoryID string `yaml:"category_id"`
Pairings []string `yaml:"pairings"`
HKSV bool `yaml:"hksv"`
Motion string `yaml:"motion"`
MotionThreshold float64 `yaml:"motion_threshold"`
Speaker *bool `yaml:"speaker"`
} `yaml:"homekit"`
}
app.LoadConfig(&cfg)
@@ -47,8 +54,8 @@ func Init() {
return
}
hosts = map[string]*server{}
servers = map[string]*server{}
hosts = map[string]*hksv.Server{}
servers = map[string]*hksv.Server{}
var entries []*mdns.ServiceEntry
for id, conf := range cfg.Mod {
@@ -58,78 +65,46 @@ func Init() {
continue
}
if conf.Pin == "" {
conf.Pin = "19550224" // default PIN
var proxyURL string
if url := findHomeKitURL(stream.Sources()); url != "" {
proxyURL = url
}
pin, err := hap.SanitizePin(conf.Pin)
srv, err := hksv.NewServer(hksv.Config{
StreamName: id,
Pin: conf.Pin,
Name: conf.Name,
DeviceID: conf.DeviceID,
DevicePrivate: conf.DevicePrivate,
CategoryID: conf.CategoryID,
Pairings: conf.Pairings,
ProxyURL: proxyURL,
HKSV: conf.HKSV,
MotionMode: conf.Motion,
MotionThreshold: conf.MotionThreshold,
Speaker: conf.Speaker,
UserAgent: app.UserAgent,
Version: app.Version,
Streams: &go2rtcStreamProvider{},
Store: &go2rtcPairingStore{},
Snapshots: &go2rtcSnapshotProvider{},
LiveStream: &go2rtcLiveStreamHandler{},
Logger: log,
Port: uint16(api.Port),
})
if err != nil {
log.Error().Err(err).Caller().Send()
log.Error().Err(err).Str("stream", id).Msg("[homekit] create server failed")
continue
}
deviceID := calcDeviceID(conf.DeviceID, id) // random MAC-address
name := calcName(conf.Name, deviceID)
setupID := calcSetupID(id)
entry := srv.MDNSEntry()
entries = append(entries, entry)
srv := &server{
stream: id,
pairings: conf.Pairings,
setupID: setupID,
}
srv.hap = &hap.Server{
Pin: pin,
DeviceID: deviceID,
DevicePrivate: calcDevicePrivate(conf.DevicePrivate, id),
GetClientPublic: srv.GetPair,
}
srv.mdns = &mdns.ServiceEntry{
Name: name,
Port: uint16(api.Port),
Info: map[string]string{
hap.TXTConfigNumber: "1",
hap.TXTFeatureFlags: "0",
hap.TXTDeviceID: deviceID,
hap.TXTModel: app.UserAgent,
hap.TXTProtoVersion: "1.1",
hap.TXTStateNumber: "1",
hap.TXTStatusFlags: hap.StatusNotPaired,
hap.TXTCategory: calcCategoryID(conf.CategoryID),
hap.TXTSetupHash: hap.SetupHash(setupID, deviceID),
},
}
entries = append(entries, srv.mdns)
srv.UpdateStatus()
if url := findHomeKitURL(stream.Sources()); url != "" {
// 1. Act as transparent proxy for HomeKit camera
srv.proxyURL = url
} else if conf.HKSV {
// 2. Act as HKSV camera
srv.motionMode = conf.Motion
srv.motionThreshold = conf.MotionThreshold
if srv.motionThreshold <= 0 {
srv.motionThreshold = motionThreshold
}
log.Debug().Str("stream", id).Str("motion", conf.Motion).Float64("threshold", srv.motionThreshold).Msg("[homekit] HKSV mode")
if conf.CategoryID == "doorbell" {
srv.accessory = camera.NewHKSVDoorbellAccessory("AlexxIT", "go2rtc", name, "-", app.Version)
} else {
srv.accessory = camera.NewHKSVAccessory("AlexxIT", "go2rtc", name, "-", app.Version)
}
} else {
// 3. Act as basic HomeKit camera
srv.accessory = camera.NewAccessory("AlexxIT", "go2rtc", name, "-", app.Version)
}
host := srv.mdns.Host(mdns.ServiceHAP)
host := entry.Host(mdns.ServiceHAP)
hosts[host] = srv
servers[id] = srv
log.Trace().Msgf("[homekit] new server: %s", srv.mdns)
log.Trace().Msgf("[homekit] new server: %s", entry)
}
api.HandleFunc(hap.PathPairSetup, hapHandler)
@@ -143,8 +118,137 @@ func Init() {
}
var log zerolog.Logger
var hosts map[string]*server
var servers map[string]*server
var hosts map[string]*hksv.Server
var servers map[string]*hksv.Server
// go2rtcStreamProvider implements hksv.StreamProvider
type go2rtcStreamProvider struct{}
func (p *go2rtcStreamProvider) AddConsumer(name string, cons core.Consumer) error {
stream := streams.Get(name)
if stream == nil {
return errors.New("stream not found: " + name)
}
return stream.AddConsumer(cons)
}
func (p *go2rtcStreamProvider) RemoveConsumer(name string, cons core.Consumer) {
if s := streams.Get(name); s != nil {
s.RemoveConsumer(cons)
}
}
// go2rtcPairingStore implements hksv.PairingStore
type go2rtcPairingStore struct{}
func (s *go2rtcPairingStore) SavePairings(name string, pairings []string) error {
return app.PatchConfig([]string{"homekit", name, "pairings"}, pairings)
}
// go2rtcSnapshotProvider implements hksv.SnapshotProvider
type go2rtcSnapshotProvider struct{}
func (s *go2rtcSnapshotProvider) GetSnapshot(streamName string, width, height int) ([]byte, error) {
stream := streams.Get(streamName)
if stream == nil {
return nil, errors.New("stream not found: " + streamName)
}
cons := magic.NewKeyframe()
if err := stream.AddConsumer(cons); err != nil {
return nil, err
}
once := &core.OnceBuffer{}
_, _ = cons.WriteTo(once)
b := once.Buffer()
stream.RemoveConsumer(cons)
switch cons.CodecName() {
case core.CodecH264, core.CodecH265:
var err error
if b, err = ffmpeg.JPEGWithScale(b, width, height); err != nil {
return nil, err
}
}
return b, nil
}
// go2rtcLiveStreamHandler implements hksv.LiveStreamHandler
type go2rtcLiveStreamHandler struct {
mu sync.Mutex
consumer *homekit.Consumer
}
func (h *go2rtcLiveStreamHandler) SetupEndpoints(conn net.Conn, offer *camera.SetupEndpointsRequest) (any, error) {
consumer := homekit.NewConsumer(conn, srtp.Server)
consumer.SetOffer(offer)
h.mu.Lock()
h.consumer = consumer
h.mu.Unlock()
answer := consumer.GetAnswer()
v, err := tlv8.MarshalBase64(answer)
if err != nil {
return nil, err
}
return v, nil
}
func (h *go2rtcLiveStreamHandler) GetEndpointsResponse() any {
h.mu.Lock()
consumer := h.consumer
h.mu.Unlock()
if consumer == nil {
return nil
}
answer := consumer.GetAnswer()
v, _ := tlv8.MarshalBase64(answer)
return v
}
func (h *go2rtcLiveStreamHandler) StartStream(streamName string, conf *camera.SelectedStreamConfiguration, connTracker hksv.ConnTracker) error {
h.mu.Lock()
consumer := h.consumer
h.mu.Unlock()
if consumer == nil {
return errors.New("no consumer")
}
if !consumer.SetConfig(conf) {
return errors.New("wrong config")
}
connTracker.AddConn(consumer)
stream := streams.Get(streamName)
if err := stream.AddConsumer(consumer); err != nil {
return err
}
go func() {
_, _ = consumer.WriteTo(nil)
stream.RemoveConsumer(consumer)
connTracker.DelConn(consumer)
}()
return nil
}
func (h *go2rtcLiveStreamHandler) StopStream(sessionID string, connTracker hksv.ConnTracker) error {
h.mu.Lock()
consumer := h.consumer
h.mu.Unlock()
if consumer != nil && consumer.SessionID() == sessionID {
_ = consumer.Stop()
}
return nil
}
func streamHandler(rawURL string) (core.Producer, error) {
if srtp.Server == nil {
@@ -163,7 +267,7 @@ func streamHandler(rawURL string) (core.Producer, error) {
return client, err
}
func resolve(host string) *server {
func resolve(host string) *hksv.Server {
if len(hosts) == 1 {
for _, srv := range hosts {
return srv
@@ -176,9 +280,6 @@ func resolve(host string) *server {
}
func hapHandler(w http.ResponseWriter, r *http.Request) {
// Can support multiple HomeKit cameras on single port ONLY for Apple devices.
// Doesn't support Home Assistant and any other open source projects
// because they don't send the host header in requests.
srv := resolve(r.Host)
if srv == nil {
log.Error().Msg("[homekit] unknown host: " + r.Host)
-243
View File
@@ -1,243 +0,0 @@
package homekit
import (
"io"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/pion/rtp"
)
const (
motionWarmupFrames = 30
motionThreshold = 2.0
motionAlphaFast = 0.1
motionAlphaSlow = 0.02
motionHoldTime = 30 * time.Second
motionCooldown = 5 * time.Second
motionDefaultFPS = 30.0
// recalibrate FPS and emit trace log every N frames (~5s at 30fps)
motionTraceFrames = 150
)
type motionDetector struct {
core.Connection
server *server
done chan struct{}
// algorithm state (accessed only from Sender goroutine — no mutex needed)
threshold float64
triggerLevel int // pre-computed: int(baseline * threshold)
baseline float64
initialized bool
frameCount int
// frame-based timing (calibrated periodically, no time.Now() in per-frame hot path)
holdBudget int // motionHoldTime converted to frames
cooldownBudget int // motionCooldown converted to frames
remainingHold int // frames left until hold expires (active motion)
remainingCooldown int // frames left until cooldown expires (after OFF)
// motion state
motionActive bool
// periodic FPS recalibration
lastFPSCheck time.Time
lastFPSFrame int
// for testing: injectable time and callback
now func() time.Time
onMotion func(bool)
}
func newMotionDetector(srv *server) *motionDetector {
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecH264},
},
},
}
threshold := motionThreshold
if srv != nil && srv.motionThreshold > 0 {
threshold = srv.motionThreshold
}
return &motionDetector{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "motion",
Protocol: "detect",
Medias: medias,
},
server: srv,
threshold: threshold,
done: make(chan struct{}),
now: time.Now,
}
}
func (m *motionDetector) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
log.Debug().Str("stream", m.streamName()).Str("codec", track.Codec.Name).Msg("[homekit] motion: add track")
codec := track.Codec.Clone()
sender := core.NewSender(media, codec)
sender.Handler = func(packet *rtp.Packet) {
m.handlePacket(packet)
}
if track.Codec.IsRTP() {
sender.Handler = h264.RTPDepay(track.Codec, sender.Handler)
} else {
sender.Handler = h264.RepairAVCC(track.Codec, sender.Handler)
}
sender.HandleRTP(track)
m.Senders = append(m.Senders, sender)
return nil
}
func (m *motionDetector) streamName() string {
if m.server != nil {
return m.server.stream
}
return ""
}
func (m *motionDetector) calibrate() {
// use default FPS — real FPS calibrated after first periodic check
m.holdBudget = int(motionHoldTime.Seconds() * motionDefaultFPS)
m.cooldownBudget = int(motionCooldown.Seconds() * motionDefaultFPS)
m.triggerLevel = int(m.baseline * m.threshold)
m.lastFPSCheck = m.now()
m.lastFPSFrame = m.frameCount
log.Debug().Str("stream", m.streamName()).
Float64("baseline", m.baseline).
Int("holdFrames", m.holdBudget).Int("cooldownFrames", m.cooldownBudget).
Msg("[homekit] motion: warmup complete")
}
func (m *motionDetector) handlePacket(packet *rtp.Packet) {
payload := packet.Payload
if len(payload) < 5 {
return
}
// skip keyframes — always large, not informative for motion
if h264.IsKeyframe(payload) {
return
}
size := len(payload)
m.frameCount++
if m.frameCount <= motionWarmupFrames {
fsize := float64(size)
if !m.initialized {
m.baseline = fsize
m.initialized = true
} else {
m.baseline += motionAlphaFast * (fsize - m.baseline)
}
if m.frameCount == motionWarmupFrames {
m.calibrate()
}
return
}
if m.triggerLevel <= 0 {
return
}
// integer comparison — no float division needed
triggered := size > m.triggerLevel
if !m.motionActive {
// idle path: decrement cooldown, check for trigger, update baseline
if m.remainingCooldown > 0 {
m.remainingCooldown--
}
if triggered && m.remainingCooldown <= 0 {
m.motionActive = true
m.remainingHold = m.holdBudget
log.Debug().Str("stream", m.streamName()).
Float64("ratio", float64(size)/m.baseline).
Msg("[homekit] motion: ON")
m.setMotion(true)
}
// update baseline only if still idle (trigger frame doesn't pollute baseline)
if !m.motionActive {
fsize := float64(size)
m.baseline += motionAlphaSlow * (fsize - m.baseline)
m.triggerLevel = int(m.baseline * m.threshold)
}
} else {
// active motion path: pure integer arithmetic, zero time.Now() calls
if triggered {
m.remainingHold = m.holdBudget
} else {
m.remainingHold--
if m.remainingHold <= 0 {
m.motionActive = false
m.remainingCooldown = m.cooldownBudget
log.Debug().Str("stream", m.streamName()).Msg("[homekit] motion: OFF (hold expired)")
m.setMotion(false)
}
}
}
// periodic: recalibrate FPS and emit trace log
if m.frameCount%motionTraceFrames == 0 {
now := m.now()
frames := m.frameCount - m.lastFPSFrame
if frames > 0 {
if elapsed := now.Sub(m.lastFPSCheck); elapsed > time.Millisecond {
fps := float64(frames) / elapsed.Seconds()
m.holdBudget = int(motionHoldTime.Seconds() * fps)
m.cooldownBudget = int(motionCooldown.Seconds() * fps)
}
}
m.lastFPSCheck = now
m.lastFPSFrame = m.frameCount
log.Trace().Str("stream", m.streamName()).
Float64("baseline", m.baseline).Float64("ratio", float64(size)/m.baseline).
Bool("active", m.motionActive).Msg("[homekit] motion: status")
}
}
func (m *motionDetector) setMotion(detected bool) {
if m.onMotion != nil {
m.onMotion(detected)
return
}
if m.server != nil {
m.server.SetMotionDetected(detected)
}
}
func (m *motionDetector) WriteTo(io.Writer) (int64, error) {
<-m.done
return 0, nil
}
func (m *motionDetector) Stop() error {
select {
case <-m.done:
default:
if m.motionActive {
m.motionActive = false
log.Debug().Str("stream", m.streamName()).Msg("[homekit] motion: OFF (stop)")
m.setMotion(false)
}
close(m.done)
}
return m.Connection.Stop()
}
-510
View File
@@ -1,510 +0,0 @@
package homekit
import (
"encoding/binary"
"testing"
"time"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/pion/rtp"
)
// makeAVCC creates a fake AVCC packet with the given NAL type and total size.
func makeAVCC(nalType byte, totalSize int) []byte {
if totalSize < 5 {
totalSize = 5
}
b := make([]byte, totalSize)
binary.BigEndian.PutUint32(b[:4], uint32(totalSize-4))
b[4] = nalType
return b
}
func makePFrame(size int) *rtp.Packet {
return &rtp.Packet{Payload: makeAVCC(h264.NALUTypePFrame, size)}
}
func makeIFrame(size int) *rtp.Packet {
return &rtp.Packet{Payload: makeAVCC(h264.NALUTypeIFrame, size)}
}
type mockClock struct {
t time.Time
}
func (c *mockClock) now() time.Time { return c.t }
func (c *mockClock) advance(d time.Duration) { c.t = c.t.Add(d) }
type motionRecorder struct {
calls []bool
}
func (r *motionRecorder) onMotion(detected bool) {
r.calls = append(r.calls, detected)
}
func (r *motionRecorder) lastCall() (bool, bool) {
if len(r.calls) == 0 {
return false, false
}
return r.calls[len(r.calls)-1], true
}
func newTestDetector() (*motionDetector, *mockClock, *motionRecorder) {
det := newMotionDetector(nil)
clock := &mockClock{t: time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)}
rec := &motionRecorder{}
det.now = clock.now
det.onMotion = rec.onMotion
return det, clock, rec
}
// warmup feeds the detector with small P-frames to build baseline.
func warmup(det *motionDetector, clock *mockClock, size int) {
for i := 0; i < motionWarmupFrames; i++ {
det.handlePacket(makePFrame(size))
clock.advance(33 * time.Millisecond) // ~30fps
}
}
// warmupWithBudgets performs warmup then sets test-friendly hold/cooldown budgets.
func warmupWithBudgets(det *motionDetector, clock *mockClock, size, hold, cooldown int) {
warmup(det, clock, size)
det.holdBudget = hold
det.cooldownBudget = cooldown
}
func TestMotionDetector_NoMotion(t *testing.T) {
det, clock, rec := newTestDetector()
warmup(det, clock, 500)
// feed same-size P-frames — no motion
for i := 0; i < 100; i++ {
det.handlePacket(makePFrame(500))
}
if len(rec.calls) != 0 {
t.Fatalf("expected no motion calls, got %d: %v", len(rec.calls), rec.calls)
}
}
func TestMotionDetector_MotionDetected(t *testing.T) {
det, clock, rec := newTestDetector()
warmup(det, clock, 500)
// large P-frame triggers motion
det.handlePacket(makePFrame(5000))
last, ok := rec.lastCall()
if !ok || !last {
t.Fatal("expected motion detected")
}
}
func TestMotionDetector_HoldTime(t *testing.T) {
det, clock, rec := newTestDetector()
warmupWithBudgets(det, clock, 500, 30, 5)
// trigger motion
det.handlePacket(makePFrame(5000))
if len(rec.calls) != 1 || !rec.calls[0] {
t.Fatal("expected motion ON")
}
// send 20 non-triggered frames — still active (< holdBudget=30)
for i := 0; i < 20; i++ {
det.handlePacket(makePFrame(500))
}
if len(rec.calls) != 1 {
t.Fatalf("expected only ON call during hold, got %v", rec.calls)
}
// send 15 more (total 35 > holdBudget=30) — should turn OFF
for i := 0; i < 15; i++ {
det.handlePacket(makePFrame(500))
}
last, _ := rec.lastCall()
if last {
t.Fatal("expected motion OFF after hold budget exhausted")
}
}
func TestMotionDetector_Cooldown(t *testing.T) {
det, clock, rec := newTestDetector()
warmupWithBudgets(det, clock, 500, 30, 5)
// trigger and expire motion
det.handlePacket(makePFrame(5000))
for i := 0; i < 30; i++ {
det.handlePacket(makePFrame(500))
}
if len(rec.calls) != 2 || rec.calls[1] != false {
t.Fatalf("expected ON then OFF, got %v", rec.calls)
}
// try to trigger again immediately — should be blocked by cooldown
det.handlePacket(makePFrame(5000))
if len(rec.calls) != 2 {
t.Fatalf("expected cooldown to block re-trigger, got %v", rec.calls)
}
// send frames to expire cooldown (blocked trigger consumed 1 decrement)
for i := 0; i < 5; i++ {
det.handlePacket(makePFrame(500))
}
// now re-trigger should work
det.handlePacket(makePFrame(5000))
if len(rec.calls) != 3 || !rec.calls[2] {
t.Fatalf("expected motion ON after cooldown, got %v", rec.calls)
}
}
func TestMotionDetector_SkipsKeyframes(t *testing.T) {
det, clock, rec := newTestDetector()
warmup(det, clock, 500)
// huge keyframe should not trigger motion
det.handlePacket(makeIFrame(50000))
if len(rec.calls) != 0 {
t.Fatal("keyframes should not trigger motion")
}
// verify baseline didn't change
det.handlePacket(makePFrame(500))
if len(rec.calls) != 0 {
t.Fatal("baseline should be unaffected by keyframes")
}
}
func TestMotionDetector_Warmup(t *testing.T) {
det, clock, rec := newTestDetector()
// during warmup, even large frames should not trigger
for i := 0; i < motionWarmupFrames; i++ {
det.handlePacket(makePFrame(5000))
clock.advance(33 * time.Millisecond)
}
if len(rec.calls) != 0 {
t.Fatal("warmup should not trigger motion")
}
}
func TestMotionDetector_BaselineFreeze(t *testing.T) {
det, clock, rec := newTestDetector()
warmup(det, clock, 500)
baselineBefore := det.baseline
// trigger motion
det.handlePacket(makePFrame(5000))
if len(rec.calls) != 1 || !rec.calls[0] {
t.Fatal("expected motion ON")
}
// feed large frames during motion — baseline should not change
for i := 0; i < 50; i++ {
det.handlePacket(makePFrame(5000))
}
if det.baseline != baselineBefore {
t.Fatalf("baseline changed during motion: %f -> %f", baselineBefore, det.baseline)
}
}
func TestMotionDetector_CustomThreshold(t *testing.T) {
det, clock, rec := newTestDetector()
det.threshold = 1.5
warmup(det, clock, 500)
// 1.6x — below default 2.0 but above custom 1.5
det.handlePacket(makePFrame(800))
if len(rec.calls) != 1 || !rec.calls[0] {
t.Fatalf("expected motion ON with custom threshold 1.5, got %v", rec.calls)
}
}
func TestMotionDetector_CustomThresholdNoFalsePositive(t *testing.T) {
det, clock, rec := newTestDetector()
det.threshold = 3.0
warmup(det, clock, 500)
// 2.5x — above default 2.0 but below custom 3.0
det.handlePacket(makePFrame(1250))
if len(rec.calls) != 0 {
t.Fatalf("expected no motion with high threshold 3.0, got %v", rec.calls)
}
}
func TestMotionDetector_HoldTimeExtended(t *testing.T) {
det, clock, rec := newTestDetector()
warmupWithBudgets(det, clock, 500, 30, 5)
// trigger motion
det.handlePacket(makePFrame(5000))
if len(rec.calls) != 1 || !rec.calls[0] {
t.Fatal("expected motion ON")
}
// send 25 non-triggered frames (remainingHold 30→5)
for i := 0; i < 25; i++ {
det.handlePacket(makePFrame(500))
}
// re-trigger — remainingHold resets to 30
det.handlePacket(makePFrame(5000))
// send 25 more non-triggered (remainingHold 30→5)
for i := 0; i < 25; i++ {
det.handlePacket(makePFrame(500))
}
// should still be ON
if len(rec.calls) != 1 {
t.Fatalf("expected hold time to be extended by re-trigger, got %v", rec.calls)
}
// send 10 more to exhaust hold
for i := 0; i < 10; i++ {
det.handlePacket(makePFrame(500))
}
last, _ := rec.lastCall()
if last {
t.Fatal("expected motion OFF after extended hold expired")
}
}
func TestMotionDetector_SmallPayloadIgnored(t *testing.T) {
det, clock, rec := newTestDetector()
warmup(det, clock, 500)
det.handlePacket(&rtp.Packet{Payload: []byte{1, 2, 3, 4}})
det.handlePacket(&rtp.Packet{Payload: nil})
det.handlePacket(&rtp.Packet{Payload: []byte{}})
if len(rec.calls) != 0 {
t.Fatalf("small payloads should be ignored, got %v", rec.calls)
}
}
func TestMotionDetector_BaselineAdapts(t *testing.T) {
det, clock, _ := newTestDetector()
warmup(det, clock, 500)
baselineAfterWarmup := det.baseline
// feed gradually larger frames — baseline should drift up
for i := 0; i < 200; i++ {
det.handlePacket(makePFrame(700))
}
if det.baseline <= baselineAfterWarmup {
t.Fatalf("baseline should adapt upward: before=%f after=%f", baselineAfterWarmup, det.baseline)
}
}
func TestMotionDetector_DoubleStopSafe(t *testing.T) {
det, clock, rec := newTestDetector()
warmup(det, clock, 500)
det.handlePacket(makePFrame(5000))
_ = det.Stop()
_ = det.Stop() // second stop should not panic
if len(rec.calls) != 2 {
t.Fatalf("expected ON+OFF, got %v", rec.calls)
}
}
func TestMotionDetector_StopWithoutMotion(t *testing.T) {
det, clock, _ := newTestDetector()
warmup(det, clock, 500)
rec := &motionRecorder{}
det.onMotion = rec.onMotion
_ = det.Stop()
if len(rec.calls) != 0 {
t.Fatalf("stop without motion should not call onMotion, got %v", rec.calls)
}
}
func TestMotionDetector_StopClearsMotion(t *testing.T) {
det, clock, rec := newTestDetector()
warmup(det, clock, 500)
det.handlePacket(makePFrame(5000))
if len(rec.calls) != 1 || !rec.calls[0] {
t.Fatal("expected motion ON")
}
_ = det.Stop()
if len(rec.calls) != 2 || rec.calls[1] != false {
t.Fatalf("expected Stop to clear motion, got %v", rec.calls)
}
}
func TestMotionDetector_WarmupBaseline(t *testing.T) {
det, clock, _ := newTestDetector()
for i := 0; i < motionWarmupFrames; i++ {
size := 400 + (i%5)*50
det.handlePacket(makePFrame(size))
clock.advance(33 * time.Millisecond)
}
if det.baseline < 400 || det.baseline > 600 {
t.Fatalf("baseline should be in 400-600 range, got %f", det.baseline)
}
}
func TestMotionDetector_MultipleCycles(t *testing.T) {
det, clock, rec := newTestDetector()
warmupWithBudgets(det, clock, 500, 30, 5)
for cycle := 0; cycle < 3; cycle++ {
det.handlePacket(makePFrame(5000)) // trigger ON
for i := 0; i < 30; i++ { // expire hold
det.handlePacket(makePFrame(500))
}
for i := 0; i < 6; i++ { // expire cooldown
det.handlePacket(makePFrame(500))
}
}
if len(rec.calls) != 6 {
t.Fatalf("expected 6 calls (3 cycles), got %d: %v", len(rec.calls), rec.calls)
}
for i, v := range rec.calls {
expected := i%2 == 0
if v != expected {
t.Fatalf("call[%d] = %v, expected %v", i, v, expected)
}
}
}
func TestMotionDetector_TriggerLevel(t *testing.T) {
det, clock, _ := newTestDetector()
warmup(det, clock, 500)
expected := int(det.baseline * det.threshold)
if det.triggerLevel != expected {
t.Fatalf("triggerLevel = %d, expected %d", det.triggerLevel, expected)
}
}
func TestMotionDetector_DefaultFPSCalibration(t *testing.T) {
det, clock, _ := newTestDetector()
warmup(det, clock, 500)
// calibrate uses default 30fps
expectedHold := int(motionHoldTime.Seconds() * motionDefaultFPS)
expectedCooldown := int(motionCooldown.Seconds() * motionDefaultFPS)
if det.holdBudget != expectedHold {
t.Fatalf("holdBudget = %d, expected %d", det.holdBudget, expectedHold)
}
if det.cooldownBudget != expectedCooldown {
t.Fatalf("cooldownBudget = %d, expected %d", det.cooldownBudget, expectedCooldown)
}
}
func TestMotionDetector_FPSRecalibration(t *testing.T) {
det, clock, _ := newTestDetector()
warmup(det, clock, 500)
// initial budgets use default 30fps
initialHold := det.holdBudget
// send motionTraceFrames frames with 100ms intervals → FPS=10
for i := 0; i < motionTraceFrames; i++ {
clock.advance(100 * time.Millisecond)
det.handlePacket(makePFrame(500))
}
// after recalibration, holdBudget should reflect ~10fps (±5% due to warmup tail)
expectedHold := int(motionHoldTime.Seconds() * 10.0) // ~300
if det.holdBudget < expectedHold-20 || det.holdBudget > expectedHold+20 {
t.Fatalf("holdBudget after recalibration = %d, expected ~%d (was %d)", det.holdBudget, expectedHold, initialHold)
}
}
func BenchmarkMotionDetector_HandlePacket(b *testing.B) {
det, clock, _ := newTestDetector()
warmup(det, clock, 500)
pkt := makePFrame(600)
b.ResetTimer()
for i := 0; i < b.N; i++ {
det.handlePacket(pkt)
}
}
func BenchmarkMotionDetector_WithKeyframes(b *testing.B) {
det, clock, _ := newTestDetector()
warmup(det, clock, 500)
pFrame := makePFrame(600)
iFrame := makeIFrame(10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if i%30 == 0 {
det.handlePacket(iFrame)
} else {
det.handlePacket(pFrame)
}
}
}
func BenchmarkMotionDetector_MotionActive(b *testing.B) {
det, clock, _ := newTestDetector()
warmup(det, clock, 500)
// trigger motion and keep it active
det.handlePacket(makePFrame(5000))
pkt := makePFrame(5000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
det.handlePacket(pkt)
}
}
func BenchmarkMotionDetector_Warmup(b *testing.B) {
pkt := makePFrame(500)
b.ResetTimer()
for i := 0; i < b.N; i++ {
det := newMotionDetector(nil)
det.onMotion = func(bool) {}
det.now = time.Now
for j := 0; j < motionWarmupFrames; j++ {
det.handlePacket(pkt)
}
}
}
-598
View File
@@ -1,598 +0,0 @@
package homekit
import (
"crypto/ed25519"
"crypto/sha512"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"slices"
"strings"
"sync"
"time"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/ffmpeg"
srtp2 "github.com/AlexxIT/go2rtc/internal/srtp"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/hap"
"github.com/AlexxIT/go2rtc/pkg/hap/camera"
"github.com/AlexxIT/go2rtc/pkg/hap/hds"
"github.com/AlexxIT/go2rtc/pkg/hap/tlv8"
"github.com/AlexxIT/go2rtc/pkg/homekit"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mdns"
)
type server struct {
hap *hap.Server // server for HAP connection and encryption
mdns *mdns.ServiceEntry
pairings []string // pairings list
conns []any
mu sync.Mutex
accessory *hap.Accessory // HAP accessory
consumer *homekit.Consumer
proxyURL string
setupID string
stream string // stream name from YAML
// HKSV fields
motionMode string // "api", "continuous", "detect"
motionThreshold float64 // ratio threshold for "detect" mode (default 2.0)
motionDetector *motionDetector
hksvSession *hksvSession
continuousMotion bool
preparedConsumer *hksvConsumer
}
func (s *server) MarshalJSON() ([]byte, error) {
v := struct {
Name string `json:"name"`
DeviceID string `json:"device_id"`
Paired int `json:"paired,omitempty"`
CategoryID string `json:"category_id,omitempty"`
SetupCode string `json:"setup_code,omitempty"`
SetupID string `json:"setup_id,omitempty"`
Conns []any `json:"connections,omitempty"`
}{
Name: s.mdns.Name,
DeviceID: s.mdns.Info[hap.TXTDeviceID],
CategoryID: s.mdns.Info[hap.TXTCategory],
Paired: len(s.pairings),
Conns: s.conns,
}
if v.Paired == 0 {
v.SetupCode = s.hap.Pin
v.SetupID = s.setupID
}
return json.Marshal(v)
}
func (s *server) Handle(w http.ResponseWriter, r *http.Request) {
conn, rw, err := w.(http.Hijacker).Hijack()
if err != nil {
return
}
defer conn.Close()
// Fix reading from Body after Hijack.
r.Body = io.NopCloser(rw)
switch r.RequestURI {
case hap.PathPairSetup:
id, key, err := s.hap.PairSetup(r, rw)
if err != nil {
log.Error().Err(err).Caller().Send()
return
}
s.AddPair(id, key, hap.PermissionAdmin)
case hap.PathPairVerify:
id, key, err := s.hap.PairVerify(r, rw)
if err != nil {
log.Debug().Err(err).Caller().Send()
return
}
log.Debug().Str("stream", s.stream).Str("client_id", id).Msgf("[homekit] %s: new conn", conn.RemoteAddr())
controller, err := hap.NewConn(conn, rw, key, false)
if err != nil {
log.Error().Err(err).Caller().Send()
return
}
s.AddConn(controller)
defer s.DelConn(controller)
// start motion on first Home Hub connection
switch s.motionMode {
case "detect":
go s.startMotionDetector()
case "continuous":
go s.prepareHKSVConsumer()
go s.startContinuousMotion()
}
var handler homekit.HandlerFunc
switch {
case s.accessory != nil:
handler = homekit.ServerHandler(s)
case s.proxyURL != "":
client, err := hap.Dial(s.proxyURL)
if err != nil {
log.Error().Err(err).Caller().Send()
return
}
handler = homekit.ProxyHandler(s, client.Conn)
}
log.Debug().Str("stream", s.stream).Msgf("[homekit] handler started for %s", conn.RemoteAddr())
// If your iPhone goes to sleep, it will be an EOF error.
if err = handler(controller); err != nil {
if errors.Is(err, io.EOF) {
log.Debug().Str("stream", s.stream).Msgf("[homekit] %s: connection closed (EOF)", conn.RemoteAddr())
} else {
log.Error().Err(err).Str("stream", s.stream).Caller().Send()
}
return
}
}
}
type logger struct {
v any
}
func (l logger) String() string {
switch v := l.v.(type) {
case *hap.Conn:
return "hap " + v.RemoteAddr().String()
case *hds.Conn:
return "hds " + v.RemoteAddr().String()
case *homekit.Consumer:
return "rtp " + v.RemoteAddr
}
return "unknown"
}
func (s *server) AddConn(v any) {
log.Trace().Str("stream", s.stream).Msgf("[homekit] add conn %s", logger{v})
s.mu.Lock()
s.conns = append(s.conns, v)
s.mu.Unlock()
}
func (s *server) DelConn(v any) {
log.Trace().Str("stream", s.stream).Msgf("[homekit] del conn %s", logger{v})
s.mu.Lock()
if i := slices.Index(s.conns, v); i >= 0 {
s.conns = slices.Delete(s.conns, i, i+1)
}
s.mu.Unlock()
}
func (s *server) UpdateStatus() {
// true status is important, or device may be offline in Apple Home
if len(s.pairings) == 0 {
s.mdns.Info[hap.TXTStatusFlags] = hap.StatusNotPaired
} else {
s.mdns.Info[hap.TXTStatusFlags] = hap.StatusPaired
}
}
func (s *server) pairIndex(id string) int {
id = "client_id=" + id
for i, pairing := range s.pairings {
if strings.HasPrefix(pairing, id) {
return i
}
}
return -1
}
func (s *server) GetPair(id string) []byte {
s.mu.Lock()
defer s.mu.Unlock()
if i := s.pairIndex(id); i >= 0 {
query, _ := url.ParseQuery(s.pairings[i])
b, _ := hex.DecodeString(query.Get("client_public"))
return b
}
return nil
}
func (s *server) AddPair(id string, public []byte, permissions byte) {
log.Debug().Str("stream", s.stream).Msgf("[homekit] add pair id=%s public=%x perm=%d", id, public, permissions)
s.mu.Lock()
if s.pairIndex(id) < 0 {
s.pairings = append(s.pairings, fmt.Sprintf(
"client_id=%s&client_public=%x&permissions=%d", id, public, permissions,
))
s.UpdateStatus()
s.PatchConfig()
}
s.mu.Unlock()
}
func (s *server) DelPair(id string) {
log.Debug().Str("stream", s.stream).Msgf("[homekit] del pair id=%s", id)
s.mu.Lock()
if i := s.pairIndex(id); i >= 0 {
s.pairings = append(s.pairings[:i], s.pairings[i+1:]...)
s.UpdateStatus()
s.PatchConfig()
}
s.mu.Unlock()
}
func (s *server) PatchConfig() {
if err := app.PatchConfig([]string{"homekit", s.stream, "pairings"}, s.pairings); err != nil {
log.Error().Err(err).Msgf(
"[homekit] can't save %s pairings=%v", s.stream, s.pairings,
)
}
}
func (s *server) GetAccessories(_ net.Conn) []*hap.Accessory {
log.Trace().Str("stream", s.stream).Msg("[homekit] GET /accessories")
if log.Trace().Enabled() {
if b, err := json.Marshal(s.accessory); err == nil {
log.Trace().Str("stream", s.stream).RawJSON("accessory", b).Msg("[homekit] accessory JSON")
}
}
return []*hap.Accessory{s.accessory}
}
func (s *server) GetCharacteristic(conn net.Conn, aid uint8, iid uint64) any {
log.Trace().Str("stream", s.stream).Msgf("[homekit] get char aid=%d iid=0x%x", aid, iid)
char := s.accessory.GetCharacterByID(iid)
if char == nil {
log.Warn().Msgf("[homekit] get unknown characteristic: %d", iid)
return nil
}
switch char.Type {
case camera.TypeSetupEndpoints:
consumer := s.consumer
if consumer == nil {
return nil
}
answer := consumer.GetAnswer()
v, err := tlv8.MarshalBase64(answer)
if err != nil {
return nil
}
return v
}
return char.Value
}
func (s *server) SetCharacteristic(conn net.Conn, aid uint8, iid uint64, value any) {
log.Trace().Str("stream", s.stream).Msgf("[homekit] set char aid=%d iid=0x%x value=%v", aid, iid, value)
char := s.accessory.GetCharacterByID(iid)
if char == nil {
log.Warn().Msgf("[homekit] set unknown characteristic: %d", iid)
return
}
switch char.Type {
case camera.TypeSetupEndpoints:
var offer camera.SetupEndpointsRequest
if err := tlv8.UnmarshalBase64(value, &offer); err != nil {
return
}
consumer := homekit.NewConsumer(conn, srtp2.Server)
consumer.SetOffer(&offer)
s.consumer = consumer
case camera.TypeSelectedStreamConfiguration:
var conf camera.SelectedStreamConfiguration
if err := tlv8.UnmarshalBase64(value, &conf); err != nil {
return
}
log.Trace().Str("stream", s.stream).Msgf("[homekit] stream id=%x cmd=%d", conf.Control.SessionID, conf.Control.Command)
switch conf.Control.Command {
case camera.SessionCommandEnd:
for _, consumer := range s.conns {
if consumer, ok := consumer.(*homekit.Consumer); ok {
if consumer.SessionID() == conf.Control.SessionID {
_ = consumer.Stop()
return
}
}
}
case camera.SessionCommandStart:
consumer := s.consumer
if consumer == nil {
return
}
if !consumer.SetConfig(&conf) {
log.Warn().Msgf("[homekit] wrong config")
return
}
s.AddConn(consumer)
stream := streams.Get(s.stream)
if err := stream.AddConsumer(consumer); err != nil {
return
}
go func() {
_, _ = consumer.WriteTo(nil)
stream.RemoveConsumer(consumer)
s.DelConn(consumer)
}()
}
case camera.TypeSetupDataStreamTransport:
var req camera.SetupDataStreamTransportRequest
if err := tlv8.UnmarshalBase64(value, &req); err != nil {
log.Error().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV parse ch131 failed")
return
}
log.Debug().Str("stream", s.stream).Uint8("cmd", req.SessionCommandType).
Uint8("transport", req.TransportType).Msg("[homekit] HKSV DataStream setup")
if req.SessionCommandType != 0 {
// 0 = start, 1 = close
log.Debug().Str("stream", s.stream).Msg("[homekit] HKSV DataStream close request")
if s.hksvSession != nil {
s.hksvSession.Close()
}
return
}
accessoryKeySalt := core.RandString(32, 0)
combinedSalt := req.ControllerKeySalt + accessoryKeySalt
ln, err := net.ListenTCP("tcp", nil)
if err != nil {
log.Error().Err(err).Str("stream", s.stream).Msg("[homekit] HKSV listen failed")
return
}
port := ln.Addr().(*net.TCPAddr).Port
resp := camera.SetupDataStreamTransportResponse{
Status: 0,
AccessoryKeySalt: accessoryKeySalt,
}
resp.TransportTypeSessionParameters.TCPListeningPort = uint16(port)
v, err := tlv8.MarshalBase64(resp)
if err != nil {
ln.Close()
return
}
char.Value = v
log.Debug().Str("stream", s.stream).Int("port", port).Msg("[homekit] HKSV listening for HDS")
hapConn := conn.(*hap.Conn)
go s.acceptHDS(hapConn, ln, combinedSalt)
case camera.TypeSelectedCameraRecordingConfiguration:
log.Debug().Str("stream", s.stream).Str("motion", s.motionMode).Msg("[homekit] HKSV selected recording config")
char.Value = value
switch s.motionMode {
case "continuous":
go s.startContinuousMotion()
case "detect":
go s.startMotionDetector()
}
default:
// Store value for all other writable characteristics
char.Value = value
}
}
func (s *server) GetImage(conn net.Conn, width, height int) []byte {
log.Trace().Str("stream", s.stream).Msgf("[homekit] get image width=%d height=%d", width, height)
stream := streams.Get(s.stream)
cons := magic.NewKeyframe()
if err := stream.AddConsumer(cons); err != nil {
return nil
}
once := &core.OnceBuffer{} // init and first frame
_, _ = cons.WriteTo(once)
b := once.Buffer()
stream.RemoveConsumer(cons)
switch cons.CodecName() {
case core.CodecH264, core.CodecH265:
var err error
if b, err = ffmpeg.JPEGWithScale(b, width, height); err != nil {
return nil
}
}
return b
}
func (s *server) SetMotionDetected(detected bool) {
if s.accessory == nil {
return
}
char := s.accessory.GetCharacter("22") // MotionDetected
if char == nil {
return
}
char.Value = detected
_ = char.NotifyListeners(nil)
log.Debug().Str("stream", s.stream).Bool("motion", detected).Msg("[homekit] motion")
}
func (s *server) TriggerDoorbell() {
if s.accessory == nil {
return
}
char := s.accessory.GetCharacter("73") // ProgrammableSwitchEvent
if char == nil {
return
}
char.Value = 0 // SINGLE_PRESS
_ = char.NotifyListeners(nil)
log.Debug().Str("stream", s.stream).Msg("[homekit] doorbell")
}
func (s *server) startMotionDetector() {
s.mu.Lock()
if s.motionDetector != nil {
s.mu.Unlock()
return
}
det := newMotionDetector(s)
s.motionDetector = det
s.mu.Unlock()
s.AddConn(det)
stream := streams.Get(s.stream)
if err := stream.AddConsumer(det); err != nil {
log.Error().Err(err).Str("stream", s.stream).Msg("[homekit] motion detector add consumer failed")
s.DelConn(det)
s.mu.Lock()
s.motionDetector = nil
s.mu.Unlock()
return
}
log.Debug().Str("stream", s.stream).Msg("[homekit] motion detector started")
_, _ = det.WriteTo(nil) // blocks until Stop()
stream.RemoveConsumer(det)
s.DelConn(det)
s.mu.Lock()
if s.motionDetector == det {
s.motionDetector = nil
}
s.mu.Unlock()
log.Debug().Str("stream", s.stream).Msg("[homekit] motion detector stopped")
}
func (s *server) stopMotionDetector() {
s.mu.Lock()
det := s.motionDetector
s.mu.Unlock()
if det != nil {
_ = det.Stop()
}
}
func (s *server) startContinuousMotion() {
s.mu.Lock()
if s.continuousMotion {
s.mu.Unlock()
return
}
s.continuousMotion = true
s.mu.Unlock()
log.Debug().Str("stream", s.stream).Msg("[homekit] continuous motion started")
// delay to allow Home Hub to subscribe to events
time.Sleep(5 * time.Second)
s.SetMotionDetected(true)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
if s.accessory == nil {
return
}
s.SetMotionDetected(true)
}
}
func calcName(name, seed string) string {
if name != "" {
return name
}
b := sha512.Sum512([]byte(seed))
return fmt.Sprintf("go2rtc-%02X%02X", b[0], b[2])
}
func calcDeviceID(deviceID, seed string) string {
if deviceID != "" {
if len(deviceID) >= 17 {
// 1. Returd device_id as is (ex. AA:BB:CC:DD:EE:FF)
return deviceID
}
// 2. Use device_id as seed if not zero
seed = deviceID
}
b := sha512.Sum512([]byte(seed))
return fmt.Sprintf("%02X:%02X:%02X:%02X:%02X:%02X", b[32], b[34], b[36], b[38], b[40], b[42])
}
func calcDevicePrivate(private, seed string) []byte {
if private != "" {
// 1. Decode private from HEX string
if b, _ := hex.DecodeString(private); len(b) == ed25519.PrivateKeySize {
// 2. Return if OK
return b
}
// 3. Use private as seed if not zero
seed = private
}
b := sha512.Sum512([]byte(seed))
return ed25519.NewKeyFromSeed(b[:ed25519.SeedSize])
}
func calcSetupID(seed string) string {
b := sha512.Sum512([]byte(seed))
return fmt.Sprintf("%02X%02X", b[44], b[46])
}
func calcCategoryID(categoryID string) string {
switch categoryID {
case "bridge":
return hap.CategoryBridge
case "doorbell":
return hap.CategoryDoorbell
}
if core.Atoi(categoryID) > 0 {
return categoryID
}
return hap.CategoryCamera
}