""" Store Amazon - Parsing de produits Amazon.fr et Amazon.com. Supporte l'extraction de: titre, prix, ASIN, images, specs, etc. """ import json import re from html import unescape from datetime import datetime from pathlib import Path from typing import Optional from urllib.parse import urlparse from bs4 import BeautifulSoup from pricewatch.app.core.logging import get_logger from pricewatch.app.core.schema import ( DebugInfo, DebugStatus, FetchMethod, ProductSnapshot, StockStatus, ) from pricewatch.app.stores.base import BaseStore from pricewatch.app.stores.price_parser import parse_price_text logger = get_logger("stores.amazon") class AmazonStore(BaseStore): """Store pour Amazon.fr et Amazon.com.""" def __init__(self): """Initialise le store Amazon avec ses sélecteurs.""" selectors_path = Path(__file__).parent / "selectors.yml" super().__init__(store_id="amazon", selectors_path=selectors_path) def match(self, url: str) -> float: """ Détecte si l'URL est Amazon. Returns: 0.9 pour amazon.fr 0.8 pour amazon.com et autres domaines amazon 0.0 sinon """ if not url: return 0.0 url_lower = url.lower() if "amazon.fr" in url_lower: return 0.9 elif "amazon.com" in url_lower or "amazon.co" in url_lower: return 0.8 elif "amazon." in url_lower: return 0.7 return 0.0 def canonicalize(self, url: str) -> str: """ Normalise l'URL Amazon vers /dp/{ASIN}. Exemples: https://www.amazon.fr/product-name/dp/B08N5WRWNW/ref=... → https://www.amazon.fr/dp/B08N5WRWNW Justification: L'ASIN est l'identifiant unique, le reste est superflu. """ if not url: return url # Extraire l'ASIN asin = self.extract_reference(url) if not asin: # Si pas d'ASIN trouvé, retourner l'URL sans query params parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}{parsed.path}" # Reconstruire l'URL canonique parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}/dp/{asin}" def extract_reference(self, url: str) -> Optional[str]: """ Extrait l'ASIN depuis l'URL. L'ASIN est généralement après /dp/ ou /gp/product/. L'ASIN doit avoir exactement 10 caractères alphanumériques. Exemples: /dp/B08N5WRWNW → B08N5WRWNW /gp/product/B08N5WRWNW → B08N5WRWNW """ if not url: return None # Pattern: /dp/{ASIN} ou /gp/product/{ASIN} # L'ASIN doit être suivi de /, ?, #, ou fin de string match = re.search(r"/(?:dp|gp/product)/([A-Z0-9]{10})(?:/|\?|#|$)", url) if match: return match.group(1) return None def parse(self, html: str, url: str) -> ProductSnapshot: """ Parse le HTML Amazon vers ProductSnapshot. Utilise BeautifulSoup et les sélecteurs du fichier YAML. """ soup = BeautifulSoup(html, "lxml") debug_info = DebugInfo( method=FetchMethod.HTTP, # Sera mis à jour par l'appelant status=DebugStatus.SUCCESS, errors=[], notes=[], ) # Vérifier si captcha/robot check if self._detect_captcha(soup): debug_info.errors.append("Captcha ou robot check détecté") debug_info.status = DebugStatus.FAILED logger.warning(f"[Amazon] Captcha détecté pour: {url}") # Extraction des champs title = self._extract_title(soup, debug_info) price = self._extract_price(soup, debug_info) currency = self._extract_currency(soup, debug_info) stock_status = self._extract_stock(soup, debug_info) images = self._extract_images(soup, debug_info) category = self._extract_category(soup, debug_info) specs = self._extract_specs(soup, debug_info) description = self._extract_description(soup, debug_info) msrp = self._extract_msrp(soup, debug_info) reference = self.extract_reference(url) or self._extract_asin_from_html(soup) # Déterminer le statut final (ne pas écraser FAILED) if debug_info.status != DebugStatus.FAILED: if not title or price is None: debug_info.status = DebugStatus.PARTIAL debug_info.notes.append("Parsing incomplet: titre ou prix manquant") snapshot = ProductSnapshot( source=self.store_id, url=self.canonicalize(url), fetched_at=datetime.now(), title=title, price=price, currency=currency or "EUR", shipping_cost=None, # Difficile à extraire stock_status=stock_status, reference=reference, category=category, description=description, images=images, specs=specs, msrp=msrp, debug=debug_info, ) logger.info( f"[Amazon] Parsing {'réussi' if snapshot.is_complete() else 'partiel'}: " f"title={bool(title)}, price={price is not None}" ) return snapshot def _detect_captcha(self, soup: BeautifulSoup) -> bool: """Détecte si la page contient un captcha/robot check.""" captcha_selectors = self.get_selector("captcha_indicators", []) if isinstance(captcha_selectors, str): captcha_selectors = [captcha_selectors] for selector in captcha_selectors: if soup.select(selector): return True # Vérifier dans le texte text = soup.get_text().lower() if "captcha" in text or "robot check" in text or "sorry" in text: return True return False def _extract_title(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]: """Extrait le titre du produit.""" selectors = self.get_selector("title", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: element = soup.select_one(selector) if element: title = element.get_text(strip=True) if title: return title debug.errors.append("Titre non trouvé") return None def _extract_description(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]: """Extrait la description (meta tags).""" meta = soup.find("meta", property="og:description") or soup.find( "meta", attrs={"name": "description"} ) if meta: description = meta.get("content", "").strip() if description: return description return None def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]: """Extrait le prix.""" selectors = self.get_selector("price", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: elements = soup.select(selector) for element in elements: text = element.get_text(strip=True) price = parse_price_text(text) if price is not None: return price # Fallback: chercher les spans séparés a-price-whole et a-price-fraction whole = soup.select_one("span.a-price-whole") fraction = soup.select_one("span.a-price-fraction") if whole and fraction: whole_text = whole.get_text(strip=True) fraction_text = fraction.get_text(strip=True) price = parse_price_text(f"{whole_text}.{fraction_text}") if price is not None: return price debug.errors.append("Prix non trouvé") return None def _extract_msrp(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]: """Extrait le prix conseille.""" strike = soup.select_one("span.priceBlockStrikePriceString") or soup.select_one( "span.a-text-price span.a-offscreen" ) if strike: price = parse_price_text(strike.get_text(strip=True)) if price is not None: return price return None def _extract_currency(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]: """Extrait la devise.""" selectors = self.get_selector("currency", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: element = soup.select_one(selector) if element: symbol = element.get_text(strip=True) # Mapper symboles vers codes ISO currency_map = {"€": "EUR", "$": "USD", "£": "GBP"} return currency_map.get(symbol, "EUR") # Défaut basé sur le domaine return "EUR" def _extract_stock(self, soup: BeautifulSoup, debug: DebugInfo) -> StockStatus: """Extrait le statut de stock.""" selectors = self.get_selector("stock_status", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: element = soup.select_one(selector) if element: text = element.get_text(strip=True).lower() if "en stock" in text or "available" in text or "in stock" in text: return StockStatus.IN_STOCK elif ( "rupture" in text or "indisponible" in text or "out of stock" in text ): return StockStatus.OUT_OF_STOCK return StockStatus.UNKNOWN def _extract_images(self, soup: BeautifulSoup, debug: DebugInfo) -> list[str]: """Extrait les URLs d'images.""" images = [] seen = set() selectors = self.get_selector("images", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: elements = soup.select(selector) for element in elements: # Attribut src ou data-src url = element.get("src") or element.get("data-src") or element.get("data-old-hires") if url and url.startswith("http"): if self._is_product_image(url) and url not in seen: images.append(url) seen.add(url) dynamic = element.get("data-a-dynamic-image") if dynamic: urls = self._extract_dynamic_images(dynamic) for dyn_url in urls: if self._is_product_image(dyn_url) and dyn_url not in seen: images.append(dyn_url) seen.add(dyn_url) # Fallback: chercher tous les img tags si aucune image trouvée if not images: all_imgs = soup.find_all("img") for img in all_imgs: url = img.get("src") or img.get("data-src") if url and url.startswith("http") and self._is_product_image(url): if url not in seen: images.append(url) seen.add(url) return images def _extract_dynamic_images(self, raw: str) -> list[str]: """Extrait les URLs du JSON data-a-dynamic-image.""" try: data = json.loads(unescape(raw)) except (TypeError, json.JSONDecodeError): return [] urls = [] if isinstance(data, dict): candidates = [] for url, dims in data.items(): if not isinstance(url, str) or not url.startswith("http"): continue size = dims[0] if isinstance(dims, list) and dims else 0 candidates.append((size, url)) candidates.sort(key=lambda item: item[0], reverse=True) for _, url in candidates: urls.append(url) return urls def _is_product_image(self, url: str) -> bool: """Filtre basique pour eviter les logos et sprites.""" lowered = url.lower() if "prime_logo" in lowered or "sprite" in lowered: return False return True def _extract_category(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]: """Extrait la catégorie depuis les breadcrumbs.""" selectors = self.get_selector("category", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: element = soup.select_one(selector) if element: # Prendre le dernier élément du breadcrumb links = element.select("a") if links: return links[-1].get_text(strip=True) return None def _extract_specs(self, soup: BeautifulSoup, debug: DebugInfo) -> dict[str, str]: """Extrait les caractéristiques techniques.""" specs = {} selectors = self.get_selector("specs_table", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: table = soup.select_one(selector) if table: # Parser table