""" Met à jour la base IP avec les champs ip_parent/ip_enfant depuis config.yaml. Ajoute les colonnes si nécessaire et synchronise les relations. """ from __future__ import annotations import json import sqlite3 from pathlib import Path from typing import Any, Dict, List, Optional import yaml CONFIG_PATH = Path(__file__).resolve().parents[3] / "config.yaml" def load_config() -> Dict[str, Any]: with CONFIG_PATH.open("r", encoding="utf-8") as handle: return yaml.safe_load(handle) or {} def normalize_children(value: Any) -> Optional[List[str]]: if value is None: return None if isinstance(value, str): return [value] if value else [] if isinstance(value, list): return [str(item) for item in value if item] return [] def ensure_columns(conn: sqlite3.Connection) -> None: cursor = conn.execute("PRAGMA table_info(ip)") columns = {row[1] for row in cursor.fetchall()} if "ip_parent" not in columns: conn.execute("ALTER TABLE ip ADD COLUMN ip_parent TEXT") if "ip_enfant" not in columns: conn.execute("ALTER TABLE ip ADD COLUMN ip_enfant TEXT") conn.commit() def collect_config_mapping(config: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: mapping: Dict[str, Dict[str, Any]] = {} ip_classes = config.get("ip_classes", {}) or {} for ip_address, data in ip_classes.items(): if not isinstance(data, dict): continue if "ip_parent" in data or "ip_enfant" in data: mapping[ip_address] = { "ip_parent": data.get("ip_parent"), "ip_enfant": normalize_children(data.get("ip_enfant")) } for host in config.get("hosts", []) or []: if not isinstance(host, dict): continue ip_address = host.get("ip") if not ip_address: continue if "ip_parent" in host or "ip_enfant" in host: entry = mapping.setdefault(ip_address, {}) entry.setdefault("ip_parent", host.get("ip_parent")) entry.setdefault("ip_enfant", normalize_children(host.get("ip_enfant"))) return mapping def parse_json_list(value: Optional[str]) -> List[str]: if not value: return [] try: parsed = json.loads(value) except json.JSONDecodeError: return [] if isinstance(parsed, list): return [str(item) for item in parsed if item] return [] def main() -> None: config = load_config() db_path = Path(config.get("database", {}).get("path", "./data/db.sqlite")) mapping = collect_config_mapping(config) if not db_path.exists(): raise FileNotFoundError(f"Base de données introuvable: {db_path}") conn = sqlite3.connect(db_path) try: ensure_columns(conn) if mapping: for ip_address, values in mapping.items(): ip_parent = values.get("ip_parent") ip_enfant = values.get("ip_enfant") if ip_parent is None and ip_enfant is None: continue if ip_enfant is not None: ip_enfant_json = json.dumps(ip_enfant) conn.execute( "UPDATE ip SET ip_parent = COALESCE(?, ip_parent), ip_enfant = ? WHERE ip = ?", (ip_parent, ip_enfant_json, ip_address) ) else: conn.execute( "UPDATE ip SET ip_parent = COALESCE(?, ip_parent) WHERE ip = ?", (ip_parent, ip_address) ) cursor = conn.execute("SELECT ip, ip_parent, ip_enfant FROM ip") rows = cursor.fetchall() parent_children: Dict[str, List[str]] = {} for ip_address, ip_parent, _ in rows: if ip_parent: parent_children.setdefault(ip_parent, []).append(ip_address) for ip_address, _, ip_enfant_raw in rows: existing = parse_json_list(ip_enfant_raw) merged = list(dict.fromkeys(existing + parent_children.get(ip_address, []))) conn.execute( "UPDATE ip SET ip_enfant = ? WHERE ip = ?", (json.dumps(merged), ip_address) ) conn.commit() print("Mise à jour terminée.") finally: conn.close() if __name__ == "__main__": main()