Files
home_stock/backend/app/repositories/item.py
2026-01-28 19:22:30 +01:00

248 lines
7.2 KiB
Python

"""Repository pour les objets d'inventaire."""
from decimal import Decimal
from typing import Any
from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.item import Item, ItemStatus
from app.repositories.base import BaseRepository
class ItemRepository(BaseRepository[Item]):
"""Repository pour les opérations sur les objets."""
def __init__(self, db: AsyncSession) -> None:
"""Initialise le repository."""
super().__init__(Item, db)
async def get_with_relations(self, id: int) -> Item | None:
"""Récupère un objet avec ses relations (catégorie, emplacement, documents).
Args:
id: ID de l'objet
Returns:
L'objet avec ses relations ou None
"""
result = await self.db.execute(
select(Item)
.options(
selectinload(Item.category),
selectinload(Item.location),
selectinload(Item.documents),
)
.where(Item.id == id)
)
return result.scalar_one_or_none()
async def get_all_with_relations(
self, skip: int = 0, limit: int = 100
) -> list[Item]:
"""Récupère tous les objets avec leurs relations.
Args:
skip: Offset
limit: Limite
Returns:
Liste des objets avec relations
"""
result = await self.db.execute(
select(Item)
.options(
selectinload(Item.category),
selectinload(Item.location),
)
.offset(skip)
.limit(limit)
.order_by(Item.name)
)
return list(result.scalars().all())
async def search(
self,
query: str,
category_id: int | None = None,
location_id: int | None = None,
status: ItemStatus | None = None,
min_price: Decimal | None = None,
max_price: Decimal | None = None,
skip: int = 0,
limit: int = 100,
) -> list[Item]:
"""Recherche des objets avec filtres.
Args:
query: Texte de recherche (nom, description, marque, modèle)
category_id: Filtre par catégorie
location_id: Filtre par emplacement
status: Filtre par statut
min_price: Prix minimum
max_price: Prix maximum
skip: Offset
limit: Limite
Returns:
Liste des objets correspondants
"""
stmt = select(Item).options(
selectinload(Item.category),
selectinload(Item.location),
)
# Recherche textuelle
if query:
search_term = f"%{query}%"
stmt = stmt.where(
or_(
Item.name.ilike(search_term),
Item.description.ilike(search_term),
Item.brand.ilike(search_term),
Item.model.ilike(search_term),
Item.notes.ilike(search_term),
)
)
# Filtres
if category_id is not None:
stmt = stmt.where(Item.category_id == category_id)
if location_id is not None:
stmt = stmt.where(Item.location_id == location_id)
if status is not None:
stmt = stmt.where(Item.status == status)
if min_price is not None:
stmt = stmt.where(Item.price >= min_price)
if max_price is not None:
stmt = stmt.where(Item.price <= max_price)
stmt = stmt.offset(skip).limit(limit).order_by(Item.name)
result = await self.db.execute(stmt)
return list(result.scalars().all())
async def count_filtered(
self,
query: str | None = None,
category_id: int | None = None,
location_id: int | None = None,
status: ItemStatus | None = None,
min_price: Decimal | None = None,
max_price: Decimal | None = None,
) -> int:
"""Compte les objets avec filtres.
Returns:
Nombre d'objets correspondants
"""
from sqlalchemy import func
stmt = select(func.count(Item.id))
if query:
search_term = f"%{query}%"
stmt = stmt.where(
or_(
Item.name.ilike(search_term),
Item.description.ilike(search_term),
Item.brand.ilike(search_term),
Item.model.ilike(search_term),
Item.notes.ilike(search_term),
)
)
if category_id is not None:
stmt = stmt.where(Item.category_id == category_id)
if location_id is not None:
stmt = stmt.where(Item.location_id == location_id)
if status is not None:
stmt = stmt.where(Item.status == status)
if min_price is not None:
stmt = stmt.where(Item.price >= min_price)
if max_price is not None:
stmt = stmt.where(Item.price <= max_price)
result = await self.db.execute(stmt)
return result.scalar_one()
async def get_by_category(
self, category_id: int, skip: int = 0, limit: int = 100
) -> list[Item]:
"""Récupère les objets d'une catégorie.
Args:
category_id: ID de la catégorie
skip: Offset
limit: Limite
Returns:
Liste des objets
"""
result = await self.db.execute(
select(Item)
.where(Item.category_id == category_id)
.offset(skip)
.limit(limit)
.order_by(Item.name)
)
return list(result.scalars().all())
async def get_by_location(
self, location_id: int, skip: int = 0, limit: int = 100
) -> list[Item]:
"""Récupère les objets d'un emplacement.
Args:
location_id: ID de l'emplacement
skip: Offset
limit: Limite
Returns:
Liste des objets
"""
result = await self.db.execute(
select(Item)
.where(Item.location_id == location_id)
.offset(skip)
.limit(limit)
.order_by(Item.name)
)
return list(result.scalars().all())
async def get_by_status(
self, status: ItemStatus, skip: int = 0, limit: int = 100
) -> list[Item]:
"""Récupère les objets par statut.
Args:
status: Statut recherché
skip: Offset
limit: Limite
Returns:
Liste des objets
"""
result = await self.db.execute(
select(Item)
.where(Item.status == status)
.offset(skip)
.limit(limit)
.order_by(Item.name)
)
return list(result.scalars().all())
async def get_by_serial_number(self, serial_number: str) -> Item | None:
"""Récupère un objet par son numéro de série.
Args:
serial_number: Numéro de série
Returns:
L'objet trouvé ou None
"""
result = await self.db.execute(
select(Item).where(Item.serial_number == serial_number)
)
return result.scalar_one_or_none()