import json import asyncio from datetime import datetime from typing import Any import aiohttp from app.config import settings class HAClient: def __init__(self): self.base_url = settings.ha_base_url.rstrip("/") self.token = settings.ha_token self._headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json", } self._ws_id_counter = 0 def _next_ws_id(self) -> int: self._ws_id_counter += 1 return self._ws_id_counter async def check_connection(self) -> tuple[bool, str]: try: async with aiohttp.ClientSession() as session: async with session.get( f"{self.base_url}/api/", headers=self._headers, timeout=aiohttp.ClientTimeout(total=10), ) as resp: if resp.status == 200: return True, "Connecté" elif resp.status == 401: return False, "Token invalide (401)" else: return False, f"Erreur HTTP {resp.status}" except aiohttp.ClientError as e: return False, f"Connexion impossible : {e}" except asyncio.TimeoutError: return False, "Timeout de connexion" async def fetch_all_states(self) -> list[dict[str, Any]]: async with aiohttp.ClientSession() as session: async with session.get( f"{self.base_url}/api/states", headers=self._headers, timeout=aiohttp.ClientTimeout(total=30), ) as resp: resp.raise_for_status() return await resp.json() async def _ws_command(self, command: dict[str, Any]) -> dict[str, Any]: async with aiohttp.ClientSession() as session: async with session.ws_connect( f"{self.base_url}/api/websocket", timeout=aiohttp.ClientTimeout(total=30), ) as ws: # Attendre auth_required msg = await ws.receive_json() # Authentification await ws.send_json({"type": "auth", "access_token": self.token}) msg = await ws.receive_json() if msg.get("type") != "auth_ok": raise ConnectionError(f"Authentification WS échouée : {msg}") # Envoyer la commande cmd_id = self._next_ws_id() command["id"] = cmd_id await ws.send_json(command) # Attendre la réponse msg = await ws.receive_json() if not msg.get("success"): raise RuntimeError( f"Commande WS échouée : {msg.get('error', {}).get('message', 'Erreur inconnue')}" ) return msg.get("result", {}) async def fetch_entity_registry(self) -> list[dict[str, Any]]: return await self._ws_command({"type": "config/entity_registry/list"}) async def update_entity_registry( self, entity_id: str, **updates: Any ) -> dict[str, Any]: return await self._ws_command( { "type": "config/entity_registry/update", "entity_id": entity_id, **updates, } ) def _parse_dt(value: str | None) -> datetime | None: if not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except (ValueError, AttributeError): return None def normalize_entity( state: dict[str, Any], registry_entry: dict[str, Any] | None = None, ) -> dict[str, Any]: attrs = state.get("attributes", {}) entity_id = state.get("entity_id", "") domain = entity_id.split(".")[0] if "." in entity_id else "" reg = registry_entry or {} return { "entity_id": entity_id, "domain": domain, "friendly_name": attrs.get("friendly_name", ""), "state": state.get("state", ""), "attrs_json": json.dumps(attrs, ensure_ascii=False), "device_class": attrs.get("device_class"), "unit_of_measurement": attrs.get("unit_of_measurement"), "area_id": reg.get("area_id"), "device_id": reg.get("device_id"), "integration": reg.get("platform"), "is_disabled": reg.get("disabled_by") is not None, "is_hidden": reg.get("hidden_by") is not None, "is_available": state.get("state") not in ("unavailable", "unknown"), "last_changed": _parse_dt(state.get("last_changed")), "last_updated": _parse_dt(state.get("last_updated")), } ha_client = HAClient()