Files
jardin/backend/app/routers/settings.py
2026-03-22 12:17:01 +01:00

515 lines
17 KiB
Python

import os
import shutil
import time
import json
import tempfile
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse
from starlette.background import BackgroundTask
from sqlalchemy import text
from sqlmodel import Session, select
from app.database import get_session
from app.models.settings import UserSettings
from app.config import DATABASE_URL, UPLOAD_DIR
router = APIRouter(tags=["réglages"])
_PREV_CPU_USAGE_USEC: int | None = None
_PREV_CPU_TS: float | None = None
_TEXT_EXTENSIONS = {
".txt", ".md", ".markdown", ".json", ".csv", ".log", ".ini", ".yaml", ".yml", ".xml"
}
def _read_int_from_paths(paths: list[str]) -> int | None:
for path in paths:
try:
with open(path, "r", encoding="utf-8") as f:
raw = f.read().strip().split()[0]
return int(raw)
except Exception:
continue
return None
def _read_cgroup_cpu_usage_usec() -> int | None:
# cgroup v2
try:
with open("/sys/fs/cgroup/cpu.stat", "r", encoding="utf-8") as f:
for line in f:
if line.startswith("usage_usec "):
return int(line.split()[1])
except Exception:
pass
# cgroup v1
ns = _read_int_from_paths(["/sys/fs/cgroup/cpuacct/cpuacct.usage"])
if ns is not None:
return ns // 1000
return None
def _cpu_quota_cores() -> float | None:
# cgroup v2
try:
with open("/sys/fs/cgroup/cpu.max", "r", encoding="utf-8") as f:
quota, period = f.read().strip().split()[:2]
if quota == "max":
return float(os.cpu_count() or 1)
q, p = int(quota), int(period)
if p > 0:
return max(q / p, 0.01)
except Exception:
pass
# cgroup v1
quota = _read_int_from_paths(["/sys/fs/cgroup/cpu/cpu.cfs_quota_us"])
period = _read_int_from_paths(["/sys/fs/cgroup/cpu/cpu.cfs_period_us"])
if quota is not None and period is not None and quota > 0 and period > 0:
return max(quota / period, 0.01)
return float(os.cpu_count() or 1)
def _memory_stats() -> dict[str, Any]:
used = _read_int_from_paths(
[
"/sys/fs/cgroup/memory.current", # cgroup v2
"/sys/fs/cgroup/memory/memory.usage_in_bytes", # cgroup v1
]
)
limit = _read_int_from_paths(
[
"/sys/fs/cgroup/memory.max", # cgroup v2
"/sys/fs/cgroup/memory/memory.limit_in_bytes", # cgroup v1
]
)
# Certaines limites cgroup valent "max" ou des sentinelles tres grandes.
if limit is not None and limit >= 9_000_000_000_000_000_000:
limit = None
pct = None
if used is not None and limit and limit > 0:
pct = round((used / limit) * 100, 1)
return {"used_bytes": used, "limit_bytes": limit, "used_pct": pct}
def _disk_stats() -> dict[str, Any]:
target = "/data" if os.path.isdir("/data") else "/"
total, used, free = shutil.disk_usage(target)
uploads_size = None
if os.path.isdir(UPLOAD_DIR):
try:
uploads_size = sum(
os.path.getsize(os.path.join(root, name))
for root, _, files in os.walk(UPLOAD_DIR)
for name in files
)
except Exception:
uploads_size = None
return {
"path": target,
"total_bytes": total,
"used_bytes": used,
"free_bytes": free,
"used_pct": round((used / total) * 100, 1) if total else None,
"uploads_bytes": uploads_size,
}
def _safe_remove(path: str) -> None:
try:
os.remove(path)
except OSError:
pass
def _resolve_sqlite_db_path() -> Path | None:
prefix = "sqlite:///"
if not DATABASE_URL.startswith(prefix):
return None
raw = DATABASE_URL[len(prefix):]
if not raw:
return None
db_path = Path(raw)
if db_path.is_absolute():
return db_path
return (Path.cwd() / db_path).resolve()
def _zip_directory(zipf: zipfile.ZipFile, source_dir: Path, arc_prefix: str) -> int:
count = 0
if not source_dir.is_dir():
return count
for root, _, files in os.walk(source_dir):
root_path = Path(root)
for name in files:
file_path = root_path / name
if not file_path.is_file():
continue
rel = file_path.relative_to(source_dir)
arcname = str(Path(arc_prefix) / rel)
zipf.write(file_path, arcname=arcname)
count += 1
return count
def _zip_data_text_files(
zipf: zipfile.ZipFile,
data_root: Path,
db_path: Path | None,
uploads_dir: Path,
) -> int:
count = 0
if not data_root.is_dir():
return count
for root, _, files in os.walk(data_root):
root_path = Path(root)
for name in files:
file_path = root_path / name
if db_path and file_path == db_path:
continue
if uploads_dir in file_path.parents:
continue
if file_path.suffix.lower() not in _TEXT_EXTENSIONS:
continue
rel = file_path.relative_to(data_root)
zipf.write(file_path, arcname=str(Path("data_text") / rel))
count += 1
return count
@router.get("/settings")
def get_settings(session: Session = Depends(get_session)):
rows = session.exec(select(UserSettings)).all()
return {r.cle: r.valeur for r in rows}
@router.put("/settings")
def update_settings(data: dict, session: Session = Depends(get_session)):
for cle, valeur in data.items():
row = session.exec(select(UserSettings).where(UserSettings.cle == cle)).first()
if row:
row.valeur = str(valeur)
else:
row = UserSettings(cle=cle, valeur=str(valeur))
session.add(row)
session.commit()
return {"ok": True}
@router.get("/settings/debug/system")
def get_debug_system_stats() -> dict[str, Any]:
"""Stats runtime du conteneur (utile pour affichage debug UI)."""
global _PREV_CPU_USAGE_USEC, _PREV_CPU_TS
now = time.monotonic()
usage_usec = _read_cgroup_cpu_usage_usec()
quota_cores = _cpu_quota_cores()
cpu_pct = None
if usage_usec is not None and _PREV_CPU_USAGE_USEC is not None and _PREV_CPU_TS is not None:
delta_usage = usage_usec - _PREV_CPU_USAGE_USEC
delta_time_usec = (now - _PREV_CPU_TS) * 1_000_000
if delta_time_usec > 0 and quota_cores and quota_cores > 0:
cpu_pct = round((delta_usage / (delta_time_usec * quota_cores)) * 100, 1)
_PREV_CPU_USAGE_USEC = usage_usec
_PREV_CPU_TS = now
return {
"source": "container-cgroup",
"cpu": {
"usage_usec_total": usage_usec,
"quota_cores": quota_cores,
"used_pct": cpu_pct,
},
"memory": _memory_stats(),
"disk": _disk_stats(),
}
def _create_backup_zip() -> tuple[Path, str]:
"""Crée l'archive ZIP de sauvegarde. Retourne (chemin_tmp, nom_fichier)."""
now = datetime.now(timezone.utc)
ts = now.strftime("%Y%m%d_%H%M%S")
db_path = _resolve_sqlite_db_path()
uploads_dir = Path(UPLOAD_DIR).resolve()
data_root = db_path.parent if db_path else uploads_dir.parent
fd, tmp_zip_path = tempfile.mkstemp(prefix=f"jardin_backup_{ts}_", suffix=".zip")
os.close(fd)
tmp_zip = Path(tmp_zip_path)
stats = {"database_files": 0, "upload_files": 0, "text_files": 0}
with zipfile.ZipFile(tmp_zip, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
if db_path and db_path.is_file():
zipf.write(db_path, arcname=f"db/{db_path.name}")
stats["database_files"] = 1
stats["upload_files"] = _zip_directory(zipf, uploads_dir, "uploads")
stats["text_files"] = _zip_data_text_files(zipf, data_root, db_path, uploads_dir)
manifest = {
"generated_at_utc": now.isoformat(),
"database_url": DATABASE_URL,
"paths": {
"database_path": str(db_path) if db_path else None,
"uploads_path": str(uploads_dir),
"data_root": str(data_root),
},
"included": stats,
"text_extensions": sorted(_TEXT_EXTENSIONS),
}
zipf.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2))
return tmp_zip, f"jardin_backup_{ts}.zip"
@router.get("/settings/backup/download")
def download_backup_zip() -> FileResponse:
tmp_zip, download_name = _create_backup_zip()
return FileResponse(
path=str(tmp_zip),
media_type="application/zip",
filename=download_name,
background=BackgroundTask(_safe_remove, str(tmp_zip)),
)
def _merge_db_add_only(backup_db_path: Path, current_db_path: Path) -> dict[str, int]:
"""Insère dans la BDD courante les lignes absentes de la BDD de sauvegarde (INSERT OR IGNORE)."""
import sqlite3
stats = {"rows_added": 0, "rows_skipped": 0}
backup_conn = sqlite3.connect(str(backup_db_path))
current_conn = sqlite3.connect(str(current_db_path))
current_conn.execute("PRAGMA foreign_keys=OFF")
try:
tables = backup_conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
).fetchall()
for (table,) in tables:
try:
cur = backup_conn.execute(f'SELECT * FROM "{table}"')
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
if not rows:
continue
col_names = ", ".join(f'"{c}"' for c in cols)
placeholders = ", ".join(["?"] * len(cols))
before = current_conn.execute(f'SELECT COUNT(*) FROM "{table}"').fetchone()[0]
current_conn.executemany(
f'INSERT OR IGNORE INTO "{table}" ({col_names}) VALUES ({placeholders})',
rows,
)
after = current_conn.execute(f'SELECT COUNT(*) FROM "{table}"').fetchone()[0]
added = after - before
stats["rows_added"] += added
stats["rows_skipped"] += len(rows) - added
except Exception:
pass
current_conn.commit()
finally:
backup_conn.close()
current_conn.close()
return stats
@router.post("/settings/backup/restore")
async def restore_backup(
file: UploadFile = File(...),
overwrite: bool = Form(default=True),
) -> dict[str, Any]:
"""Restaure une sauvegarde ZIP (DB + uploads). overwrite=true écrase, false ajoute uniquement."""
import shutil
db_path = _resolve_sqlite_db_path()
uploads_dir = Path(UPLOAD_DIR).resolve()
data = await file.read()
if len(data) < 4 or data[:2] != b'PK':
raise HTTPException(400, "Le fichier n'est pas une archive ZIP valide.")
fd, tmp_zip_path = tempfile.mkstemp(suffix=".zip")
os.close(fd)
tmp_zip = Path(tmp_zip_path)
tmp_extract = Path(tempfile.mkdtemp(prefix="jardin_restore_"))
try:
tmp_zip.write_bytes(data)
with zipfile.ZipFile(tmp_zip, "r") as zipf:
zipf.extractall(str(tmp_extract))
stats: dict[str, Any] = {
"uploads_copies": 0,
"uploads_ignores": 0,
"db_restauree": False,
"db_lignes_ajoutees": 0,
"erreurs": 0,
}
# --- Uploads ---
backup_uploads = tmp_extract / "uploads"
if backup_uploads.is_dir():
uploads_dir.mkdir(parents=True, exist_ok=True)
for src in backup_uploads.rglob("*"):
if not src.is_file():
continue
dst = uploads_dir / src.relative_to(backup_uploads)
dst.parent.mkdir(parents=True, exist_ok=True)
if overwrite or not dst.exists():
try:
shutil.copy2(str(src), str(dst))
stats["uploads_copies"] += 1
except Exception:
stats["erreurs"] += 1
else:
stats["uploads_ignores"] += 1
# --- Base de données ---
backup_db_dir = tmp_extract / "db"
db_files = sorted(backup_db_dir.glob("*.db")) if backup_db_dir.is_dir() else []
if db_files and db_path:
backup_db_file = db_files[0]
if overwrite:
from app.database import engine
try:
with engine.connect() as conn:
conn.execute(text("PRAGMA wal_checkpoint(TRUNCATE)"))
except Exception:
pass
engine.dispose()
shutil.copy2(str(backup_db_file), str(db_path))
stats["db_restauree"] = True
else:
merge = _merge_db_add_only(backup_db_file, db_path)
stats["db_lignes_ajoutees"] = merge["rows_added"]
stats["db_restauree"] = True
return {"ok": True, **stats}
except HTTPException:
raise
except Exception as exc:
raise HTTPException(500, f"Erreur lors de la restauration : {exc}") from exc
finally:
_safe_remove(str(tmp_zip))
shutil.rmtree(str(tmp_extract), ignore_errors=True)
@router.post("/settings/images/resize-all")
def resize_all_images(session: Session = Depends(get_session)) -> dict[str, Any]:
"""Redimensionne les images pleine taille de la bibliothèque dont la largeur dépasse le paramètre configuré."""
from PIL import Image
import io as _io
setting = session.exec(select(UserSettings).where(UserSettings.cle == "image_max_width")).first()
max_px = 1200
if setting:
try:
max_px = int(setting.valeur)
except (ValueError, TypeError):
pass
if max_px <= 0:
return {"ok": True, "redimensionnees": 0, "ignorees": 0, "erreurs": 0,
"message": "Taille originale configurée — aucune modification."}
from app.models.media import Media as MediaModel
urls = session.exec(select(MediaModel.url)).all()
uploads_dir = Path(UPLOAD_DIR).resolve()
redimensionnees = 0
ignorees = 0
erreurs = 0
for url in urls:
if not url:
continue
# /uploads/filename.webp → data/uploads/filename.webp
filename = url.lstrip("/").removeprefix("uploads/")
file_path = uploads_dir / filename
if not file_path.is_file():
ignorees += 1
continue
try:
with Image.open(file_path) as img:
w, h = img.size
if w <= max_px and h <= max_px:
ignorees += 1
continue
img_copy = img.copy()
img_copy.thumbnail((max_px, max_px), Image.LANCZOS)
img_copy.save(file_path, "WEBP", quality=85)
redimensionnees += 1
except Exception:
erreurs += 1
return {"ok": True, "redimensionnees": redimensionnees, "ignorees": ignorees, "erreurs": erreurs}
@router.post("/settings/backup/samba")
def backup_to_samba(session: Session = Depends(get_session)) -> dict[str, Any]:
"""Envoie une sauvegarde ZIP vers un partage Samba/CIFS."""
def _get(key: str, default: str = "") -> str:
row = session.exec(select(UserSettings).where(UserSettings.cle == key)).first()
return row.valeur if row else default
server = _get("samba_serveur").strip()
share = _get("samba_partage").strip()
username = _get("samba_utilisateur").strip()
password = _get("samba_motdepasse")
subfolder = _get("samba_sous_dossier").strip().strip("/\\")
if not server or not share:
raise HTTPException(400, "Configuration Samba incomplète : serveur et partage requis.")
try:
import smbclient # type: ignore
except ImportError:
raise HTTPException(500, "Module smbprotocol non installé dans l'environnement.")
tmp_zip, filename = _create_backup_zip()
try:
smbclient.register_session(server, username=username or None, password=password or None)
remote_dir = f"\\\\{server}\\{share}"
if subfolder:
remote_dir = f"{remote_dir}\\{subfolder}"
try:
smbclient.makedirs(remote_dir, exist_ok=True)
except Exception:
pass
remote_path = f"{remote_dir}\\{filename}"
with open(tmp_zip, "rb") as local_f:
data = local_f.read()
with smbclient.open_file(remote_path, mode="wb") as smb_f:
smb_f.write(data)
return {"ok": True, "fichier": filename, "chemin": remote_path}
except HTTPException:
raise
except Exception as exc:
raise HTTPException(500, f"Erreur Samba : {exc}") from exc
finally:
_safe_remove(str(tmp_zip))