from __future__ import annotations from datetime import datetime from pathlib import Path from typing import Iterable import json import random import time from loguru import logger from playwright.sync_api import sync_playwright from sqlalchemy.orm import Session from backend.app.core.config import load_config from backend.app.db import database, models from backend.app.scraper.amazon.parser import detect_blocked, extract_product_data def _create_run(session: Session) -> models.ScrapeRun: run = models.ScrapeRun(demarre_le=datetime.utcnow(), statut="en_cours") session.add(run) session.commit() session.refresh(run) return run def _finalize_run(run: models.ScrapeRun, session: Session, status: str) -> None: run.statut = status run.termine_le = datetime.utcnow() session.add(run) session.commit() def _save_raw_json(payload: dict, product_id: int) -> Path: base_dir = Path(__file__).resolve().parent.parent.parent / "data" / "raw" timestamp = datetime.utcnow().strftime("%Y-%m-%d") folder = base_dir / timestamp folder.mkdir(parents=True, exist_ok=True) filename = f"{product_id}_{datetime.utcnow().strftime('%H%M%S')}.json" path = folder / filename path.write_text(json.dumps(payload, ensure_ascii=False, indent=2)) return path def _save_debug_artifacts(page, product_id: int) -> tuple[Path, Path]: base_dir = Path(__file__).resolve().parent.parent.parent / "data" / "screenshots" base_dir.mkdir(parents=True, exist_ok=True) stamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") screenshot_path = base_dir / f"{product_id}_{stamp}.png" html_path = base_dir / f"{product_id}_{stamp}.html" page.screenshot(path=str(screenshot_path), full_page=True) html_path.write_text(page.content()) return screenshot_path, html_path def _update_product_from_scrape( session: Session, product: models.Product, data: dict, ) -> None: """Met à jour le produit avec les données scrappées (titre, image).""" if data.get("titre") and not product.titre: product.titre = data["titre"] if data.get("url_image_principale") and not product.url_image: product.url_image = data["url_image_principale"] session.add(product) session.commit() def _create_snapshot( session: Session, product: models.Product, run: models.ScrapeRun, data: dict, status: str, raw_json_path: Path | None, error_message: str | None = None, ) -> None: # Mettre à jour le produit avec titre/image si manquants _update_product_from_scrape(session, product, data) snapshot = models.ProductSnapshot( produit_id=product.id, run_scrap_id=run.id, prix_actuel=data.get("prix_actuel"), prix_conseille=data.get("prix_conseille"), prix_min_30j=data.get("prix_min_30j"), etat_stock=data.get("etat_stock"), en_stock=data.get("en_stock"), note=data.get("note"), nombre_avis=data.get("nombre_avis"), prime=data.get("prime"), choix_amazon=data.get("choix_amazon"), offre_limitee=data.get("offre_limitee"), exclusivite_amazon=data.get("exclusivite_amazon"), chemin_json_brut=str(raw_json_path) if raw_json_path else None, statut_scrap=status, message_erreur=error_message, ) session.add(snapshot) session.commit() def scrape_product(product_id: int) -> None: logger.info("Déclenchement du scraping pour le produit %s", product_id) session = database.SessionLocal() run = _create_run(session) try: product = session.get(models.Product, product_id) if not product: logger.warning("Produit %s introuvable", product_id) _finalize_run(run, session, "echec") return config = load_config() run.nb_total = 1 session.commit() with sync_playwright() as playwright: browser = playwright.chromium.launch(headless=config.scrape.headless) context = browser.new_context( locale=config.scrape.locale, timezone_id=config.scrape.timezone, user_agent=config.scrape.user_agent, viewport=config.scrape.viewport, ) page = context.new_page() page.set_default_timeout(config.scrape.timeout_ms) try: page.goto(product.url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms) html = page.content() if detect_blocked(html): screenshot_path, html_path = _save_debug_artifacts(page, product.id) data = {"url": product.url, "asin": product.asin, "bloque": True} raw_path = _save_raw_json(data, product.id) _create_snapshot( session, product, run, data, status="bloque", raw_json_path=raw_path, error_message=f"Bloque: {screenshot_path.name} / {html_path.name}", ) run.nb_echec = 1 _finalize_run(run, session, "partiel") return data = extract_product_data(page, product.url) raw_path = _save_raw_json(data, product.id) required = ["titre", "prix_actuel", "note"] missing = [field for field in required if not data.get(field)] status = "champs_manquants" if missing else "ok" _create_snapshot( session, product, run, data, status=status, raw_json_path=raw_path, error_message=", ".join(missing) if missing else None, ) run.nb_ok = 1 if not missing else 0 run.nb_echec = 0 if not missing else 1 _finalize_run(run, session, "succes" if not missing else "partiel") delay_min, delay_max = config.scrape.delay_range_ms time.sleep(random.uniform(delay_min, delay_max) / 1000.0) finally: # fermeture propre du navigateur context.close() browser.close() except Exception: # pragma: no cover logger.exception("Erreur pendant le scraping de %s", product_id) _finalize_run(run, session, "erreur") finally: session.close() def scrape_all(product_ids: Iterable[int] | None = None) -> None: logger.info("Déclenchement du scraping global") session = database.SessionLocal() run = _create_run(session) try: config = load_config() products = session.query(models.Product).all() if product_ids: products = [product for product in products if product.id in product_ids] run.nb_total = len(products) session.commit() with sync_playwright() as playwright: browser = playwright.chromium.launch(headless=config.scrape.headless) context = browser.new_context( locale=config.scrape.locale, timezone_id=config.scrape.timezone, user_agent=config.scrape.user_agent, viewport=config.scrape.viewport, ) page = context.new_page() page.set_default_timeout(config.scrape.timeout_ms) nb_ok = 0 nb_echec = 0 try: for product in products: page.goto(product.url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms) html = page.content() if detect_blocked(html): screenshot_path, html_path = _save_debug_artifacts(page, product.id) data = {"url": product.url, "asin": product.asin, "bloque": True} raw_path = _save_raw_json(data, product.id) _create_snapshot( session, product, run, data, status="bloque", raw_json_path=raw_path, error_message=f"Bloque: {screenshot_path.name} / {html_path.name}", ) nb_echec += 1 continue data = extract_product_data(page, product.url) raw_path = _save_raw_json(data, product.id) required = ["titre", "prix_actuel", "note"] missing = [field for field in required if not data.get(field)] status = "champs_manquants" if missing else "ok" _create_snapshot( session, product, run, data, status=status, raw_json_path=raw_path, error_message=", ".join(missing) if missing else None, ) if missing: nb_echec += 1 else: nb_ok += 1 delay_min, delay_max = config.scrape.delay_range_ms time.sleep(random.uniform(delay_min, delay_max) / 1000.0) run.nb_ok = nb_ok run.nb_echec = nb_echec _finalize_run(run, session, "succes" if nb_echec == 0 else "partiel") finally: # fermeture propre du navigateur context.close() browser.close() except Exception: # pragma: no cover logger.exception("Erreur du scraping global") _finalize_run(run, session, "erreur") finally: session.close()