from __future__ import annotations import json import os import random import re import sys import time from pathlib import Path from urllib.parse import urlparse PROJECT_ROOT = Path(__file__).resolve().parents[3] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from loguru import logger # noqa: E402 from playwright.sync_api import sync_playwright # noqa: E402 from backend.app.core.config import load_config # noqa: E402 from backend.app.scraper.amazon.parser import extract_product_data # noqa: E402 SAMPLES_DIR = Path(__file__).resolve().parent.parent / "samples" TESTS_PATH = SAMPLES_DIR / "scrape_test.json" RESULTS_PATH = SAMPLES_DIR / "scrap_result.json" FIELDS_PATH = SAMPLES_DIR / "scrape_fields.json" STORAGE_STATE_PATH = SAMPLES_DIR / "storage_state.json" DEBUG_DIR = SAMPLES_DIR / "debug" DEFAULT_REQUIRED_FIELDS = ("titre", "prix_actuel") DEFAULT_OPTIONAL_FIELDS = ( "prix_conseille", "prix_min_30j", "etat_stock", "en_stock", "note", "nombre_avis", "prime", "choix_amazon", "offre_limitee", "exclusivite_amazon", ) def load_fields_config() -> tuple[tuple[str, ...], tuple[str, ...]]: if not FIELDS_PATH.exists(): return DEFAULT_REQUIRED_FIELDS, DEFAULT_OPTIONAL_FIELDS payload = json.loads(FIELDS_PATH.read_text(encoding="utf-8")) required = tuple(payload.get("required", DEFAULT_REQUIRED_FIELDS)) optional = tuple(payload.get("optional", DEFAULT_OPTIONAL_FIELDS)) return required, optional def canonicalize_url(url: str) -> str: if not url: return url parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}{parsed.path}" def extract_reference(url: str) -> str | None: if not url: return None match = re.search(r"/dp/([A-Z0-9]{10})", url) if match: return match.group(1) return None def build_debug(statut: str, erreurs: list[str] | None = None, notes: list[str] | None = None) -> dict: return { "statut": statut, "erreurs": erreurs or [], "notes": notes or [], } def build_result( test_id: str, url: str, statut: str, data: dict | None = None, debug: dict | None = None, ) -> dict: return { "id": test_id, "url": url, "url_canonique": canonicalize_url(url), "reference": extract_reference(url), "statut": statut, "donnees": data, "debug": debug, } def save_debug_artifacts(page, test_id: str, suffix: str) -> dict: debug_files = {} try: screenshot_path = DEBUG_DIR / f"{test_id}_{suffix}.png" html_path = DEBUG_DIR / f"{test_id}_{suffix}.html" page.screenshot(path=str(screenshot_path), full_page=True) html_path.write_text(page.content(), encoding="utf-8") debug_files = { "screenshot": str(screenshot_path), "html": str(html_path), } logger.info("Artifacts debug: {}", debug_files) except Exception: logger.warning("Impossible de générer les artifacts de debug.") return debug_files def evaluate_data( data: dict, required_fields: tuple[str, ...], optional_fields: tuple[str, ...], ) -> tuple[str, dict]: missing_required = [field for field in required_fields if not data.get(field)] missing_optional = [field for field in optional_fields if data.get(field) is None] if missing_required: notes = [] if missing_optional: notes.append(f"Optionnels manquants: {', '.join(missing_optional)}") return "partiel", build_debug( "partiel", erreurs=[f"Obligatoires manquants: {', '.join(missing_required)}"], notes=notes, ) if missing_optional: return "ok", build_debug( "succes", notes=[f"Optionnels manquants: {', '.join(missing_optional)}"], ) return "ok", build_debug("succes") def main() -> None: logger.remove() logger.add(sys.stdout, level="INFO") DEBUG_DIR.mkdir(parents=True, exist_ok=True) payload = json.loads(TESTS_PATH.read_text(encoding="utf-8")) tests = payload.get("tests", []) if not tests: logger.warning("Aucun test trouvé dans {}", TESTS_PATH) return config = load_config() required_fields, optional_fields = load_fields_config() min_delay = int(os.getenv("SCRAPE_TEST_MIN_DELAY", "1")) max_delay = int(os.getenv("SCRAPE_TEST_MAX_DELAY", "5")) max_tests = int(os.getenv("SCRAPE_TEST_MAX", "0")) headful_on_block = os.getenv("SCRAPE_TEST_HEADFUL_ON_BLOCK", "0") == "1" wait_on_block = int(os.getenv("SCRAPE_TEST_WAIT_ON_BLOCK", "60")) # Permet de forcer le debug en tests même si désactivé dans config debug_enabled = os.getenv("SCRAPE_TEST_DEBUG_ENABLED", str(config.scrape.debug_enabled)).lower() in ("1", "true") results = [] with sync_playwright() as playwright: browser = playwright.chromium.launch(headless=config.scrape.headless) context_kwargs = { "locale": config.scrape.locale, "timezone_id": config.scrape.timezone, "user_agent": config.scrape.user_agent, "viewport": config.scrape.viewport, } if STORAGE_STATE_PATH.exists(): context_kwargs["storage_state"] = str(STORAGE_STATE_PATH) logger.info("Session persistée chargée: {}", STORAGE_STATE_PATH) context = browser.new_context(**context_kwargs) page = context.new_page() page.set_default_timeout(config.scrape.timeout_ms) try: for index, test in enumerate(tests, start=1): if max_tests > 0 and index > max_tests: logger.info("Limite atteinte ({} tests), arrêt de la session.", max_tests) break test_id = test.get("id") url = test.get("url") pause_s = test.get("pause_s", 0) if not url: logger.warning("Test {} sans URL", test_id) continue logger.info("Scraping {} ({})", test_id, url) page.goto(url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms) debug_files = {} if debug_enabled: debug_files = save_debug_artifacts(page, test_id, "capture") data = extract_product_data(page, url) if not data.get("titre"): logger.warning("Titre absent, suspicion de blocage pour {}", test_id) if headful_on_block: logger.info("Ouverture headful pour résolution manuelle.") manual_browser = playwright.chromium.launch(headless=False) manual_context_kwargs = dict(context_kwargs) manual_context = manual_browser.new_context(**manual_context_kwargs) manual_page = manual_context.new_page() manual_page.goto(url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms) if debug_enabled: save_debug_artifacts(manual_page, test_id, "manual") logger.info("Résoudre le captcha puis appuyer sur Entrée.") try: input() except EOFError: logger.info("Pas d'entrée disponible, attente {}s.", wait_on_block) time.sleep(wait_on_block) data = extract_product_data(manual_page, url) if not data.get("titre"): results.append( build_result( test_id, url, "bloque", debug=build_debug("bloque", notes=[f"debug={debug_files}"]), ) ) else: status, debug = evaluate_data(data, required_fields, optional_fields) if status == "partiel": logger.warning("Champs manquants: {}", debug.get("erreurs")) debug["notes"].append(f"debug={debug_files}") results.append(build_result(test_id, url, status, data=data, debug=debug)) logger.info("OK {} (titre={})", test_id, data.get("titre")) try: manual_context.storage_state(path=str(STORAGE_STATE_PATH)) logger.info("Session persistée sauvegardée: {}", STORAGE_STATE_PATH) except Exception: logger.warning("Impossible de sauvegarder la session persistée.") manual_context.close() manual_browser.close() else: results.append( build_result( test_id, url, "bloque", debug=build_debug("bloque", notes=[f"debug={debug_files}"]), ) ) else: status, debug = evaluate_data(data, required_fields, optional_fields) if status == "partiel": logger.warning("Champs manquants: {}", debug.get("erreurs")) debug["notes"].append(f"debug={debug_files}") results.append(build_result(test_id, url, status, data=data, debug=debug)) logger.info("OK {} (titre={})", test_id, data.get("titre")) if pause_s: logger.info("Pause {}s", pause_s) time.sleep(pause_s) # délai supplémentaire entre pages pour limiter les blocages jitter = random.uniform(min_delay, max_delay) logger.info("Délai anti-blocage: {:.1f}s", jitter) time.sleep(jitter) finally: try: context.storage_state(path=str(STORAGE_STATE_PATH)) logger.info("Session persistée sauvegardée: {}", STORAGE_STATE_PATH) except Exception: logger.warning("Impossible de sauvegarder la session persistée.") context.close() browser.close() RESULTS_PATH.write_text(json.dumps({"results": results}, ensure_ascii=False, indent=2)) logger.info("Résultats sauvegardés dans {}", RESULTS_PATH) if __name__ == "__main__": main()