Files
jardin/backend/app/routers/settings.py
2026-03-22 11:42:57 +01:00

335 lines
10 KiB
Python

import os
import shutil
import time
import json
import tempfile
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import FileResponse
from starlette.background import BackgroundTask
from sqlmodel import Session, select
from app.database import get_session
from app.models.settings import UserSettings
from app.config import DATABASE_URL, UPLOAD_DIR
router = APIRouter(tags=["réglages"])
_PREV_CPU_USAGE_USEC: int | None = None
_PREV_CPU_TS: float | None = None
_TEXT_EXTENSIONS = {
".txt", ".md", ".markdown", ".json", ".csv", ".log", ".ini", ".yaml", ".yml", ".xml"
}
def _read_int_from_paths(paths: list[str]) -> int | None:
for path in paths:
try:
with open(path, "r", encoding="utf-8") as f:
raw = f.read().strip().split()[0]
return int(raw)
except Exception:
continue
return None
def _read_cgroup_cpu_usage_usec() -> int | None:
# cgroup v2
try:
with open("/sys/fs/cgroup/cpu.stat", "r", encoding="utf-8") as f:
for line in f:
if line.startswith("usage_usec "):
return int(line.split()[1])
except Exception:
pass
# cgroup v1
ns = _read_int_from_paths(["/sys/fs/cgroup/cpuacct/cpuacct.usage"])
if ns is not None:
return ns // 1000
return None
def _cpu_quota_cores() -> float | None:
# cgroup v2
try:
with open("/sys/fs/cgroup/cpu.max", "r", encoding="utf-8") as f:
quota, period = f.read().strip().split()[:2]
if quota == "max":
return float(os.cpu_count() or 1)
q, p = int(quota), int(period)
if p > 0:
return max(q / p, 0.01)
except Exception:
pass
# cgroup v1
quota = _read_int_from_paths(["/sys/fs/cgroup/cpu/cpu.cfs_quota_us"])
period = _read_int_from_paths(["/sys/fs/cgroup/cpu/cpu.cfs_period_us"])
if quota is not None and period is not None and quota > 0 and period > 0:
return max(quota / period, 0.01)
return float(os.cpu_count() or 1)
def _memory_stats() -> dict[str, Any]:
used = _read_int_from_paths(
[
"/sys/fs/cgroup/memory.current", # cgroup v2
"/sys/fs/cgroup/memory/memory.usage_in_bytes", # cgroup v1
]
)
limit = _read_int_from_paths(
[
"/sys/fs/cgroup/memory.max", # cgroup v2
"/sys/fs/cgroup/memory/memory.limit_in_bytes", # cgroup v1
]
)
# Certaines limites cgroup valent "max" ou des sentinelles tres grandes.
if limit is not None and limit >= 9_000_000_000_000_000_000:
limit = None
pct = None
if used is not None and limit and limit > 0:
pct = round((used / limit) * 100, 1)
return {"used_bytes": used, "limit_bytes": limit, "used_pct": pct}
def _disk_stats() -> dict[str, Any]:
target = "/data" if os.path.isdir("/data") else "/"
total, used, free = shutil.disk_usage(target)
uploads_size = None
if os.path.isdir(UPLOAD_DIR):
try:
uploads_size = sum(
os.path.getsize(os.path.join(root, name))
for root, _, files in os.walk(UPLOAD_DIR)
for name in files
)
except Exception:
uploads_size = None
return {
"path": target,
"total_bytes": total,
"used_bytes": used,
"free_bytes": free,
"used_pct": round((used / total) * 100, 1) if total else None,
"uploads_bytes": uploads_size,
}
def _safe_remove(path: str) -> None:
try:
os.remove(path)
except OSError:
pass
def _resolve_sqlite_db_path() -> Path | None:
prefix = "sqlite:///"
if not DATABASE_URL.startswith(prefix):
return None
raw = DATABASE_URL[len(prefix):]
if not raw:
return None
db_path = Path(raw)
if db_path.is_absolute():
return db_path
return (Path.cwd() / db_path).resolve()
def _zip_directory(zipf: zipfile.ZipFile, source_dir: Path, arc_prefix: str) -> int:
count = 0
if not source_dir.is_dir():
return count
for root, _, files in os.walk(source_dir):
root_path = Path(root)
for name in files:
file_path = root_path / name
if not file_path.is_file():
continue
rel = file_path.relative_to(source_dir)
arcname = str(Path(arc_prefix) / rel)
zipf.write(file_path, arcname=arcname)
count += 1
return count
def _zip_data_text_files(
zipf: zipfile.ZipFile,
data_root: Path,
db_path: Path | None,
uploads_dir: Path,
) -> int:
count = 0
if not data_root.is_dir():
return count
for root, _, files in os.walk(data_root):
root_path = Path(root)
for name in files:
file_path = root_path / name
if db_path and file_path == db_path:
continue
if uploads_dir in file_path.parents:
continue
if file_path.suffix.lower() not in _TEXT_EXTENSIONS:
continue
rel = file_path.relative_to(data_root)
zipf.write(file_path, arcname=str(Path("data_text") / rel))
count += 1
return count
@router.get("/settings")
def get_settings(session: Session = Depends(get_session)):
rows = session.exec(select(UserSettings)).all()
return {r.cle: r.valeur for r in rows}
@router.put("/settings")
def update_settings(data: dict, session: Session = Depends(get_session)):
for cle, valeur in data.items():
row = session.exec(select(UserSettings).where(UserSettings.cle == cle)).first()
if row:
row.valeur = str(valeur)
else:
row = UserSettings(cle=cle, valeur=str(valeur))
session.add(row)
session.commit()
return {"ok": True}
@router.get("/settings/debug/system")
def get_debug_system_stats() -> dict[str, Any]:
"""Stats runtime du conteneur (utile pour affichage debug UI)."""
global _PREV_CPU_USAGE_USEC, _PREV_CPU_TS
now = time.monotonic()
usage_usec = _read_cgroup_cpu_usage_usec()
quota_cores = _cpu_quota_cores()
cpu_pct = None
if usage_usec is not None and _PREV_CPU_USAGE_USEC is not None and _PREV_CPU_TS is not None:
delta_usage = usage_usec - _PREV_CPU_USAGE_USEC
delta_time_usec = (now - _PREV_CPU_TS) * 1_000_000
if delta_time_usec > 0 and quota_cores and quota_cores > 0:
cpu_pct = round((delta_usage / (delta_time_usec * quota_cores)) * 100, 1)
_PREV_CPU_USAGE_USEC = usage_usec
_PREV_CPU_TS = now
return {
"source": "container-cgroup",
"cpu": {
"usage_usec_total": usage_usec,
"quota_cores": quota_cores,
"used_pct": cpu_pct,
},
"memory": _memory_stats(),
"disk": _disk_stats(),
}
def _create_backup_zip() -> tuple[Path, str]:
"""Crée l'archive ZIP de sauvegarde. Retourne (chemin_tmp, nom_fichier)."""
now = datetime.now(timezone.utc)
ts = now.strftime("%Y%m%d_%H%M%S")
db_path = _resolve_sqlite_db_path()
uploads_dir = Path(UPLOAD_DIR).resolve()
data_root = db_path.parent if db_path else uploads_dir.parent
fd, tmp_zip_path = tempfile.mkstemp(prefix=f"jardin_backup_{ts}_", suffix=".zip")
os.close(fd)
tmp_zip = Path(tmp_zip_path)
stats = {"database_files": 0, "upload_files": 0, "text_files": 0}
with zipfile.ZipFile(tmp_zip, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=6) as zipf:
if db_path and db_path.is_file():
zipf.write(db_path, arcname=f"db/{db_path.name}")
stats["database_files"] = 1
stats["upload_files"] = _zip_directory(zipf, uploads_dir, "uploads")
stats["text_files"] = _zip_data_text_files(zipf, data_root, db_path, uploads_dir)
manifest = {
"generated_at_utc": now.isoformat(),
"database_url": DATABASE_URL,
"paths": {
"database_path": str(db_path) if db_path else None,
"uploads_path": str(uploads_dir),
"data_root": str(data_root),
},
"included": stats,
"text_extensions": sorted(_TEXT_EXTENSIONS),
}
zipf.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2))
return tmp_zip, f"jardin_backup_{ts}.zip"
@router.get("/settings/backup/download")
def download_backup_zip() -> FileResponse:
tmp_zip, download_name = _create_backup_zip()
return FileResponse(
path=str(tmp_zip),
media_type="application/zip",
filename=download_name,
background=BackgroundTask(_safe_remove, str(tmp_zip)),
)
@router.post("/settings/backup/samba")
def backup_to_samba(session: Session = Depends(get_session)) -> dict[str, Any]:
"""Envoie une sauvegarde ZIP vers un partage Samba/CIFS."""
def _get(key: str, default: str = "") -> str:
row = session.exec(select(UserSettings).where(UserSettings.cle == key)).first()
return row.valeur if row else default
server = _get("samba_serveur").strip()
share = _get("samba_partage").strip()
username = _get("samba_utilisateur").strip()
password = _get("samba_motdepasse")
subfolder = _get("samba_sous_dossier").strip().strip("/\\")
if not server or not share:
raise HTTPException(400, "Configuration Samba incomplète : serveur et partage requis.")
try:
import smbclient # type: ignore
except ImportError:
raise HTTPException(500, "Module smbprotocol non installé dans l'environnement.")
tmp_zip, filename = _create_backup_zip()
try:
smbclient.register_session(server, username=username or None, password=password or None)
remote_dir = f"\\\\{server}\\{share}"
if subfolder:
remote_dir = f"{remote_dir}\\{subfolder}"
try:
smbclient.makedirs(remote_dir, exist_ok=True)
except Exception:
pass
remote_path = f"{remote_dir}\\{filename}"
with open(tmp_zip, "rb") as local_f:
data = local_f.read()
with smbclient.open_file(remote_path, mode="wb") as smb_f:
smb_f.write(data)
return {"ok": True, "fichier": filename, "chemin": remote_path}
except HTTPException:
raise
except Exception as exc:
raise HTTPException(500, f"Erreur Samba : {exc}") from exc
finally:
_safe_remove(str(tmp_zip))