Files
StrixCamDB/scripts/validate.py
T
2026-03-25 06:24:33 +00:00

275 lines
8.8 KiB
Python

#!/usr/bin/env python3
"""Validate all StrixCamDB data files: brands, presets, and OUI."""
import json
import os
import re
import sys
BASE_DIR = os.path.join(os.path.dirname(__file__), "..")
BRANDS_DIR = os.path.join(BASE_DIR, "brands")
PRESETS_DIR = os.path.join(BASE_DIR, "presets")
OUI_FILE = os.path.join(BASE_DIR, "oui.json")
REQUIRED_ROOT = {"version", "brand", "brand_id", "streams"}
REQUIRED_STREAM = {"id", "url", "protocol", "port", "models"}
REQUIRED_PRESET_ROOT = {"version", "name", "preset_id", "streams"}
REQUIRED_PRESET_STREAM = {"url", "protocol", "port"}
MAC_PREFIX_RE = re.compile(r'^[0-9A-F]{2}:[0-9A-F]{2}:[0-9A-F]{2}$')
errors = []
warnings = []
stats = {
"brand_files": 0,
"streams": 0,
"preset_files": 0,
"preset_streams": 0,
"oui_entries": 0,
}
# ===== BRANDS =====
def validate_brand(filepath, filename):
brand_id_expected = filename.replace(".json", "")
try:
with open(filepath) as f:
data = json.load(f)
except json.JSONDecodeError as e:
errors.append(f"brands/{filename}: invalid JSON: {e}")
return
except IOError as e:
errors.append(f"brands/{filename}: cannot read: {e}")
return
if not isinstance(data, dict):
errors.append(f"brands/{filename}: root must be object")
return
for field in REQUIRED_ROOT:
if field not in data:
errors.append(f"brands/{filename}: missing field '{field}'")
if data.get("version") != 2:
errors.append(f"brands/{filename}: version must be 2, got {data.get('version')}")
if data.get("brand_id") != brand_id_expected:
errors.append(f"brands/{filename}: brand_id mismatch '{data.get('brand_id')}' != '{brand_id_expected}'")
if not data.get("brand", "").strip():
errors.append(f"brands/{filename}: brand name is empty")
streams = data.get("streams", [])
if not isinstance(streams, list):
errors.append(f"brands/{filename}: streams must be array")
return
if len(streams) == 0:
warnings.append(f"brands/{filename}: no streams")
seen_ids = set()
seen_urls = set()
for i, stream in enumerate(streams):
stats["streams"] += 1
prefix = f"brands/{filename}: stream[{i}]"
if not isinstance(stream, dict):
errors.append(f"{prefix}: must be object")
continue
for field in REQUIRED_STREAM:
if field not in stream:
errors.append(f"{prefix}: missing field '{field}'")
sid = stream.get("id", "")
if sid in seen_ids:
errors.append(f"{prefix}: duplicate id '{sid}'")
seen_ids.add(sid)
val = stream.get("protocol", "")
if not isinstance(val, str) or not val.strip():
errors.append(f"{prefix}: protocol must be non-empty string")
port = stream.get("port")
if not isinstance(port, int):
errors.append(f"{prefix}: port must be int")
elif port < 0 or port > 65535:
errors.append(f"{prefix}: port {port} out of range")
models = stream.get("models")
if not isinstance(models, list) or len(models) == 0:
errors.append(f"{prefix}: models must be non-empty array")
elif not all(isinstance(m, str) for m in models):
errors.append(f"{prefix}: all models must be strings")
url = stream.get("url")
if not isinstance(url, str):
errors.append(f"{prefix}: url must be string")
dedup_key = f"{stream.get('protocol')}:{stream.get('port')}:{stream.get('url')}"
if dedup_key in seen_urls:
errors.append(f"{prefix}: duplicate stream {dedup_key}")
seen_urls.add(dedup_key)
if "notes" in stream and not isinstance(stream["notes"], str):
errors.append(f"{prefix}: notes must be string")
if "tags" in stream:
if not isinstance(stream["tags"], list) or not all(isinstance(t, str) for t in stream["tags"]):
errors.append(f"{prefix}: tags must be array of strings")
allowed = REQUIRED_STREAM | {"notes", "tags"}
extra = set(stream.keys()) - allowed
if extra:
warnings.append(f"{prefix}: unexpected fields: {extra}")
# ===== PRESETS =====
def validate_preset(filepath, filename):
try:
with open(filepath) as f:
data = json.load(f)
except json.JSONDecodeError as e:
errors.append(f"presets/{filename}: invalid JSON: {e}")
return
except IOError as e:
errors.append(f"presets/{filename}: cannot read: {e}")
return
if not isinstance(data, dict):
errors.append(f"presets/{filename}: root must be object")
return
for field in REQUIRED_PRESET_ROOT:
if field not in data:
errors.append(f"presets/{filename}: missing field '{field}'")
preset_id_expected = filename.replace(".json", "")
if data.get("preset_id") != preset_id_expected:
errors.append(f"presets/{filename}: preset_id mismatch '{data.get('preset_id')}' != '{preset_id_expected}'")
if not data.get("name", "").strip():
errors.append(f"presets/{filename}: name is empty")
streams = data.get("streams", [])
if not isinstance(streams, list):
errors.append(f"presets/{filename}: streams must be array")
return
for i, stream in enumerate(streams):
stats["preset_streams"] += 1
prefix = f"presets/{filename}: stream[{i}]"
if not isinstance(stream, dict):
errors.append(f"{prefix}: must be object")
continue
for field in REQUIRED_PRESET_STREAM:
if field not in stream:
errors.append(f"{prefix}: missing field '{field}'")
port = stream.get("port")
if isinstance(port, int) and (port < 0 or port > 65535):
errors.append(f"{prefix}: port {port} out of range")
val = stream.get("protocol", "")
if not isinstance(val, str) or not val.strip():
errors.append(f"{prefix}: protocol must be non-empty string")
# ===== OUI =====
def validate_oui(filepath):
try:
with open(filepath) as f:
data = json.load(f)
except json.JSONDecodeError as e:
errors.append(f"oui.json: invalid JSON: {e}")
return
except IOError as e:
errors.append(f"oui.json: cannot read: {e}")
return
if not isinstance(data, dict):
errors.append("oui.json: must be object")
return
for prefix, brand in data.items():
stats["oui_entries"] += 1
if not MAC_PREFIX_RE.match(prefix):
errors.append(f"oui.json: invalid MAC prefix '{prefix}' (expected XX:XX:XX uppercase)")
if not isinstance(brand, str) or not brand.strip():
errors.append(f"oui.json: empty brand for prefix '{prefix}'")
# Check for duplicate prefixes with different case
seen_lower = {}
for prefix in data:
lower = prefix.lower()
if lower in seen_lower:
warnings.append(f"oui.json: case duplicate '{prefix}' and '{seen_lower[lower]}'")
seen_lower[lower] = prefix
# ===== MAIN =====
def main():
# Validate brands
brands_dir = os.path.abspath(BRANDS_DIR)
if os.path.isdir(brands_dir):
files = sorted(f for f in os.listdir(brands_dir) if f.endswith(".json"))
stats["brand_files"] = len(files)
for filename in files:
validate_brand(os.path.join(brands_dir, filename), filename)
# Validate presets
presets_dir = os.path.abspath(PRESETS_DIR)
if os.path.isdir(presets_dir):
files = sorted(f for f in os.listdir(presets_dir) if f.endswith(".json"))
stats["preset_files"] = len(files)
for filename in files:
validate_preset(os.path.join(presets_dir, filename), filename)
# Validate OUI
oui_file = os.path.abspath(OUI_FILE)
if os.path.exists(oui_file):
validate_oui(oui_file)
# Print results
print("=" * 50)
print("Validation results")
print("=" * 50)
print(f" Brand files: {stats['brand_files']}")
print(f" Streams: {stats['streams']}")
print(f" Preset files: {stats['preset_files']}")
print(f" Preset streams: {stats['preset_streams']}")
print(f" OUI entries: {stats['oui_entries']}")
print(f" Errors: {len(errors)}")
print(f" Warnings: {len(warnings)}")
if errors:
print(f"\n--- ERRORS ({len(errors)}) ---")
for e in errors[:50]:
print(f" {e}")
if len(errors) > 50:
print(f" ... and {len(errors) - 50} more")
if warnings:
print(f"\n--- WARNINGS ({len(warnings)}) ---")
for w in warnings[:20]:
print(f" {w}")
if len(warnings) > 20:
print(f" ... and {len(warnings) - 20} more")
if errors:
sys.exit(1)
print("\nAll checks passed.")
if __name__ == "__main__":
main()