Files
home_stock/backend/app/repositories/item.py
2026-02-01 01:45:51 +01:00

255 lines
7.8 KiB
Python

"""Repository pour les objets d'inventaire."""
from decimal import Decimal
from typing import Any
from sqlalchemy import or_, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.item import Item, ItemStatus
from app.repositories.base import BaseRepository
class ItemRepository(BaseRepository[Item]):
"""Repository pour les opérations sur les objets."""
def __init__(self, db: AsyncSession) -> None:
"""Initialise le repository."""
super().__init__(Item, db)
async def get_with_relations(self, id: int) -> Item | None:
"""Récupère un objet avec ses relations (catégorie, emplacement, documents).
Args:
id: ID de l'objet
Returns:
L'objet avec ses relations ou None
"""
result = await self.db.execute(
select(Item)
.options(
selectinload(Item.category),
selectinload(Item.location),
selectinload(Item.documents),
selectinload(Item.parent_item),
)
.where(Item.id == id)
)
return result.scalar_one_or_none()
async def get_all_with_relations(
self, skip: int = 0, limit: int = 100
) -> list[Item]:
"""Récupère tous les objets avec leurs relations.
Args:
skip: Offset
limit: Limite
Returns:
Liste des objets avec relations
"""
result = await self.db.execute(
select(Item)
.options(
selectinload(Item.category),
selectinload(Item.location),
selectinload(Item.documents),
selectinload(Item.parent_item),
)
.offset(skip)
.limit(limit)
.order_by(Item.name)
)
return list(result.scalars().all())
async def search(
self,
query: str,
category_id: int | None = None,
location_id: int | None = None,
status: ItemStatus | None = None,
min_price: Decimal | None = None,
max_price: Decimal | None = None,
skip: int = 0,
limit: int = 100,
) -> list[Item]:
"""Recherche des objets avec filtres.
Args:
query: Texte de recherche (nom, description, marque, modèle)
category_id: Filtre par catégorie
location_id: Filtre par emplacement
status: Filtre par statut
min_price: Prix minimum
max_price: Prix maximum
skip: Offset
limit: Limite
Returns:
Liste des objets correspondants
"""
stmt = select(Item).options(
selectinload(Item.category),
selectinload(Item.location),
selectinload(Item.documents),
selectinload(Item.parent_item),
)
# Recherche full-text via FTS5
if query:
# Échapper les caractères spéciaux FTS5 et ajouter le préfixe *
safe_query = query.replace('"', '""').strip()
if safe_query:
# Recherche par préfixe pour résultats en temps réel
fts_terms = " ".join(f'"{word}"*' for word in safe_query.split() if word)
stmt = stmt.where(
Item.id.in_(
select(text("rowid")).select_from(text("fts_items")).where(
text("fts_items MATCH :fts_query")
)
)
).params(fts_query=fts_terms)
# Filtres
if category_id is not None:
stmt = stmt.where(Item.category_id == category_id)
if location_id is not None:
stmt = stmt.where(Item.location_id == location_id)
if status is not None:
stmt = stmt.where(Item.status == status)
if min_price is not None:
stmt = stmt.where(Item.price >= min_price)
if max_price is not None:
stmt = stmt.where(Item.price <= max_price)
stmt = stmt.offset(skip).limit(limit).order_by(Item.name)
result = await self.db.execute(stmt)
return list(result.scalars().all())
async def count_filtered(
self,
query: str | None = None,
category_id: int | None = None,
location_id: int | None = None,
status: ItemStatus | None = None,
min_price: Decimal | None = None,
max_price: Decimal | None = None,
) -> int:
"""Compte les objets avec filtres.
Returns:
Nombre d'objets correspondants
"""
from sqlalchemy import func
stmt = select(func.count(Item.id))
if query:
safe_query = query.replace('"', '""').strip()
if safe_query:
fts_terms = " ".join(f'"{word}"*' for word in safe_query.split() if word)
stmt = stmt.where(
Item.id.in_(
select(text("rowid")).select_from(text("fts_items")).where(
text("fts_items MATCH :fts_query")
)
)
).params(fts_query=fts_terms)
if category_id is not None:
stmt = stmt.where(Item.category_id == category_id)
if location_id is not None:
stmt = stmt.where(Item.location_id == location_id)
if status is not None:
stmt = stmt.where(Item.status == status)
if min_price is not None:
stmt = stmt.where(Item.price >= min_price)
if max_price is not None:
stmt = stmt.where(Item.price <= max_price)
result = await self.db.execute(stmt)
return result.scalar_one()
async def get_by_category(
self, category_id: int, skip: int = 0, limit: int = 100
) -> list[Item]:
"""Récupère les objets d'une catégorie.
Args:
category_id: ID de la catégorie
skip: Offset
limit: Limite
Returns:
Liste des objets
"""
result = await self.db.execute(
select(Item)
.where(Item.category_id == category_id)
.offset(skip)
.limit(limit)
.order_by(Item.name)
)
return list(result.scalars().all())
async def get_by_location(
self, location_id: int, skip: int = 0, limit: int = 100
) -> list[Item]:
"""Récupère les objets d'un emplacement.
Args:
location_id: ID de l'emplacement
skip: Offset
limit: Limite
Returns:
Liste des objets
"""
result = await self.db.execute(
select(Item)
.where(Item.location_id == location_id)
.offset(skip)
.limit(limit)
.order_by(Item.name)
)
return list(result.scalars().all())
async def get_by_status(
self, status: ItemStatus, skip: int = 0, limit: int = 100
) -> list[Item]:
"""Récupère les objets par statut.
Args:
status: Statut recherché
skip: Offset
limit: Limite
Returns:
Liste des objets
"""
result = await self.db.execute(
select(Item)
.where(Item.status == status)
.offset(skip)
.limit(limit)
.order_by(Item.name)
)
return list(result.scalars().all())
async def get_by_serial_number(self, serial_number: str) -> Item | None:
"""Récupère un objet par son numéro de série.
Args:
serial_number: Numéro de série
Returns:
L'objet trouvé ou None
"""
result = await self.db.execute(
select(Item).where(Item.serial_number == serial_number)
)
return result.scalar_one_or_none()