#!/usr/bin/env python3 import argparse import json import re from datetime import datetime, timedelta from html import unescape from xml.etree import ElementTree as ET from urllib.parse import urljoin from urllib.request import urlopen def fetch_text(url: str, timeout: int = 12) -> str: with urlopen(url, timeout=timeout) as r: return r.read().decode("utf-8", errors="replace") def _to_float(value: str | None) -> float | None: if value is None: return None try: return float(value.replace(",", ".")) except ValueError: return None def _to_int(value: str | None) -> int | None: if value is None: return None try: return int(value) except ValueError: return None def _html_to_text(html: str) -> str: text = re.sub(r"(?i)", "\n", html) text = re.sub(r"(?i)", "\n", text) text = re.sub(r"<[^>]+>", " ", text) text = unescape(text) text = text.replace("\xa0", " ") text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n\s+", "\n", text) text = re.sub(r"\n{2,}", "\n", text) return text.strip() def _grab(pattern: str, text: str) -> str | None: m = re.search(pattern, text, re.I | re.S) return m.group(1).strip() if m else None def _extract_block(text: str, start_pattern: str, end_patterns: list[str]) -> str: start = re.search(start_pattern, text, re.I | re.S) if not start: return "" sub = text[start.end():] end_pos = len(sub) for pattern in end_patterns: m = re.search(pattern, sub, re.I | re.S) if m: end_pos = min(end_pos, m.start()) return sub[:end_pos] def _metric_numbers(block: str, label: str) -> list[float]: line = _grab(rf"{label}\s+([^\n]+)", block) if not line: return [] vals = re.findall(r"[-+]?\d+(?:[.,]\d+)?", line) out: list[float] = [] for v in vals: n = _to_float(v) if n is not None: out.append(n) return out def parse_station_page(html: str) -> dict: txt = _html_to_text(html) lever_values = re.findall(r"Lever\s+(\d{2}:\d{2}:\d{2})", txt, re.I) coucher_values = re.findall(r"Coucher\s+(\d{2}:\d{2}:\d{2})", txt, re.I) moon_phase = _grab( r"(Nouvelle\s+lune|Premier\s+croissant|Premier\s+quartier|Lune\s+gibbeuse\s+croissante|Pleine\s+lune|Lune\s+gibbeuse\s+décroissante|Dernier\s+quartier|Dernier\s+croissant)", txt, ) current_ext = { "temperature_ext_c": _to_float(_grab(r"Température Extérieure\s+([\-0-9\.,]+)\s*°C", txt)), "indice_chaleur_c": _to_float(_grab(r"Indice de Chaleur\s+([\-0-9\.,]+)\s*°C", txt)), "refroidissement_eolien_c": _to_float(_grab(r"Refroidissement Éolien\s+([\-0-9\.,]+)\s*°C", txt)), "point_rosee_c": _to_float(_grab(r"Point de Rosée\s+([\-0-9\.,]+)\s*°C", txt)), "humidity_ext_pct": _to_int(_grab(r"Hygrométrie Extérieure\s+([0-9]+)\s*%", txt)), "pressure_mbar": _to_float(_grab(r"Pression Atmosphérique\s+([\-0-9\.,]+)\s*mbar", txt)), "wind_speed_m_s": _to_float(_grab(r"Vent\s+([\-0-9\.,]+)\s*m/s", txt)), "wind_dir_text": _grab(r"Vent\s+[\-0-9\.,]+\s*m/s\s+([A-ZÀ-ÿ/]+)", txt), "wind_dir_deg": _to_int(_grab(r"Vent\s+[\-0-9\.,]+\s*m/s[^\n]*\(([0-9]{1,3})°\)", txt)), "rain_today_mm": _to_float(_grab(r"Précipitations Aujourd'hui\s+([\-0-9\.,]+)\s*mm", txt)), "rain_rate_mm_h": _to_float(_grab(r"Taux de Précipitations\s+([\-0-9\.,]+)\s*mm/h", txt)), "uv_index": _to_float(_grab(r"Ultra-Violet\s+([\-0-9\.,]+)", txt)), "solar_radiation_w_m2": _to_float(_grab(r"Rayonnement Solaire\s+([\-0-9\.,]+)\s*W/m²", txt)), "temperature_int_c": _to_float(_grab(r"Température Intérieure\s+([\-0-9\.,]+)\s*°C", txt)), "humidity_int_pct": _to_int(_grab(r"Hygrométrie Intérieure\s+([0-9]+)\s*%", txt)), } astrology = { "sunrise": lever_values[0] if len(lever_values) >= 1 else None, "sunset": coucher_values[0] if len(coucher_values) >= 1 else None, "day_length_hm": _grab(r"Journée\s+([0-9]{1,2}:[0-9]{2})", txt), "moonrise": lever_values[1] if len(lever_values) >= 2 else None, "moonset": coucher_values[1] if len(coucher_values) >= 2 else None, "moon_phase": moon_phase, "moon_illumination_pct": _to_int(_grab(r"\n([0-9]{1,3})\s*%\s*(?:\n|$)", txt)), } stats_block = _extract_block( txt, r"Statistiques.*?Aujourd'hui", [r"À propos de cette station"], ) temp_ext_vals = _metric_numbers(stats_block, r"Température Extérieure") heat_vals = _metric_numbers(stats_block, r"Indice de Chaleur") chill_vals = _metric_numbers(stats_block, r"Refroidissement Éolien") dew_vals = _metric_numbers(stats_block, r"Point de Rosée") hum_vals = _metric_numbers(stats_block, r"Hygrométrie Extérieure") pressure_vals = _metric_numbers(stats_block, r"Pression Atmosphérique") wind_max_vals = _metric_numbers(stats_block, r"Vent Max") wind_avg_vals = _metric_numbers(stats_block, r"Vent Moyen") wind_rms_vals = _metric_numbers(stats_block, r"Valeur Efficace \(RMS\) Vent") wind_vec_vals = _metric_numbers(stats_block, r"Direction Vecteur Vent") rain_vals = _metric_numbers(stats_block, r"Précipitations") rain_rate_vals = _metric_numbers(stats_block, r"Taux de Précipitations") etp_vals = _metric_numbers(stats_block, r"Evapotranspiration") uv_vals = _metric_numbers(stats_block, r"Ultra-Violet") solar_vals = _metric_numbers(stats_block, r"Rayonnement Solaire") tint_vals = _metric_numbers(stats_block, r"Température Intérieure") hint_vals = _metric_numbers(stats_block, r"Hygrométrie Intérieure") stats_today = { "temp_ext_max_c": temp_ext_vals[0] if len(temp_ext_vals) >= 1 else None, "temp_ext_min_c": temp_ext_vals[1] if len(temp_ext_vals) >= 2 else None, "heat_index_max_c": heat_vals[0] if len(heat_vals) >= 1 else None, "heat_index_min_c": heat_vals[1] if len(heat_vals) >= 2 else None, "wind_chill_max_c": chill_vals[0] if len(chill_vals) >= 1 else None, "wind_chill_min_c": chill_vals[1] if len(chill_vals) >= 2 else None, "dew_point_max_c": dew_vals[0] if len(dew_vals) >= 1 else None, "dew_point_min_c": dew_vals[1] if len(dew_vals) >= 2 else None, "humidity_ext_max_pct": hum_vals[0] if len(hum_vals) >= 1 else None, "humidity_ext_min_pct": hum_vals[1] if len(hum_vals) >= 2 else None, "pressure_max_mbar": pressure_vals[0] if len(pressure_vals) >= 1 else None, "pressure_min_mbar": pressure_vals[1] if len(pressure_vals) >= 2 else None, "wind_max_m_s": wind_max_vals[0] if len(wind_max_vals) >= 1 else None, "wind_max_dir_deg": _to_int(str(int(wind_max_vals[1]))) if len(wind_max_vals) >= 2 else None, "wind_avg_m_s": wind_avg_vals[0] if len(wind_avg_vals) >= 1 else None, "wind_rms_m_s": wind_rms_vals[0] if len(wind_rms_vals) >= 1 else None, "wind_vector_speed_m_s": wind_vec_vals[0] if len(wind_vec_vals) >= 1 else None, "wind_vector_dir_deg": _to_int(str(int(wind_vec_vals[1]))) if len(wind_vec_vals) >= 2 else None, "rain_mm": rain_vals[0] if len(rain_vals) >= 1 else None, "rain_rate_mm_h": rain_rate_vals[0] if len(rain_rate_vals) >= 1 else None, "evapotranspiration_mm": etp_vals[0] if len(etp_vals) >= 1 else None, "uv_max": uv_vals[0] if len(uv_vals) >= 1 else None, "solar_radiation_max_w_m2": solar_vals[0] if len(solar_vals) >= 1 else None, "solar_radiation_min_w_m2": solar_vals[1] if len(solar_vals) >= 2 else None, "temp_int_max_c": tint_vals[0] if len(tint_vals) >= 1 else None, "temp_int_min_c": tint_vals[1] if len(tint_vals) >= 2 else None, "humidity_int_max_pct": hint_vals[0] if len(hint_vals) >= 1 else None, "humidity_int_min_pct": hint_vals[1] if len(hint_vals) >= 2 else None, } station_info = { "hardware": _grab(r"Matériel\s+([^\n]+)", txt), "latitude_text": _grab(r"Latitude\s+([^\n]+)", txt), "longitude_text": _grab(r"Longitude\s+([^\n]+)", txt), "altitude_m": _to_float(_grab(r"Altitude\s+([\-0-9\.,]+)\s+mètres", txt)), "server_uptime": _grab(r"Serveur en service depuis\s+([^\n]+)", txt), "weewx_uptime": _grab(r"WeeWX en service depuis\s+([^\n]+)", txt), "weewx_version": _grab(r"Version de WeeWX\s+([^\n]+)", txt), "skin": _grab(r"Skin\s+([^\n]+)", txt), } return { "current_extended": current_ext, "astrology": astrology, "stats_today": stats_today, "station_info": station_info, } def parse_current_from_rss(xml: str) -> dict: try: root = ET.fromstring(xml) except ET.ParseError: return {} channel = root.find("channel") if channel is None: return {} current_item = None for item in channel.findall("item"): title = (item.findtext("title") or "").strip() if "Conditions Météorologiques" in title: current_item = item break if current_item is None: current_item = channel.find("item") if current_item is None: return {} desc = unescape(current_item.findtext("description") or "") pub = (current_item.findtext("pubDate") or channel.findtext("pubDate") or "").strip() def grab(pattern: str) -> str | None: m = re.search(pattern, desc, re.I) return m.group(1).strip() if m else None wind_text = grab(r"Vent:\s*([^;]+)") wind_speed = _to_float(grab(r"Vent:\s*([\-0-9\.,]+)\s*m/s")) wind_dir_deg = _to_int(grab(r"Vent:\s*[\-0-9\.,]+\s*m/s\s*de\s*([0-9]{1,3})°")) return { "observed_at": pub, "temperature_ext_c": _to_float(grab(r"Température Extérieure:\s*([\-0-9\.,]+)°C")), "temperature_int_c": _to_float(grab(r"Température Intérieure:\s*([\-0-9\.,]+)°C")), "pressure_mbar": _to_float(grab(r"Pression Atmosphérique:\s*([\-0-9\.,]+)\s*mbar")), "wind": wind_text, "wind_speed_m_s": wind_speed, "wind_dir_deg": wind_dir_deg, "rain_mm": _to_float(grab(r"Précipitations:\s*([\-0-9\.,]+)\s*mm")), "rain_rate_mm_h": _to_float(grab(r"Taux de Précipitations:\s*([\-0-9\.,]+)\s*mm/h")), "wind_chill_c": _to_float(grab(r"Refroidissement Éolien:\s*([\-0-9\.,]+)°C")), "heat_index_c": _to_float(grab(r"Indice de Chaleur:\s*([\-0-9\.,]+)°C")), "dew_point_c": _to_float(grab(r"Point de Rosée:\s*([\-0-9\.,]+)°C")), "humidity_ext_pct": _to_int(grab(r"Hygrométrie Extérieure:\s*([0-9]+)%")), "humidity_int_pct": _to_int(grab(r"Hygrométrie Intérieure:\s*([0-9]+)%")), } def parse_daily_summary_from_rss(xml: str) -> dict: try: root = ET.fromstring(xml) except ET.ParseError: return {} channel = root.find("channel") if channel is None: return {} summary_item = None for item in channel.findall("item"): title = (item.findtext("title") or "").strip() if "Résumé météorologique quotidien" in title: summary_item = item break if summary_item is None: return {} desc = unescape(summary_item.findtext("description") or "") pub = (summary_item.findtext("pubDate") or channel.findtext("pubDate") or "").strip() def _v(pattern: str): m = re.search(pattern, desc, re.I) return m.group(1).strip() if m else None def _vt(pattern: str): m = re.search(pattern, desc, re.I) if not m: return None, None return m.group(1).strip(), m.group(2).strip() tmin, tmin_t = _vt(r"Min Température Extérieure:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") tmax, tmax_t = _vt(r"Max Température Extérieure:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") ti_min, ti_min_t = _vt(r"Min Température Intérieure:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") ti_max, ti_max_t = _vt(r"Max Température Intérieure:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") pmin, pmin_t = _vt(r"Min Pression Atmosphérique:\s*([\-0-9\.,]+)\s*mbar\s+at\s+([0-9:]{8})") pmax, pmax_t = _vt(r"Max Pression Atmosphérique:\s*([\-0-9\.,]+)\s*mbar\s+at\s+([0-9:]{8})") wmax, wmax_t = _vt(r"Max Vent:\s*([\-0-9\.,]+)\s*m/s\s+de\s+([0-9]{1,3})°\s+at\s+([0-9:]{8})") rrmax, rrmax_t = _vt(r"Max Taux de Précipitations:\s*([\-0-9\.,]+)\s*mm/h\s*;?") wc_min, wc_min_t = _vt(r"Min Refroidissement Éolien:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") wc_max, wc_max_t = _vt(r"Max Refroidissement Éolien:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") hi_min, hi_min_t = _vt(r"Min Indice de Chaleur:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") hi_max, hi_max_t = _vt(r"Max Indice de Chaleur:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") dp_min, dp_min_t = _vt(r"Min Point de Rosée:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") dp_max, dp_max_t = _vt(r"Max Point de Rosée:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})") he_min, he_min_t = _vt(r"Min Hygrométrie Extérieure:\s*([0-9]+)%\s+at\s+([0-9:]{8})") he_max, he_max_t = _vt(r"Max Hygrométrie Extérieure:\s*([0-9]+)%\s+at\s+([0-9:]{8})") hi2_min, hi2_min_t = _vt(r"Min Hygrométrie Intérieure:\s*([0-9]+)%\s+at\s+([0-9:]{8})") hi2_max, hi2_max_t = _vt(r"Max Hygrométrie Intérieure:\s*([0-9]+)%\s+at\s+([0-9:]{8})") wind_match = re.search( r"Max Vent:\s*([\-0-9\.,]+)\s*m/s\s+de\s+([0-9]{1,3})°\s+at\s+([0-9:]{8})", desc, re.I, ) wind_max_m_s = _to_float(wind_match.group(1)) if wind_match else None wind_max_dir_deg = _to_int(wind_match.group(2)) if wind_match else None wind_max_time = wind_match.group(3) if wind_match else None rain_mm = _to_float(_v(r"Précipitations:\s*([\-0-9\.,]+)\s*mm")) rr_match = re.search(r"Max Taux de Précipitations:\s*([\-0-9\.,]+)\s*mm/h", desc, re.I) rain_rate_mm_h = _to_float(rr_match.group(1)) if rr_match else None return { "source": "rss_daily_summary", "observed_at": pub, "temp_ext_min_c": _to_float(tmin), "temp_ext_min_time": tmin_t, "temp_ext_max_c": _to_float(tmax), "temp_ext_max_time": tmax_t, "temp_int_min_c": _to_float(ti_min), "temp_int_min_time": ti_min_t, "temp_int_max_c": _to_float(ti_max), "temp_int_max_time": ti_max_t, "pressure_min_mbar": _to_float(pmin), "pressure_min_time": pmin_t, "pressure_max_mbar": _to_float(pmax), "pressure_max_time": pmax_t, "wind_max_m_s": wind_max_m_s, "wind_max_dir_deg": wind_max_dir_deg, "wind_max_time": wind_max_time, "rain_mm": rain_mm, "rain_rate_mm_h": rain_rate_mm_h, "wind_chill_min_c": _to_float(wc_min), "wind_chill_min_time": wc_min_t, "wind_chill_max_c": _to_float(wc_max), "wind_chill_max_time": wc_max_t, "heat_index_min_c": _to_float(hi_min), "heat_index_min_time": hi_min_t, "heat_index_max_c": _to_float(hi_max), "heat_index_max_time": hi_max_t, "dew_point_min_c": _to_float(dp_min), "dew_point_min_time": dp_min_t, "dew_point_max_c": _to_float(dp_max), "dew_point_max_time": dp_max_t, "humidity_ext_min_pct": _to_int(he_min), "humidity_ext_min_time": he_min_t, "humidity_ext_max_pct": _to_int(he_max), "humidity_ext_max_time": he_max_t, "humidity_int_min_pct": _to_int(hi2_min), "humidity_int_min_time": hi2_min_t, "humidity_int_max_pct": _to_int(hi2_max), "humidity_int_max_time": hi2_max_t, } def parse_yesterday_from_noaa(noaa_text: str, day: int) -> dict: lines = noaa_text.splitlines() target = None for line in lines: if re.match(rf"\s*{day:02d}\s+", line) or re.match(rf"\s*{day}\s+", line): target = line break if not target: return {"error": f"day {day} not found in NOAA monthly file"} # Columns (weewx summary): # DAY MEAN HIGH TIME LOW TIME HEAT COOL RAIN AVG_WIND HIGH_WIND TIME DOM_DIR nums = re.split(r"\s+", target.strip()) if len(nums) < 13: return {"raw_line": target, "error": "unexpected NOAA line format"} return { "raw_line": target.strip(), "day_of_month": int(nums[0]), "temp_mean_c": _to_float(nums[1]), "temp_max_c": _to_float(nums[2]), "temp_max_time": nums[3], "temp_min_c": _to_float(nums[4]), "temp_min_time": nums[5], "rain_mm": _to_float(nums[8]), "wind_avg_m_s": _to_float(nums[9]), "wind_max_m_s": _to_float(nums[10]), "wind_max_time": nums[11], "wind_dom_dir_deg": _to_int(nums[12]), } def main() -> int: ap = argparse.ArgumentParser(description="Récupère météo station locale (RSS + NOAA) en JSON") ap.add_argument("--base", default="http://10.0.0.8:8081/", help="URL de base de la station") ap.add_argument("--date", help="Date cible NOAA au format YYYY-MM-DD (défaut: veille)") ap.add_argument("--json", action="store_true", help="Sortie JSON (défaut: oui)") args = ap.parse_args() base = args.base.rstrip("/") + "/" rss_url = urljoin(base, "rss.xml") station_page_url = base if args.date: try: target_date = datetime.strptime(args.date, "%Y-%m-%d") except ValueError: raise SystemExit("Erreur: --date doit être au format YYYY-MM-DD") else: target_date = datetime.now() - timedelta(days=1) noaa_month_url = urljoin(base, f"NOAA/NOAA-{target_date.year}-{target_date.month:02d}.txt") rss_xml = fetch_text(rss_url) current = parse_current_from_rss(rss_xml) rss_stats_today = parse_daily_summary_from_rss(rss_xml) station_html = fetch_text(station_page_url) station_extra = parse_station_page(station_html) if rss_stats_today: station_extra["stats_today"] = rss_stats_today else: station_extra.setdefault("stats_today", {})["source"] = "station_page_fallback" noaa_text = fetch_text(noaa_month_url) day_data = parse_yesterday_from_noaa(noaa_text, target_date.day) day_data["date"] = target_date.strftime("%Y-%m-%d") out = { "source": { "base_url": base, "rss_url": rss_url, "station_page_url": station_page_url, "noaa_month_url": noaa_month_url, }, "target_date": target_date.strftime("%Y-%m-%d"), "current": current, "station": station_extra, "day_data": day_data, } print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())