Files
Strix/pkg/generate/xiaomi_test.go
T
eduard256 3a48e23100 Nest credential sections under go2rtc in frigate config
Frigate rejects unknown top-level keys (extra="forbid" on root config),
but its RestreamConfig (go2rtc: block) allows extra keys. Move credential
sections under go2rtc: with 2/4 space indentation.

- writeCredentials emits "  xiaomi:" + "    \"<key>\": <value>"
- upsertSection matches 2-space section header + 4-space key regex
- insertNewSection places new nested sections after streams: block
- findStreamInsertPoint stops at sibling headers (2-space) inside go2rtc:
- Add xiaomi_test.go with 16 scenarios covering new config, addToConfig
  merging, token refresh, dedup, sort order, malformed URLs, special chars,
  go2rtc override, mixed protocols, and section order
2026-04-18 08:49:04 +00:00

397 lines
12 KiB
Go

package generate
import (
"net/url"
"strings"
"sync"
"testing"
)
// registerXiaomi installs a xiaomi extractor identical to the one in
// internal/xiaomi. Tests live here (not in internal/xiaomi) because they
// validate generator behavior with xiaomi-style URLs.
var registerOnce sync.Once
func registerXiaomi() {
registerOnce.Do(func() {
RegisterExtract("xiaomi", func(rawURL string) (cleaned, section, key, value string) {
u, err := url.Parse(rawURL)
if err != nil || u.User == nil {
return rawURL, "", "", ""
}
q := u.Query()
token := q.Get("token")
if token == "" {
return rawURL, "", "", ""
}
q.Del("token")
u.RawQuery = q.Encode()
return u.String(), "xiaomi", u.User.Username(), token
})
})
}
// --- Helpers ---
func mustGen(t *testing.T, req *Request) string {
t.Helper()
r, err := Generate(req)
if err != nil {
t.Fatalf("Generate: %v", err)
}
return r.Config
}
func assertContains(t *testing.T, cfg, substr string) {
t.Helper()
if !strings.Contains(cfg, substr) {
t.Errorf("expected config to contain:\n %q\n--- got ---\n%s", substr, cfg)
}
}
func assertNotContains(t *testing.T, cfg, substr string) {
t.Helper()
if strings.Contains(cfg, substr) {
t.Errorf("expected config NOT to contain:\n %q\n--- got ---\n%s", substr, cfg)
}
}
func countOccurrences(s, substr string) int {
if substr == "" {
return 0
}
n := 0
for i := 0; ; {
j := strings.Index(s[i:], substr)
if j < 0 {
return n
}
n++
i += j + len(substr)
}
}
// ex. "xiaomi://user:cn@ip?did=D&model=M&token=T"
func xurl(user, region, ip, did, model, token string) string {
return "xiaomi://" + user + ":" + region + "@" + ip +
"?did=" + did + "&model=" + model + "&token=" + url.QueryEscape(token)
}
// --- Tests ---
// Single xiaomi camera in a fresh config.
func TestXiaomi_NewConfig_SingleCamera(t *testing.T) {
registerXiaomi()
cfg := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.229", "1", "chuangmi.camera.v1", "V1:TOK_A"),
})
assertContains(t, cfg, "go2rtc:\n streams:\n")
assertContains(t, cfg, " '10_0_20_229_main':\n")
assertContains(t, cfg, "- xiaomi://acc1:cn@10.0.20.229?did=1&model=chuangmi.camera.v1\n")
assertContains(t, cfg, " xiaomi:\n \"acc1\": V1:TOK_A\n")
assertNotContains(t, cfg, "token=")
assertNotContains(t, cfg, "\nxiaomi:") // must be nested, not top-level
}
// Two cameras on the same account -- token appears only once.
func TestXiaomi_SameAccount_TokenNotDuplicated(t *testing.T) {
registerXiaomi()
c1 := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.229", "1", "v1", "V1:TOK_A"),
})
c2 := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.230", "2", "v2", "V1:TOK_A"),
ExistingConfig: c1,
})
if n := countOccurrences(c2, `"acc1":`); n != 1 {
t.Errorf("expected exactly 1 \"acc1\" key, got %d\n---\n%s", n, c2)
}
assertContains(t, c2, `"acc1": V1:TOK_A`)
assertContains(t, c2, " '10_0_20_229_main':")
assertContains(t, c2, " '10_0_20_230_main':")
}
// Two accounts -- both tokens present, sorted by key.
func TestXiaomi_TwoAccounts_SortedKeys(t *testing.T) {
registerXiaomi()
c1 := mustGen(t, &Request{
MainStream: xurl("zeta", "cn", "10.0.20.229", "1", "v1", "TOK_Z"),
})
c2 := mustGen(t, &Request{
MainStream: xurl("alpha", "de", "10.0.20.230", "2", "v2", "TOK_A"),
ExistingConfig: c1,
})
iAlpha := strings.Index(c2, `"alpha":`)
iZeta := strings.Index(c2, `"zeta":`)
if iAlpha < 0 || iZeta < 0 {
t.Fatalf("expected both keys:\n%s", c2)
}
if iAlpha >= iZeta {
t.Errorf("expected alpha before zeta (sorted)\n%s", c2)
}
}
// Re-login with a new token overwrites the existing value.
func TestXiaomi_TokenRefresh_OverwritesValue(t *testing.T) {
registerXiaomi()
c1 := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.229", "1", "v1", "V1:OLD"),
})
c2 := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.230", "2", "v2", "V1:NEW"),
ExistingConfig: c1,
})
assertContains(t, c2, `"acc1": V1:NEW`)
assertNotContains(t, c2, `"acc1": V1:OLD`)
if n := countOccurrences(c2, `"acc1":`); n != 1 {
t.Errorf("expected 1 key after refresh, got %d", n)
}
}
// Main + Sub stream with same credentials -- token deduped to one entry.
func TestXiaomi_MainAndSub_SameAccount_OneToken(t *testing.T) {
registerXiaomi()
cfg := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.229", "1", "main", "V1:TOK"),
SubStream: xurl("acc1", "cn", "10.0.20.229", "1", "sub", "V1:TOK"),
})
if n := countOccurrences(cfg, `"acc1":`); n != 1 {
t.Errorf("expected 1 acc1 key for main+sub, got %d\n%s", n, cfg)
}
assertContains(t, cfg, " '10_0_20_229_main':")
assertContains(t, cfg, " '10_0_20_229_sub':")
assertNotContains(t, cfg, "token=")
}
// Main and Sub from different accounts -- both tokens in the section.
func TestXiaomi_MainAndSub_DifferentAccounts(t *testing.T) {
registerXiaomi()
cfg := mustGen(t, &Request{
MainStream: xurl("accA", "cn", "10.0.20.229", "1", "v1", "TOK_A"),
SubStream: xurl("accB", "de", "10.0.20.229", "1", "v1", "TOK_B"),
})
assertContains(t, cfg, `"accA": TOK_A`)
assertContains(t, cfg, `"accB": TOK_B`)
}
// 10 cameras across 3 accounts added sequentially -- exactly 3 tokens at the end,
// correct token values, all streams present.
func TestXiaomi_Scale_10Cameras_3Accounts(t *testing.T) {
registerXiaomi()
cases := []struct{ user, ip, token string }{
{"accA", "10.0.20.10", "TOK_A_v1"},
{"accA", "10.0.20.11", "TOK_A_v1"},
{"accB", "10.0.20.12", "TOK_B_v1"},
{"accA", "10.0.20.13", "TOK_A_v1"},
{"accC", "10.0.20.14", "TOK_C_v1"},
{"accB", "10.0.20.15", "TOK_B_v2"}, // B gets refreshed
{"accC", "10.0.20.16", "TOK_C_v1"},
{"accA", "10.0.20.17", "TOK_A_v2"}, // A gets refreshed
{"accB", "10.0.20.18", "TOK_B_v2"},
{"accC", "10.0.20.19", "TOK_C_v2"}, // C gets refreshed
}
cfg := ""
for i, c := range cases {
req := &Request{
MainStream: xurl(c.user, "cn", c.ip, "1", "v", c.token),
}
if i > 0 {
req.ExistingConfig = cfg
}
cfg = mustGen(t, req)
}
if n := countOccurrences(cfg, `"accA":`); n != 1 {
t.Errorf("accA: expected 1 key, got %d", n)
}
if n := countOccurrences(cfg, `"accB":`); n != 1 {
t.Errorf("accB: expected 1 key, got %d", n)
}
if n := countOccurrences(cfg, `"accC":`); n != 1 {
t.Errorf("accC: expected 1 key, got %d", n)
}
// final (latest) tokens
assertContains(t, cfg, `"accA": TOK_A_v2`)
assertContains(t, cfg, `"accB": TOK_B_v2`)
assertContains(t, cfg, `"accC": TOK_C_v2`)
for _, c := range cases {
want := "xiaomi://" + c.user + ":cn@" + c.ip
if !strings.Contains(cfg, want) {
t.Errorf("missing stream URL %q", want)
}
}
// only one xiaomi: section header, only one go2rtc:
if n := countOccurrences(cfg, "\n xiaomi:\n"); n != 1 {
t.Errorf("expected 1 xiaomi: header, got %d", n)
}
if n := countOccurrences(cfg, "\ngo2rtc:\n"); n != 1 {
t.Errorf("expected 1 go2rtc: header, got %d", n)
}
}
// URL without ?token=... -- extractor returns empty section, no xiaomi: block written.
func TestXiaomi_URLWithoutToken_NoSection(t *testing.T) {
registerXiaomi()
cfg := mustGen(t, &Request{
MainStream: "xiaomi://acc1:cn@10.0.20.229?did=1&model=v1",
})
// the nested section header would look like "\n xiaomi:\n" -- URL scheme
// "xiaomi://" must not trigger a false positive
assertNotContains(t, cfg, "\n xiaomi:\n")
assertContains(t, cfg, "- xiaomi://acc1:cn@10.0.20.229?did=1&model=v1\n")
}
// Malformed URL must not crash the generator; URL is passed through as-is.
func TestXiaomi_MalformedURL_DoesNotPanic(t *testing.T) {
registerXiaomi()
_, err := Generate(&Request{
MainStream: "xiaomi://%%%bad",
})
if err != nil {
t.Logf("Generate returned error (ok): %v", err)
}
}
// Token with base64 special chars (+ / =) must survive YAML write without escaping.
func TestXiaomi_TokenSpecialChars_PreservedRaw(t *testing.T) {
registerXiaomi()
raw := "V1:9d2w+abc/def=end="
cfg := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.229", "1", "v1", raw),
})
assertContains(t, cfg, `"acc1": `+raw)
}
// Go2RTC override MainStreamSource must also pass through the extractor.
func TestXiaomi_Go2RTCOverride_PassesThroughExtractor(t *testing.T) {
registerXiaomi()
cfg := mustGen(t, &Request{
MainStream: "rtsp://placeholder:554/stream",
Go2RTC: &Go2RTCOverride{
MainStreamSource: xurl("acc1", "cn", "10.0.20.229", "1", "v1", "V1:OVR"),
},
})
assertContains(t, cfg, `"acc1": V1:OVR`)
assertNotContains(t, cfg, "token=")
assertContains(t, cfg, "- xiaomi://acc1:cn@10.0.20.229?did=1&model=v1\n")
}
// addToConfig: existing config has no xiaomi: section -- must create one.
func TestXiaomi_AddToConfig_NoExistingSection(t *testing.T) {
registerXiaomi()
// start from a rtsp-only config
c1 := mustGen(t, &Request{
MainStream: "rtsp://user:pass@10.0.20.100/stream1",
})
assertNotContains(t, c1, "xiaomi:")
c2 := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.229", "1", "v1", "V1:TOK"),
ExistingConfig: c1,
})
assertContains(t, c2, " xiaomi:\n \"acc1\": V1:TOK\n")
assertContains(t, c2, "- rtsp://user:pass@10.0.20.100/stream1")
assertContains(t, c2, "- xiaomi://acc1:cn@10.0.20.229?did=1&model=v1")
}
// addToConfig: existing config already has xiaomi: section with other accounts.
func TestXiaomi_AddToConfig_ExistingSection(t *testing.T) {
registerXiaomi()
c1 := mustGen(t, &Request{
MainStream: xurl("accA", "cn", "10.0.20.10", "1", "v1", "TOK_A"),
})
c2 := mustGen(t, &Request{
MainStream: xurl("accB", "de", "10.0.20.20", "2", "v2", "TOK_B"),
ExistingConfig: c1,
})
assertContains(t, c2, `"accA": TOK_A`)
assertContains(t, c2, `"accB": TOK_B`)
// xiaomi: section stays nested, exactly one header
if n := countOccurrences(c2, "\n xiaomi:\n"); n != 1 {
t.Errorf("expected 1 xiaomi header, got %d\n%s", n, c2)
}
}
// Stream and camera names stay clean (no leftover tokens in URLs) with custom Name.
func TestXiaomi_CustomName_URLStillClean(t *testing.T) {
registerXiaomi()
cfg := mustGen(t, &Request{
Name: "my_cam",
MainStream: xurl("acc1", "cn", "10.0.20.229", "1", "v1", "V1:TOK"),
})
assertContains(t, cfg, " 'my_cam_main':")
assertContains(t, cfg, " my_cam:")
assertNotContains(t, cfg, "token=")
assertContains(t, cfg, `"acc1": V1:TOK`)
}
// Mixed: rtsp + xiaomi -- rtsp URL untouched, xiaomi token extracted.
func TestXiaomi_MixedProtocols(t *testing.T) {
registerXiaomi()
c1 := mustGen(t, &Request{
MainStream: "rtsp://admin:pw@10.0.20.100/Streaming/Channels/101",
})
c2 := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.229", "1", "v1", "V1:TOK"),
ExistingConfig: c1,
})
assertContains(t, c2, "- rtsp://admin:pw@10.0.20.100/Streaming/Channels/101")
assertContains(t, c2, "- xiaomi://acc1:cn@10.0.20.229?did=1&model=v1")
assertContains(t, c2, `"acc1": V1:TOK`)
}
// Order in generated config: go2rtc -> (streams, xiaomi) -> cameras -> version.
func TestXiaomi_SectionOrder(t *testing.T) {
registerXiaomi()
cfg := mustGen(t, &Request{
MainStream: xurl("acc1", "cn", "10.0.20.229", "1", "v1", "V1:TOK"),
})
iGo2rtc := strings.Index(cfg, "\ngo2rtc:\n")
iStreams := strings.Index(cfg, " streams:")
iXiaomi := strings.Index(cfg, " xiaomi:")
iCameras := strings.Index(cfg, "\ncameras:\n")
iVersion := strings.Index(cfg, "\nversion:")
if iGo2rtc < 0 || iStreams < 0 || iXiaomi < 0 || iCameras < 0 || iVersion < 0 {
t.Fatalf("missing section in config:\n%s", cfg)
}
if !(iGo2rtc < iStreams && iStreams < iXiaomi && iXiaomi < iCameras && iCameras < iVersion) {
t.Errorf("wrong section order: go2rtc=%d streams=%d xiaomi=%d cameras=%d version=%d\n%s",
iGo2rtc, iStreams, iXiaomi, iCameras, iVersion, cfg)
}
}