Files
suivi_produit/backend/app/api/routes_config.py
2026-01-25 14:48:26 +01:00

180 lines
6.5 KiB
Python

from __future__ import annotations
import json
import os
import shutil
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, Body, HTTPException, UploadFile, File
from fastapi.responses import FileResponse
from backend.app.core.config import BackendConfig, CONFIG_PATH, load_config
from backend.app.db.database import DEFAULT_DATABASE_PATH
router = APIRouter(prefix="/config", tags=["config"])
# Chemins possibles vers la config frontend (dev + docker)
FRONTEND_CONFIG_PATH = (
Path(__file__).resolve().parent.parent.parent.parent / "frontend" / "public" / "config_frontend.json"
)
FRONTEND_CONFIG_FALLBACK_PATH = (
Path(__file__).resolve().parent.parent.parent.parent / "backend" / "config_frontend.json"
)
def _get_frontend_config_path() -> Path | None:
if FRONTEND_CONFIG_PATH.exists():
return FRONTEND_CONFIG_PATH
if FRONTEND_CONFIG_FALLBACK_PATH.exists():
return FRONTEND_CONFIG_FALLBACK_PATH
return None
@router.get("/backend", response_model=BackendConfig)
def read_backend_config() -> BackendConfig:
# expose la configuration backend en lecture seule
return load_config()
@router.put("/backend", response_model=BackendConfig)
def update_backend_config(payload: dict = Body(...)) -> BackendConfig:
current = load_config()
try:
# Fusion profonde des configs (nécessaire pour les modèles imbriqués Pydantic v2)
def deep_merge(base: dict, update: dict) -> dict:
result = base.copy()
for key, value in update.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = value
return result
# Convertir en dict, fusionner, puis revalider
current_dict = current.model_dump()
merged = deep_merge(current_dict, payload)
updated = BackendConfig.model_validate(merged)
CONFIG_PATH.write_text(updated.model_dump_json(indent=2), encoding="utf-8")
load_config.cache_clear()
return load_config()
except Exception as exc: # pragma: no cover
raise HTTPException(status_code=400, detail=str(exc))
@router.get("/frontend")
def read_frontend_config() -> dict:
"""Retourne la configuration frontend."""
config_path = _get_frontend_config_path()
if not config_path:
raise HTTPException(status_code=404, detail="Config frontend introuvable")
return json.loads(config_path.read_text(encoding="utf-8"))
@router.put("/frontend")
def update_frontend_config(payload: dict = Body(...)) -> dict:
"""Met à jour la configuration frontend."""
try:
# Charger la config actuelle
current = {}
config_path = _get_frontend_config_path()
if config_path and config_path.exists():
current = json.loads(config_path.read_text(encoding="utf-8"))
# Fusion profonde des configs
def deep_merge(base: dict, update: dict) -> dict:
result = base.copy()
for key, value in update.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = value
return result
updated = deep_merge(current, payload)
target_paths = []
if FRONTEND_CONFIG_PATH.parent.exists():
target_paths.append(FRONTEND_CONFIG_PATH)
if FRONTEND_CONFIG_FALLBACK_PATH.parent.exists():
target_paths.append(FRONTEND_CONFIG_FALLBACK_PATH)
if not target_paths:
target_paths.append(FRONTEND_CONFIG_FALLBACK_PATH)
for target in target_paths:
target.write_text(
json.dumps(updated, indent=2, ensure_ascii=False),
encoding="utf-8",
)
return updated
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
# ==================== Database Backup ====================
@router.get("/database/info")
def database_info() -> dict:
"""Retourne les informations sur la base de données."""
if not DEFAULT_DATABASE_PATH.exists():
raise HTTPException(status_code=404, detail="Base de données introuvable")
stat = DEFAULT_DATABASE_PATH.stat()
return {
"path": str(DEFAULT_DATABASE_PATH),
"filename": DEFAULT_DATABASE_PATH.name,
"size_bytes": stat.st_size,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
}
@router.get("/database/backup")
def download_database():
"""Télécharge une copie de la base de données."""
if not DEFAULT_DATABASE_PATH.exists():
raise HTTPException(status_code=404, detail="Base de données introuvable")
# Créer une copie temporaire pour éviter les problèmes de lock
backup_path = DEFAULT_DATABASE_PATH.parent / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
shutil.copy2(DEFAULT_DATABASE_PATH, backup_path)
return FileResponse(
path=backup_path,
filename=f"suivi_produit_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db",
media_type="application/x-sqlite3",
background=None, # Nettoyer après envoi
)
@router.post("/database/restore")
async def restore_database(file: UploadFile = File(...)) -> dict:
"""Restaure la base de données depuis un fichier uploadé."""
if not file.filename.endswith(".db"):
raise HTTPException(status_code=400, detail="Le fichier doit être un .db")
# Vérifier la taille (max 100MB)
content = await file.read()
if len(content) > 100 * 1024 * 1024:
raise HTTPException(status_code=400, detail="Fichier trop volumineux (max 100MB)")
# Créer un backup avant restauration
if DEFAULT_DATABASE_PATH.exists():
backup_before = DEFAULT_DATABASE_PATH.parent / f"before_restore_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
shutil.copy2(DEFAULT_DATABASE_PATH, backup_before)
try:
# Écrire le nouveau fichier
with open(DEFAULT_DATABASE_PATH, "wb") as f:
f.write(content)
return {
"success": True,
"message": "Base de données restaurée avec succès",
"size_bytes": len(content),
}
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Erreur lors de la restauration: {exc}")