"""Service de collecte des données de la station météo locale WeeWX.""" import html import logging import re import unicodedata import xml.etree.ElementTree as ET from datetime import datetime, timedelta, timezone import httpx from app.config import STATION_URL logger = logging.getLogger(__name__) def _safe_float(text: str | None) -> float | None: if text is None: return None try: cleaned = text.strip().replace(",", ".") # Retirer unités courantes for unit in [ " °C", " %", " %", " hPa", " mbar", " km/h", " m/s", " mm/h", " mm", " W/m²", " W/m2", "°C", "%", "hPa", "mbar", ]: cleaned = cleaned.replace(unit, "") return float(cleaned.strip()) except (ValueError, AttributeError): return None def _normalize(text: str) -> str: text = unicodedata.normalize("NFKD", text) text = "".join(ch for ch in text if not unicodedata.combining(ch)) text = text.lower() return re.sub(r"\s+", " ", text).strip() def _to_kmh(value: float | None, unit: str | None) -> float | None: if value is None: return None u = (unit or "").strip().lower() if u == "m/s": return round(value * 3.6, 1) return round(value, 1) def _direction_to_abbr(deg: float | None) -> str | None: if deg is None: return None dirs = ["N", "NE", "E", "SE", "S", "SO", "O", "NO"] return dirs[round(deg / 45) % 8] def fetch_current(base_url: str = STATION_URL) -> dict | None: """Scrape les données actuelles depuis le RSS de la station WeeWX. Retourne un dict avec les clés : temp_ext, humidite, pression, pluie_mm, vent_kmh, vent_dir, uv, solaire — ou None si indisponible. """ try: url = base_url.rstrip("/") + "/rss.xml" r = httpx.get(url, timeout=10) r.raise_for_status() root = ET.fromstring(r.text) channel = root.find("channel") if channel is None: return None item = channel.find("item") if item is None: return None desc = html.unescape(item.findtext("description") or "") result: dict = {} segments = [seg.strip() for seg in desc.split(";") if seg.strip()] for seg in segments: if ":" not in seg: continue raw_key, raw_value = seg.split(":", 1) key = _normalize(raw_key) value = raw_value.strip() if "temperature exterieure" in key or "outside temperature" in key: result["temp_ext"] = _safe_float(value) continue if "temperature interieure" in key or "inside temperature" in key: result["temp_int"] = _safe_float(value) continue if "hygrometrie exterieure" in key or "outside humidity" in key: result["humidite"] = _safe_float(value) continue if "pression atmospherique" in key or "barometer" in key: result["pression"] = _safe_float(value) continue if "precipitations" in key and "taux" not in key and "rate" not in key: result["pluie_mm"] = _safe_float(value) continue if key in {"uv", "ultra-violet"} or "ultra violet" in key: result["uv"] = _safe_float(value) continue if "rayonnement solaire" in key or "solar radiation" in key: result["solaire"] = _safe_float(value) continue if key == "vent" or "wind" in key: speed_match = re.search(r"(-?\d+(?:[.,]\d+)?)\s*(m/s|km/h)?", value, re.IGNORECASE) speed_val = _safe_float(speed_match.group(1)) if speed_match else None speed_unit = speed_match.group(2) if speed_match else None result["vent_kmh"] = _to_kmh(speed_val, speed_unit) deg_match = re.search(r"(\d{1,3}(?:[.,]\d+)?)\s*°", value) if deg_match: result["vent_dir"] = _direction_to_abbr(_safe_float(deg_match.group(1))) continue card_match = re.search(r"\b(N|NE|E|SE|S|SO|O|NO|NNE|ENE|ESE|SSE|SSO|OSO|ONO|NNO)\b", value, re.IGNORECASE) result["vent_dir"] = card_match.group(1).upper() if card_match else None return result if any(v is not None for v in result.values()) else None except Exception as e: logger.warning(f"Station fetch_current error: {e}") return None def _parse_noaa_day_line(parts: list[str]) -> dict | None: """Parse une ligne de données journalières du fichier NOAA WeeWX. Format standard : day mean max hh:mm min hh:mm HDD CDD rain wind_avg wind_max hh:mm dir """ if not parts or not parts[0].isdigit(): return None # Format complet avec timestamps hh:mm en positions 3 et 5 if len(parts) >= 11 and ":" in parts[3] and ":" in parts[5]: return { "temp_ext": _safe_float(parts[1]), "t_max": _safe_float(parts[2]), "t_min": _safe_float(parts[4]), "pluie_mm": _safe_float(parts[8]), "vent_kmh": _to_kmh(_safe_float(parts[10]), "m/s"), } # Fallback générique (anciens formats sans hh:mm) return { "t_max": _safe_float(parts[1]) if len(parts) > 1 else None, "t_min": _safe_float(parts[2]) if len(parts) > 2 else None, "temp_ext": _safe_float(parts[3]) if len(parts) > 3 else None, "pluie_mm": _safe_float(parts[5]) if len(parts) > 5 else None, "vent_kmh": _safe_float(parts[6]) if len(parts) > 6 else None, } def fetch_month_summaries(year: int, month: int, base_url: str = STATION_URL) -> dict[int, dict]: """Récupère tous les résumés journaliers d'un mois depuis le fichier NOAA WeeWX. Retourne un dict {numéro_jour: data_dict} pour chaque jour disponible du mois. Un seul appel HTTP par mois — utilisé pour le backfill groupé. """ try: url = f"{base_url.rstrip('/')}/NOAA/NOAA-{year:04d}-{month:02d}.txt" r = httpx.get(url, timeout=15) r.raise_for_status() result: dict[int, dict] = {} for line in r.text.splitlines(): parts = line.split() if not parts or not parts[0].isdigit(): continue data = _parse_noaa_day_line(parts) if data: result[int(parts[0])] = data return result except Exception as e: logger.warning(f"Station fetch_month_summaries({year}-{month:02d}) error: {e}") return {} def fetch_yesterday_summary(base_url: str = STATION_URL) -> dict | None: """Récupère le résumé de la veille via le fichier NOAA mensuel de la station WeeWX. Retourne un dict avec : temp_ext (moy), t_min, t_max, pluie_mm — ou None. """ yesterday = (datetime.now() - timedelta(days=1)).date() month_data = fetch_month_summaries(yesterday.year, yesterday.month, base_url) return month_data.get(yesterday.day)