180 lines
6.5 KiB
Python
180 lines
6.5 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, Body, HTTPException, UploadFile, File
|
|
from fastapi.responses import FileResponse
|
|
|
|
from backend.app.core.config import BackendConfig, CONFIG_PATH, load_config
|
|
from backend.app.db.database import DEFAULT_DATABASE_PATH
|
|
|
|
router = APIRouter(prefix="/config", tags=["config"])
|
|
|
|
# Chemins possibles vers la config frontend (dev + docker)
|
|
FRONTEND_CONFIG_PATH = (
|
|
Path(__file__).resolve().parent.parent.parent.parent / "frontend" / "public" / "config_frontend.json"
|
|
)
|
|
FRONTEND_CONFIG_FALLBACK_PATH = (
|
|
Path(__file__).resolve().parent.parent.parent.parent / "backend" / "config_frontend.json"
|
|
)
|
|
|
|
|
|
def _get_frontend_config_path() -> Path | None:
|
|
if FRONTEND_CONFIG_PATH.exists():
|
|
return FRONTEND_CONFIG_PATH
|
|
if FRONTEND_CONFIG_FALLBACK_PATH.exists():
|
|
return FRONTEND_CONFIG_FALLBACK_PATH
|
|
return None
|
|
|
|
|
|
@router.get("/backend", response_model=BackendConfig)
|
|
def read_backend_config() -> BackendConfig:
|
|
# expose la configuration backend en lecture seule
|
|
return load_config()
|
|
|
|
|
|
@router.put("/backend", response_model=BackendConfig)
|
|
def update_backend_config(payload: dict = Body(...)) -> BackendConfig:
|
|
current = load_config()
|
|
try:
|
|
# Fusion profonde des configs (nécessaire pour les modèles imbriqués Pydantic v2)
|
|
def deep_merge(base: dict, update: dict) -> dict:
|
|
result = base.copy()
|
|
for key, value in update.items():
|
|
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
|
|
result[key] = deep_merge(result[key], value)
|
|
else:
|
|
result[key] = value
|
|
return result
|
|
|
|
# Convertir en dict, fusionner, puis revalider
|
|
current_dict = current.model_dump()
|
|
merged = deep_merge(current_dict, payload)
|
|
updated = BackendConfig.model_validate(merged)
|
|
|
|
CONFIG_PATH.write_text(updated.model_dump_json(indent=2), encoding="utf-8")
|
|
load_config.cache_clear()
|
|
return load_config()
|
|
except Exception as exc: # pragma: no cover
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
|
|
@router.get("/frontend")
|
|
def read_frontend_config() -> dict:
|
|
"""Retourne la configuration frontend."""
|
|
config_path = _get_frontend_config_path()
|
|
if not config_path:
|
|
raise HTTPException(status_code=404, detail="Config frontend introuvable")
|
|
return json.loads(config_path.read_text(encoding="utf-8"))
|
|
|
|
|
|
@router.put("/frontend")
|
|
def update_frontend_config(payload: dict = Body(...)) -> dict:
|
|
"""Met à jour la configuration frontend."""
|
|
try:
|
|
# Charger la config actuelle
|
|
current = {}
|
|
config_path = _get_frontend_config_path()
|
|
if config_path and config_path.exists():
|
|
current = json.loads(config_path.read_text(encoding="utf-8"))
|
|
|
|
# Fusion profonde des configs
|
|
def deep_merge(base: dict, update: dict) -> dict:
|
|
result = base.copy()
|
|
for key, value in update.items():
|
|
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
|
|
result[key] = deep_merge(result[key], value)
|
|
else:
|
|
result[key] = value
|
|
return result
|
|
|
|
updated = deep_merge(current, payload)
|
|
target_paths = []
|
|
if FRONTEND_CONFIG_PATH.parent.exists():
|
|
target_paths.append(FRONTEND_CONFIG_PATH)
|
|
if FRONTEND_CONFIG_FALLBACK_PATH.parent.exists():
|
|
target_paths.append(FRONTEND_CONFIG_FALLBACK_PATH)
|
|
if not target_paths:
|
|
target_paths.append(FRONTEND_CONFIG_FALLBACK_PATH)
|
|
|
|
for target in target_paths:
|
|
target.write_text(
|
|
json.dumps(updated, indent=2, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
return updated
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
|
|
# ==================== Database Backup ====================
|
|
|
|
|
|
@router.get("/database/info")
|
|
def database_info() -> dict:
|
|
"""Retourne les informations sur la base de données."""
|
|
if not DEFAULT_DATABASE_PATH.exists():
|
|
raise HTTPException(status_code=404, detail="Base de données introuvable")
|
|
|
|
stat = DEFAULT_DATABASE_PATH.stat()
|
|
return {
|
|
"path": str(DEFAULT_DATABASE_PATH),
|
|
"filename": DEFAULT_DATABASE_PATH.name,
|
|
"size_bytes": stat.st_size,
|
|
"size_mb": round(stat.st_size / (1024 * 1024), 2),
|
|
"modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
|
}
|
|
|
|
|
|
@router.get("/database/backup")
|
|
def download_database():
|
|
"""Télécharge une copie de la base de données."""
|
|
if not DEFAULT_DATABASE_PATH.exists():
|
|
raise HTTPException(status_code=404, detail="Base de données introuvable")
|
|
|
|
# Créer une copie temporaire pour éviter les problèmes de lock
|
|
backup_path = DEFAULT_DATABASE_PATH.parent / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
|
|
shutil.copy2(DEFAULT_DATABASE_PATH, backup_path)
|
|
|
|
return FileResponse(
|
|
path=backup_path,
|
|
filename=f"suivi_produit_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db",
|
|
media_type="application/x-sqlite3",
|
|
background=None, # Nettoyer après envoi
|
|
)
|
|
|
|
|
|
@router.post("/database/restore")
|
|
async def restore_database(file: UploadFile = File(...)) -> dict:
|
|
"""Restaure la base de données depuis un fichier uploadé."""
|
|
if not file.filename.endswith(".db"):
|
|
raise HTTPException(status_code=400, detail="Le fichier doit être un .db")
|
|
|
|
# Vérifier la taille (max 100MB)
|
|
content = await file.read()
|
|
if len(content) > 100 * 1024 * 1024:
|
|
raise HTTPException(status_code=400, detail="Fichier trop volumineux (max 100MB)")
|
|
|
|
# Créer un backup avant restauration
|
|
if DEFAULT_DATABASE_PATH.exists():
|
|
backup_before = DEFAULT_DATABASE_PATH.parent / f"before_restore_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
|
|
shutil.copy2(DEFAULT_DATABASE_PATH, backup_before)
|
|
|
|
try:
|
|
# Écrire le nouveau fichier
|
|
with open(DEFAULT_DATABASE_PATH, "wb") as f:
|
|
f.write(content)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Base de données restaurée avec succès",
|
|
"size_bytes": len(content),
|
|
}
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=500, detail=f"Erreur lors de la restauration: {exc}")
|