diff --git a/schemas/brand.schema.json b/schemas/brand.schema.json new file mode 100644 index 0000000..0b6916f --- /dev/null +++ b/schemas/brand.schema.json @@ -0,0 +1,84 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/eduard256/StrixCamDB/schemas/brand.schema.json", + "title": "StrixCamDB Brand File", + "description": "Schema for IP camera brand files in StrixCamDB v2 format", + "type": "object", + "required": ["version", "brand", "brand_id", "streams"], + "additionalProperties": false, + "properties": { + "version": { + "type": "integer", + "const": 2, + "description": "Format version, always 2" + }, + "brand": { + "type": "string", + "minLength": 1, + "description": "Human-readable brand name" + }, + "brand_id": { + "type": "string", + "pattern": "^[a-z0-9][a-z0-9\\-]*[a-z0-9]$|^[a-z0-9]$", + "description": "URL-safe brand identifier, must match filename" + }, + "streams": { + "type": "array", + "items": { + "$ref": "#/$defs/stream" + }, + "description": "List of stream URL patterns for this brand" + } + }, + "$defs": { + "stream": { + "type": "object", + "required": ["id", "url", "type", "protocol", "port", "models"], + "additionalProperties": false, + "properties": { + "id": { + "type": "string", + "minLength": 1, + "description": "Unique stream identifier within this brand file" + }, + "url": { + "type": "string", + "description": "URL path with optional placeholders: [CHANNEL], [CHANNEL+1], [USERNAME], [PASSWORD], [WIDTH], [HEIGHT], [IP], [PORT], [AUTH], [TOKEN], [USER], [PASS], [PWD], [PASWORD]" + }, + "type": { + "type": "string", + "description": "Stream type: FFMPEG, MJPEG, JPEG, VLC, BUBBLE, or future types" + }, + "protocol": { + "type": "string", + "description": "Network protocol: rtsp, http, https, mms, rtmp, rtsps, bubble, rtp, or future protocols" + }, + "port": { + "type": "integer", + "minimum": 0, + "maximum": 65535, + "description": "Port number. 0 means unknown/use default for protocol" + }, + "models": { + "type": "array", + "minItems": 1, + "items": { + "type": "string" + }, + "description": "Camera models this stream works for. [\"*\"] means all models of this brand" + }, + "tags": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Classification tags: main, sub, snapshot, mjpeg, audio, ptz, onvif, etc." + }, + "notes": { + "type": "string", + "description": "Human-readable notes about this stream" + } + } + } + } +} diff --git a/schemas/preset.schema.json b/schemas/preset.schema.json new file mode 100644 index 0000000..bf7c1d9 --- /dev/null +++ b/schemas/preset.schema.json @@ -0,0 +1,72 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/eduard256/StrixCamDB/schemas/preset.schema.json", + "title": "StrixCamDB Preset File", + "description": "Schema for curated stream URL pattern lists", + "type": "object", + "required": ["version", "name", "preset_id", "streams"], + "additionalProperties": false, + "properties": { + "version": { + "type": "integer", + "const": 1, + "description": "Preset format version" + }, + "name": { + "type": "string", + "minLength": 1, + "description": "Human-readable preset name" + }, + "preset_id": { + "type": "string", + "pattern": "^[a-z0-9][a-z0-9\\-]*[a-z0-9]$", + "description": "URL-safe preset identifier, must match filename" + }, + "description": { + "type": "string", + "description": "What this preset contains and when to use it" + }, + "streams": { + "type": "array", + "items": { + "$ref": "#/$defs/preset_stream" + } + } + }, + "$defs": { + "preset_stream": { + "type": "object", + "required": ["url", "type", "protocol", "port"], + "additionalProperties": false, + "properties": { + "url": { + "type": "string", + "description": "URL path with optional placeholders" + }, + "type": { + "type": "string", + "description": "Stream type" + }, + "protocol": { + "type": "string", + "description": "Network protocol" + }, + "port": { + "type": "integer", + "minimum": 0, + "maximum": 65535, + "description": "Port number" + }, + "notes": { + "type": "string", + "description": "Optional notes" + }, + "brand_count": { + "type": "integer", + "minimum": 0, + "description": "Number of brands that use this pattern" + } + } + } + } +} diff --git a/scripts/convert_legacy.py b/scripts/convert_legacy.py new file mode 100644 index 0000000..176abfb --- /dev/null +++ b/scripts/convert_legacy.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +"""Convert legacy camera database to StrixCamDB v2 format. + +Reads from legacy/brands/*.json and writes to brands/*.json. +Applies minimal transformations: removes dead fields, deduplicates, +skips empty URLs, converts ALL to wildcard. Everything else is preserved as-is. +""" + +import json +import os +import sys + +LEGACY_DIR = os.path.join(os.path.dirname(__file__), "..", "legacy", "brands") +OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "..", "brands") + +# Files to skip entirely +SKIP_FILES = {"index.json", "indexa.json"} + +# Brands to skip (different format or empty) +SKIP_BRANDS = {"auto"} + +# Stats +stats = { + "brands_processed": 0, + "brands_skipped": 0, + "streams_total": 0, + "streams_skipped_empty_url": 0, + "streams_skipped_duplicate": 0, + "models_all_converted": 0, + "streams_skipped_empty_type": 0, + "streams_skipped_empty_models": 0, +} + + +def convert_brand(data, brand_id): + """Convert a single brand from legacy to v2 format. + + Returns the new brand dict or None if it should be skipped. + """ + # Must be a dict with entries + if not isinstance(data, dict): + return None + if "entries" not in data and "cameras" in data: + # auto.json-style format, skip + return None + if "entries" not in data: + return None + + brand_name = data.get("brand", "") + if not brand_name: + return None + + streams = [] + seen_urls = set() + counter = 0 + + for entry in data["entries"]: + url = entry.get("url", "") + + # Skip empty URLs + if not url.strip(): + stats["streams_skipped_empty_url"] += 1 + continue + + # Skip entries with empty type + if not entry.get("type", "").strip(): + stats["streams_skipped_empty_type"] += 1 + continue + + # Skip entries with empty models list + if not entry.get("models"): + stats["streams_skipped_empty_models"] += 1 + continue + + # Deduplicate by protocol:port:url + proto = entry.get("protocol", "") + port = entry.get("port", 0) + dedup_key = f"{proto}:{port}:{url}" + if dedup_key in seen_urls: + stats["streams_skipped_duplicate"] += 1 + continue + seen_urls.add(dedup_key) + + counter += 1 + + # Build stream object + stream = { + "id": f"{brand_id}-{counter}", + "url": url, + "type": entry.get("type", ""), + "protocol": proto, + "port": port, + } + + # Convert models: ["ALL"] -> ["*"] + models = entry.get("models", []) + if models == ["ALL"]: + models = ["*"] + stats["models_all_converted"] += 1 + stream["models"] = models + + # Keep notes if present and non-empty + notes = entry.get("notes", "") + if notes and notes.strip(): + stream["notes"] = notes.strip() + + streams.append(stream) + stats["streams_total"] += 1 + + if not streams: + return None + + return { + "version": 2, + "brand": brand_name, + "brand_id": brand_id, + "streams": streams, + } + + +def main(): + legacy_dir = os.path.abspath(LEGACY_DIR) + output_dir = os.path.abspath(OUTPUT_DIR) + + if not os.path.isdir(legacy_dir): + print(f"Error: legacy directory not found: {legacy_dir}", file=sys.stderr) + sys.exit(1) + + os.makedirs(output_dir, exist_ok=True) + + files = sorted(f for f in os.listdir(legacy_dir) if f.endswith(".json")) + + for filename in files: + if filename in SKIP_FILES: + stats["brands_skipped"] += 1 + continue + + brand_id = filename.replace(".json", "") + if brand_id in SKIP_BRANDS: + stats["brands_skipped"] += 1 + continue + + filepath = os.path.join(legacy_dir, filename) + try: + with open(filepath) as f: + data = json.load(f) + except (json.JSONDecodeError, IOError) as e: + print(f" WARN: failed to read {filename}: {e}", file=sys.stderr) + stats["brands_skipped"] += 1 + continue + + # Skip JSON arrays (index files that slipped through) + if isinstance(data, list): + stats["brands_skipped"] += 1 + continue + + result = convert_brand(data, brand_id) + if result is None: + stats["brands_skipped"] += 1 + continue + + # Write output + output_path = os.path.join(output_dir, filename) + with open(output_path, "w") as f: + json.dump(result, f, indent=2, ensure_ascii=False) + f.write("\n") + + stats["brands_processed"] += 1 + + # Print summary + print("=" * 50) + print("Conversion complete") + print("=" * 50) + print(f" Brands processed: {stats['brands_processed']}") + print(f" Brands skipped: {stats['brands_skipped']}") + print(f" Streams created: {stats['streams_total']}") + print(f" Empty URLs skipped: {stats['streams_skipped_empty_url']}") + print(f" Duplicates skipped: {stats['streams_skipped_duplicate']}") + print(f" Empty type skipped: {stats['streams_skipped_empty_type']}") + print(f" Empty models skipped: {stats['streams_skipped_empty_models']}") + print(f" ALL -> * converted: {stats['models_all_converted']}") + + +if __name__ == "__main__": + main() diff --git a/scripts/generate_presets.py b/scripts/generate_presets.py new file mode 100644 index 0000000..7d154aa --- /dev/null +++ b/scripts/generate_presets.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +"""Generate preset files from converted brand data. + +Reads all brands/*.json, counts URL pattern popularity (by number of brands +that use each pattern), and outputs top-N preset files. +""" + +import json +import os +import sys +from collections import defaultdict + +BRANDS_DIR = os.path.join(os.path.dirname(__file__), "..", "brands") +PRESETS_DIR = os.path.join(os.path.dirname(__file__), "..", "presets") + +# Preset configurations: (preset_id, name, description, limit) +PRESETS = [ + ( + "top-150", + "Top 150 Stream Patterns", + "150 most common stream URL patterns across all brands. Good for quick scanning.", + 150, + ), + ( + "top-1000", + "Top 1000 Stream Patterns", + "1000 most common stream URL patterns. Covers most IP cameras.", + 1000, + ), + ( + "top-5000", + "Top 5000 Stream Patterns", + "5000 most common stream URL patterns. Comprehensive coverage.", + 5000, + ), +] + + +def main(): + brands_dir = os.path.abspath(BRANDS_DIR) + presets_dir = os.path.abspath(PRESETS_DIR) + + if not os.path.isdir(brands_dir): + print(f"Error: brands directory not found: {brands_dir}", file=sys.stderr) + sys.exit(1) + + os.makedirs(presets_dir, exist_ok=True) + + # Collect URL patterns and count brands per pattern. + # Key: (url, type, protocol, port) + # Value: set of brand_ids + pattern_brands = defaultdict(set) + + files = sorted(f for f in os.listdir(brands_dir) if f.endswith(".json")) + for filename in files: + filepath = os.path.join(brands_dir, filename) + try: + with open(filepath) as f: + data = json.load(f) + except (json.JSONDecodeError, IOError): + continue + + brand_id = data.get("brand_id", "") + for stream in data.get("streams", []): + key = ( + stream.get("url", ""), + stream.get("type", ""), + stream.get("protocol", ""), + stream.get("port", 0), + ) + pattern_brands[key].add(brand_id) + + # Sort by brand count descending, then by URL alphabetically + sorted_patterns = sorted( + pattern_brands.items(), + key=lambda x: (-len(x[1]), x[0][0]), + ) + + print(f"Total unique patterns: {len(sorted_patterns)}") + + # Generate each preset + for preset_id, name, description, limit in PRESETS: + streams = [] + for (url, stype, protocol, port), brands in sorted_patterns[:limit]: + entry = { + "url": url, + "type": stype, + "protocol": protocol, + "port": port, + "brand_count": len(brands), + } + streams.append(entry) + + preset = { + "version": 1, + "name": name, + "preset_id": preset_id, + "description": description, + "streams": streams, + } + + output_path = os.path.join(presets_dir, f"{preset_id}.json") + with open(output_path, "w") as f: + json.dump(preset, f, indent=2, ensure_ascii=False) + f.write("\n") + + actual = len(streams) + top_count = streams[0]["brand_count"] if streams else 0 + bottom_count = streams[-1]["brand_count"] if streams else 0 + print( + f" {preset_id}.json: {actual} patterns " + f"(brand_count {top_count} -> {bottom_count})" + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/validate.py b/scripts/validate.py new file mode 100644 index 0000000..d61478d --- /dev/null +++ b/scripts/validate.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +"""Validate all brand files against StrixCamDB v2 format rules. + +Checks: required fields, field types, unique IDs, no duplicate streams, +brand_id matches filename, port range, non-empty models. +""" + +import json +import os +import sys + +BRANDS_DIR = os.path.join(os.path.dirname(__file__), "..", "brands") + +REQUIRED_ROOT = {"version", "brand", "brand_id", "streams"} +REQUIRED_STREAM = {"id", "url", "type", "protocol", "port", "models"} + +errors = [] +warnings = [] +total_files = 0 +total_streams = 0 + + +def validate_file(filepath, filename): + """Validate a single brand file. Appends to global errors/warnings lists.""" + global total_streams + + brand_id_expected = filename.replace(".json", "") + + try: + with open(filepath) as f: + data = json.load(f) + except json.JSONDecodeError as e: + errors.append(f"{filename}: invalid JSON: {e}") + return + except IOError as e: + errors.append(f"{filename}: cannot read: {e}") + return + + if not isinstance(data, dict): + errors.append(f"{filename}: root must be object, got {type(data).__name__}") + return + + # Required root fields + for field in REQUIRED_ROOT: + if field not in data: + errors.append(f"{filename}: missing required field '{field}'") + + # Version check + if data.get("version") != 2: + errors.append(f"{filename}: version must be 2, got {data.get('version')}") + + # brand_id matches filename + if data.get("brand_id") != brand_id_expected: + errors.append( + f"{filename}: brand_id '{data.get('brand_id')}' " + f"does not match filename '{brand_id_expected}'" + ) + + # Brand name non-empty + if not data.get("brand", "").strip(): + errors.append(f"{filename}: brand name is empty") + + streams = data.get("streams", []) + if not isinstance(streams, list): + errors.append(f"{filename}: streams must be array") + return + + if len(streams) == 0: + warnings.append(f"{filename}: no streams") + + seen_ids = set() + seen_urls = set() + + for i, stream in enumerate(streams): + total_streams += 1 + prefix = f"{filename}: stream[{i}]" + + if not isinstance(stream, dict): + errors.append(f"{prefix}: must be object") + continue + + # Required stream fields + for field in REQUIRED_STREAM: + if field not in stream: + errors.append(f"{prefix}: missing required field '{field}'") + + # ID uniqueness + sid = stream.get("id", "") + if sid in seen_ids: + errors.append(f"{prefix}: duplicate id '{sid}'") + seen_ids.add(sid) + + # Type and protocol are non-empty strings + for field in ("type", "protocol"): + val = stream.get(field, "") + if not isinstance(val, str) or not val.strip(): + errors.append(f"{prefix}: '{field}' must be non-empty string, got {repr(val)}") + + # Port range + port = stream.get("port") + if not isinstance(port, int): + errors.append(f"{prefix}: port must be int, got {type(port).__name__}") + elif port < 0 or port > 65535: + errors.append(f"{prefix}: port {port} out of range 0-65535") + + # Models non-empty array + models = stream.get("models") + if not isinstance(models, list) or len(models) == 0: + errors.append(f"{prefix}: models must be non-empty array") + elif not all(isinstance(m, str) for m in models): + errors.append(f"{prefix}: all models must be strings") + + # URL is string + url = stream.get("url") + if not isinstance(url, str): + errors.append(f"{prefix}: url must be string") + + # Duplicate stream check (same protocol:port:url) + dedup_key = f"{stream.get('protocol')}:{stream.get('port')}:{stream.get('url')}" + if dedup_key in seen_urls: + errors.append(f"{prefix}: duplicate stream {dedup_key}") + seen_urls.add(dedup_key) + + # Optional fields type check + if "notes" in stream and not isinstance(stream["notes"], str): + errors.append(f"{prefix}: notes must be string") + if "tags" in stream: + tags = stream["tags"] + if not isinstance(tags, list) or not all(isinstance(t, str) for t in tags): + errors.append(f"{prefix}: tags must be array of strings") + + # No unexpected fields + allowed = REQUIRED_STREAM | {"notes", "tags"} + extra = set(stream.keys()) - allowed + if extra: + warnings.append(f"{prefix}: unexpected fields: {extra}") + + +def main(): + global total_files + + brands_dir = os.path.abspath(BRANDS_DIR) + if not os.path.isdir(brands_dir): + print(f"Error: brands directory not found: {brands_dir}", file=sys.stderr) + sys.exit(1) + + files = sorted(f for f in os.listdir(brands_dir) if f.endswith(".json")) + total_files = len(files) + + for filename in files: + filepath = os.path.join(brands_dir, filename) + validate_file(filepath, filename) + + # Print results + print("=" * 50) + print("Validation results") + print("=" * 50) + print(f" Files checked: {total_files}") + print(f" Streams checked: {total_streams}") + print(f" Errors: {len(errors)}") + print(f" Warnings: {len(warnings)}") + + if errors: + print(f"\n--- ERRORS ({len(errors)}) ---") + for e in errors[:50]: + print(f" {e}") + if len(errors) > 50: + print(f" ... and {len(errors) - 50} more") + + if warnings: + print(f"\n--- WARNINGS ({len(warnings)}) ---") + for w in warnings[:20]: + print(f" {w}") + if len(warnings) > 20: + print(f" ... and {len(warnings) - 20} more") + + if errors: + sys.exit(1) + print("\nAll checks passed.") + + +if __name__ == "__main__": + main()