- Add SPA support for Playwright with wait_for_network_idle and extra_wait_ms - Add BaseStore.get_spa_config() and requires_playwright() methods - Implement AliExpress SPA config with JSON price extraction patterns - Fix Amazon price parsing to prioritize whole+fraction combination - Fix AliExpress regex patterns (remove double backslashes) - Add CLI tests: detect, doctor, fetch, parse, run commands - Add API tests: auth, logs, products, scraping_logs, webhooks Tests: 417 passed, 85% coverage Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
415 lines
15 KiB
Python
Executable File
415 lines
15 KiB
Python
Executable File
"""
|
|
Store Amazon - Parsing de produits Amazon.fr et Amazon.com.
|
|
|
|
Supporte l'extraction de: titre, prix, ASIN, images, specs, etc.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
from html import unescape
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from urllib.parse import urlparse
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
from pricewatch.app.core.logging import get_logger
|
|
from pricewatch.app.core.schema import (
|
|
DebugInfo,
|
|
DebugStatus,
|
|
FetchMethod,
|
|
ProductSnapshot,
|
|
StockStatus,
|
|
)
|
|
from pricewatch.app.stores.base import BaseStore
|
|
from pricewatch.app.stores.price_parser import parse_price_text
|
|
|
|
logger = get_logger("stores.amazon")
|
|
|
|
|
|
class AmazonStore(BaseStore):
|
|
"""Store pour Amazon.fr et Amazon.com."""
|
|
|
|
def __init__(self):
|
|
"""Initialise le store Amazon avec ses sélecteurs."""
|
|
selectors_path = Path(__file__).parent / "selectors.yml"
|
|
super().__init__(store_id="amazon", selectors_path=selectors_path)
|
|
|
|
def match(self, url: str) -> float:
|
|
"""
|
|
Détecte si l'URL est Amazon.
|
|
|
|
Returns:
|
|
0.9 pour amazon.fr
|
|
0.8 pour amazon.com et autres domaines amazon
|
|
0.0 sinon
|
|
"""
|
|
if not url:
|
|
return 0.0
|
|
|
|
url_lower = url.lower()
|
|
|
|
if "amazon.fr" in url_lower:
|
|
return 0.9
|
|
elif "amazon.com" in url_lower or "amazon.co" in url_lower:
|
|
return 0.8
|
|
elif "amazon." in url_lower:
|
|
return 0.7
|
|
|
|
return 0.0
|
|
|
|
def canonicalize(self, url: str) -> str:
|
|
"""
|
|
Normalise l'URL Amazon vers /dp/{ASIN}.
|
|
|
|
Exemples:
|
|
https://www.amazon.fr/product-name/dp/B08N5WRWNW/ref=...
|
|
→ https://www.amazon.fr/dp/B08N5WRWNW
|
|
|
|
Justification: L'ASIN est l'identifiant unique, le reste est superflu.
|
|
"""
|
|
if not url:
|
|
return url
|
|
|
|
# Extraire l'ASIN
|
|
asin = self.extract_reference(url)
|
|
if not asin:
|
|
# Si pas d'ASIN trouvé, retourner l'URL sans query params
|
|
parsed = urlparse(url)
|
|
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
|
|
|
|
# Reconstruire l'URL canonique
|
|
parsed = urlparse(url)
|
|
return f"{parsed.scheme}://{parsed.netloc}/dp/{asin}"
|
|
|
|
def extract_reference(self, url: str) -> Optional[str]:
|
|
"""
|
|
Extrait l'ASIN depuis l'URL.
|
|
|
|
L'ASIN est généralement après /dp/ ou /gp/product/.
|
|
L'ASIN doit avoir exactement 10 caractères alphanumériques.
|
|
|
|
Exemples:
|
|
/dp/B08N5WRWNW → B08N5WRWNW
|
|
/gp/product/B08N5WRWNW → B08N5WRWNW
|
|
"""
|
|
if not url:
|
|
return None
|
|
|
|
# Pattern: /dp/{ASIN} ou /gp/product/{ASIN}
|
|
# L'ASIN doit être suivi de /, ?, #, ou fin de string
|
|
match = re.search(r"/(?:dp|gp/product)/([A-Z0-9]{10})(?:/|\?|#|$)", url)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return None
|
|
|
|
def parse(self, html: str, url: str) -> ProductSnapshot:
|
|
"""
|
|
Parse le HTML Amazon vers ProductSnapshot.
|
|
|
|
Utilise BeautifulSoup et les sélecteurs du fichier YAML.
|
|
"""
|
|
soup = BeautifulSoup(html, "lxml")
|
|
|
|
debug_info = DebugInfo(
|
|
method=FetchMethod.HTTP, # Sera mis à jour par l'appelant
|
|
status=DebugStatus.SUCCESS,
|
|
errors=[],
|
|
notes=[],
|
|
)
|
|
|
|
# Vérifier si captcha/robot check
|
|
if self._detect_captcha(soup):
|
|
debug_info.errors.append("Captcha ou robot check détecté")
|
|
debug_info.status = DebugStatus.FAILED
|
|
logger.warning(f"[Amazon] Captcha détecté pour: {url}")
|
|
|
|
# Extraction des champs
|
|
title = self._extract_title(soup, debug_info)
|
|
price = self._extract_price(soup, debug_info)
|
|
currency = self._extract_currency(soup, debug_info)
|
|
stock_status = self._extract_stock(soup, debug_info)
|
|
images = self._extract_images(soup, debug_info)
|
|
category = self._extract_category(soup, debug_info)
|
|
specs = self._extract_specs(soup, debug_info)
|
|
description = self._extract_description(soup, debug_info)
|
|
msrp = self._extract_msrp(soup, debug_info)
|
|
reference = self.extract_reference(url) or self._extract_asin_from_html(soup)
|
|
|
|
# Déterminer le statut final (ne pas écraser FAILED)
|
|
if debug_info.status != DebugStatus.FAILED:
|
|
if not title or price is None:
|
|
debug_info.status = DebugStatus.PARTIAL
|
|
debug_info.notes.append("Parsing incomplet: titre ou prix manquant")
|
|
|
|
snapshot = ProductSnapshot(
|
|
source=self.store_id,
|
|
url=self.canonicalize(url),
|
|
fetched_at=datetime.now(),
|
|
title=title,
|
|
price=price,
|
|
currency=currency or "EUR",
|
|
shipping_cost=None, # Difficile à extraire
|
|
stock_status=stock_status,
|
|
reference=reference,
|
|
category=category,
|
|
description=description,
|
|
images=images,
|
|
specs=specs,
|
|
msrp=msrp,
|
|
debug=debug_info,
|
|
)
|
|
|
|
logger.info(
|
|
f"[Amazon] Parsing {'réussi' if snapshot.is_complete() else 'partiel'}: "
|
|
f"title={bool(title)}, price={price is not None}"
|
|
)
|
|
|
|
return snapshot
|
|
|
|
def _detect_captcha(self, soup: BeautifulSoup) -> bool:
|
|
"""Détecte si la page contient un captcha/robot check."""
|
|
captcha_selectors = self.get_selector("captcha_indicators", [])
|
|
if isinstance(captcha_selectors, str):
|
|
captcha_selectors = [captcha_selectors]
|
|
|
|
for selector in captcha_selectors:
|
|
if soup.select(selector):
|
|
return True
|
|
|
|
# Vérifier dans le texte
|
|
text = soup.get_text().lower()
|
|
if "captcha" in text or "robot check" in text or "sorry" in text:
|
|
return True
|
|
|
|
return False
|
|
|
|
def _extract_title(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
|
|
"""Extrait le titre du produit."""
|
|
selectors = self.get_selector("title", [])
|
|
if isinstance(selectors, str):
|
|
selectors = [selectors]
|
|
|
|
for selector in selectors:
|
|
element = soup.select_one(selector)
|
|
if element:
|
|
title = element.get_text(strip=True)
|
|
if title:
|
|
return title
|
|
|
|
debug.errors.append("Titre non trouvé")
|
|
return None
|
|
|
|
def _extract_description(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
|
|
"""Extrait la description (meta tags)."""
|
|
meta = soup.find("meta", property="og:description") or soup.find(
|
|
"meta", attrs={"name": "description"}
|
|
)
|
|
if meta:
|
|
description = meta.get("content", "").strip()
|
|
if description:
|
|
return description
|
|
return None
|
|
|
|
def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
|
|
"""Extrait le prix."""
|
|
# Priorité 1: combiner les spans séparés a-price-whole et a-price-fraction
|
|
# C'est le format le plus courant sur Amazon pour les prix avec centimes séparés
|
|
whole = soup.select_one("span.a-price-whole")
|
|
fraction = soup.select_one("span.a-price-fraction")
|
|
if whole and fraction:
|
|
whole_text = whole.get_text(strip=True).rstrip(",.")
|
|
fraction_text = fraction.get_text(strip=True)
|
|
if whole_text and fraction_text:
|
|
price = parse_price_text(f"{whole_text}.{fraction_text}")
|
|
if price is not None:
|
|
return price
|
|
|
|
# Priorité 2: essayer les sélecteurs (incluant a-price-whole seul avec prix complet)
|
|
selectors = self.get_selector("price", [])
|
|
if isinstance(selectors, str):
|
|
selectors = [selectors]
|
|
|
|
for selector in selectors:
|
|
elements = soup.select(selector)
|
|
for element in elements:
|
|
text = element.get_text(strip=True)
|
|
price = parse_price_text(text)
|
|
if price is not None:
|
|
return price
|
|
|
|
debug.errors.append("Prix non trouvé")
|
|
return None
|
|
|
|
def _extract_msrp(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
|
|
"""Extrait le prix conseille."""
|
|
strike = soup.select_one("span.priceBlockStrikePriceString") or soup.select_one(
|
|
"span.a-text-price span.a-offscreen"
|
|
)
|
|
if strike:
|
|
price = parse_price_text(strike.get_text(strip=True))
|
|
if price is not None:
|
|
return price
|
|
return None
|
|
|
|
def _extract_currency(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
|
|
"""Extrait la devise."""
|
|
selectors = self.get_selector("currency", [])
|
|
if isinstance(selectors, str):
|
|
selectors = [selectors]
|
|
|
|
for selector in selectors:
|
|
element = soup.select_one(selector)
|
|
if element:
|
|
symbol = element.get_text(strip=True)
|
|
# Mapper symboles vers codes ISO
|
|
currency_map = {"€": "EUR", "$": "USD", "£": "GBP"}
|
|
return currency_map.get(symbol, "EUR")
|
|
|
|
# Défaut basé sur le domaine
|
|
return "EUR"
|
|
|
|
def _extract_stock(self, soup: BeautifulSoup, debug: DebugInfo) -> StockStatus:
|
|
"""Extrait le statut de stock."""
|
|
selectors = self.get_selector("stock_status", [])
|
|
if isinstance(selectors, str):
|
|
selectors = [selectors]
|
|
|
|
for selector in selectors:
|
|
element = soup.select_one(selector)
|
|
if element:
|
|
text = element.get_text(strip=True).lower()
|
|
if "en stock" in text or "available" in text or "in stock" in text:
|
|
return StockStatus.IN_STOCK
|
|
elif (
|
|
"rupture" in text
|
|
or "indisponible" in text
|
|
or "out of stock" in text
|
|
):
|
|
return StockStatus.OUT_OF_STOCK
|
|
|
|
return StockStatus.UNKNOWN
|
|
|
|
def _extract_images(self, soup: BeautifulSoup, debug: DebugInfo) -> list[str]:
|
|
"""Extrait les URLs d'images."""
|
|
images = []
|
|
seen = set()
|
|
selectors = self.get_selector("images", [])
|
|
if isinstance(selectors, str):
|
|
selectors = [selectors]
|
|
|
|
for selector in selectors:
|
|
elements = soup.select(selector)
|
|
for element in elements:
|
|
# Attribut src ou data-src
|
|
url = element.get("src") or element.get("data-src") or element.get("data-old-hires")
|
|
if url and url.startswith("http"):
|
|
if self._is_product_image(url) and url not in seen:
|
|
images.append(url)
|
|
seen.add(url)
|
|
dynamic = element.get("data-a-dynamic-image")
|
|
if dynamic:
|
|
urls = self._extract_dynamic_images(dynamic)
|
|
for dyn_url in urls:
|
|
if self._is_product_image(dyn_url) and dyn_url not in seen:
|
|
images.append(dyn_url)
|
|
seen.add(dyn_url)
|
|
|
|
# Fallback: chercher tous les img tags si aucune image trouvée
|
|
if not images:
|
|
all_imgs = soup.find_all("img")
|
|
for img in all_imgs:
|
|
url = img.get("src") or img.get("data-src")
|
|
if url and url.startswith("http") and self._is_product_image(url):
|
|
if url not in seen:
|
|
images.append(url)
|
|
seen.add(url)
|
|
|
|
return images
|
|
|
|
def _extract_dynamic_images(self, raw: str) -> list[str]:
|
|
"""Extrait les URLs du JSON data-a-dynamic-image."""
|
|
try:
|
|
data = json.loads(unescape(raw))
|
|
except (TypeError, json.JSONDecodeError):
|
|
return []
|
|
|
|
urls = []
|
|
if isinstance(data, dict):
|
|
candidates = []
|
|
for url, dims in data.items():
|
|
if not isinstance(url, str) or not url.startswith("http"):
|
|
continue
|
|
size = dims[0] if isinstance(dims, list) and dims else 0
|
|
candidates.append((size, url))
|
|
candidates.sort(key=lambda item: item[0], reverse=True)
|
|
for _, url in candidates:
|
|
urls.append(url)
|
|
return urls
|
|
|
|
def _is_product_image(self, url: str) -> bool:
|
|
"""Filtre basique pour eviter les logos et sprites."""
|
|
lowered = url.lower()
|
|
if "prime_logo" in lowered or "sprite" in lowered:
|
|
return False
|
|
return True
|
|
|
|
def _extract_category(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
|
|
"""Extrait la catégorie depuis les breadcrumbs."""
|
|
selectors = self.get_selector("category", [])
|
|
if isinstance(selectors, str):
|
|
selectors = [selectors]
|
|
|
|
for selector in selectors:
|
|
element = soup.select_one(selector)
|
|
if element:
|
|
# Prendre le dernier élément du breadcrumb
|
|
links = element.select("a")
|
|
if links:
|
|
return links[-1].get_text(strip=True)
|
|
|
|
return None
|
|
|
|
def _extract_specs(self, soup: BeautifulSoup, debug: DebugInfo) -> dict[str, str]:
|
|
"""Extrait les caractéristiques techniques."""
|
|
specs = {}
|
|
selectors = self.get_selector("specs_table", [])
|
|
if isinstance(selectors, str):
|
|
selectors = [selectors]
|
|
|
|
for selector in selectors:
|
|
table = soup.select_one(selector)
|
|
if table:
|
|
# Parser table <th>/<td>
|
|
rows = table.select("tr")
|
|
for row in rows:
|
|
th = row.select_one("th")
|
|
td = row.select_one("td")
|
|
if th and td:
|
|
key = th.get_text(strip=True)
|
|
value = td.get_text(strip=True)
|
|
if key and value:
|
|
specs[key] = value
|
|
|
|
return specs
|
|
|
|
def _extract_asin_from_html(self, soup: BeautifulSoup) -> Optional[str]:
|
|
"""Extrait l'ASIN depuis le HTML (fallback)."""
|
|
selectors = self.get_selector("asin", [])
|
|
if isinstance(selectors, str):
|
|
selectors = [selectors]
|
|
|
|
for selector in selectors:
|
|
element = soup.select_one(selector)
|
|
if element:
|
|
# Input avec attribut value
|
|
if element.name == "input":
|
|
return element.get("value")
|
|
# TD dans une table
|
|
else:
|
|
return element.get_text(strip=True)
|
|
|
|
return None
|