#!/usr/bin/env python3 import argparse import json from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlencode import openmeteo_requests import requests_cache from retry_requests import retry WMO_LABELS = { 0: "Clair", 1: "Plutôt clair", 2: "Partiellement nuageux", 3: "Couvert", 45: "Brouillard", 48: "Brouillard givrant", 51: "Bruine légère", 53: "Bruine modérée", 55: "Bruine dense", 61: "Pluie légère", 63: "Pluie modérée", 65: "Pluie forte", 71: "Neige légère", 73: "Neige modérée", 75: "Neige forte", 80: "Averses légères", 81: "Averses modérées", 82: "Averses fortes", 95: "Orage", } HOURLY_FIELDS = [ "temperature_2m", "weather_code", "cloud_cover", "evapotranspiration", "precipitation", "precipitation_probability", "rain", "snowfall", "wind_speed_10m", "relative_humidity_2m", "wind_direction_10m", "soil_temperature_0cm", "soil_temperature_6cm", "soil_moisture_1_to_3cm", "soil_moisture_3_to_9cm", "sunshine_duration", "freezing_level_height", ] CURRENT_FIELDS = [ "temperature_2m", "is_day", "snowfall", "showers", "rain", "precipitation", "weather_code", "cloud_cover", "relative_humidity_2m", "apparent_temperature", ] def build_url(lat: float, lon: float, days: int, past_days: int, tz: str) -> str: params = { "latitude": lat, "longitude": lon, "hourly": ",".join(HOURLY_FIELDS), "current": ",".join(CURRENT_FIELDS), "past_days": past_days, "forecast_days": days, "timezone": tz, } return f"https://api.open-meteo.com/v1/forecast?{urlencode(params)}" def _unit_of(variable) -> str: unit_fn = getattr(variable, "Unit", None) if callable(unit_fn): return unit_fn() or "" return "" def _to_json_safe(value): if isinstance(value, bytes): return value.decode("utf-8", errors="replace") return value def _iso_times(start_s: int, end_s: int, interval_s: int, utc_offset_s: int) -> list[str]: out: list[str] = [] current = start_s while current < end_s: local_epoch = current + utc_offset_s dt = datetime.fromtimestamp(local_epoch, tz=timezone.utc) out.append(dt.strftime("%Y-%m-%dT%H:%M")) current += interval_s return out def fetch_data(lat: float, lon: float, days: int, past_days: int, tz: str) -> dict: cache_session = requests_cache.CachedSession(".cache", expire_after=3600) retry_session = retry(cache_session, retries=5, backoff_factor=0.2) client = openmeteo_requests.Client(session=retry_session) params = { "latitude": lat, "longitude": lon, "hourly": HOURLY_FIELDS, "current": CURRENT_FIELDS, "past_days": past_days, "timezone": tz, "forecast_days": days, } responses = client.weather_api("https://api.open-meteo.com/v1/forecast", params=params) response = responses[0] hourly = response.Hourly() hourly_time = _iso_times( hourly.Time(), hourly.TimeEnd(), hourly.Interval(), response.UtcOffsetSeconds(), ) hourly_payload: dict[str, list] = {"time": hourly_time} hourly_units: dict[str, str] = {} for idx, field in enumerate(HOURLY_FIELDS): variable = hourly.Variables(idx) hourly_payload[field] = variable.ValuesAsNumpy().tolist() hourly_units[field] = _unit_of(variable) current = response.Current() current_payload: dict[str, float | int | None] = {} current_units: dict[str, str] = {} for idx, field in enumerate(CURRENT_FIELDS): variable = current.Variables(idx) current_payload[field] = variable.Value() current_units[field] = _unit_of(variable) current_payload["time"] = datetime.fromtimestamp( current.Time() + response.UtcOffsetSeconds(), tz=timezone.utc, ).strftime("%Y-%m-%dT%H:%M") metadata = { "latitude": response.Latitude(), "longitude": response.Longitude(), "elevation": response.Elevation(), "timezone": _to_json_safe(response.Timezone()), "timezone_abbr": _to_json_safe(response.TimezoneAbbreviation()), "utc_offset_seconds": response.UtcOffsetSeconds(), } return { "hourly": hourly_payload, "hourly_units": hourly_units, "current": current_payload, "current_units": current_units, "metadata": metadata, } def _clean(vals): return [v for v in vals if v is not None] def _avg(vals, ndigits=1): c = _clean(vals) return round(sum(c) / len(c), ndigits) if c else None def _sum(vals, ndigits=1): c = _clean(vals) return round(sum(c), ndigits) if c else 0.0 def summarize(data: dict) -> list[dict]: h = data["hourly"] days = defaultdict(lambda: defaultdict(list)) for i, iso in enumerate(h["time"]): day = iso.split("T", 1)[0] for k in HOURLY_FIELDS: days[day][k].append(h[k][i]) out = [] for day in sorted(days.keys()): d = days[day] codes = d["weather_code"] dom_code = max(set(codes), key=codes.count) tvals = _clean(d["temperature_2m"]) out.append({ "date": day, "t_min": min(tvals) if tvals else None, "t_max": max(tvals) if tvals else None, "pluie_total_mm": _sum(d["precipitation"], 1), "pluie_mm": _sum(d["rain"], 1), "neige_cm": _sum(d["snowfall"], 1), "proba_pluie_max": max(_clean(d["precipitation_probability"])) if _clean(d["precipitation_probability"]) else None, "humidite_moy": _avg(d["relative_humidity_2m"], 0), "vent_moy_kmh": _avg(d["wind_speed_10m"], 1), "vent_dir_moy_deg": _avg(d["wind_direction_10m"], 0), "nuages_moy": _avg(d["cloud_cover"], 0), "sol_0cm_moy": _avg(d["soil_temperature_0cm"], 1), "sol_6cm_moy": _avg(d["soil_temperature_6cm"], 1), "hum_sol_1_3_moy": _avg(d["soil_moisture_1_to_3cm"], 3), "hum_sol_3_9_moy": _avg(d["soil_moisture_3_to_9cm"], 3), "ensoleillement_h": round((_sum(d["sunshine_duration"], 0)) / 3600, 1), "isotherme_0m_moy": _avg(d["freezing_level_height"], 0), "etp_mm": _sum(d["evapotranspiration"], 2), "wmo": dom_code, "meteo": WMO_LABELS.get(dom_code, f"Code {dom_code}"), }) return out def _fmt(v, spec): if v is None: return " n/a" return format(v, spec) def print_table(rows: list[dict], units: dict[str, str]) -> None: t_unit = units.get("temperature_2m", "°C") precip_unit = units.get("precipitation", "mm") rain_unit = units.get("rain", "mm") snow_unit = units.get("snowfall", "cm") pprob_unit = units.get("precipitation_probability", "%") wind_unit = units.get("wind_speed_10m", "km/h") wdir_unit = units.get("wind_direction_10m", "°") hum_unit = units.get("relative_humidity_2m", "%") cloud_unit = units.get("cloud_cover", "%") soil0_unit = units.get("soil_temperature_0cm", "°C") soil6_unit = units.get("soil_temperature_6cm", "°C") soilm13_unit = units.get("soil_moisture_1_to_3cm", "m³/m³") soilm39_unit = units.get("soil_moisture_3_to_9cm", "m³/m³") etp_unit = units.get("evapotranspiration", "mm") freeze_unit = units.get("freezing_level_height", "m") print( f"{'Date':<10} | {'Tmin':>6} | {'Tmax':>6} | {'Precip':>7} | {'Rain':>6} | {'Snow':>6} | " f"{'Pmax':>5} | {'Hum':>5} | {'Wind':>6} | {'Dir':>4} | {'Cloud':>5} | " f"{'Soil0':>6} | {'Soil6':>6} | {'SM1-3':>7} | {'SM3-9':>7} | {'Sun(h)':>6} | " f"{'ETP':>6} | {'Iso0':>6} | Meteo" ) print( f"{'':<10} | {t_unit:>6} | {t_unit:>6} | {precip_unit:>7} | {rain_unit:>6} | {snow_unit:>6} | " f"{pprob_unit:>5} | {hum_unit:>5} | {wind_unit:>6} | {wdir_unit:>4} | {cloud_unit:>5} | " f"{soil0_unit:>6} | {soil6_unit:>6} | {soilm13_unit:>7} | {soilm39_unit:>7} | {'h':>6} | " f"{etp_unit:>6} | {freeze_unit:>6} |" ) print("-" * 245) for r in rows: print( f"{r['date']:<10} | {_fmt(r['t_min'], '>6.1f')} | {_fmt(r['t_max'], '>6.1f')} | {_fmt(r['pluie_total_mm'], '>7.1f')} | " f"{_fmt(r['pluie_mm'], '>6.1f')} | {_fmt(r['neige_cm'], '>6.1f')} | {_fmt(r['proba_pluie_max'], '>5.0f')} | " f"{_fmt(r['humidite_moy'], '>5.0f')} | {_fmt(r['vent_moy_kmh'], '>6.1f')} | {_fmt(r['vent_dir_moy_deg'], '>4.0f')} | " f"{_fmt(r['nuages_moy'], '>5.0f')} | {_fmt(r['sol_0cm_moy'], '>6.1f')} | {_fmt(r['sol_6cm_moy'], '>6.1f')} | " f"{_fmt(r['hum_sol_1_3_moy'], '>7.3f')} | {_fmt(r['hum_sol_3_9_moy'], '>7.3f')} | {_fmt(r['ensoleillement_h'], '>6.1f')} | " f"{_fmt(r['etp_mm'], '>6.2f')} | {_fmt(r['isotherme_0m_moy'], '>6.0f')} | {r['meteo']}" ) def print_current(current: dict, current_units: dict) -> None: print("\nCurrent") print("-------") order = [ "time", "temperature_2m", "apparent_temperature", "is_day", "weather_code", "cloud_cover", "relative_humidity_2m", "precipitation", "rain", "showers", "snowfall", ] for key in order: if key not in current: continue unit = current_units.get(key, "") suffix = f" {unit}" if unit else "" print(f"{key}: {current[key]}{suffix}") def main() -> int: ap = argparse.ArgumentParser(description="Résumé jardinier Open-Meteo (journalier)") ap.add_argument("--lat", type=float, default=45.1412) ap.add_argument("--lon", type=float, default=4.0736) ap.add_argument("--days", type=int, default=14) ap.add_argument("--past-days", type=int, default=7) ap.add_argument("--timezone", default="auto") ap.add_argument("--json", action="store_true", help="Sortie JSON") ap.add_argument("--json-out", help="Chemin de fichier JSON de sortie") args = ap.parse_args() url = build_url(args.lat, args.lon, args.days, args.past_days, args.timezone) data = fetch_data(args.lat, args.lon, args.days, args.past_days, args.timezone) rows = summarize(data) units = data.get("hourly_units", {}) payload = { "url": url, "metadata": data.get("metadata", {}), "units": units, "current_units": data.get("current_units", {}), "current": data.get("current", {}), "days": rows, } if args.json_out: out_path = Path(args.json_out) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") print(f"JSON écrit: {out_path}") if args.json: print(json.dumps(payload, ensure_ascii=False, indent=2)) else: print(f"Source: {url}") print(f"Location: {data.get('metadata', {}).get('latitude')}N {data.get('metadata', {}).get('longitude')}E | Elevation: {data.get('metadata', {}).get('elevation')} m") print(f"Timezone: {data.get('metadata', {}).get('timezone')} ({data.get('metadata', {}).get('timezone_abbr')})") print_current(data.get("current", {}), data.get("current_units", {})) print_table(rows, units) return 0 if __name__ == "__main__": raise SystemExit(main())