Files
jardin/station_meteo/local_station_weather.py
2026-02-22 22:18:32 +01:00

435 lines
18 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import re
from datetime import datetime, timedelta
from html import unescape
from xml.etree import ElementTree as ET
from urllib.parse import urljoin
from urllib.request import urlopen
def fetch_text(url: str, timeout: int = 12) -> str:
with urlopen(url, timeout=timeout) as r:
return r.read().decode("utf-8", errors="replace")
def _to_float(value: str | None) -> float | None:
if value is None:
return None
try:
return float(value.replace(",", "."))
except ValueError:
return None
def _to_int(value: str | None) -> int | None:
if value is None:
return None
try:
return int(value)
except ValueError:
return None
def _html_to_text(html: str) -> str:
text = re.sub(r"(?i)<br\s*/?>", "\n", html)
text = re.sub(r"(?i)</(tr|p|div|h1|h2|h3|h4|li|table)>", "\n", text)
text = re.sub(r"<[^>]+>", " ", text)
text = unescape(text)
text = text.replace("\xa0", " ")
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n\s+", "\n", text)
text = re.sub(r"\n{2,}", "\n", text)
return text.strip()
def _grab(pattern: str, text: str) -> str | None:
m = re.search(pattern, text, re.I | re.S)
return m.group(1).strip() if m else None
def _extract_block(text: str, start_pattern: str, end_patterns: list[str]) -> str:
start = re.search(start_pattern, text, re.I | re.S)
if not start:
return ""
sub = text[start.end():]
end_pos = len(sub)
for pattern in end_patterns:
m = re.search(pattern, sub, re.I | re.S)
if m:
end_pos = min(end_pos, m.start())
return sub[:end_pos]
def _metric_numbers(block: str, label: str) -> list[float]:
line = _grab(rf"{label}\s+([^\n]+)", block)
if not line:
return []
vals = re.findall(r"[-+]?\d+(?:[.,]\d+)?", line)
out: list[float] = []
for v in vals:
n = _to_float(v)
if n is not None:
out.append(n)
return out
def parse_station_page(html: str) -> dict:
txt = _html_to_text(html)
lever_values = re.findall(r"Lever\s+(\d{2}:\d{2}:\d{2})", txt, re.I)
coucher_values = re.findall(r"Coucher\s+(\d{2}:\d{2}:\d{2})", txt, re.I)
moon_phase = _grab(
r"(Nouvelle\s+lune|Premier\s+croissant|Premier\s+quartier|Lune\s+gibbeuse\s+croissante|Pleine\s+lune|Lune\s+gibbeuse\s+décroissante|Dernier\s+quartier|Dernier\s+croissant)",
txt,
)
current_ext = {
"temperature_ext_c": _to_float(_grab(r"Température Extérieure\s+([\-0-9\.,]+)\s*°C", txt)),
"indice_chaleur_c": _to_float(_grab(r"Indice de Chaleur\s+([\-0-9\.,]+)\s*°C", txt)),
"refroidissement_eolien_c": _to_float(_grab(r"Refroidissement Éolien\s+([\-0-9\.,]+)\s*°C", txt)),
"point_rosee_c": _to_float(_grab(r"Point de Rosée\s+([\-0-9\.,]+)\s*°C", txt)),
"humidity_ext_pct": _to_int(_grab(r"Hygrométrie Extérieure\s+([0-9]+)\s*%", txt)),
"pressure_mbar": _to_float(_grab(r"Pression Atmosphérique\s+([\-0-9\.,]+)\s*mbar", txt)),
"wind_speed_m_s": _to_float(_grab(r"Vent\s+([\-0-9\.,]+)\s*m/s", txt)),
"wind_dir_text": _grab(r"Vent\s+[\-0-9\.,]+\s*m/s\s+([A-ZÀ-ÿ/]+)", txt),
"wind_dir_deg": _to_int(_grab(r"Vent\s+[\-0-9\.,]+\s*m/s[^\n]*\(([0-9]{1,3})°\)", txt)),
"rain_today_mm": _to_float(_grab(r"Précipitations Aujourd'hui\s+([\-0-9\.,]+)\s*mm", txt)),
"rain_rate_mm_h": _to_float(_grab(r"Taux de Précipitations\s+([\-0-9\.,]+)\s*mm/h", txt)),
"uv_index": _to_float(_grab(r"Ultra-Violet\s+([\-0-9\.,]+)", txt)),
"solar_radiation_w_m2": _to_float(_grab(r"Rayonnement Solaire\s+([\-0-9\.,]+)\s*W/m²", txt)),
"temperature_int_c": _to_float(_grab(r"Température Intérieure\s+([\-0-9\.,]+)\s*°C", txt)),
"humidity_int_pct": _to_int(_grab(r"Hygrométrie Intérieure\s+([0-9]+)\s*%", txt)),
}
astrology = {
"sunrise": lever_values[0] if len(lever_values) >= 1 else None,
"sunset": coucher_values[0] if len(coucher_values) >= 1 else None,
"day_length_hm": _grab(r"Journée\s+([0-9]{1,2}:[0-9]{2})", txt),
"moonrise": lever_values[1] if len(lever_values) >= 2 else None,
"moonset": coucher_values[1] if len(coucher_values) >= 2 else None,
"moon_phase": moon_phase,
"moon_illumination_pct": _to_int(_grab(r"\n([0-9]{1,3})\s*%\s*(?:\n|$)", txt)),
}
stats_block = _extract_block(
txt,
r"Statistiques.*?Aujourd'hui",
[r"À propos de cette station"],
)
temp_ext_vals = _metric_numbers(stats_block, r"Température Extérieure")
heat_vals = _metric_numbers(stats_block, r"Indice de Chaleur")
chill_vals = _metric_numbers(stats_block, r"Refroidissement Éolien")
dew_vals = _metric_numbers(stats_block, r"Point de Rosée")
hum_vals = _metric_numbers(stats_block, r"Hygrométrie Extérieure")
pressure_vals = _metric_numbers(stats_block, r"Pression Atmosphérique")
wind_max_vals = _metric_numbers(stats_block, r"Vent Max")
wind_avg_vals = _metric_numbers(stats_block, r"Vent Moyen")
wind_rms_vals = _metric_numbers(stats_block, r"Valeur Efficace \(RMS\) Vent")
wind_vec_vals = _metric_numbers(stats_block, r"Direction Vecteur Vent")
rain_vals = _metric_numbers(stats_block, r"Précipitations")
rain_rate_vals = _metric_numbers(stats_block, r"Taux de Précipitations")
etp_vals = _metric_numbers(stats_block, r"Evapotranspiration")
uv_vals = _metric_numbers(stats_block, r"Ultra-Violet")
solar_vals = _metric_numbers(stats_block, r"Rayonnement Solaire")
tint_vals = _metric_numbers(stats_block, r"Température Intérieure")
hint_vals = _metric_numbers(stats_block, r"Hygrométrie Intérieure")
stats_today = {
"temp_ext_max_c": temp_ext_vals[0] if len(temp_ext_vals) >= 1 else None,
"temp_ext_min_c": temp_ext_vals[1] if len(temp_ext_vals) >= 2 else None,
"heat_index_max_c": heat_vals[0] if len(heat_vals) >= 1 else None,
"heat_index_min_c": heat_vals[1] if len(heat_vals) >= 2 else None,
"wind_chill_max_c": chill_vals[0] if len(chill_vals) >= 1 else None,
"wind_chill_min_c": chill_vals[1] if len(chill_vals) >= 2 else None,
"dew_point_max_c": dew_vals[0] if len(dew_vals) >= 1 else None,
"dew_point_min_c": dew_vals[1] if len(dew_vals) >= 2 else None,
"humidity_ext_max_pct": hum_vals[0] if len(hum_vals) >= 1 else None,
"humidity_ext_min_pct": hum_vals[1] if len(hum_vals) >= 2 else None,
"pressure_max_mbar": pressure_vals[0] if len(pressure_vals) >= 1 else None,
"pressure_min_mbar": pressure_vals[1] if len(pressure_vals) >= 2 else None,
"wind_max_m_s": wind_max_vals[0] if len(wind_max_vals) >= 1 else None,
"wind_max_dir_deg": _to_int(str(int(wind_max_vals[1]))) if len(wind_max_vals) >= 2 else None,
"wind_avg_m_s": wind_avg_vals[0] if len(wind_avg_vals) >= 1 else None,
"wind_rms_m_s": wind_rms_vals[0] if len(wind_rms_vals) >= 1 else None,
"wind_vector_speed_m_s": wind_vec_vals[0] if len(wind_vec_vals) >= 1 else None,
"wind_vector_dir_deg": _to_int(str(int(wind_vec_vals[1]))) if len(wind_vec_vals) >= 2 else None,
"rain_mm": rain_vals[0] if len(rain_vals) >= 1 else None,
"rain_rate_mm_h": rain_rate_vals[0] if len(rain_rate_vals) >= 1 else None,
"evapotranspiration_mm": etp_vals[0] if len(etp_vals) >= 1 else None,
"uv_max": uv_vals[0] if len(uv_vals) >= 1 else None,
"solar_radiation_max_w_m2": solar_vals[0] if len(solar_vals) >= 1 else None,
"solar_radiation_min_w_m2": solar_vals[1] if len(solar_vals) >= 2 else None,
"temp_int_max_c": tint_vals[0] if len(tint_vals) >= 1 else None,
"temp_int_min_c": tint_vals[1] if len(tint_vals) >= 2 else None,
"humidity_int_max_pct": hint_vals[0] if len(hint_vals) >= 1 else None,
"humidity_int_min_pct": hint_vals[1] if len(hint_vals) >= 2 else None,
}
station_info = {
"hardware": _grab(r"Matériel\s+([^\n]+)", txt),
"latitude_text": _grab(r"Latitude\s+([^\n]+)", txt),
"longitude_text": _grab(r"Longitude\s+([^\n]+)", txt),
"altitude_m": _to_float(_grab(r"Altitude\s+([\-0-9\.,]+)\s+mètres", txt)),
"server_uptime": _grab(r"Serveur en service depuis\s+([^\n]+)", txt),
"weewx_uptime": _grab(r"WeeWX en service depuis\s+([^\n]+)", txt),
"weewx_version": _grab(r"Version de WeeWX\s+([^\n]+)", txt),
"skin": _grab(r"Skin\s+([^\n]+)", txt),
}
return {
"current_extended": current_ext,
"astrology": astrology,
"stats_today": stats_today,
"station_info": station_info,
}
def parse_current_from_rss(xml: str) -> dict:
try:
root = ET.fromstring(xml)
except ET.ParseError:
return {}
channel = root.find("channel")
if channel is None:
return {}
current_item = None
for item in channel.findall("item"):
title = (item.findtext("title") or "").strip()
if "Conditions Météorologiques" in title:
current_item = item
break
if current_item is None:
current_item = channel.find("item")
if current_item is None:
return {}
desc = unescape(current_item.findtext("description") or "")
pub = (current_item.findtext("pubDate") or channel.findtext("pubDate") or "").strip()
def grab(pattern: str) -> str | None:
m = re.search(pattern, desc, re.I)
return m.group(1).strip() if m else None
wind_text = grab(r"Vent:\s*([^;]+)")
wind_speed = _to_float(grab(r"Vent:\s*([\-0-9\.,]+)\s*m/s"))
wind_dir_deg = _to_int(grab(r"Vent:\s*[\-0-9\.,]+\s*m/s\s*de\s*([0-9]{1,3})°"))
return {
"observed_at": pub,
"temperature_ext_c": _to_float(grab(r"Température Extérieure:\s*([\-0-9\.,]+)°C")),
"temperature_int_c": _to_float(grab(r"Température Intérieure:\s*([\-0-9\.,]+)°C")),
"pressure_mbar": _to_float(grab(r"Pression Atmosphérique:\s*([\-0-9\.,]+)\s*mbar")),
"wind": wind_text,
"wind_speed_m_s": wind_speed,
"wind_dir_deg": wind_dir_deg,
"rain_mm": _to_float(grab(r"Précipitations:\s*([\-0-9\.,]+)\s*mm")),
"rain_rate_mm_h": _to_float(grab(r"Taux de Précipitations:\s*([\-0-9\.,]+)\s*mm/h")),
"wind_chill_c": _to_float(grab(r"Refroidissement Éolien:\s*([\-0-9\.,]+)°C")),
"heat_index_c": _to_float(grab(r"Indice de Chaleur:\s*([\-0-9\.,]+)°C")),
"dew_point_c": _to_float(grab(r"Point de Rosée:\s*([\-0-9\.,]+)°C")),
"humidity_ext_pct": _to_int(grab(r"Hygrométrie Extérieure:\s*([0-9]+)%")),
"humidity_int_pct": _to_int(grab(r"Hygrométrie Intérieure:\s*([0-9]+)%")),
}
def parse_daily_summary_from_rss(xml: str) -> dict:
try:
root = ET.fromstring(xml)
except ET.ParseError:
return {}
channel = root.find("channel")
if channel is None:
return {}
summary_item = None
for item in channel.findall("item"):
title = (item.findtext("title") or "").strip()
if "Résumé météorologique quotidien" in title:
summary_item = item
break
if summary_item is None:
return {}
desc = unescape(summary_item.findtext("description") or "")
pub = (summary_item.findtext("pubDate") or channel.findtext("pubDate") or "").strip()
def _v(pattern: str):
m = re.search(pattern, desc, re.I)
return m.group(1).strip() if m else None
def _vt(pattern: str):
m = re.search(pattern, desc, re.I)
if not m:
return None, None
return m.group(1).strip(), m.group(2).strip()
tmin, tmin_t = _vt(r"Min Température Extérieure:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
tmax, tmax_t = _vt(r"Max Température Extérieure:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
ti_min, ti_min_t = _vt(r"Min Température Intérieure:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
ti_max, ti_max_t = _vt(r"Max Température Intérieure:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
pmin, pmin_t = _vt(r"Min Pression Atmosphérique:\s*([\-0-9\.,]+)\s*mbar\s+at\s+([0-9:]{8})")
pmax, pmax_t = _vt(r"Max Pression Atmosphérique:\s*([\-0-9\.,]+)\s*mbar\s+at\s+([0-9:]{8})")
wmax, wmax_t = _vt(r"Max Vent:\s*([\-0-9\.,]+)\s*m/s\s+de\s+([0-9]{1,3})°\s+at\s+([0-9:]{8})")
rrmax, rrmax_t = _vt(r"Max Taux de Précipitations:\s*([\-0-9\.,]+)\s*mm/h\s*;?")
wc_min, wc_min_t = _vt(r"Min Refroidissement Éolien:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
wc_max, wc_max_t = _vt(r"Max Refroidissement Éolien:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
hi_min, hi_min_t = _vt(r"Min Indice de Chaleur:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
hi_max, hi_max_t = _vt(r"Max Indice de Chaleur:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
dp_min, dp_min_t = _vt(r"Min Point de Rosée:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
dp_max, dp_max_t = _vt(r"Max Point de Rosée:\s*([\-0-9\.,]+)°C\s+at\s+([0-9:]{8})")
he_min, he_min_t = _vt(r"Min Hygrométrie Extérieure:\s*([0-9]+)%\s+at\s+([0-9:]{8})")
he_max, he_max_t = _vt(r"Max Hygrométrie Extérieure:\s*([0-9]+)%\s+at\s+([0-9:]{8})")
hi2_min, hi2_min_t = _vt(r"Min Hygrométrie Intérieure:\s*([0-9]+)%\s+at\s+([0-9:]{8})")
hi2_max, hi2_max_t = _vt(r"Max Hygrométrie Intérieure:\s*([0-9]+)%\s+at\s+([0-9:]{8})")
wind_match = re.search(
r"Max Vent:\s*([\-0-9\.,]+)\s*m/s\s+de\s+([0-9]{1,3})°\s+at\s+([0-9:]{8})",
desc,
re.I,
)
wind_max_m_s = _to_float(wind_match.group(1)) if wind_match else None
wind_max_dir_deg = _to_int(wind_match.group(2)) if wind_match else None
wind_max_time = wind_match.group(3) if wind_match else None
rain_mm = _to_float(_v(r"Précipitations:\s*([\-0-9\.,]+)\s*mm"))
rr_match = re.search(r"Max Taux de Précipitations:\s*([\-0-9\.,]+)\s*mm/h", desc, re.I)
rain_rate_mm_h = _to_float(rr_match.group(1)) if rr_match else None
return {
"source": "rss_daily_summary",
"observed_at": pub,
"temp_ext_min_c": _to_float(tmin),
"temp_ext_min_time": tmin_t,
"temp_ext_max_c": _to_float(tmax),
"temp_ext_max_time": tmax_t,
"temp_int_min_c": _to_float(ti_min),
"temp_int_min_time": ti_min_t,
"temp_int_max_c": _to_float(ti_max),
"temp_int_max_time": ti_max_t,
"pressure_min_mbar": _to_float(pmin),
"pressure_min_time": pmin_t,
"pressure_max_mbar": _to_float(pmax),
"pressure_max_time": pmax_t,
"wind_max_m_s": wind_max_m_s,
"wind_max_dir_deg": wind_max_dir_deg,
"wind_max_time": wind_max_time,
"rain_mm": rain_mm,
"rain_rate_mm_h": rain_rate_mm_h,
"wind_chill_min_c": _to_float(wc_min),
"wind_chill_min_time": wc_min_t,
"wind_chill_max_c": _to_float(wc_max),
"wind_chill_max_time": wc_max_t,
"heat_index_min_c": _to_float(hi_min),
"heat_index_min_time": hi_min_t,
"heat_index_max_c": _to_float(hi_max),
"heat_index_max_time": hi_max_t,
"dew_point_min_c": _to_float(dp_min),
"dew_point_min_time": dp_min_t,
"dew_point_max_c": _to_float(dp_max),
"dew_point_max_time": dp_max_t,
"humidity_ext_min_pct": _to_int(he_min),
"humidity_ext_min_time": he_min_t,
"humidity_ext_max_pct": _to_int(he_max),
"humidity_ext_max_time": he_max_t,
"humidity_int_min_pct": _to_int(hi2_min),
"humidity_int_min_time": hi2_min_t,
"humidity_int_max_pct": _to_int(hi2_max),
"humidity_int_max_time": hi2_max_t,
}
def parse_yesterday_from_noaa(noaa_text: str, day: int) -> dict:
lines = noaa_text.splitlines()
target = None
for line in lines:
if re.match(rf"\s*{day:02d}\s+", line) or re.match(rf"\s*{day}\s+", line):
target = line
break
if not target:
return {"error": f"day {day} not found in NOAA monthly file"}
# Columns (weewx summary):
# DAY MEAN HIGH TIME LOW TIME HEAT COOL RAIN AVG_WIND HIGH_WIND TIME DOM_DIR
nums = re.split(r"\s+", target.strip())
if len(nums) < 13:
return {"raw_line": target, "error": "unexpected NOAA line format"}
return {
"raw_line": target.strip(),
"day_of_month": int(nums[0]),
"temp_mean_c": _to_float(nums[1]),
"temp_max_c": _to_float(nums[2]),
"temp_max_time": nums[3],
"temp_min_c": _to_float(nums[4]),
"temp_min_time": nums[5],
"rain_mm": _to_float(nums[8]),
"wind_avg_m_s": _to_float(nums[9]),
"wind_max_m_s": _to_float(nums[10]),
"wind_max_time": nums[11],
"wind_dom_dir_deg": _to_int(nums[12]),
}
def main() -> int:
ap = argparse.ArgumentParser(description="Récupère météo station locale (RSS + NOAA) en JSON")
ap.add_argument("--base", default="http://10.0.0.8:8081/", help="URL de base de la station")
ap.add_argument("--date", help="Date cible NOAA au format YYYY-MM-DD (défaut: veille)")
ap.add_argument("--json", action="store_true", help="Sortie JSON (défaut: oui)")
args = ap.parse_args()
base = args.base.rstrip("/") + "/"
rss_url = urljoin(base, "rss.xml")
station_page_url = base
if args.date:
try:
target_date = datetime.strptime(args.date, "%Y-%m-%d")
except ValueError:
raise SystemExit("Erreur: --date doit être au format YYYY-MM-DD")
else:
target_date = datetime.now() - timedelta(days=1)
noaa_month_url = urljoin(base, f"NOAA/NOAA-{target_date.year}-{target_date.month:02d}.txt")
rss_xml = fetch_text(rss_url)
current = parse_current_from_rss(rss_xml)
rss_stats_today = parse_daily_summary_from_rss(rss_xml)
station_html = fetch_text(station_page_url)
station_extra = parse_station_page(station_html)
if rss_stats_today:
station_extra["stats_today"] = rss_stats_today
else:
station_extra.setdefault("stats_today", {})["source"] = "station_page_fallback"
noaa_text = fetch_text(noaa_month_url)
day_data = parse_yesterday_from_noaa(noaa_text, target_date.day)
day_data["date"] = target_date.strftime("%Y-%m-%d")
out = {
"source": {
"base_url": base,
"rss_url": rss_url,
"station_page_url": station_page_url,
"noaa_month_url": noaa_month_url,
},
"target_date": target_date.strftime("%Y-%m-%d"),
"current": current,
"station": station_extra,
"day_data": day_data,
}
print(json.dumps(out, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())