import os import shutil import time import json import tempfile import zipfile from datetime import datetime, timezone from pathlib import Path from typing import Any from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile from fastapi.responses import FileResponse from starlette.background import BackgroundTask from sqlalchemy import text from sqlmodel import Session, select from app.database import get_session from app.models.settings import UserSettings from app.config import DATABASE_URL, UPLOAD_DIR router = APIRouter(tags=["réglages"]) _PREV_CPU_USAGE_USEC: int | None = None _PREV_CPU_TS: float | None = None _TEXT_EXTENSIONS = { ".txt", ".md", ".markdown", ".json", ".csv", ".log", ".ini", ".yaml", ".yml", ".xml" } def _read_int_from_paths(paths: list[str]) -> int | None: for path in paths: try: with open(path, "r", encoding="utf-8") as f: raw = f.read().strip().split()[0] return int(raw) except Exception: continue return None def _read_cgroup_cpu_usage_usec() -> int | None: # cgroup v2 try: with open("/sys/fs/cgroup/cpu.stat", "r", encoding="utf-8") as f: for line in f: if line.startswith("usage_usec "): return int(line.split()[1]) except Exception: pass # cgroup v1 ns = _read_int_from_paths(["/sys/fs/cgroup/cpuacct/cpuacct.usage"]) if ns is not None: return ns // 1000 return None def _cpu_quota_cores() -> float | None: # cgroup v2 try: with open("/sys/fs/cgroup/cpu.max", "r", encoding="utf-8") as f: quota, period = f.read().strip().split()[:2] if quota == "max": return float(os.cpu_count() or 1) q, p = int(quota), int(period) if p > 0: return max(q / p, 0.01) except Exception: pass # cgroup v1 quota = _read_int_from_paths(["/sys/fs/cgroup/cpu/cpu.cfs_quota_us"]) period = _read_int_from_paths(["/sys/fs/cgroup/cpu/cpu.cfs_period_us"]) if quota is not None and period is not None and quota > 0 and period > 0: return max(quota / period, 0.01) return float(os.cpu_count() or 1) def _memory_stats() -> dict[str, Any]: used = _read_int_from_paths( [ "/sys/fs/cgroup/memory.current", # cgroup v2 "/sys/fs/cgroup/memory/memory.usage_in_bytes", # cgroup v1 ] ) limit = _read_int_from_paths( [ "/sys/fs/cgroup/memory.max", # cgroup v2 "/sys/fs/cgroup/memory/memory.limit_in_bytes", # cgroup v1 ] ) # Certaines limites cgroup valent "max" ou des sentinelles tres grandes. if limit is not None and limit >= 9_000_000_000_000_000_000: limit = None pct = None if used is not None and limit and limit > 0: pct = round((used / limit) * 100, 1) return {"used_bytes": used, "limit_bytes": limit, "used_pct": pct} def _disk_stats() -> dict[str, Any]: target = "/data" if os.path.isdir("/data") else "/" total, used, free = shutil.disk_usage(target) uploads_size = None if os.path.isdir(UPLOAD_DIR): try: uploads_size = sum( os.path.getsize(os.path.join(root, name)) for root, _, files in os.walk(UPLOAD_DIR) for name in files ) except Exception: uploads_size = None return { "path": target, "total_bytes": total, "used_bytes": used, "free_bytes": free, "used_pct": round((used / total) * 100, 1) if total else None, "uploads_bytes": uploads_size, } def _safe_remove(path: str) -> None: try: os.remove(path) except OSError: pass def _resolve_sqlite_db_path() -> Path | None: prefix = "sqlite:///" if not DATABASE_URL.startswith(prefix): return None raw = DATABASE_URL[len(prefix):] if not raw: return None db_path = Path(raw) if db_path.is_absolute(): return db_path return (Path.cwd() / db_path).resolve() def _zip_directory(zipf: zipfile.ZipFile, source_dir: Path, arc_prefix: str) -> int: count = 0 if not source_dir.is_dir(): return count for root, _, files in os.walk(source_dir): root_path = Path(root) for name in files: file_path = root_path / name if not file_path.is_file(): continue rel = file_path.relative_to(source_dir) arcname = str(Path(arc_prefix) / rel) zipf.write(file_path, arcname=arcname) count += 1 return count def _zip_data_text_files( zipf: zipfile.ZipFile, data_root: Path, db_path: Path | None, uploads_dir: Path, ) -> int: count = 0 if not data_root.is_dir(): return count for root, _, files in os.walk(data_root): root_path = Path(root) for name in files: file_path = root_path / name if db_path and file_path == db_path: continue if uploads_dir in file_path.parents: continue if file_path.suffix.lower() not in _TEXT_EXTENSIONS: continue rel = file_path.relative_to(data_root) zipf.write(file_path, arcname=str(Path("data_text") / rel)) count += 1 return count @router.get("/settings") def get_settings(session: Session = Depends(get_session)): rows = session.exec(select(UserSettings)).all() return {r.cle: r.valeur for r in rows} @router.put("/settings") def update_settings(data: dict, session: Session = Depends(get_session)): for cle, valeur in data.items(): row = session.exec(select(UserSettings).where(UserSettings.cle == cle)).first() if row: row.valeur = str(valeur) else: row = UserSettings(cle=cle, valeur=str(valeur)) session.add(row) session.commit() return {"ok": True} @router.get("/settings/debug/system") def get_debug_system_stats() -> dict[str, Any]: """Stats runtime du conteneur (utile pour affichage debug UI).""" global _PREV_CPU_USAGE_USEC, _PREV_CPU_TS now = time.monotonic() usage_usec = _read_cgroup_cpu_usage_usec() quota_cores = _cpu_quota_cores() cpu_pct = None if usage_usec is not None and _PREV_CPU_USAGE_USEC is not None and _PREV_CPU_TS is not None: delta_usage = usage_usec - _PREV_CPU_USAGE_USEC delta_time_usec = (now - _PREV_CPU_TS) * 1_000_000 if delta_time_usec > 0 and quota_cores and quota_cores > 0: cpu_pct = round((delta_usage / (delta_time_usec * quota_cores)) * 100, 1) _PREV_CPU_USAGE_USEC = usage_usec _PREV_CPU_TS = now return { "source": "container-cgroup", "cpu": { "usage_usec_total": usage_usec, "quota_cores": quota_cores, "used_pct": cpu_pct, }, "memory": _memory_stats(), "disk": _disk_stats(), } def _create_backup_zip() -> tuple[Path, str]: """Crée l'archive ZIP de sauvegarde. Retourne (chemin_tmp, nom_fichier).""" now = datetime.now(timezone.utc) ts = now.strftime("%Y%m%d_%H%M%S") db_path = _resolve_sqlite_db_path() uploads_dir = Path(UPLOAD_DIR).resolve() data_root = db_path.parent if db_path else uploads_dir.parent fd, tmp_zip_path = tempfile.mkstemp(prefix=f"jardin_backup_{ts}_", suffix=".zip") os.close(fd) tmp_zip = Path(tmp_zip_path) stats = {"database_files": 0, "upload_files": 0, "text_files": 0} with zipfile.ZipFile(tmp_zip, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=6) as zipf: if db_path and db_path.is_file(): zipf.write(db_path, arcname=f"db/{db_path.name}") stats["database_files"] = 1 stats["upload_files"] = _zip_directory(zipf, uploads_dir, "uploads") stats["text_files"] = _zip_data_text_files(zipf, data_root, db_path, uploads_dir) manifest = { "generated_at_utc": now.isoformat(), "database_url": DATABASE_URL, "paths": { "database_path": str(db_path) if db_path else None, "uploads_path": str(uploads_dir), "data_root": str(data_root), }, "included": stats, "text_extensions": sorted(_TEXT_EXTENSIONS), } zipf.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2)) return tmp_zip, f"jardin_backup_{ts}.zip" @router.get("/settings/backup/download") def download_backup_zip() -> FileResponse: tmp_zip, download_name = _create_backup_zip() return FileResponse( path=str(tmp_zip), media_type="application/zip", filename=download_name, background=BackgroundTask(_safe_remove, str(tmp_zip)), ) def _merge_db_add_only(backup_db_path: Path, current_db_path: Path) -> dict[str, int]: """Insère dans la BDD courante les lignes absentes de la BDD de sauvegarde (INSERT OR IGNORE).""" import sqlite3 stats = {"rows_added": 0, "rows_skipped": 0} backup_conn = sqlite3.connect(str(backup_db_path)) current_conn = sqlite3.connect(str(current_db_path)) current_conn.execute("PRAGMA foreign_keys=OFF") try: tables = backup_conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" ).fetchall() for (table,) in tables: try: cur = backup_conn.execute(f'SELECT * FROM "{table}"') cols = [d[0] for d in cur.description] rows = cur.fetchall() if not rows: continue col_names = ", ".join(f'"{c}"' for c in cols) placeholders = ", ".join(["?"] * len(cols)) before = current_conn.execute(f'SELECT COUNT(*) FROM "{table}"').fetchone()[0] current_conn.executemany( f'INSERT OR IGNORE INTO "{table}" ({col_names}) VALUES ({placeholders})', rows, ) after = current_conn.execute(f'SELECT COUNT(*) FROM "{table}"').fetchone()[0] added = after - before stats["rows_added"] += added stats["rows_skipped"] += len(rows) - added except Exception: pass current_conn.commit() finally: backup_conn.close() current_conn.close() return stats @router.post("/settings/backup/restore") async def restore_backup( file: UploadFile = File(...), overwrite: bool = Form(default=True), ) -> dict[str, Any]: """Restaure une sauvegarde ZIP (DB + uploads). overwrite=true écrase, false ajoute uniquement.""" import shutil db_path = _resolve_sqlite_db_path() uploads_dir = Path(UPLOAD_DIR).resolve() data = await file.read() if len(data) < 4 or data[:2] != b'PK': raise HTTPException(400, "Le fichier n'est pas une archive ZIP valide.") fd, tmp_zip_path = tempfile.mkstemp(suffix=".zip") os.close(fd) tmp_zip = Path(tmp_zip_path) tmp_extract = Path(tempfile.mkdtemp(prefix="jardin_restore_")) try: tmp_zip.write_bytes(data) with zipfile.ZipFile(tmp_zip, "r") as zipf: zipf.extractall(str(tmp_extract)) stats: dict[str, Any] = { "uploads_copies": 0, "uploads_ignores": 0, "db_restauree": False, "db_lignes_ajoutees": 0, "erreurs": 0, } # --- Uploads --- backup_uploads = tmp_extract / "uploads" if backup_uploads.is_dir(): uploads_dir.mkdir(parents=True, exist_ok=True) for src in backup_uploads.rglob("*"): if not src.is_file(): continue dst = uploads_dir / src.relative_to(backup_uploads) dst.parent.mkdir(parents=True, exist_ok=True) if overwrite or not dst.exists(): try: shutil.copy2(str(src), str(dst)) stats["uploads_copies"] += 1 except Exception: stats["erreurs"] += 1 else: stats["uploads_ignores"] += 1 # --- Base de données --- backup_db_dir = tmp_extract / "db" db_files = sorted(backup_db_dir.glob("*.db")) if backup_db_dir.is_dir() else [] if db_files and db_path: backup_db_file = db_files[0] if overwrite: from app.database import engine try: with engine.connect() as conn: conn.execute(text("PRAGMA wal_checkpoint(TRUNCATE)")) except Exception: pass engine.dispose() shutil.copy2(str(backup_db_file), str(db_path)) stats["db_restauree"] = True else: merge = _merge_db_add_only(backup_db_file, db_path) stats["db_lignes_ajoutees"] = merge["rows_added"] stats["db_restauree"] = True return {"ok": True, **stats} except HTTPException: raise except Exception as exc: raise HTTPException(500, f"Erreur lors de la restauration : {exc}") from exc finally: _safe_remove(str(tmp_zip)) shutil.rmtree(str(tmp_extract), ignore_errors=True) @router.post("/settings/images/resize-all") def resize_all_images(session: Session = Depends(get_session)) -> dict[str, Any]: """Redimensionne les images pleine taille de la bibliothèque dont la largeur dépasse le paramètre configuré.""" from PIL import Image import io as _io setting = session.exec(select(UserSettings).where(UserSettings.cle == "image_max_width")).first() max_px = 1200 if setting: try: max_px = int(setting.valeur) except (ValueError, TypeError): pass if max_px <= 0: return {"ok": True, "redimensionnees": 0, "ignorees": 0, "erreurs": 0, "message": "Taille originale configurée — aucune modification."} from app.models.media import Media as MediaModel urls = session.exec(select(MediaModel.url)).all() uploads_dir = Path(UPLOAD_DIR).resolve() redimensionnees = 0 ignorees = 0 erreurs = 0 for url in urls: if not url: continue # /uploads/filename.webp → data/uploads/filename.webp filename = url.lstrip("/").removeprefix("uploads/") file_path = uploads_dir / filename if not file_path.is_file(): ignorees += 1 continue try: with Image.open(file_path) as img: w, h = img.size if w <= max_px and h <= max_px: ignorees += 1 continue img_copy = img.copy() img_copy.thumbnail((max_px, max_px), Image.LANCZOS) img_copy.save(file_path, "WEBP", quality=85) redimensionnees += 1 except Exception: erreurs += 1 return {"ok": True, "redimensionnees": redimensionnees, "ignorees": ignorees, "erreurs": erreurs} @router.post("/settings/backup/samba") def backup_to_samba(session: Session = Depends(get_session)) -> dict[str, Any]: """Envoie une sauvegarde ZIP vers un partage Samba/CIFS.""" def _get(key: str, default: str = "") -> str: row = session.exec(select(UserSettings).where(UserSettings.cle == key)).first() return row.valeur if row else default server = _get("samba_serveur").strip() share = _get("samba_partage").strip() username = _get("samba_utilisateur").strip() password = _get("samba_motdepasse") subfolder = _get("samba_sous_dossier").strip().strip("/\\") if not server or not share: raise HTTPException(400, "Configuration Samba incomplète : serveur et partage requis.") try: import smbclient # type: ignore except ImportError: raise HTTPException(500, "Module smbprotocol non installé dans l'environnement.") tmp_zip, filename = _create_backup_zip() try: smbclient.register_session(server, username=username or None, password=password or None) remote_dir = f"\\\\{server}\\{share}" if subfolder: remote_dir = f"{remote_dir}\\{subfolder}" try: smbclient.makedirs(remote_dir, exist_ok=True) except Exception: pass remote_path = f"{remote_dir}\\{filename}" with open(tmp_zip, "rb") as local_f: data = local_f.read() with smbclient.open_file(remote_path, mode="wb") as smb_f: smb_f.write(data) return {"ok": True, "fichier": filename, "chemin": remote_path} except HTTPException: raise except Exception as exc: raise HTTPException(500, f"Erreur Samba : {exc}") from exc finally: _safe_remove(str(tmp_zip))