""" CLI PriceWatch - Interface en ligne de commande. Commandes disponibles: - run: Pipeline complet YAML → JSON - detect: Détection du store depuis une URL - fetch: Récupération d'une page (HTTP ou Playwright) - parse: Parsing d'un fichier HTML - doctor: Vérification de l'installation """ import sys from pathlib import Path from typing import Optional import typer from rich import print as rprint from rich.console import Console from rich.table import Table from pricewatch.app.core import logging as app_logging from pricewatch.app.core.io import read_yaml_config, write_json_results from pricewatch.app.core.logging import get_logger, set_level from pricewatch.app.core.registry import get_registry, register_store from pricewatch.app.core.schema import DebugInfo, DebugStatus, FetchMethod from pricewatch.app.scraping.http_fetch import fetch_http from pricewatch.app.scraping.pw_fetch import fetch_playwright from pricewatch.app.stores.amazon.store import AmazonStore from pricewatch.app.stores.cdiscount.store import CdiscountStore # Créer l'application Typer app = typer.Typer( name="pricewatch", help="Application de suivi de prix e-commerce", add_completion=False, ) console = Console() logger = get_logger("cli") def setup_stores(): """Enregistre tous les stores disponibles dans le registry.""" registry = get_registry() registry.register(AmazonStore()) registry.register(CdiscountStore()) @app.command() def run( yaml: Path = typer.Option( "scrap_url.yaml", "--yaml", "-y", help="Fichier YAML de configuration", exists=True, ), out: Path = typer.Option( "scraped_store.json", "--out", "-o", help="Fichier JSON de sortie", ), debug: bool = typer.Option( False, "--debug", "-d", help="Activer le mode debug", ), ): """ Pipeline complet: scrape toutes les URLs du YAML et génère le JSON. """ if debug: set_level("DEBUG") logger.info("=== Démarrage du pipeline PriceWatch ===") # Initialiser les stores setup_stores() registry = get_registry() logger.info(f"Stores enregistrés: {', '.join(registry.list_stores())}") # Lire la configuration try: config = read_yaml_config(yaml) except Exception as e: logger.error(f"Erreur lecture YAML: {e}") raise typer.Exit(code=1) logger.info(f"{len(config.urls)} URL(s) à scraper") # Scraper chaque URL snapshots = [] for i, url in enumerate(config.urls, 1): logger.info(f"[{i}/{len(config.urls)}] Traitement: {url}") # Détecter le store store = registry.detect_store(url) if not store: logger.error(f"Aucun store trouvé pour: {url}") continue # Canoniser l'URL canonical_url = store.canonicalize(url) logger.info(f"URL canonique: {canonical_url}") # Récupérer la page html = None fetch_method = FetchMethod.HTTP fetch_error = None # Tenter HTTP d'abord logger.info("Tentative HTTP...") http_result = fetch_http(canonical_url) if http_result.success: html = http_result.html fetch_method = FetchMethod.HTTP logger.info("✓ HTTP réussi") elif config.options.use_playwright: # Fallback Playwright logger.warning(f"HTTP échoué: {http_result.error}, fallback Playwright") pw_result = fetch_playwright( canonical_url, headless=not config.options.headful, timeout_ms=config.options.timeout_ms, save_screenshot=config.options.save_screenshot, ) if pw_result.success: html = pw_result.html fetch_method = FetchMethod.PLAYWRIGHT logger.info("✓ Playwright réussi") # Sauvegarder screenshot si demandé if config.options.save_screenshot and pw_result.screenshot: from pricewatch.app.core.io import save_debug_screenshot ref = store.extract_reference(canonical_url) or f"url_{i}" save_debug_screenshot(pw_result.screenshot, f"{store.store_id}_{ref}") else: fetch_error = pw_result.error logger.error(f"✗ Playwright échoué: {fetch_error}") else: fetch_error = http_result.error logger.error(f"✗ HTTP échoué: {fetch_error}") # Parser si on a du HTML if html: try: # Sauvegarder HTML si demandé if config.options.save_html: from pricewatch.app.core.io import save_debug_html ref = store.extract_reference(canonical_url) or f"url_{i}" save_debug_html(html, f"{store.store_id}_{ref}") snapshot = store.parse(html, canonical_url) snapshot.debug.method = fetch_method snapshots.append(snapshot) status_emoji = "✓" if snapshot.is_complete() else "⚠" logger.info( f"{status_emoji} Parsing: title={bool(snapshot.title)}, " f"price={snapshot.price is not None}" ) except Exception as e: logger.error(f"✗ Erreur parsing: {e}") # Créer un snapshot failed from pricewatch.app.core.schema import ProductSnapshot snapshot = ProductSnapshot( source=store.store_id, url=canonical_url, debug=DebugInfo( method=fetch_method, status=DebugStatus.FAILED, errors=[f"Parsing failed: {str(e)}"], ), ) snapshots.append(snapshot) else: # Pas de HTML récupéré from pricewatch.app.core.schema import ProductSnapshot snapshot = ProductSnapshot( source=store.store_id if store else "unknown", url=canonical_url, debug=DebugInfo( method=fetch_method, status=DebugStatus.FAILED, errors=[f"Fetch failed: {fetch_error or 'Unknown error'}"], ), ) snapshots.append(snapshot) # Écrire les résultats logger.info(f"Écriture de {len(snapshots)} snapshot(s) dans: {out}") try: write_json_results(snapshots, out) logger.info("✓ Pipeline terminé avec succès") except Exception as e: logger.error(f"✗ Erreur écriture JSON: {e}") raise typer.Exit(code=1) @app.command() def detect(url: str): """ Détecte le store correspondant à une URL. """ logger.info(f"Détection du store pour: {url}") setup_stores() registry = get_registry() store = registry.detect_store(url) if store: rprint(f"[green]✓ Store détecté: {store.store_id}[/green]") rprint(f" URL canonique: {store.canonicalize(url)}") rprint(f" Référence: {store.extract_reference(url)}") else: rprint("[red]✗ Aucun store trouvé[/red]") raise typer.Exit(code=1) @app.command() def fetch( url: str, http: bool = typer.Option(False, "--http", help="Forcer HTTP"), playwright: bool = typer.Option(False, "--playwright", help="Forcer Playwright"), headful: bool = typer.Option(False, "--headful", help="Mode Playwright visible"), debug: bool = typer.Option(False, "--debug", "-d", help="Mode debug"), ): """ Récupère une page via HTTP ou Playwright. """ if debug: set_level("DEBUG") if http and playwright: rprint("[red]✗ Impossible de spécifier --http et --playwright ensemble[/red]") raise typer.Exit(code=1) if playwright or (not http and not playwright): # Playwright par défaut ou explicite logger.info(f"Récupération via Playwright: {url}") result = fetch_playwright(url, headless=not headful) if result.success: rprint(f"[green]✓ Succès[/green]") rprint(f" Taille HTML: {len(result.html)} chars") rprint(f" Durée: {result.duration_ms}ms") else: rprint(f"[red]✗ Échec: {result.error}[/red]") raise typer.Exit(code=1) else: # HTTP explicite logger.info(f"Récupération via HTTP: {url}") result = fetch_http(url) if result.success: rprint(f"[green]✓ Succès[/green]") rprint(f" Taille HTML: {len(result.html)} chars") rprint(f" Status: {result.status_code}") rprint(f" Durée: {result.duration_ms}ms") else: rprint(f"[red]✗ Échec: {result.error}[/red]") raise typer.Exit(code=1) @app.command() def parse( store: str = typer.Argument(..., help="Store ID (amazon, cdiscount)"), html_file: Path = typer.Option( ..., "--in", "-i", help="Fichier HTML à parser", exists=True ), debug: bool = typer.Option(False, "--debug", "-d", help="Mode debug"), ): """ Parse un fichier HTML avec un store spécifique. """ if debug: set_level("DEBUG") setup_stores() registry = get_registry() store_obj = registry.get_store(store) if not store_obj: rprint(f"[red]✗ Store inconnu: {store}[/red]") rprint(f"Stores disponibles: {', '.join(registry.list_stores())}") raise typer.Exit(code=1) logger.info(f"Parsing avec {store}: {html_file}") with open(html_file, "r", encoding="utf-8") as f: html = f.read() try: snapshot = store_obj.parse(html, url="file://local") if snapshot.is_complete(): rprint("[green]✓ Parsing réussi[/green]") else: rprint("[yellow]⚠ Parsing partiel[/yellow]") rprint(f" Titre: {snapshot.title or 'N/A'}") rprint(f" Prix: {snapshot.price} {snapshot.currency}") rprint(f" Référence: {snapshot.reference or 'N/A'}") rprint(f" Stock: {snapshot.stock_status}") rprint(f" Images: {len(snapshot.images)}") rprint(f" Specs: {len(snapshot.specs)}") except Exception as e: rprint(f"[red]✗ Erreur parsing: {e}[/red]") raise typer.Exit(code=1) @app.command() def doctor(): """ Vérifie l'installation de PriceWatch. """ table = Table(title="PriceWatch Doctor") table.add_column("Composant", style="cyan") table.add_column("Statut", style="green") # Python version table.add_row("Python", f"{sys.version.split()[0]} ✓") # Dépendances deps = [ ("typer", "typer"), ("pydantic", "pydantic"), ("requests", "requests"), ("playwright", "playwright"), ("beautifulsoup4", "bs4"), ("pyyaml", "yaml"), ] for name, module in deps: try: __import__(module) table.add_row(name, "✓ Installé") except ImportError: table.add_row(name, "✗ Manquant") # Stores setup_stores() registry = get_registry() table.add_row("Stores", f"{len(registry)} enregistrés: {', '.join(registry.list_stores())}") console.print(table) rprint("\n[green]✓ PriceWatch est prêt![/green]") if __name__ == "__main__": app()