import os import shutil import time import json import tempfile import zipfile from datetime import datetime, timezone from pathlib import Path from typing import Any from fastapi import APIRouter, Depends from fastapi.responses import FileResponse from starlette.background import BackgroundTask from sqlmodel import Session, select from app.database import get_session from app.models.settings import UserSettings from app.config import DATABASE_URL, UPLOAD_DIR router = APIRouter(tags=["réglages"]) _PREV_CPU_USAGE_USEC: int | None = None _PREV_CPU_TS: float | None = None _TEXT_EXTENSIONS = { ".txt", ".md", ".markdown", ".json", ".csv", ".log", ".ini", ".yaml", ".yml", ".xml" } def _read_int_from_paths(paths: list[str]) -> int | None: for path in paths: try: with open(path, "r", encoding="utf-8") as f: raw = f.read().strip().split()[0] return int(raw) except Exception: continue return None def _read_cgroup_cpu_usage_usec() -> int | None: # cgroup v2 try: with open("/sys/fs/cgroup/cpu.stat", "r", encoding="utf-8") as f: for line in f: if line.startswith("usage_usec "): return int(line.split()[1]) except Exception: pass # cgroup v1 ns = _read_int_from_paths(["/sys/fs/cgroup/cpuacct/cpuacct.usage"]) if ns is not None: return ns // 1000 return None def _cpu_quota_cores() -> float | None: # cgroup v2 try: with open("/sys/fs/cgroup/cpu.max", "r", encoding="utf-8") as f: quota, period = f.read().strip().split()[:2] if quota == "max": return float(os.cpu_count() or 1) q, p = int(quota), int(period) if p > 0: return max(q / p, 0.01) except Exception: pass # cgroup v1 quota = _read_int_from_paths(["/sys/fs/cgroup/cpu/cpu.cfs_quota_us"]) period = _read_int_from_paths(["/sys/fs/cgroup/cpu/cpu.cfs_period_us"]) if quota is not None and period is not None and quota > 0 and period > 0: return max(quota / period, 0.01) return float(os.cpu_count() or 1) def _memory_stats() -> dict[str, Any]: used = _read_int_from_paths( [ "/sys/fs/cgroup/memory.current", # cgroup v2 "/sys/fs/cgroup/memory/memory.usage_in_bytes", # cgroup v1 ] ) limit = _read_int_from_paths( [ "/sys/fs/cgroup/memory.max", # cgroup v2 "/sys/fs/cgroup/memory/memory.limit_in_bytes", # cgroup v1 ] ) # Certaines limites cgroup valent "max" ou des sentinelles tres grandes. if limit is not None and limit >= 9_000_000_000_000_000_000: limit = None pct = None if used is not None and limit and limit > 0: pct = round((used / limit) * 100, 1) return {"used_bytes": used, "limit_bytes": limit, "used_pct": pct} def _disk_stats() -> dict[str, Any]: target = "/data" if os.path.isdir("/data") else "/" total, used, free = shutil.disk_usage(target) uploads_size = None if os.path.isdir(UPLOAD_DIR): try: uploads_size = sum( os.path.getsize(os.path.join(root, name)) for root, _, files in os.walk(UPLOAD_DIR) for name in files ) except Exception: uploads_size = None return { "path": target, "total_bytes": total, "used_bytes": used, "free_bytes": free, "used_pct": round((used / total) * 100, 1) if total else None, "uploads_bytes": uploads_size, } def _safe_remove(path: str) -> None: try: os.remove(path) except OSError: pass def _resolve_sqlite_db_path() -> Path | None: prefix = "sqlite:///" if not DATABASE_URL.startswith(prefix): return None raw = DATABASE_URL[len(prefix):] if not raw: return None db_path = Path(raw) if db_path.is_absolute(): return db_path return (Path.cwd() / db_path).resolve() def _zip_directory(zipf: zipfile.ZipFile, source_dir: Path, arc_prefix: str) -> int: count = 0 if not source_dir.is_dir(): return count for root, _, files in os.walk(source_dir): root_path = Path(root) for name in files: file_path = root_path / name if not file_path.is_file(): continue rel = file_path.relative_to(source_dir) arcname = str(Path(arc_prefix) / rel) zipf.write(file_path, arcname=arcname) count += 1 return count def _zip_data_text_files( zipf: zipfile.ZipFile, data_root: Path, db_path: Path | None, uploads_dir: Path, ) -> int: count = 0 if not data_root.is_dir(): return count for root, _, files in os.walk(data_root): root_path = Path(root) for name in files: file_path = root_path / name if db_path and file_path == db_path: continue if uploads_dir in file_path.parents: continue if file_path.suffix.lower() not in _TEXT_EXTENSIONS: continue rel = file_path.relative_to(data_root) zipf.write(file_path, arcname=str(Path("data_text") / rel)) count += 1 return count @router.get("/settings") def get_settings(session: Session = Depends(get_session)): rows = session.exec(select(UserSettings)).all() return {r.cle: r.valeur for r in rows} @router.put("/settings") def update_settings(data: dict, session: Session = Depends(get_session)): for cle, valeur in data.items(): row = session.exec(select(UserSettings).where(UserSettings.cle == cle)).first() if row: row.valeur = str(valeur) else: row = UserSettings(cle=cle, valeur=str(valeur)) session.add(row) session.commit() return {"ok": True} @router.get("/settings/debug/system") def get_debug_system_stats() -> dict[str, Any]: """Stats runtime du conteneur (utile pour affichage debug UI).""" global _PREV_CPU_USAGE_USEC, _PREV_CPU_TS now = time.monotonic() usage_usec = _read_cgroup_cpu_usage_usec() quota_cores = _cpu_quota_cores() cpu_pct = None if usage_usec is not None and _PREV_CPU_USAGE_USEC is not None and _PREV_CPU_TS is not None: delta_usage = usage_usec - _PREV_CPU_USAGE_USEC delta_time_usec = (now - _PREV_CPU_TS) * 1_000_000 if delta_time_usec > 0 and quota_cores and quota_cores > 0: cpu_pct = round((delta_usage / (delta_time_usec * quota_cores)) * 100, 1) _PREV_CPU_USAGE_USEC = usage_usec _PREV_CPU_TS = now return { "source": "container-cgroup", "cpu": { "usage_usec_total": usage_usec, "quota_cores": quota_cores, "used_pct": cpu_pct, }, "memory": _memory_stats(), "disk": _disk_stats(), } @router.get("/settings/backup/download") def download_backup_zip() -> FileResponse: now = datetime.now(timezone.utc) ts = now.strftime("%Y%m%d_%H%M%S") db_path = _resolve_sqlite_db_path() uploads_dir = Path(UPLOAD_DIR).resolve() data_root = db_path.parent if db_path else uploads_dir.parent fd, tmp_zip_path = tempfile.mkstemp(prefix=f"jardin_backup_{ts}_", suffix=".zip") os.close(fd) tmp_zip = Path(tmp_zip_path) stats = { "database_files": 0, "upload_files": 0, "text_files": 0, } with zipfile.ZipFile(tmp_zip, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=6) as zipf: if db_path and db_path.is_file(): zipf.write(db_path, arcname=f"db/{db_path.name}") stats["database_files"] = 1 stats["upload_files"] = _zip_directory(zipf, uploads_dir, "uploads") stats["text_files"] = _zip_data_text_files(zipf, data_root, db_path, uploads_dir) manifest = { "generated_at_utc": now.isoformat(), "database_url": DATABASE_URL, "paths": { "database_path": str(db_path) if db_path else None, "uploads_path": str(uploads_dir), "data_root": str(data_root), }, "included": stats, "text_extensions": sorted(_TEXT_EXTENSIONS), } zipf.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2)) download_name = f"jardin_backup_{ts}.zip" return FileResponse( path=str(tmp_zip), media_type="application/zip", filename=download_name, background=BackgroundTask(_safe_remove, str(tmp_zip)), )