feat(backend): service cache Redis pour identifications

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-22 12:14:31 +01:00
parent 1af5f66855
commit 6ca233d720
5 changed files with 310 additions and 0 deletions

View File

View File

@@ -0,0 +1,144 @@
from __future__ import annotations
from dataclasses import dataclass, asdict
from datetime import date, datetime, timedelta
import math
from pathlib import Path
import pytz
from skyfield.api import load
from skyfield import almanac
TZ = pytz.timezone("Europe/Paris")
SIGN_NAMES = [
"Bélier", "Taureau", "Gémeaux", "Cancer", "Lion", "Vierge",
"Balance", "Scorpion", "Sagittaire", "Capricorne", "Verseau", "Poissons",
]
SIGN_TO_TYPE = {
"Taureau": "Racine", "Vierge": "Racine", "Capricorne": "Racine",
"Cancer": "Feuille", "Scorpion": "Feuille", "Poissons": "Feuille",
"Gémeaux": "Fleur", "Balance": "Fleur", "Verseau": "Fleur",
"Bélier": "Fruit", "Lion": "Fruit", "Sagittaire": "Fruit",
}
@dataclass
class DayInfo:
date: str
phase: str
illumination: float
croissante_decroissante: str
montante_descendante: str
signe: str
type_jour: str
perigee: bool
apogee: bool
noeud_lunaire: bool
def zodiac_sign_from_lon(lon_deg: float) -> str:
return SIGN_NAMES[int(lon_deg // 30) % 12]
def _local_noon(d: date) -> datetime:
return TZ.localize(datetime(d.year, d.month, d.day, 12, 0, 0))
def _compute_perigee_apogee_days(
ts, earth, moon, start: date, end: date
) -> tuple[set[date], set[date]]:
sample_start = datetime.combine(start - timedelta(days=1), datetime.min.time())
sample_end = datetime.combine(
end + timedelta(days=1), datetime.max.time().replace(microsecond=0)
)
samples = []
current = TZ.localize(sample_start)
end_local = TZ.localize(sample_end)
step = timedelta(hours=1)
while current <= end_local:
t = ts.utc(current.astimezone(pytz.utc))
dist_km = earth.at(t).observe(moon).distance().km
samples.append((current.date(), dist_km))
current += step
perigee_days, apogee_days = set(), set()
for i in range(1, len(samples) - 1):
day, dist = samples[i]
if not (start <= day <= end):
continue
prev_dist = samples[i - 1][1]
next_dist = samples[i + 1][1]
if dist < prev_dist and dist < next_dist:
perigee_days.add(day)
if dist > prev_dist and dist > next_dist:
apogee_days.add(day)
return perigee_days, apogee_days
def build_calendar(start: date, end: date) -> list[DayInfo]:
if end < start:
raise ValueError(f"Invalid date range: {start} > {end}")
ts = load.timescale()
eph = load("de421.bsp")
earth, moon, sun = eph["earth"], eph["moon"], eph["sun"]
t0 = ts.utc(start.year, start.month, start.day)
t1 = ts.utc(end.year, end.month, end.day + 1)
f_phase = almanac.moon_phases(eph)
phase_times, phase_events = almanac.find_discrete(t0, t1, f_phase)
phase_by_day = {}
for t, ev in zip(phase_times, phase_events):
local_day = (
t.utc_datetime().replace(tzinfo=pytz.utc).astimezone(TZ).date()
)
phase_by_day[local_day] = [
"Nouvelle Lune", "Premier Quartier", "Pleine Lune", "Dernier Quartier"
][int(ev)]
f_nodes = almanac.moon_nodes(eph)
node_times, _ = almanac.find_discrete(t0, t1, f_nodes)
node_days = set()
for t in node_times:
local_day = (
t.utc_datetime().replace(tzinfo=pytz.utc).astimezone(TZ).date()
)
node_days.add(local_day)
perigee_days, apogee_days = _compute_perigee_apogee_days(
ts, earth, moon, start, end
)
result = []
d = start
while d <= end:
local_noon = _local_noon(d)
t = ts.utc(local_noon.astimezone(pytz.utc))
e = earth.at(t)
v_sun = e.observe(sun).apparent()
v_moon = e.observe(moon).apparent()
sep = v_sun.separation_from(v_moon).radians
illum = (1 - math.cos(sep)) / 2
d2 = d + timedelta(days=1)
local_noon2 = _local_noon(d2)
t2 = ts.utc(local_noon2.astimezone(pytz.utc))
e2 = earth.at(t2)
v_sun2 = e2.observe(sun).apparent()
v_moon2 = e2.observe(moon).apparent()
sep2 = v_sun2.separation_from(v_moon2).radians
illum2 = (1 - math.cos(sep2)) / 2
croissante = "Croissante" if illum2 >= illum else "Décroissante"
dec = v_moon.radec()[1].degrees
dec2 = v_moon2.radec()[1].degrees
montante = "Montante" if dec2 >= dec else "Descendante"
lat, lon, dist = v_moon.ecliptic_latlon()
signe = zodiac_sign_from_lon(lon.degrees % 360.0)
type_jour = SIGN_TO_TYPE[signe]
result.append(
DayInfo(
date=d.isoformat(),
phase=phase_by_day.get(d, ""),
illumination=round(illum * 100, 2),
croissante_decroissante=croissante,
montante_descendante=montante,
signe=signe,
type_jour=type_jour,
perigee=(d in perigee_days),
apogee=(d in apogee_days),
noeud_lunaire=(d in node_days),
)
)
d += timedelta(days=1)
return result

View File

@@ -0,0 +1,127 @@
"""Client Open-Meteo (gratuit, sans clé API)."""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import httpx
CACHE_PATH = Path(os.environ.get("UPLOAD_DIR", "/data")).parent / "meteo_cache.json"
CACHE_TTL_SECONDS = 3 * 3600 # 3h
WMO_LABELS = {
0: "Ensoleillé",
1: "Principalement ensoleillé",
2: "Partiellement nuageux",
3: "Couvert",
45: "Brouillard",
48: "Brouillard givrant",
51: "Bruine légère",
53: "Bruine modérée",
55: "Bruine dense",
61: "Pluie légère",
63: "Pluie modérée",
65: "Pluie forte",
71: "Neige légère",
73: "Neige modérée",
75: "Neige forte",
80: "Averses légères",
81: "Averses modérées",
82: "Averses violentes",
85: "Averses de neige",
95: "Orage",
96: "Orage avec grêle",
99: "Orage violent",
}
WMO_ICONS = {
0: "☀️",
1: "🌤",
2: "",
3: "☁️",
45: "🌫",
48: "🌫",
51: "🌦",
53: "🌦",
55: "🌧",
61: "🌦",
63: "🌧",
65: "🌧",
71: "🌨",
73: "🌨",
75: "❄️",
80: "🌦",
81: "🌧",
82: "",
85: "🌨",
95: "",
96: "",
99: "",
}
def _cache_fresh() -> dict | None:
if not CACHE_PATH.exists():
return None
try:
data = json.loads(CACHE_PATH.read_text())
cached_at = datetime.fromisoformat(
data.get("cached_at", "2000-01-01T00:00:00+00:00")
)
if (datetime.now(timezone.utc) - cached_at).total_seconds() < CACHE_TTL_SECONDS:
return data
except Exception:
pass
return None
def fetch_forecast(
lat: float = 45.14, lon: float = 4.12, days: int = 14
) -> dict[str, Any]:
cached = _cache_fresh()
if cached:
return cached
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,windspeed_10m_max,weathercode",
"timezone": "Europe/Paris",
"forecast_days": min(days, 16),
}
try:
r = httpx.get(url, params=params, timeout=10)
r.raise_for_status()
raw = r.json()
except Exception as e:
return {"error": str(e), "days": []}
daily = raw.get("daily", {})
dates = daily.get("time", [])
result_days = []
for i, d in enumerate(dates):
code = int(daily.get("weathercode", [0] * len(dates))[i] or 0)
result_days.append(
{
"date": d,
"t_max": daily.get("temperature_2m_max", [None] * len(dates))[i],
"t_min": daily.get("temperature_2m_min", [None] * len(dates))[i],
"pluie_mm": daily.get("precipitation_sum", [0] * len(dates))[i] or 0,
"vent_kmh": daily.get("windspeed_10m_max", [0] * len(dates))[i] or 0,
"code": code,
"label": WMO_LABELS.get(code, "Inconnu"),
"icone": WMO_ICONS.get(code, "🌡"),
}
)
data = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"days": result_days,
}
try:
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
CACHE_PATH.write_text(json.dumps(data, ensure_ascii=False))
except Exception:
pass
return data

View File

@@ -0,0 +1,34 @@
import hashlib
import json
import os
from typing import Optional
_client = None
def _get_client():
global _client
if _client is None:
import redis
url = os.environ.get("REDIS_URL", "redis://localhost:6379")
_client = redis.from_url(url, decode_responses=True)
return _client
def cache_key(image_bytes: bytes) -> str:
return f"identify:{hashlib.sha256(image_bytes).hexdigest()}"
def get(image_bytes: bytes) -> Optional[list]:
try:
value = _get_client().get(cache_key(image_bytes))
return json.loads(value) if value else None
except Exception:
return None
def set(image_bytes: bytes, results: list, ttl: int = 604800) -> None:
try:
_get_client().setex(cache_key(image_bytes), ttl, json.dumps(results))
except Exception:
pass # cache indisponible → silencieux

View File

@@ -5,3 +5,8 @@ python-multipart==0.0.12
aiofiles==24.1.0
pytest==8.3.3
httpx==0.28.0
Pillow==11.1.0
skyfield==1.49
pytz==2025.1
numpy==2.2.3
redis==5.2.1