"""Repository pour les catégories.""" from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models.category import Category from app.repositories.base import BaseRepository class CategoryRepository(BaseRepository[Category]): """Repository pour les opérations sur les catégories.""" def __init__(self, db: AsyncSession) -> None: """Initialise le repository.""" super().__init__(Category, db) async def get_by_name(self, name: str) -> Category | None: """Récupère une catégorie par son nom. Args: name: Nom de la catégorie Returns: La catégorie trouvée ou None """ result = await self.db.execute( select(Category).where(Category.name == name) ) return result.scalar_one_or_none() async def get_with_item_count(self, id: int) -> tuple[Category, int] | None: """Récupère une catégorie avec le nombre d'objets. Args: id: ID de la catégorie Returns: Tuple (catégorie, nombre d'objets) ou None """ result = await self.db.execute( select(Category).options(selectinload(Category.items)).where(Category.id == id) ) category = result.scalar_one_or_none() if category is None: return None return category, len(category.items) async def get_all_with_item_count( self, skip: int = 0, limit: int = 100 ) -> list[tuple[Category, int]]: """Récupère toutes les catégories avec le nombre d'objets. Args: skip: Offset limit: Limite Returns: Liste de tuples (catégorie, nombre d'objets) """ result = await self.db.execute( select(Category) .options(selectinload(Category.items)) .offset(skip) .limit(limit) .order_by(Category.name) ) categories = result.scalars().all() return [(cat, len(cat.items)) for cat in categories] async def name_exists(self, name: str, exclude_id: int | None = None) -> bool: """Vérifie si un nom de catégorie existe déjà. Args: name: Nom à vérifier exclude_id: ID à exclure (pour les mises à jour) Returns: True si le nom existe déjà """ query = select(func.count(Category.id)).where(Category.name == name) if exclude_id is not None: query = query.where(Category.id != exclude_id) result = await self.db.execute(query) return result.scalar_one() > 0