from __future__ import annotations from datetime import datetime from pathlib import Path from typing import Iterable import json import random import time from loguru import logger from playwright.sync_api import sync_playwright from sqlalchemy.orm import Session from backend.app.core.config import load_config from backend.app.db import database, models from backend.app.scraper.amazon.parser import extract_product_data # Répertoires de stockage SAMPLES_DIR = Path(__file__).resolve().parent.parent / "samples" DEBUG_DIR = SAMPLES_DIR / "debug" STORAGE_STATE_PATH = SAMPLES_DIR / "storage_state.json" RAW_DATA_DIR = Path(__file__).resolve().parent.parent.parent / "data" / "raw" def _create_run(session: Session) -> models.ScrapeRun: run = models.ScrapeRun(demarre_le=datetime.utcnow(), statut="en_cours") session.add(run) session.commit() session.refresh(run) return run def _finalize_run(run: models.ScrapeRun, session: Session, status: str) -> None: run.statut = status run.termine_le = datetime.utcnow() session.add(run) session.commit() def _save_raw_json(payload: dict, product_id: int) -> Path: timestamp = datetime.utcnow().strftime("%Y-%m-%d") folder = RAW_DATA_DIR / timestamp folder.mkdir(parents=True, exist_ok=True) filename = f"{product_id}_{datetime.utcnow().strftime('%H%M%S')}.json" path = folder / filename path.write_text(json.dumps(payload, ensure_ascii=False, indent=2)) return path def _save_debug_artifacts(page, product_id: int, suffix: str = "capture") -> dict: """Sauvegarde screenshot et HTML dans le répertoire debug.""" DEBUG_DIR.mkdir(parents=True, exist_ok=True) stamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") debug_files = {} try: screenshot_path = DEBUG_DIR / f"{product_id}_{stamp}_{suffix}.png" html_path = DEBUG_DIR / f"{product_id}_{stamp}_{suffix}.html" page.screenshot(path=str(screenshot_path), full_page=True) html_path.write_text(page.content(), encoding="utf-8") debug_files = { "screenshot": str(screenshot_path), "html": str(html_path), } logger.info("Artifacts debug sauvegardés: screenshot={}, html={}", screenshot_path.name, html_path.name) except Exception as e: logger.warning("Impossible de générer les artifacts de debug: {}", e) return debug_files def _update_product_from_scrape( session: Session, product: models.Product, data: dict, ) -> None: """Met à jour le produit avec les données scrappées (titre, image).""" if data.get("titre") and not product.titre: product.titre = data["titre"] if data.get("url_image_principale") and not product.url_image: product.url_image = data["url_image_principale"] session.add(product) session.commit() def _create_snapshot( session: Session, product: models.Product, run: models.ScrapeRun, data: dict, status: str, raw_json_path: Path | None, error_message: str | None = None, ) -> None: # Mettre à jour le produit avec titre/image si manquants _update_product_from_scrape(session, product, data) # Sérialiser les données étendues en JSON a_propos = data.get("a_propos") if a_propos is not None: a_propos = json.dumps(a_propos, ensure_ascii=False) carateristique = data.get("carateristique") if carateristique is not None: carateristique = json.dumps(carateristique, ensure_ascii=False) details = data.get("details") if details is not None: details = json.dumps(details, ensure_ascii=False) snapshot = models.ProductSnapshot( produit_id=product.id, run_scrap_id=run.id, prix_actuel=data.get("prix_actuel"), prix_conseille=data.get("prix_conseille"), prix_min_30j=data.get("prix_min_30j"), etat_stock=data.get("etat_stock"), en_stock=data.get("en_stock"), note=data.get("note"), nombre_avis=data.get("nombre_avis"), prime=data.get("prime"), choix_amazon=data.get("choix_amazon"), offre_limitee=data.get("offre_limitee"), exclusivite_amazon=data.get("exclusivite_amazon"), a_propos=a_propos, description=data.get("description"), carateristique=carateristique, details=details, chemin_json_brut=str(raw_json_path) if raw_json_path else None, statut_scrap=status, message_erreur=error_message, ) session.add(snapshot) session.commit() def _create_browser_context(playwright, config): """Crée un contexte navigateur avec storage_state si disponible.""" browser = playwright.chromium.launch(headless=config.scrape.headless) context_kwargs = { "locale": config.scrape.locale, "timezone_id": config.scrape.timezone, "user_agent": config.scrape.user_agent, "viewport": config.scrape.viewport, } # Charger la session persistée si disponible if STORAGE_STATE_PATH.exists(): context_kwargs["storage_state"] = str(STORAGE_STATE_PATH) logger.info("Session persistée chargée: {}", STORAGE_STATE_PATH) context = browser.new_context(**context_kwargs) return browser, context def _save_storage_state(context) -> None: """Sauvegarde l'état de session pour réutilisation.""" try: context.storage_state(path=str(STORAGE_STATE_PATH)) logger.info("Session persistée sauvegardée: {}", STORAGE_STATE_PATH) except Exception as e: logger.warning("Impossible de sauvegarder la session: {}", e) def _process_product( page, session: Session, product: models.Product, run: models.ScrapeRun, config, ) -> tuple[bool, dict]: """Scrape un produit et retourne (success, data).""" logger.info("Scraping produit {} ({})", product.id, product.url) page.goto(product.url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms) # Toujours sauvegarder les artifacts de debug debug_files = _save_debug_artifacts(page, product.id, "capture") # Extraire les données data = extract_product_data(page, product.url) # Vérifier si bloqué (pas de titre = probable blocage) if not data.get("titre"): logger.warning("Titre absent pour produit {}, probable blocage Amazon", product.id) data["bloque"] = True data["debug_files"] = debug_files raw_path = _save_raw_json(data, product.id) _create_snapshot( session, product, run, data, status="bloque", raw_json_path=raw_path, error_message=f"Blocage détecté - debug: {debug_files.get('screenshot', 'N/A')}", ) return False, data # Succès ou partiel data["debug_files"] = debug_files raw_path = _save_raw_json(data, product.id) required = ["titre", "prix_actuel"] missing = [field for field in required if not data.get(field)] status = "champs_manquants" if missing else "ok" _create_snapshot( session, product, run, data, status=status, raw_json_path=raw_path, error_message=", ".join(missing) if missing else None, ) if missing: logger.warning("Champs manquants pour {}: {}", product.id, missing) return False, data logger.info("Scraping OK pour {} (titre={})", product.id, data.get("titre", "")[:50]) return True, data def scrape_product(product_id: int) -> None: logger.info("Déclenchement du scraping pour le produit {}", product_id) session = database.SessionLocal() run = _create_run(session) try: product = session.get(models.Product, product_id) if not product: logger.warning("Produit {} introuvable", product_id) _finalize_run(run, session, "echec") return config = load_config() run.nb_total = 1 session.commit() with sync_playwright() as playwright: browser, context = _create_browser_context(playwright, config) page = context.new_page() page.set_default_timeout(config.scrape.timeout_ms) try: success, _ = _process_product(page, session, product, run, config) run.nb_ok = 1 if success else 0 run.nb_echec = 0 if success else 1 _finalize_run(run, session, "succes" if success else "partiel") # Sauvegarder la session pour réutilisation _save_storage_state(context) # Délai anti-blocage delay_min, delay_max = config.scrape.delay_range_ms time.sleep(random.uniform(delay_min, delay_max) / 1000.0) finally: context.close() browser.close() except Exception as e: logger.exception("Erreur pendant le scraping de {}: {}", product_id, e) _finalize_run(run, session, "erreur") finally: session.close() def scrape_preview(url: str) -> dict: """ Scrape une URL Amazon sans enregistrer en base. Retourne les données extraites pour prévisualisation. """ logger.info("Prévisualisation scrape pour URL: {}", url) config = load_config() result = { "url": url, "success": False, "data": {}, "error": None, } try: with sync_playwright() as playwright: browser, context = _create_browser_context(playwright, config) page = context.new_page() page.set_default_timeout(config.scrape.timeout_ms) try: page.goto(url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms) # Extraire les données data = extract_product_data(page, url) # Vérifier si bloqué if not data.get("titre"): result["error"] = "Blocage Amazon détecté ou produit introuvable" result["data"] = data else: result["success"] = True result["data"] = data # Sauvegarder la session _save_storage_state(context) finally: context.close() browser.close() except Exception as e: logger.exception("Erreur prévisualisation scrape: {}", e) result["error"] = str(e) return result def scrape_all(product_ids: Iterable[int] | None = None) -> None: logger.info("Déclenchement du scraping global") session = database.SessionLocal() run = _create_run(session) try: config = load_config() products = session.query(models.Product).filter(models.Product.actif == True).all() if product_ids: products = [product for product in products if product.id in product_ids] run.nb_total = len(products) session.commit() if not products: logger.info("Aucun produit actif à scraper") _finalize_run(run, session, "succes") return with sync_playwright() as playwright: browser, context = _create_browser_context(playwright, config) page = context.new_page() page.set_default_timeout(config.scrape.timeout_ms) nb_ok = 0 nb_echec = 0 try: for product in products: try: success, _ = _process_product(page, session, product, run, config) if success: nb_ok += 1 else: nb_echec += 1 except Exception as e: logger.error("Erreur scraping produit {}: {}", product.id, e) nb_echec += 1 # Délai anti-blocage entre les produits delay_min, delay_max = config.scrape.delay_range_ms time.sleep(random.uniform(delay_min, delay_max) / 1000.0) run.nb_ok = nb_ok run.nb_echec = nb_echec _finalize_run(run, session, "succes" if nb_echec == 0 else "partiel") # Sauvegarder la session pour réutilisation _save_storage_state(context) finally: context.close() browser.close() except Exception as e: logger.exception("Erreur du scraping global: {}", e) _finalize_run(run, session, "erreur") finally: session.close()