- Ajout des champs a_propos, description, carateristique, details au modèle ProductSnapshot - Sérialisation JSON pour les listes et dictionnaires - Modification du CRUD pour stocker/lire les données étendues - Modification du runner pour passer les données lors du scrape - AddProductModal envoie les données étendues lors de la création - La base SQLite doit être recréée (suppression de suivi.db) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
368 lines
12 KiB
Python
368 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
import json
|
|
import random
|
|
import time
|
|
|
|
from loguru import logger
|
|
from playwright.sync_api import sync_playwright
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.app.core.config import load_config
|
|
from backend.app.db import database, models
|
|
from backend.app.scraper.amazon.parser import extract_product_data
|
|
|
|
# Répertoires de stockage
|
|
SAMPLES_DIR = Path(__file__).resolve().parent.parent / "samples"
|
|
DEBUG_DIR = SAMPLES_DIR / "debug"
|
|
STORAGE_STATE_PATH = SAMPLES_DIR / "storage_state.json"
|
|
RAW_DATA_DIR = Path(__file__).resolve().parent.parent.parent / "data" / "raw"
|
|
|
|
|
|
def _create_run(session: Session) -> models.ScrapeRun:
|
|
run = models.ScrapeRun(demarre_le=datetime.utcnow(), statut="en_cours")
|
|
session.add(run)
|
|
session.commit()
|
|
session.refresh(run)
|
|
return run
|
|
|
|
|
|
def _finalize_run(run: models.ScrapeRun, session: Session, status: str) -> None:
|
|
run.statut = status
|
|
run.termine_le = datetime.utcnow()
|
|
session.add(run)
|
|
session.commit()
|
|
|
|
|
|
def _save_raw_json(payload: dict, product_id: int) -> Path:
|
|
timestamp = datetime.utcnow().strftime("%Y-%m-%d")
|
|
folder = RAW_DATA_DIR / timestamp
|
|
folder.mkdir(parents=True, exist_ok=True)
|
|
filename = f"{product_id}_{datetime.utcnow().strftime('%H%M%S')}.json"
|
|
path = folder / filename
|
|
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
|
|
return path
|
|
|
|
|
|
def _save_debug_artifacts(page, product_id: int, suffix: str = "capture") -> dict:
|
|
"""Sauvegarde screenshot et HTML dans le répertoire debug."""
|
|
DEBUG_DIR.mkdir(parents=True, exist_ok=True)
|
|
stamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
debug_files = {}
|
|
try:
|
|
screenshot_path = DEBUG_DIR / f"{product_id}_{stamp}_{suffix}.png"
|
|
html_path = DEBUG_DIR / f"{product_id}_{stamp}_{suffix}.html"
|
|
page.screenshot(path=str(screenshot_path), full_page=True)
|
|
html_path.write_text(page.content(), encoding="utf-8")
|
|
debug_files = {
|
|
"screenshot": str(screenshot_path),
|
|
"html": str(html_path),
|
|
}
|
|
logger.info("Artifacts debug sauvegardés: screenshot={}, html={}", screenshot_path.name, html_path.name)
|
|
except Exception as e:
|
|
logger.warning("Impossible de générer les artifacts de debug: {}", e)
|
|
return debug_files
|
|
|
|
|
|
def _update_product_from_scrape(
|
|
session: Session,
|
|
product: models.Product,
|
|
data: dict,
|
|
) -> None:
|
|
"""Met à jour le produit avec les données scrappées (titre, image)."""
|
|
if data.get("titre") and not product.titre:
|
|
product.titre = data["titre"]
|
|
if data.get("url_image_principale") and not product.url_image:
|
|
product.url_image = data["url_image_principale"]
|
|
session.add(product)
|
|
session.commit()
|
|
|
|
|
|
def _create_snapshot(
|
|
session: Session,
|
|
product: models.Product,
|
|
run: models.ScrapeRun,
|
|
data: dict,
|
|
status: str,
|
|
raw_json_path: Path | None,
|
|
error_message: str | None = None,
|
|
) -> None:
|
|
# Mettre à jour le produit avec titre/image si manquants
|
|
_update_product_from_scrape(session, product, data)
|
|
|
|
# Sérialiser les données étendues en JSON
|
|
a_propos = data.get("a_propos")
|
|
if a_propos is not None:
|
|
a_propos = json.dumps(a_propos, ensure_ascii=False)
|
|
|
|
carateristique = data.get("carateristique")
|
|
if carateristique is not None:
|
|
carateristique = json.dumps(carateristique, ensure_ascii=False)
|
|
|
|
details = data.get("details")
|
|
if details is not None:
|
|
details = json.dumps(details, ensure_ascii=False)
|
|
|
|
snapshot = models.ProductSnapshot(
|
|
produit_id=product.id,
|
|
run_scrap_id=run.id,
|
|
prix_actuel=data.get("prix_actuel"),
|
|
prix_conseille=data.get("prix_conseille"),
|
|
prix_min_30j=data.get("prix_min_30j"),
|
|
etat_stock=data.get("etat_stock"),
|
|
en_stock=data.get("en_stock"),
|
|
note=data.get("note"),
|
|
nombre_avis=data.get("nombre_avis"),
|
|
prime=data.get("prime"),
|
|
choix_amazon=data.get("choix_amazon"),
|
|
offre_limitee=data.get("offre_limitee"),
|
|
exclusivite_amazon=data.get("exclusivite_amazon"),
|
|
a_propos=a_propos,
|
|
description=data.get("description"),
|
|
carateristique=carateristique,
|
|
details=details,
|
|
chemin_json_brut=str(raw_json_path) if raw_json_path else None,
|
|
statut_scrap=status,
|
|
message_erreur=error_message,
|
|
)
|
|
session.add(snapshot)
|
|
session.commit()
|
|
|
|
|
|
def _create_browser_context(playwright, config):
|
|
"""Crée un contexte navigateur avec storage_state si disponible."""
|
|
browser = playwright.chromium.launch(headless=config.scrape.headless)
|
|
context_kwargs = {
|
|
"locale": config.scrape.locale,
|
|
"timezone_id": config.scrape.timezone,
|
|
"user_agent": config.scrape.user_agent,
|
|
"viewport": config.scrape.viewport,
|
|
}
|
|
# Charger la session persistée si disponible
|
|
if STORAGE_STATE_PATH.exists():
|
|
context_kwargs["storage_state"] = str(STORAGE_STATE_PATH)
|
|
logger.info("Session persistée chargée: {}", STORAGE_STATE_PATH)
|
|
|
|
context = browser.new_context(**context_kwargs)
|
|
return browser, context
|
|
|
|
|
|
def _save_storage_state(context) -> None:
|
|
"""Sauvegarde l'état de session pour réutilisation."""
|
|
try:
|
|
context.storage_state(path=str(STORAGE_STATE_PATH))
|
|
logger.info("Session persistée sauvegardée: {}", STORAGE_STATE_PATH)
|
|
except Exception as e:
|
|
logger.warning("Impossible de sauvegarder la session: {}", e)
|
|
|
|
|
|
def _process_product(
|
|
page,
|
|
session: Session,
|
|
product: models.Product,
|
|
run: models.ScrapeRun,
|
|
config,
|
|
) -> tuple[bool, dict]:
|
|
"""Scrape un produit et retourne (success, data)."""
|
|
logger.info("Scraping produit {} ({})", product.id, product.url)
|
|
|
|
page.goto(product.url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms)
|
|
|
|
# Toujours sauvegarder les artifacts de debug
|
|
debug_files = _save_debug_artifacts(page, product.id, "capture")
|
|
|
|
# Extraire les données
|
|
data = extract_product_data(page, product.url)
|
|
|
|
# Vérifier si bloqué (pas de titre = probable blocage)
|
|
if not data.get("titre"):
|
|
logger.warning("Titre absent pour produit {}, probable blocage Amazon", product.id)
|
|
data["bloque"] = True
|
|
data["debug_files"] = debug_files
|
|
raw_path = _save_raw_json(data, product.id)
|
|
_create_snapshot(
|
|
session,
|
|
product,
|
|
run,
|
|
data,
|
|
status="bloque",
|
|
raw_json_path=raw_path,
|
|
error_message=f"Blocage détecté - debug: {debug_files.get('screenshot', 'N/A')}",
|
|
)
|
|
return False, data
|
|
|
|
# Succès ou partiel
|
|
data["debug_files"] = debug_files
|
|
raw_path = _save_raw_json(data, product.id)
|
|
required = ["titre", "prix_actuel"]
|
|
missing = [field for field in required if not data.get(field)]
|
|
status = "champs_manquants" if missing else "ok"
|
|
|
|
_create_snapshot(
|
|
session,
|
|
product,
|
|
run,
|
|
data,
|
|
status=status,
|
|
raw_json_path=raw_path,
|
|
error_message=", ".join(missing) if missing else None,
|
|
)
|
|
|
|
if missing:
|
|
logger.warning("Champs manquants pour {}: {}", product.id, missing)
|
|
return False, data
|
|
|
|
logger.info("Scraping OK pour {} (titre={})", product.id, data.get("titre", "")[:50])
|
|
return True, data
|
|
|
|
|
|
def scrape_product(product_id: int) -> None:
|
|
logger.info("Déclenchement du scraping pour le produit {}", product_id)
|
|
session = database.SessionLocal()
|
|
run = _create_run(session)
|
|
try:
|
|
product = session.get(models.Product, product_id)
|
|
if not product:
|
|
logger.warning("Produit {} introuvable", product_id)
|
|
_finalize_run(run, session, "echec")
|
|
return
|
|
|
|
config = load_config()
|
|
run.nb_total = 1
|
|
session.commit()
|
|
|
|
with sync_playwright() as playwright:
|
|
browser, context = _create_browser_context(playwright, config)
|
|
page = context.new_page()
|
|
page.set_default_timeout(config.scrape.timeout_ms)
|
|
|
|
try:
|
|
success, _ = _process_product(page, session, product, run, config)
|
|
run.nb_ok = 1 if success else 0
|
|
run.nb_echec = 0 if success else 1
|
|
_finalize_run(run, session, "succes" if success else "partiel")
|
|
|
|
# Sauvegarder la session pour réutilisation
|
|
_save_storage_state(context)
|
|
|
|
# Délai anti-blocage
|
|
delay_min, delay_max = config.scrape.delay_range_ms
|
|
time.sleep(random.uniform(delay_min, delay_max) / 1000.0)
|
|
finally:
|
|
context.close()
|
|
browser.close()
|
|
except Exception as e:
|
|
logger.exception("Erreur pendant le scraping de {}: {}", product_id, e)
|
|
_finalize_run(run, session, "erreur")
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def scrape_preview(url: str) -> dict:
|
|
"""
|
|
Scrape une URL Amazon sans enregistrer en base.
|
|
Retourne les données extraites pour prévisualisation.
|
|
"""
|
|
logger.info("Prévisualisation scrape pour URL: {}", url)
|
|
config = load_config()
|
|
|
|
result = {
|
|
"url": url,
|
|
"success": False,
|
|
"data": {},
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
with sync_playwright() as playwright:
|
|
browser, context = _create_browser_context(playwright, config)
|
|
page = context.new_page()
|
|
page.set_default_timeout(config.scrape.timeout_ms)
|
|
|
|
try:
|
|
page.goto(url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms)
|
|
|
|
# Extraire les données
|
|
data = extract_product_data(page, url)
|
|
|
|
# Vérifier si bloqué
|
|
if not data.get("titre"):
|
|
result["error"] = "Blocage Amazon détecté ou produit introuvable"
|
|
result["data"] = data
|
|
else:
|
|
result["success"] = True
|
|
result["data"] = data
|
|
|
|
# Sauvegarder la session
|
|
_save_storage_state(context)
|
|
|
|
finally:
|
|
context.close()
|
|
browser.close()
|
|
|
|
except Exception as e:
|
|
logger.exception("Erreur prévisualisation scrape: {}", e)
|
|
result["error"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
def scrape_all(product_ids: Iterable[int] | None = None) -> None:
|
|
logger.info("Déclenchement du scraping global")
|
|
session = database.SessionLocal()
|
|
run = _create_run(session)
|
|
try:
|
|
config = load_config()
|
|
products = session.query(models.Product).filter(models.Product.actif == True).all()
|
|
if product_ids:
|
|
products = [product for product in products if product.id in product_ids]
|
|
run.nb_total = len(products)
|
|
session.commit()
|
|
|
|
if not products:
|
|
logger.info("Aucun produit actif à scraper")
|
|
_finalize_run(run, session, "succes")
|
|
return
|
|
|
|
with sync_playwright() as playwright:
|
|
browser, context = _create_browser_context(playwright, config)
|
|
page = context.new_page()
|
|
page.set_default_timeout(config.scrape.timeout_ms)
|
|
|
|
nb_ok = 0
|
|
nb_echec = 0
|
|
|
|
try:
|
|
for product in products:
|
|
try:
|
|
success, _ = _process_product(page, session, product, run, config)
|
|
if success:
|
|
nb_ok += 1
|
|
else:
|
|
nb_echec += 1
|
|
except Exception as e:
|
|
logger.error("Erreur scraping produit {}: {}", product.id, e)
|
|
nb_echec += 1
|
|
|
|
# Délai anti-blocage entre les produits
|
|
delay_min, delay_max = config.scrape.delay_range_ms
|
|
time.sleep(random.uniform(delay_min, delay_max) / 1000.0)
|
|
|
|
run.nb_ok = nb_ok
|
|
run.nb_echec = nb_echec
|
|
_finalize_run(run, session, "succes" if nb_echec == 0 else "partiel")
|
|
|
|
# Sauvegarder la session pour réutilisation
|
|
_save_storage_state(context)
|
|
finally:
|
|
context.close()
|
|
browser.close()
|
|
except Exception as e:
|
|
logger.exception("Erreur du scraping global: {}", e)
|
|
_finalize_run(run, session, "erreur")
|
|
finally:
|
|
session.close()
|