Files
Gilles Soulier c67befc549 addon
2026-01-05 16:08:01 +01:00

304 lines
9.6 KiB
Python
Executable File

"""
Linux BenchTools - Locations API Endpoints
"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from sqlalchemy.orm import Session
from typing import List, Optional
import os
import shutil
from app.db.session import get_peripherals_db
from app.services.peripheral_service import LocationService
from app.schemas.peripheral import (
LocationCreate, LocationUpdate, LocationSchema, LocationTreeNode
)
from app.models.location import Location
from app.utils.image_processor import ImageProcessor
from app.utils.qr_generator import QRCodeGenerator
from app.core.config import settings
router = APIRouter()
# ========================================
# LOCATION CRUD
# ========================================
@router.post("/", response_model=LocationSchema, status_code=201)
def create_location(
location: LocationCreate,
db: Session = Depends(get_peripherals_db)
):
"""Create a new location"""
# Check parent exists if specified
if location.parent_id:
parent = db.query(Location).filter(Location.id == location.parent_id).first()
if not parent:
raise HTTPException(status_code=404, detail="Parent location not found")
# Check for duplicate name
existing = db.query(Location).filter(Location.nom == location.nom).first()
if existing:
raise HTTPException(status_code=400, detail="Location with this name already exists")
db_location = Location(**location.model_dump())
db.add(db_location)
db.commit()
db.refresh(db_location)
return db_location
@router.get("/", response_model=List[LocationSchema])
def list_locations(
parent_id: Optional[int] = None,
db: Session = Depends(get_peripherals_db)
):
"""List all locations (optionally filtered by parent)"""
query = db.query(Location)
if parent_id is not None:
query = query.filter(Location.parent_id == parent_id)
return query.order_by(Location.ordre_affichage, Location.nom).all()
@router.get("/tree", response_model=List[dict])
def get_location_tree(db: Session = Depends(get_peripherals_db)):
"""Get hierarchical location tree"""
return LocationService.get_location_tree(db)
@router.get("/{location_id}", response_model=LocationSchema)
def get_location(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get a location by ID"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
return location
@router.get("/{location_id}/path", response_model=List[LocationSchema])
def get_location_path(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get full path from root to location"""
path = LocationService.get_location_path(db, location_id)
if not path:
raise HTTPException(status_code=404, detail="Location not found")
return path
@router.put("/{location_id}", response_model=LocationSchema)
def update_location(
location_id: int,
location_data: LocationUpdate,
db: Session = Depends(get_peripherals_db)
):
"""Update a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Check parent exists if being changed
update_dict = location_data.model_dump(exclude_unset=True)
if "parent_id" in update_dict and update_dict["parent_id"]:
parent = db.query(Location).filter(Location.id == update_dict["parent_id"]).first()
if not parent:
raise HTTPException(status_code=404, detail="Parent location not found")
# Prevent circular reference
if update_dict["parent_id"] == location_id:
raise HTTPException(status_code=400, detail="Location cannot be its own parent")
# Check for duplicate name if name is being changed
if "nom" in update_dict and update_dict["nom"] != location.nom:
existing = db.query(Location).filter(Location.nom == update_dict["nom"]).first()
if existing:
raise HTTPException(status_code=400, detail="Location with this name already exists")
# Update fields
for key, value in update_dict.items():
setattr(location, key, value)
db.commit()
db.refresh(location)
return location
@router.delete("/{location_id}", status_code=204)
def delete_location(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Check if location has children
children = db.query(Location).filter(Location.parent_id == location_id).count()
if children > 0:
raise HTTPException(status_code=400, detail="Cannot delete location with children")
# Check if location has peripherals
count = LocationService.count_peripherals_in_location(db, location_id)
if count > 0:
raise HTTPException(status_code=400, detail="Cannot delete location with peripherals")
# Delete image and QR code files if they exist
if location.image_path and os.path.exists(location.image_path):
os.remove(location.image_path)
if location.qr_code_path and os.path.exists(location.qr_code_path):
os.remove(location.qr_code_path)
db.delete(location)
db.commit()
@router.get("/{location_id}/count")
def count_peripherals(
location_id: int,
recursive: bool = False,
db: Session = Depends(get_peripherals_db)
):
"""Count peripherals in a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
count = LocationService.count_peripherals_in_location(db, location_id, recursive)
return {"location_id": location_id, "count": count, "recursive": recursive}
# ========================================
# LOCATION IMAGES
# ========================================
@router.post("/{location_id}/image", response_model=LocationSchema)
async def upload_location_image(
location_id: int,
file: UploadFile = File(...),
db: Session = Depends(get_peripherals_db)
):
"""Upload an image for a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Validate image
temp_path = f"/tmp/{file.filename}"
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
if not ImageProcessor.is_valid_image(temp_path):
os.remove(temp_path)
raise HTTPException(status_code=400, detail="Invalid image file")
# Create upload directory
upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "locations", "images")
os.makedirs(upload_dir, exist_ok=True)
try:
# Process image
processed_path, _ = ImageProcessor.process_image(
temp_path,
upload_dir,
max_width=800,
max_height=600
)
# Delete old image if exists
if location.image_path and os.path.exists(location.image_path):
os.remove(location.image_path)
# Update location
location.image_path = processed_path
db.commit()
db.refresh(location)
return location
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
@router.delete("/{location_id}/image", status_code=204)
def delete_location_image(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete location image"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
if location.image_path and os.path.exists(location.image_path):
os.remove(location.image_path)
location.image_path = None
db.commit()
# ========================================
# LOCATION QR CODES
# ========================================
@router.post("/{location_id}/qr-code", response_model=LocationSchema)
def generate_qr_code(
location_id: int,
base_url: str,
db: Session = Depends(get_peripherals_db)
):
"""Generate QR code for a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Create QR code directory
qr_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "locations", "qrcodes")
os.makedirs(qr_dir, exist_ok=True)
# Generate QR code
qr_path = QRCodeGenerator.generate_location_qr(
location_id=location.id,
location_name=location.nom,
base_url=base_url,
output_dir=qr_dir
)
# Delete old QR code if exists
if location.qr_code_path and os.path.exists(location.qr_code_path):
os.remove(location.qr_code_path)
# Update location
location.qr_code_path = qr_path
db.commit()
db.refresh(location)
return location
@router.delete("/{location_id}/qr-code", status_code=204)
def delete_qr_code(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete location QR code"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
if location.qr_code_path and os.path.exists(location.qr_code_path):
os.remove(location.qr_code_path)
location.qr_code_path = None
db.commit()