""" Store Amazon - Parsing de produits Amazon.fr et Amazon.com. Supporte l'extraction de: titre, prix, ASIN, images, specs, etc. """ import re from datetime import datetime from pathlib import Path from typing import Optional from urllib.parse import urlparse from bs4 import BeautifulSoup from pricewatch.app.core.logging import get_logger from pricewatch.app.core.schema import ( DebugInfo, DebugStatus, FetchMethod, ProductSnapshot, StockStatus, ) from pricewatch.app.stores.base import BaseStore logger = get_logger("stores.amazon") class AmazonStore(BaseStore): """Store pour Amazon.fr et Amazon.com.""" def __init__(self): """Initialise le store Amazon avec ses sélecteurs.""" selectors_path = Path(__file__).parent / "selectors.yml" super().__init__(store_id="amazon", selectors_path=selectors_path) def match(self, url: str) -> float: """ Détecte si l'URL est Amazon. Returns: 0.9 pour amazon.fr 0.8 pour amazon.com et autres domaines amazon 0.0 sinon """ if not url: return 0.0 url_lower = url.lower() if "amazon.fr" in url_lower: return 0.9 elif "amazon.com" in url_lower or "amazon.co" in url_lower: return 0.8 elif "amazon." in url_lower: return 0.7 return 0.0 def canonicalize(self, url: str) -> str: """ Normalise l'URL Amazon vers /dp/{ASIN}. Exemples: https://www.amazon.fr/product-name/dp/B08N5WRWNW/ref=... → https://www.amazon.fr/dp/B08N5WRWNW Justification: L'ASIN est l'identifiant unique, le reste est superflu. """ if not url: return url # Extraire l'ASIN asin = self.extract_reference(url) if not asin: # Si pas d'ASIN trouvé, retourner l'URL sans query params parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}{parsed.path}" # Reconstruire l'URL canonique parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}/dp/{asin}" def extract_reference(self, url: str) -> Optional[str]: """ Extrait l'ASIN depuis l'URL. L'ASIN est généralement après /dp/ ou /gp/product/. L'ASIN doit avoir exactement 10 caractères alphanumériques. Exemples: /dp/B08N5WRWNW → B08N5WRWNW /gp/product/B08N5WRWNW → B08N5WRWNW """ if not url: return None # Pattern: /dp/{ASIN} ou /gp/product/{ASIN} # L'ASIN doit être suivi de /, ?, #, ou fin de string match = re.search(r"/(?:dp|gp/product)/([A-Z0-9]{10})(?:/|\?|#|$)", url) if match: return match.group(1) return None def parse(self, html: str, url: str) -> ProductSnapshot: """ Parse le HTML Amazon vers ProductSnapshot. Utilise BeautifulSoup et les sélecteurs du fichier YAML. """ soup = BeautifulSoup(html, "lxml") debug_info = DebugInfo( method=FetchMethod.HTTP, # Sera mis à jour par l'appelant status=DebugStatus.SUCCESS, errors=[], notes=[], ) # Vérifier si captcha/robot check if self._detect_captcha(soup): debug_info.errors.append("Captcha ou robot check détecté") debug_info.status = DebugStatus.FAILED logger.warning(f"[Amazon] Captcha détecté pour: {url}") # Extraction des champs title = self._extract_title(soup, debug_info) price = self._extract_price(soup, debug_info) currency = self._extract_currency(soup, debug_info) stock_status = self._extract_stock(soup, debug_info) images = self._extract_images(soup, debug_info) category = self._extract_category(soup, debug_info) specs = self._extract_specs(soup, debug_info) reference = self.extract_reference(url) or self._extract_asin_from_html(soup) # Déterminer le statut final (ne pas écraser FAILED) if debug_info.status != DebugStatus.FAILED: if not title or price is None: debug_info.status = DebugStatus.PARTIAL debug_info.notes.append("Parsing incomplet: titre ou prix manquant") snapshot = ProductSnapshot( source=self.store_id, url=self.canonicalize(url), fetched_at=datetime.now(), title=title, price=price, currency=currency or "EUR", shipping_cost=None, # Difficile à extraire stock_status=stock_status, reference=reference, category=category, images=images, specs=specs, debug=debug_info, ) logger.info( f"[Amazon] Parsing {'réussi' if snapshot.is_complete() else 'partiel'}: " f"title={bool(title)}, price={price is not None}" ) return snapshot def _detect_captcha(self, soup: BeautifulSoup) -> bool: """Détecte si la page contient un captcha/robot check.""" captcha_selectors = self.get_selector("captcha_indicators", []) if isinstance(captcha_selectors, str): captcha_selectors = [captcha_selectors] for selector in captcha_selectors: if soup.select(selector): return True # Vérifier dans le texte text = soup.get_text().lower() if "captcha" in text or "robot check" in text or "sorry" in text: return True return False def _extract_title(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]: """Extrait le titre du produit.""" selectors = self.get_selector("title", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: element = soup.select_one(selector) if element: title = element.get_text(strip=True) if title: return title debug.errors.append("Titre non trouvé") return None def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]: """Extrait le prix.""" selectors = self.get_selector("price", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: elements = soup.select(selector) for element in elements: text = element.get_text(strip=True) # Extraire nombre (format: "299,99" ou "299.99") match = re.search(r"(\d+)[.,](\d+)", text) if match: price_str = f"{match.group(1)}.{match.group(2)}" try: return float(price_str) except ValueError: continue # Fallback: chercher les spans séparés a-price-whole et a-price-fraction whole = soup.select_one("span.a-price-whole") fraction = soup.select_one("span.a-price-fraction") if whole and fraction: whole_text = whole.get_text(strip=True) fraction_text = fraction.get_text(strip=True) try: price_str = f"{whole_text}.{fraction_text}" return float(price_str) except ValueError: pass debug.errors.append("Prix non trouvé") return None def _extract_currency(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]: """Extrait la devise.""" selectors = self.get_selector("currency", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: element = soup.select_one(selector) if element: symbol = element.get_text(strip=True) # Mapper symboles vers codes ISO currency_map = {"€": "EUR", "$": "USD", "£": "GBP"} return currency_map.get(symbol, "EUR") # Défaut basé sur le domaine return "EUR" def _extract_stock(self, soup: BeautifulSoup, debug: DebugInfo) -> StockStatus: """Extrait le statut de stock.""" selectors = self.get_selector("stock_status", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: element = soup.select_one(selector) if element: text = element.get_text(strip=True).lower() if "en stock" in text or "available" in text or "in stock" in text: return StockStatus.IN_STOCK elif ( "rupture" in text or "indisponible" in text or "out of stock" in text ): return StockStatus.OUT_OF_STOCK return StockStatus.UNKNOWN def _extract_images(self, soup: BeautifulSoup, debug: DebugInfo) -> list[str]: """Extrait les URLs d'images.""" images = [] selectors = self.get_selector("images", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: elements = soup.select(selector) for element in elements: # Attribut src ou data-src url = element.get("src") or element.get("data-src") if url and url.startswith("http"): images.append(url) # Fallback: chercher tous les img tags si aucune image trouvée if not images: all_imgs = soup.find_all("img") for img in all_imgs: url = img.get("src") or img.get("data-src") if url and url.startswith("http"): images.append(url) return list(set(images)) # Dédupliquer def _extract_category(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]: """Extrait la catégorie depuis les breadcrumbs.""" selectors = self.get_selector("category", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: element = soup.select_one(selector) if element: # Prendre le dernier élément du breadcrumb links = element.select("a") if links: return links[-1].get_text(strip=True) return None def _extract_specs(self, soup: BeautifulSoup, debug: DebugInfo) -> dict[str, str]: """Extrait les caractéristiques techniques.""" specs = {} selectors = self.get_selector("specs_table", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: table = soup.select_one(selector) if table: # Parser table / rows = table.select("tr") for row in rows: th = row.select_one("th") td = row.select_one("td") if th and td: key = th.get_text(strip=True) value = td.get_text(strip=True) if key and value: specs[key] = value return specs def _extract_asin_from_html(self, soup: BeautifulSoup) -> Optional[str]: """Extrait l'ASIN depuis le HTML (fallback).""" selectors = self.get_selector("asin", []) if isinstance(selectors, str): selectors = [selectors] for selector in selectors: element = soup.select_one(selector) if element: # Input avec attribut value if element.name == "input": return element.get("value") # TD dans une table else: return element.get_text(strip=True) return None