Files
jardin/prevision meteo/open_meteo_garden_forecast.py
2026-02-22 15:05:40 +01:00

302 lines
11 KiB
Python

#!/usr/bin/env python3
import argparse
import json
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlencode
import openmeteo_requests
import requests_cache
from retry_requests import retry
WMO_LABELS = {
0: "Clair", 1: "Plutôt clair", 2: "Partiellement nuageux", 3: "Couvert",
45: "Brouillard", 48: "Brouillard givrant", 51: "Bruine légère", 53: "Bruine modérée",
55: "Bruine dense", 61: "Pluie légère", 63: "Pluie modérée", 65: "Pluie forte",
71: "Neige légère", 73: "Neige modérée", 75: "Neige forte", 80: "Averses légères",
81: "Averses modérées", 82: "Averses fortes", 95: "Orage",
}
HOURLY_FIELDS = [
"temperature_2m", "weather_code", "cloud_cover", "evapotranspiration", "precipitation",
"precipitation_probability", "rain", "snowfall", "wind_speed_10m", "relative_humidity_2m",
"wind_direction_10m", "soil_temperature_0cm", "soil_temperature_6cm",
"soil_moisture_1_to_3cm", "soil_moisture_3_to_9cm", "sunshine_duration", "freezing_level_height",
]
CURRENT_FIELDS = [
"temperature_2m", "is_day", "snowfall", "showers", "rain", "precipitation",
"weather_code", "cloud_cover", "relative_humidity_2m", "apparent_temperature",
]
def build_url(lat: float, lon: float, days: int, past_days: int, tz: str) -> str:
params = {
"latitude": lat,
"longitude": lon,
"hourly": ",".join(HOURLY_FIELDS),
"current": ",".join(CURRENT_FIELDS),
"past_days": past_days,
"forecast_days": days,
"timezone": tz,
}
return f"https://api.open-meteo.com/v1/forecast?{urlencode(params)}"
def _unit_of(variable) -> str:
unit_fn = getattr(variable, "Unit", None)
if callable(unit_fn):
return unit_fn() or ""
return ""
def _to_json_safe(value):
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
return value
def _iso_times(start_s: int, end_s: int, interval_s: int, utc_offset_s: int) -> list[str]:
out: list[str] = []
current = start_s
while current < end_s:
local_epoch = current + utc_offset_s
dt = datetime.fromtimestamp(local_epoch, tz=timezone.utc)
out.append(dt.strftime("%Y-%m-%dT%H:%M"))
current += interval_s
return out
def fetch_data(lat: float, lon: float, days: int, past_days: int, tz: str) -> dict:
cache_session = requests_cache.CachedSession(".cache", expire_after=3600)
retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
client = openmeteo_requests.Client(session=retry_session)
params = {
"latitude": lat,
"longitude": lon,
"hourly": HOURLY_FIELDS,
"current": CURRENT_FIELDS,
"past_days": past_days,
"timezone": tz,
"forecast_days": days,
}
responses = client.weather_api("https://api.open-meteo.com/v1/forecast", params=params)
response = responses[0]
hourly = response.Hourly()
hourly_time = _iso_times(
hourly.Time(),
hourly.TimeEnd(),
hourly.Interval(),
response.UtcOffsetSeconds(),
)
hourly_payload: dict[str, list] = {"time": hourly_time}
hourly_units: dict[str, str] = {}
for idx, field in enumerate(HOURLY_FIELDS):
variable = hourly.Variables(idx)
hourly_payload[field] = variable.ValuesAsNumpy().tolist()
hourly_units[field] = _unit_of(variable)
current = response.Current()
current_payload: dict[str, float | int | None] = {}
current_units: dict[str, str] = {}
for idx, field in enumerate(CURRENT_FIELDS):
variable = current.Variables(idx)
current_payload[field] = variable.Value()
current_units[field] = _unit_of(variable)
current_payload["time"] = datetime.fromtimestamp(
current.Time() + response.UtcOffsetSeconds(),
tz=timezone.utc,
).strftime("%Y-%m-%dT%H:%M")
metadata = {
"latitude": response.Latitude(),
"longitude": response.Longitude(),
"elevation": response.Elevation(),
"timezone": _to_json_safe(response.Timezone()),
"timezone_abbr": _to_json_safe(response.TimezoneAbbreviation()),
"utc_offset_seconds": response.UtcOffsetSeconds(),
}
return {
"hourly": hourly_payload,
"hourly_units": hourly_units,
"current": current_payload,
"current_units": current_units,
"metadata": metadata,
}
def _clean(vals):
return [v for v in vals if v is not None]
def _avg(vals, ndigits=1):
c = _clean(vals)
return round(sum(c) / len(c), ndigits) if c else None
def _sum(vals, ndigits=1):
c = _clean(vals)
return round(sum(c), ndigits) if c else 0.0
def summarize(data: dict) -> list[dict]:
h = data["hourly"]
days = defaultdict(lambda: defaultdict(list))
for i, iso in enumerate(h["time"]):
day = iso.split("T", 1)[0]
for k in HOURLY_FIELDS:
days[day][k].append(h[k][i])
out = []
for day in sorted(days.keys()):
d = days[day]
codes = d["weather_code"]
dom_code = max(set(codes), key=codes.count)
tvals = _clean(d["temperature_2m"])
out.append({
"date": day,
"t_min": min(tvals) if tvals else None,
"t_max": max(tvals) if tvals else None,
"pluie_total_mm": _sum(d["precipitation"], 1),
"pluie_mm": _sum(d["rain"], 1),
"neige_cm": _sum(d["snowfall"], 1),
"proba_pluie_max": max(_clean(d["precipitation_probability"])) if _clean(d["precipitation_probability"]) else None,
"humidite_moy": _avg(d["relative_humidity_2m"], 0),
"vent_moy_kmh": _avg(d["wind_speed_10m"], 1),
"vent_dir_moy_deg": _avg(d["wind_direction_10m"], 0),
"nuages_moy": _avg(d["cloud_cover"], 0),
"sol_0cm_moy": _avg(d["soil_temperature_0cm"], 1),
"sol_6cm_moy": _avg(d["soil_temperature_6cm"], 1),
"hum_sol_1_3_moy": _avg(d["soil_moisture_1_to_3cm"], 3),
"hum_sol_3_9_moy": _avg(d["soil_moisture_3_to_9cm"], 3),
"ensoleillement_h": round((_sum(d["sunshine_duration"], 0)) / 3600, 1),
"isotherme_0m_moy": _avg(d["freezing_level_height"], 0),
"etp_mm": _sum(d["evapotranspiration"], 2),
"wmo": dom_code,
"meteo": WMO_LABELS.get(dom_code, f"Code {dom_code}"),
})
return out
def _fmt(v, spec):
if v is None:
return " n/a"
return format(v, spec)
def print_table(rows: list[dict], units: dict[str, str]) -> None:
t_unit = units.get("temperature_2m", "°C")
precip_unit = units.get("precipitation", "mm")
rain_unit = units.get("rain", "mm")
snow_unit = units.get("snowfall", "cm")
pprob_unit = units.get("precipitation_probability", "%")
wind_unit = units.get("wind_speed_10m", "km/h")
wdir_unit = units.get("wind_direction_10m", "°")
hum_unit = units.get("relative_humidity_2m", "%")
cloud_unit = units.get("cloud_cover", "%")
soil0_unit = units.get("soil_temperature_0cm", "°C")
soil6_unit = units.get("soil_temperature_6cm", "°C")
soilm13_unit = units.get("soil_moisture_1_to_3cm", "m³/m³")
soilm39_unit = units.get("soil_moisture_3_to_9cm", "m³/m³")
etp_unit = units.get("evapotranspiration", "mm")
freeze_unit = units.get("freezing_level_height", "m")
print(
f"{'Date':<10} | {'Tmin':>6} | {'Tmax':>6} | {'Precip':>7} | {'Rain':>6} | {'Snow':>6} | "
f"{'Pmax':>5} | {'Hum':>5} | {'Wind':>6} | {'Dir':>4} | {'Cloud':>5} | "
f"{'Soil0':>6} | {'Soil6':>6} | {'SM1-3':>7} | {'SM3-9':>7} | {'Sun(h)':>6} | "
f"{'ETP':>6} | {'Iso0':>6} | Meteo"
)
print(
f"{'':<10} | {t_unit:>6} | {t_unit:>6} | {precip_unit:>7} | {rain_unit:>6} | {snow_unit:>6} | "
f"{pprob_unit:>5} | {hum_unit:>5} | {wind_unit:>6} | {wdir_unit:>4} | {cloud_unit:>5} | "
f"{soil0_unit:>6} | {soil6_unit:>6} | {soilm13_unit:>7} | {soilm39_unit:>7} | {'h':>6} | "
f"{etp_unit:>6} | {freeze_unit:>6} |"
)
print("-" * 245)
for r in rows:
print(
f"{r['date']:<10} | {_fmt(r['t_min'], '>6.1f')} | {_fmt(r['t_max'], '>6.1f')} | {_fmt(r['pluie_total_mm'], '>7.1f')} | "
f"{_fmt(r['pluie_mm'], '>6.1f')} | {_fmt(r['neige_cm'], '>6.1f')} | {_fmt(r['proba_pluie_max'], '>5.0f')} | "
f"{_fmt(r['humidite_moy'], '>5.0f')} | {_fmt(r['vent_moy_kmh'], '>6.1f')} | {_fmt(r['vent_dir_moy_deg'], '>4.0f')} | "
f"{_fmt(r['nuages_moy'], '>5.0f')} | {_fmt(r['sol_0cm_moy'], '>6.1f')} | {_fmt(r['sol_6cm_moy'], '>6.1f')} | "
f"{_fmt(r['hum_sol_1_3_moy'], '>7.3f')} | {_fmt(r['hum_sol_3_9_moy'], '>7.3f')} | {_fmt(r['ensoleillement_h'], '>6.1f')} | "
f"{_fmt(r['etp_mm'], '>6.2f')} | {_fmt(r['isotherme_0m_moy'], '>6.0f')} | {r['meteo']}"
)
def print_current(current: dict, current_units: dict) -> None:
print("\nCurrent")
print("-------")
order = [
"time",
"temperature_2m",
"apparent_temperature",
"is_day",
"weather_code",
"cloud_cover",
"relative_humidity_2m",
"precipitation",
"rain",
"showers",
"snowfall",
]
for key in order:
if key not in current:
continue
unit = current_units.get(key, "")
suffix = f" {unit}" if unit else ""
print(f"{key}: {current[key]}{suffix}")
def main() -> int:
ap = argparse.ArgumentParser(description="Résumé jardinier Open-Meteo (journalier)")
ap.add_argument("--lat", type=float, default=45.1412)
ap.add_argument("--lon", type=float, default=4.0736)
ap.add_argument("--days", type=int, default=14)
ap.add_argument("--past-days", type=int, default=7)
ap.add_argument("--timezone", default="auto")
ap.add_argument("--json", action="store_true", help="Sortie JSON")
ap.add_argument("--json-out", help="Chemin de fichier JSON de sortie")
args = ap.parse_args()
url = build_url(args.lat, args.lon, args.days, args.past_days, args.timezone)
data = fetch_data(args.lat, args.lon, args.days, args.past_days, args.timezone)
rows = summarize(data)
units = data.get("hourly_units", {})
payload = {
"url": url,
"metadata": data.get("metadata", {}),
"units": units,
"current_units": data.get("current_units", {}),
"current": data.get("current", {}),
"days": rows,
}
if args.json_out:
out_path = Path(args.json_out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"JSON écrit: {out_path}")
if args.json:
print(json.dumps(payload, ensure_ascii=False, indent=2))
else:
print(f"Source: {url}")
print(f"Location: {data.get('metadata', {}).get('latitude')}N {data.get('metadata', {}).get('longitude')}E | Elevation: {data.get('metadata', {}).get('elevation')} m")
print(f"Timezone: {data.get('metadata', {}).get('timezone')} ({data.get('metadata', {}).get('timezone_abbr')})")
print_current(data.get("current", {}), data.get("current_units", {}))
print_table(rows, units)
return 0
if __name__ == "__main__":
raise SystemExit(main())