from __future__ import annotations from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from backend.app.db import models, schemas """ CRUD minimal pour manipuler les produits dans la base SQLite. """ def get_product(db: Session, product_id: int) -> models.Product | None: return db.query(models.Product).filter(models.Product.id == product_id).first() def list_products(db: Session, skip: int = 0, limit: int = 100) -> list[models.Product]: return db.query(models.Product).offset(skip).limit(limit).all() def create_product(db: Session, data: schemas.ProductCreate) -> models.Product: # Convertir les HttpUrl en strings pour SQLite data_dict = data.model_dump() if data_dict.get("url"): data_dict["url"] = str(data_dict["url"]) if data_dict.get("url_image"): data_dict["url_image"] = str(data_dict["url_image"]) product = models.Product(**data_dict) db.add(product) try: db.commit() except IntegrityError: db.rollback() raise db.refresh(product) return product def update_product(db: Session, product: models.Product, changes: schemas.ProductUpdate) -> models.Product: for field, value in changes.dict(exclude_unset=True).items(): setattr(product, field, value) db.add(product) db.commit() db.refresh(product) return product def remove_product(db: Session, product: models.Product) -> None: db.delete(product) db.commit() def list_snapshots(db: Session, product_id: int, limit: int = 30) -> list[models.ProductSnapshot]: return ( db.query(models.ProductSnapshot) .filter(models.ProductSnapshot.produit_id == product_id) .order_by(models.ProductSnapshot.scrape_le.desc()) .limit(limit) .all() ) def get_latest_snapshot(db: Session, product_id: int) -> models.ProductSnapshot | None: return ( db.query(models.ProductSnapshot) .filter(models.ProductSnapshot.produit_id == product_id) .order_by(models.ProductSnapshot.scrape_le.desc()) .first() ) def get_product_with_snapshot(db: Session, product_id: int) -> dict | None: """Retourne un produit enrichi avec les données du dernier snapshot.""" product = get_product(db, product_id) if not product: return None return _enrich_product_with_snapshot(db, product) def list_products_with_snapshots(db: Session, skip: int = 0, limit: int = 100) -> list[dict]: """Retourne la liste des produits enrichis avec leurs derniers snapshots.""" products = list_products(db, skip=skip, limit=limit) return [_enrich_product_with_snapshot(db, p) for p in products] def _enrich_product_with_snapshot(db: Session, product: models.Product) -> dict: """Ajoute les données du dernier snapshot au produit.""" snapshot = get_latest_snapshot(db, product.id) result = { "id": product.id, "boutique": product.boutique, "url": str(product.url), "asin": product.asin, "titre": product.titre, "url_image": str(product.url_image) if product.url_image else None, "categorie": product.categorie, "type": product.type, "actif": product.actif, "cree_le": product.cree_le, "modifie_le": product.modifie_le, } if snapshot: # Calcul de la réduction en pourcentage reduction = None if snapshot.prix_actuel and snapshot.prix_conseille: reduction = round((1 - snapshot.prix_actuel / snapshot.prix_conseille) * 100) result.update( { "prix_actuel": snapshot.prix_actuel, "prix_conseille": snapshot.prix_conseille, "prix_min_30j": snapshot.prix_min_30j, "reduction_pourcent": reduction, "etat_stock": snapshot.etat_stock, "en_stock": snapshot.en_stock, "note": snapshot.note, "nombre_avis": snapshot.nombre_avis, "prime": snapshot.prime, "choix_amazon": snapshot.choix_amazon, "offre_limitee": snapshot.offre_limitee, "exclusivite_amazon": snapshot.exclusivite_amazon, "dernier_scrape": snapshot.scrape_le, "statut_scrap": snapshot.statut_scrap, } ) return result