""" Client API OPNsense pour IPWatch Gère les communications avec l'API REST OPNsense (Kea DHCP) """ import httpx import ipaddress from typing import Optional, Dict, Any, List from backend.app.core.config import config_manager class OPNsenseAPIError(Exception): """Erreur retournée par l'API OPNsense (validation, etc.)""" def __init__(self, message: str, validations: dict = None): self.validations = validations or {} super().__init__(message) class OPNsenseClient: """Client pour l'API OPNsense avec authentification Basic (api_key:api_secret)""" def __init__(self): config = config_manager.config.opnsense self.base_url = f"{config.protocol}://{config.host}" self.auth = (config.api_key, config.api_secret) self.verify_ssl = config.verify_ssl self.enabled = config.enabled print(f"[OPNsense] Client initialisé: {self.base_url} (ssl_verify={self.verify_ssl})") def _get_client(self) -> httpx.AsyncClient: """Crée un client HTTP async configuré""" return httpx.AsyncClient( base_url=self.base_url, auth=self.auth, verify=self.verify_ssl, timeout=30.0 ) def _check_result(self, data: Dict[str, Any], action: str): """Vérifie que le résultat OPNsense n'est pas 'failed'""" if data.get("result") == "failed": validations = data.get("validations", {}) msg = f"{action} échoué" if validations: details = "; ".join(f"{k}: {v}" for k, v in validations.items()) msg = f"{action} échoué: {details}" print(f"[OPNsense] VALIDATION ERREUR: {msg}") raise OPNsenseAPIError(msg, validations) async def test_connection(self) -> Dict[str, Any]: """Teste la connexion à l'API OPNsense""" print(f"[OPNsense] Test connexion: GET {self.base_url}/api/core/firmware/status") async with self._get_client() as client: response = await client.get("/api/core/firmware/status") print(f"[OPNsense] Réponse test: {response.status_code}") response.raise_for_status() return response.json() async def search_subnets(self) -> Dict[str, Any]: """Liste les subnets Kea DHCPv4""" print(f"[OPNsense] Recherche subnets: GET {self.base_url}/api/kea/dhcpv4/search_subnet") async with self._get_client() as client: response = await client.get("/api/kea/dhcpv4/search_subnet") print(f"[OPNsense] Réponse search_subnet: {response.status_code}") if response.status_code != 200: print(f"[OPNsense] Corps réponse erreur: {response.text[:500]}") response.raise_for_status() data = response.json() rows = data.get("rows", []) print(f"[OPNsense] {len(rows)} subnet(s) trouvé(s)") for row in rows: print(f"[OPNsense] - {row.get('subnet')}: uuid={row.get('uuid')}") return data async def find_subnet_for_ip(self, ip_address: str) -> Optional[str]: """Trouve le subnet UUID correspondant à une adresse IP""" print(f"[OPNsense] Recherche subnet pour IP {ip_address}") ip_obj = ipaddress.ip_address(ip_address) data = await self.search_subnets() rows = data.get("rows", []) for row in rows: subnet_cidr = row.get("subnet", "") try: network = ipaddress.ip_network(subnet_cidr, strict=False) if ip_obj in network: uuid = row.get("uuid") print(f"[OPNsense] Subnet trouvé: {subnet_cidr} -> uuid={uuid}") return uuid except ValueError: continue print(f"[OPNsense] Aucun subnet trouvé pour {ip_address}") return None async def search_reservations(self) -> Dict[str, Any]: """Liste toutes les réservations DHCP Kea""" print(f"[OPNsense] Recherche réservations: GET {self.base_url}/api/kea/dhcpv4/search_reservation") async with self._get_client() as client: response = await client.get("/api/kea/dhcpv4/search_reservation") print(f"[OPNsense] Réponse search_reservation: {response.status_code}") if response.status_code != 200: print(f"[OPNsense] Corps réponse erreur: {response.text[:500]}") response.raise_for_status() data = response.json() rows = data.get("rows", []) print(f"[OPNsense] {len(rows)} réservation(s) trouvée(s)") return data async def get_reservation(self, uuid: str) -> Dict[str, Any]: """Récupère une réservation par UUID""" print(f"[OPNsense] Get réservation: {uuid}") async with self._get_client() as client: response = await client.get(f"/api/kea/dhcpv4/get_reservation/{uuid}") print(f"[OPNsense] Réponse get_reservation: {response.status_code}") response.raise_for_status() return response.json() async def add_reservation(self, data: Dict[str, Any]) -> Dict[str, Any]: """Crée une nouvelle réservation DHCP Kea""" payload = {"reservation": data} print(f"[OPNsense] Ajout réservation: POST {self.base_url}/api/kea/dhcpv4/add_reservation") print(f"[OPNsense] Payload: {payload}") async with self._get_client() as client: response = await client.post( "/api/kea/dhcpv4/add_reservation", json=payload ) print(f"[OPNsense] Réponse add_reservation: {response.status_code}") print(f"[OPNsense] Corps réponse: {response.text[:500]}") response.raise_for_status() result = response.json() self._check_result(result, "Ajout réservation") return result async def set_reservation(self, uuid: str, data: Dict[str, Any]) -> Dict[str, Any]: """Met à jour une réservation existante""" payload = {"reservation": data} print(f"[OPNsense] Mise à jour réservation {uuid}: POST {self.base_url}/api/kea/dhcpv4/set_reservation/{uuid}") print(f"[OPNsense] Payload: {payload}") async with self._get_client() as client: response = await client.post( f"/api/kea/dhcpv4/set_reservation/{uuid}", json=payload ) print(f"[OPNsense] Réponse set_reservation: {response.status_code}") print(f"[OPNsense] Corps réponse: {response.text[:500]}") response.raise_for_status() result = response.json() self._check_result(result, "Mise à jour réservation") return result async def del_reservation(self, uuid: str) -> Dict[str, Any]: """Supprime une réservation""" print(f"[OPNsense] Suppression réservation: {uuid}") async with self._get_client() as client: response = await client.post(f"/api/kea/dhcpv4/del_reservation/{uuid}") print(f"[OPNsense] Réponse del_reservation: {response.status_code}") response.raise_for_status() return response.json() async def reconfigure_kea(self) -> Dict[str, Any]: """Applique les changements Kea (reconfigure le service)""" print(f"[OPNsense] Reconfiguration Kea: POST {self.base_url}/api/kea/service/reconfigure") async with self._get_client() as client: response = await client.post("/api/kea/service/reconfigure") print(f"[OPNsense] Réponse reconfigure: {response.status_code}") if response.status_code != 200: print(f"[OPNsense] Corps réponse erreur: {response.text[:500]}") response.raise_for_status() return response.json() async def find_reservation_by_ip(self, ip_address: str) -> Optional[Dict[str, Any]]: """Cherche une réservation existante par adresse IP""" print(f"[OPNsense] Recherche réservation par IP: {ip_address}") result = await self.search_reservations() rows = result.get("rows", []) for row in rows: if row.get("ip_address") == ip_address: print(f"[OPNsense] Réservation trouvée: uuid={row.get('uuid')}") return row print(f"[OPNsense] Aucune réservation existante pour {ip_address}") return None async def find_reservation_by_mac(self, mac_address: str) -> Optional[Dict[str, Any]]: """Cherche une réservation existante par adresse MAC""" mac_normalized = mac_address.lower().replace("-", ":") print(f"[OPNsense] Recherche réservation par MAC: {mac_normalized}") result = await self.search_reservations() rows = result.get("rows", []) for row in rows: row_mac = (row.get("hw_address") or "").lower().replace("-", ":") if row_mac == mac_normalized: print(f"[OPNsense] Réservation trouvée par MAC: uuid={row.get('uuid')}") return row print(f"[OPNsense] Aucune réservation pour MAC {mac_normalized}") return None