Files
serv_benchmark/backend/app/services/peripheral_service.py
Gilles Soulier c67befc549 addon
2026-01-05 16:08:01 +01:00

511 lines
16 KiB
Python
Executable File

"""
Linux BenchTools - Peripheral Service
Handles business logic and cross-database operations
"""
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func, desc
from datetime import date, datetime, timedelta
from app.models.peripheral import (
Peripheral, PeripheralPhoto, PeripheralDocument,
PeripheralLink, PeripheralLoan
)
from app.models.location import Location
from app.models.peripheral_history import PeripheralLocationHistory
from app.schemas.peripheral import (
PeripheralCreate, PeripheralUpdate, PeripheralSummary,
PeripheralDetail, PeripheralListResponse,
LoanCreate, LoanReturn
)
class PeripheralService:
"""Service for peripheral operations"""
@staticmethod
def create_peripheral(
db: Session,
peripheral_data: PeripheralCreate,
user: Optional[str] = None
) -> Peripheral:
"""Create a new peripheral"""
peripheral = Peripheral(**peripheral_data.model_dump())
db.add(peripheral)
db.commit()
db.refresh(peripheral)
# Create history entry
if peripheral.location_id or peripheral.device_id:
PeripheralService._create_history(
db=db,
peripheral_id=peripheral.id,
action="created",
to_location_id=peripheral.location_id,
to_device_id=peripheral.device_id,
user=user
)
return peripheral
@staticmethod
def get_peripheral(db: Session, peripheral_id: int) -> Optional[Peripheral]:
"""Get a peripheral by ID"""
return db.query(Peripheral).filter(Peripheral.id == peripheral_id).first()
@staticmethod
def update_peripheral(
db: Session,
peripheral_id: int,
peripheral_data: PeripheralUpdate,
user: Optional[str] = None
) -> Optional[Peripheral]:
"""Update a peripheral"""
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
return None
# Track location/device changes for history
old_location_id = peripheral.location_id
old_device_id = peripheral.device_id
# Update fields
update_data = peripheral_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(peripheral, key, value)
db.commit()
db.refresh(peripheral)
# Create history if location or device changed
new_location_id = peripheral.location_id
new_device_id = peripheral.device_id
if old_location_id != new_location_id or old_device_id != new_device_id:
action = "moved" if old_location_id != new_location_id else "assigned"
PeripheralService._create_history(
db=db,
peripheral_id=peripheral.id,
action=action,
from_location_id=old_location_id,
to_location_id=new_location_id,
from_device_id=old_device_id,
to_device_id=new_device_id,
user=user
)
return peripheral
@staticmethod
def delete_peripheral(db: Session, peripheral_id: int) -> bool:
"""Delete a peripheral and all related data"""
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
return False
# Delete related records
db.query(PeripheralPhoto).filter(PeripheralPhoto.peripheral_id == peripheral_id).delete()
db.query(PeripheralDocument).filter(PeripheralDocument.peripheral_id == peripheral_id).delete()
db.query(PeripheralLink).filter(PeripheralLink.peripheral_id == peripheral_id).delete()
db.query(PeripheralLoan).filter(PeripheralLoan.peripheral_id == peripheral_id).delete()
db.query(PeripheralLocationHistory).filter(PeripheralLocationHistory.peripheral_id == peripheral_id).delete()
# Delete peripheral
db.delete(peripheral)
db.commit()
return True
@staticmethod
def list_peripherals(
db: Session,
page: int = 1,
page_size: int = 50,
type_filter: Optional[str] = None,
search: Optional[str] = None,
location_id: Optional[int] = None,
device_id: Optional[int] = None,
en_pret: Optional[bool] = None,
is_complete_device: Optional[bool] = None,
sort_by: str = "date_creation",
sort_order: str = "desc"
) -> PeripheralListResponse:
"""List peripherals with pagination and filters"""
# Base query
query = db.query(Peripheral)
# Apply filters
if type_filter:
query = query.filter(Peripheral.type_principal == type_filter)
if search:
search_pattern = f"%{search}%"
query = query.filter(
or_(
Peripheral.nom.ilike(search_pattern),
Peripheral.marque.ilike(search_pattern),
Peripheral.modele.ilike(search_pattern),
Peripheral.numero_serie.ilike(search_pattern)
)
)
if location_id is not None:
query = query.filter(Peripheral.location_id == location_id)
if device_id is not None:
query = query.filter(Peripheral.device_id == device_id)
if en_pret is not None:
query = query.filter(Peripheral.en_pret == en_pret)
if is_complete_device is not None:
query = query.filter(Peripheral.is_complete_device == is_complete_device)
# Count total
total = query.count()
# Apply sorting
sort_column = getattr(Peripheral, sort_by, Peripheral.date_creation)
if sort_order == "desc":
query = query.order_by(desc(sort_column))
else:
query = query.order_by(sort_column)
# Apply pagination
offset = (page - 1) * page_size
peripherals = query.offset(offset).limit(page_size).all()
# Import PeripheralPhoto here to avoid circular import
from app.models.peripheral import PeripheralPhoto
# Convert to summary
items = []
for p in peripherals:
# Get primary photo thumbnail
thumbnail_url = None
primary_photo = db.query(PeripheralPhoto).filter(
PeripheralPhoto.peripheral_id == p.id,
PeripheralPhoto.is_primary == True
).first()
if primary_photo and primary_photo.thumbnail_path:
# Convert file path to URL
thumbnail_url = primary_photo.thumbnail_path.replace('/app/uploads/', '/uploads/')
items.append(PeripheralSummary(
id=p.id,
nom=p.nom,
type_principal=p.type_principal,
sous_type=p.sous_type,
marque=p.marque,
modele=p.modele,
etat=p.etat or "Inconnu",
rating=p.rating or 0.0,
prix=p.prix,
en_pret=p.en_pret or False,
is_complete_device=p.is_complete_device or False,
quantite_disponible=p.quantite_disponible or 0,
thumbnail_url=thumbnail_url
))
total_pages = (total + page_size - 1) // page_size
return PeripheralListResponse(
items=items,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages
)
@staticmethod
def get_peripherals_by_device(
db: Session,
device_id: int
) -> List[Peripheral]:
"""Get all peripherals assigned to a device (cross-database logical FK)"""
return db.query(Peripheral).filter(Peripheral.device_id == device_id).all()
@staticmethod
def get_peripherals_by_linked_device(
db: Session,
linked_device_id: int
) -> List[Peripheral]:
"""Get all peripherals that are part of a complete device"""
return db.query(Peripheral).filter(Peripheral.linked_device_id == linked_device_id).all()
@staticmethod
def assign_to_device(
db: Session,
peripheral_id: int,
device_id: int,
user: Optional[str] = None
) -> Optional[Peripheral]:
"""Assign a peripheral to a device"""
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
return None
old_device_id = peripheral.device_id
peripheral.device_id = device_id
db.commit()
db.refresh(peripheral)
# Create history
PeripheralService._create_history(
db=db,
peripheral_id=peripheral.id,
action="assigned",
from_device_id=old_device_id,
to_device_id=device_id,
user=user
)
return peripheral
@staticmethod
def unassign_from_device(
db: Session,
peripheral_id: int,
user: Optional[str] = None
) -> Optional[Peripheral]:
"""Unassign a peripheral from a device"""
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
return None
old_device_id = peripheral.device_id
peripheral.device_id = None
db.commit()
db.refresh(peripheral)
# Create history
PeripheralService._create_history(
db=db,
peripheral_id=peripheral.id,
action="unassigned",
from_device_id=old_device_id,
to_device_id=None,
user=user
)
return peripheral
@staticmethod
def create_loan(
db: Session,
loan_data: LoanCreate,
user: Optional[str] = None
) -> Optional[PeripheralLoan]:
"""Create a loan for a peripheral"""
peripheral = PeripheralService.get_peripheral(db, loan_data.peripheral_id)
if not peripheral or peripheral.en_pret:
return None
# Create loan
loan = PeripheralLoan(
**loan_data.model_dump(),
statut="en_cours",
created_by=user
)
db.add(loan)
# Update peripheral
peripheral.en_pret = True
peripheral.pret_actuel_id = None # Will be set after commit
peripheral.prete_a = loan_data.emprunte_par
db.commit()
db.refresh(loan)
# Update peripheral with loan ID
peripheral.pret_actuel_id = loan.id
db.commit()
db.refresh(peripheral)
return loan
@staticmethod
def return_loan(
db: Session,
loan_id: int,
return_data: LoanReturn
) -> Optional[PeripheralLoan]:
"""Return a loan"""
loan = db.query(PeripheralLoan).filter(PeripheralLoan.id == loan_id).first()
if not loan or loan.statut != "en_cours":
return None
# Update loan
loan.date_retour_effectif = return_data.date_retour_effectif
loan.etat_retour = return_data.etat_retour
loan.problemes_retour = return_data.problemes_retour
loan.caution_rendue = return_data.caution_rendue
loan.statut = "retourne"
if return_data.notes:
loan.notes = (loan.notes or "") + "\n" + return_data.notes
# Update peripheral
peripheral = PeripheralService.get_peripheral(db, loan.peripheral_id)
if peripheral:
peripheral.en_pret = False
peripheral.pret_actuel_id = None
peripheral.prete_a = None
db.commit()
db.refresh(loan)
return loan
@staticmethod
def get_overdue_loans(db: Session) -> List[PeripheralLoan]:
"""Get all overdue loans"""
today = date.today()
return db.query(PeripheralLoan).filter(
and_(
PeripheralLoan.statut == "en_cours",
PeripheralLoan.date_retour_prevue < today
)
).all()
@staticmethod
def get_upcoming_returns(db: Session, days: int = 7) -> List[PeripheralLoan]:
"""Get loans due within specified days"""
today = date.today()
future = today + timedelta(days=days)
return db.query(PeripheralLoan).filter(
and_(
PeripheralLoan.statut == "en_cours",
PeripheralLoan.date_retour_prevue.between(today, future)
)
).all()
@staticmethod
def get_statistics(db: Session) -> Dict[str, Any]:
"""Get peripheral statistics"""
total = db.query(Peripheral).count()
en_pret = db.query(Peripheral).filter(Peripheral.en_pret == True).count()
complete_devices = db.query(Peripheral).filter(Peripheral.is_complete_device == True).count()
# By type
by_type = db.query(
Peripheral.type_principal,
func.count(Peripheral.id).label('count')
).group_by(Peripheral.type_principal).all()
# By state
by_etat = db.query(
Peripheral.etat,
func.count(Peripheral.id).label('count')
).group_by(Peripheral.etat).all()
# Low stock
low_stock = db.query(Peripheral).filter(
Peripheral.quantite_disponible <= Peripheral.seuil_alerte
).count()
return {
"total_peripherals": total,
"en_pret": en_pret,
"disponible": total - en_pret,
"complete_devices": complete_devices,
"low_stock_count": low_stock,
"by_type": [{"type": t, "count": c} for t, c in by_type],
"by_etat": [{"etat": e or "Inconnu", "count": c} for e, c in by_etat]
}
@staticmethod
def _create_history(
db: Session,
peripheral_id: int,
action: str,
from_location_id: Optional[int] = None,
to_location_id: Optional[int] = None,
from_device_id: Optional[int] = None,
to_device_id: Optional[int] = None,
user: Optional[str] = None,
notes: Optional[str] = None
) -> PeripheralLocationHistory:
"""Create a history entry"""
history = PeripheralLocationHistory(
peripheral_id=peripheral_id,
action=action,
from_location_id=from_location_id,
to_location_id=to_location_id,
from_device_id=from_device_id,
to_device_id=to_device_id,
user=user,
notes=notes
)
db.add(history)
db.commit()
return history
class LocationService:
"""Service for location operations"""
@staticmethod
def get_location_tree(db: Session) -> List[Dict[str, Any]]:
"""Get hierarchical location tree"""
def build_tree(parent_id: Optional[int] = None) -> List[Dict[str, Any]]:
locations = db.query(Location).filter(
Location.parent_id == parent_id
).order_by(Location.ordre_affichage, Location.nom).all()
return [
{
"id": loc.id,
"nom": loc.nom,
"type": loc.type,
"description": loc.description,
"image_path": loc.image_path,
"qr_code_path": loc.qr_code_path,
"children": build_tree(loc.id)
}
for loc in locations
]
return build_tree(None)
@staticmethod
def get_location_path(db: Session, location_id: int) -> List[Location]:
"""Get full path from root to location"""
path = []
current_id = location_id
while current_id:
location = db.query(Location).filter(Location.id == current_id).first()
if not location:
break
path.insert(0, location)
current_id = location.parent_id
return path
@staticmethod
def count_peripherals_in_location(
db: Session,
location_id: int,
recursive: bool = False
) -> int:
"""Count peripherals in a location (optionally recursive)"""
if not recursive:
return db.query(Peripheral).filter(Peripheral.location_id == location_id).count()
# Get all child locations
def get_children(parent_id: int) -> List[int]:
children = db.query(Location.id).filter(Location.parent_id == parent_id).all()
child_ids = [c[0] for c in children]
for child_id in child_ids[:]:
child_ids.extend(get_children(child_id))
return child_ids
location_ids = [location_id] + get_children(location_id)
return db.query(Peripheral).filter(Peripheral.location_id.in_(location_ids)).count()