#!/usr/bin/env python3 """Backfill historique Open-Meteo vers la table SQLite meteoopenmeteo. Script autonome (hors webapp) : - appelle l'API Open-Meteo Archive par tranches de dates - reconstruit les champs journaliers utilises par l'app - fait un UPSERT dans la table `meteoopenmeteo` """ from __future__ import annotations import argparse import json import os import sqlite3 from datetime import date, datetime, timedelta, timezone from pathlib import Path from typing import Any import httpx WMO_LABELS = { 0: "Ensoleillé", 1: "Principalement ensoleillé", 2: "Partiellement nuageux", 3: "Couvert", 45: "Brouillard", 48: "Brouillard givrant", 51: "Bruine légère", 53: "Bruine modérée", 55: "Bruine dense", 61: "Pluie légère", 63: "Pluie modérée", 65: "Pluie forte", 71: "Neige légère", 73: "Neige modérée", 75: "Neige forte", 77: "Grains de neige", 80: "Averses légères", 81: "Averses modérées", 82: "Averses violentes", 85: "Averses de neige", 86: "Averses de neige fortes", 95: "Orage", 96: "Orage avec grêle", 99: "Orage violent", } DAILY_FIELDS = [ "temperature_2m_max", "temperature_2m_min", "precipitation_sum", "wind_speed_10m_max", "weather_code", "relative_humidity_2m_max", "et0_fao_evapotranspiration", ] HOURLY_FIELDS = [ "soil_temperature_0cm", ] def _to_float(value: Any) -> float | None: if value is None: return None try: return float(value) except (TypeError, ValueError): return None def _value_at(values: list[Any], index: int, default: Any = None) -> Any: if index < 0 or index >= len(values): return default return values[index] def _assert_db_writable(db_path: Path) -> None: if not db_path.exists(): raise FileNotFoundError(f"Base introuvable: {db_path}") if not db_path.is_file(): raise RuntimeError(f"Chemin de base invalide (pas un fichier): {db_path}") if not os.access(db_path, os.R_OK): raise PermissionError(f"Pas de lecture sur la base: {db_path}") if not os.access(db_path, os.W_OK): raise PermissionError( f"Pas d'ecriture sur la base: {db_path}. " "Lance le script avec un utilisateur qui a les droits." ) def _parse_iso_date(value: str) -> date: try: return date.fromisoformat(value) except ValueError as exc: raise ValueError(f"Date invalide '{value}' (attendu YYYY-MM-DD)") from exc def _date_chunks(start: date, end: date, chunk_days: int) -> list[tuple[date, date]]: chunks: list[tuple[date, date]] = [] cur = start while cur <= end: chunk_end = min(cur + timedelta(days=chunk_days - 1), end) chunks.append((cur, chunk_end)) cur = chunk_end + timedelta(days=1) return chunks def _daily_soil_average(raw: dict[str, Any]) -> dict[str, float]: hourly = raw.get("hourly", {}) times = hourly.get("time", []) or [] soils = hourly.get("soil_temperature_0cm", []) or [] by_day: dict[str, list[float]] = {} for idx, ts in enumerate(times): soil = _to_float(_value_at(soils, idx)) if soil is None or not isinstance(ts, str) or len(ts) < 10: continue day = ts[:10] by_day.setdefault(day, []).append(soil) return { day: round(sum(vals) / len(vals), 2) for day, vals in by_day.items() if vals } def _fetch_archive_chunk( *, lat: float, lon: float, start_date: date, end_date: date, timezone_name: str, timeout: int, ) -> list[dict[str, Any]]: url = "https://archive-api.open-meteo.com/v1/archive" params: list[tuple[str, Any]] = [ ("latitude", lat), ("longitude", lon), ("start_date", start_date.isoformat()), ("end_date", end_date.isoformat()), ("timezone", timezone_name), ] for field in DAILY_FIELDS: params.append(("daily", field)) for field in HOURLY_FIELDS: params.append(("hourly", field)) r = httpx.get(url, params=params, timeout=timeout) r.raise_for_status() raw = r.json() daily = raw.get("daily", {}) dates = daily.get("time", []) or [] soil_by_day = _daily_soil_average(raw) now_iso = datetime.now(timezone.utc).isoformat() rows: list[dict[str, Any]] = [] for i, iso in enumerate(dates): raw_code = _value_at(daily.get("weather_code", []), i, 0) code = int(raw_code) if raw_code is not None else 0 rows.append( { "date": iso, "t_min": _to_float(_value_at(daily.get("temperature_2m_min", []), i)), "t_max": _to_float(_value_at(daily.get("temperature_2m_max", []), i)), "pluie_mm": _to_float(_value_at(daily.get("precipitation_sum", []), i, 0.0)) or 0.0, "vent_kmh": _to_float(_value_at(daily.get("wind_speed_10m_max", []), i, 0.0)) or 0.0, "wmo": code, "label": WMO_LABELS.get(code, f"Code {code}"), "humidite_moy": _to_float(_value_at(daily.get("relative_humidity_2m_max", []), i)), "sol_0cm": soil_by_day.get(iso), "etp_mm": _to_float(_value_at(daily.get("et0_fao_evapotranspiration", []), i)), "fetched_at": now_iso, } ) return rows def _upsert_row(conn: sqlite3.Connection, row: dict[str, Any]) -> None: conn.execute( """ INSERT INTO meteoopenmeteo ( date, t_min, t_max, pluie_mm, vent_kmh, wmo, label, humidite_moy, sol_0cm, etp_mm, fetched_at ) VALUES ( :date, :t_min, :t_max, :pluie_mm, :vent_kmh, :wmo, :label, :humidite_moy, :sol_0cm, :etp_mm, :fetched_at ) ON CONFLICT(date) DO UPDATE SET t_min=excluded.t_min, t_max=excluded.t_max, pluie_mm=excluded.pluie_mm, vent_kmh=excluded.vent_kmh, wmo=excluded.wmo, label=excluded.label, humidite_moy=excluded.humidite_moy, sol_0cm=excluded.sol_0cm, etp_mm=excluded.etp_mm, fetched_at=excluded.fetched_at """, row, ) def main() -> int: parser = argparse.ArgumentParser( description="Backfill historique Open-Meteo dans la table meteoopenmeteo." ) parser.add_argument("--db", default="data/jardin.db", help="Chemin SQLite (defaut: data/jardin.db)") parser.add_argument("--lat", type=float, default=45.14, help="Latitude") parser.add_argument("--lon", type=float, default=4.12, help="Longitude") parser.add_argument("--start-date", default="2026-01-01", help="Date debut YYYY-MM-DD") parser.add_argument("--end-date", default=date.today().isoformat(), help="Date fin YYYY-MM-DD") parser.add_argument("--chunk-days", type=int, default=31, help="Taille des tranches en jours") parser.add_argument("--timezone", default="Europe/Paris", help="Timezone Open-Meteo") parser.add_argument("--timeout", type=int, default=25, help="Timeout HTTP en secondes") parser.add_argument("--dry-run", action="store_true", help="N ecrit pas en base") args = parser.parse_args() if args.chunk_days < 1: raise ValueError("--chunk-days doit etre >= 1") db_path = Path(args.db).expanduser().resolve() if not args.dry_run: _assert_db_writable(db_path) start = _parse_iso_date(args.start_date) end = _parse_iso_date(args.end_date) today = date.today() if end > today: end = today if end < start: raise ValueError(f"Plage invalide: {start.isoformat()} > {end.isoformat()}") chunks = _date_chunks(start, end, args.chunk_days) all_rows: list[dict[str, Any]] = [] for idx, (chunk_start, chunk_end) in enumerate(chunks, start=1): print(f"[{idx}/{len(chunks)}] Open-Meteo {chunk_start.isoformat()} -> {chunk_end.isoformat()}") rows = _fetch_archive_chunk( lat=args.lat, lon=args.lon, start_date=chunk_start, end_date=chunk_end, timezone_name=args.timezone, timeout=args.timeout, ) all_rows.extend(rows) summary = { "db": str(db_path), "lat": args.lat, "lon": args.lon, "start_date": start.isoformat(), "end_date": end.isoformat(), "chunk_count": len(chunks), "rows_fetched": len(all_rows), "dry_run": args.dry_run, } if args.dry_run: print(json.dumps(summary, ensure_ascii=False, indent=2)) return 0 conn: sqlite3.Connection | None = None try: conn = sqlite3.connect(str(db_path)) with conn: for row in all_rows: _upsert_row(conn, row) finally: if conn is not None: conn.close() print(json.dumps(summary, ensure_ascii=False, indent=2)) print("\nOK: meteoopenmeteo mise a jour.") return 0 if __name__ == "__main__": raise SystemExit(main())