Files
scrap/pricewatch/app/stores/amazon/store.py
Gilles Soulier 740c3d7516 before claude
2026-01-18 06:26:17 +01:00

566 lines
21 KiB
Python
Executable File

"""
Store Amazon - Parsing de produits Amazon.fr et Amazon.com.
Supporte l'extraction de: titre, prix, ASIN, images, specs, etc.
"""
import json
import re
from html import unescape
from datetime import datetime
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from pricewatch.app.core.logging import get_logger
from pricewatch.app.core.schema import (
DebugInfo,
DebugStatus,
FetchMethod,
ProductSnapshot,
StockStatus,
)
from pricewatch.app.stores.base import BaseStore
from pricewatch.app.stores.price_parser import parse_price_text
logger = get_logger("stores.amazon")
class AmazonStore(BaseStore):
"""Store pour Amazon.fr et Amazon.com."""
def __init__(self):
"""Initialise le store Amazon avec ses sélecteurs."""
selectors_path = Path(__file__).parent / "selectors.yml"
super().__init__(store_id="amazon", selectors_path=selectors_path)
def match(self, url: str) -> float:
"""
Détecte si l'URL est Amazon.
Returns:
0.9 pour amazon.fr
0.8 pour amazon.com et autres domaines amazon
0.0 sinon
"""
if not url:
return 0.0
url_lower = url.lower()
if "amazon.fr" in url_lower:
return 0.9
elif "amazon.com" in url_lower or "amazon.co" in url_lower:
return 0.8
elif "amazon." in url_lower:
return 0.7
return 0.0
def canonicalize(self, url: str) -> str:
"""
Normalise l'URL Amazon vers /dp/{ASIN}.
Exemples:
https://www.amazon.fr/product-name/dp/B08N5WRWNW/ref=...
→ https://www.amazon.fr/dp/B08N5WRWNW
Justification: L'ASIN est l'identifiant unique, le reste est superflu.
"""
if not url:
return url
# Extraire l'ASIN
asin = self.extract_reference(url)
if not asin:
# Si pas d'ASIN trouvé, retourner l'URL sans query params
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
# Reconstruire l'URL canonique
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}/dp/{asin}"
def extract_reference(self, url: str) -> Optional[str]:
"""
Extrait l'ASIN depuis l'URL.
L'ASIN est généralement après /dp/ ou /gp/product/.
L'ASIN doit avoir exactement 10 caractères alphanumériques.
Exemples:
/dp/B08N5WRWNW → B08N5WRWNW
/gp/product/B08N5WRWNW → B08N5WRWNW
"""
if not url:
return None
# Pattern: /dp/{ASIN} ou /gp/product/{ASIN}
# L'ASIN doit être suivi de /, ?, #, ou fin de string
match = re.search(r"/(?:dp|gp/product)/([A-Z0-9]{10})(?:/|\?|#|$)", url)
if match:
return match.group(1)
return None
def parse(self, html: str, url: str) -> ProductSnapshot:
"""
Parse le HTML Amazon vers ProductSnapshot.
Utilise BeautifulSoup et les sélecteurs du fichier YAML.
"""
soup = BeautifulSoup(html, "lxml")
debug_info = DebugInfo(
method=FetchMethod.HTTP, # Sera mis à jour par l'appelant
status=DebugStatus.SUCCESS,
errors=[],
notes=[],
)
# Vérifier si captcha/robot check
if self._detect_captcha(soup):
debug_info.errors.append("Captcha ou robot check détecté")
debug_info.status = DebugStatus.FAILED
logger.warning(f"[Amazon] Captcha détecté pour: {url}")
# Extraction des champs
title = self._extract_title(soup, debug_info)
price = self._extract_price(soup, debug_info)
currency = self._extract_currency(soup, debug_info)
stock_status, stock_text, in_stock = self._extract_stock_details(soup, debug_info)
main_image, gallery_images, images = self._extract_images(soup, debug_info)
category = self._extract_category(soup, debug_info)
specs = self._extract_specs(soup, debug_info)
description = self._extract_description(soup, debug_info)
msrp = self._extract_msrp(soup, debug_info)
reference = self.extract_reference(url) or self._extract_asin_from_html(soup)
rating_value = self._extract_rating_value(soup, debug_info)
rating_count = self._extract_rating_count(soup, debug_info)
amazon_choice, amazon_choice_label = self._extract_amazon_choice(soup, debug_info)
discount_text = self._extract_discount_text(soup, debug_info)
model_number, model_name = self._extract_model_details(specs)
asin = reference
# Déterminer le statut final (ne pas écraser FAILED)
if debug_info.status != DebugStatus.FAILED:
if not title or price is None:
debug_info.status = DebugStatus.PARTIAL
debug_info.notes.append("Parsing incomplet: titre ou prix manquant")
snapshot = ProductSnapshot(
source=self.store_id,
url=self.canonicalize(url),
fetched_at=datetime.now(),
title=title,
price=price,
currency=currency or "EUR",
shipping_cost=None, # Difficile à extraire
stock_status=stock_status,
stock_text=stock_text,
in_stock=in_stock,
reference=reference,
asin=asin,
category=category,
description=description,
images=images,
main_image=main_image,
gallery_images=gallery_images,
specs=specs,
msrp=msrp,
rating_value=rating_value,
rating_count=rating_count,
amazon_choice=amazon_choice,
amazon_choice_label=amazon_choice_label,
discount_text=discount_text,
model_number=model_number,
model_name=model_name,
debug=debug_info,
)
logger.info(
f"[Amazon] Parsing {'réussi' if snapshot.is_complete() else 'partiel'}: "
f"title={bool(title)}, price={price is not None}"
)
return snapshot
def _detect_captcha(self, soup: BeautifulSoup) -> bool:
"""Détecte si la page contient un captcha/robot check."""
captcha_selectors = self.get_selector("captcha_indicators", [])
if isinstance(captcha_selectors, str):
captcha_selectors = [captcha_selectors]
for selector in captcha_selectors:
if soup.select(selector):
return True
# Vérifier dans le texte
text = soup.get_text().lower()
if "captcha" in text or "robot check" in text or "sorry" in text:
return True
return False
def _extract_title(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait le titre du produit."""
selectors = self.get_selector("title", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
title = element.get_text(strip=True)
if title:
return title
debug.errors.append("Titre non trouvé")
return None
def _extract_description(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait la description depuis les détails de l'article."""
selectors = self.get_selector("description", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if not element:
continue
items = [
item.get_text(" ", strip=True)
for item in element.select("li")
if item.get_text(strip=True)
]
if items:
return "\n".join(items)
text = " ".join(element.stripped_strings)
if text:
return text
return None
def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix."""
# Priorité 1: combiner les spans séparés a-price-whole et a-price-fraction
# C'est le format le plus courant sur Amazon pour les prix avec centimes séparés
whole = soup.select_one("span.a-price-whole")
fraction = soup.select_one("span.a-price-fraction")
if whole and fraction:
whole_text = whole.get_text(strip=True).rstrip(",.")
fraction_text = fraction.get_text(strip=True)
if whole_text and fraction_text:
price = parse_price_text(f"{whole_text}.{fraction_text}")
if price is not None:
return price
# Priorité 2: essayer les sélecteurs (incluant a-price-whole seul avec prix complet)
selectors = self.get_selector("price", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
elements = soup.select(selector)
for element in elements:
text = element.get_text(strip=True)
price = parse_price_text(text)
if price is not None:
return price
debug.errors.append("Prix non trouvé")
return None
def _extract_msrp(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix conseille."""
strike = soup.select_one("span.priceBlockStrikePriceString") or soup.select_one(
"span.a-text-price span.a-offscreen"
)
if strike:
price = parse_price_text(strike.get_text(strip=True))
if price is not None:
return price
return None
def _extract_currency(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait la devise."""
selectors = self.get_selector("currency", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
symbol = element.get_text(strip=True)
# Mapper symboles vers codes ISO
currency_map = {"": "EUR", "$": "USD", "£": "GBP"}
return currency_map.get(symbol, "EUR")
# Défaut basé sur le domaine
return "EUR"
def _extract_stock_details(
self, soup: BeautifulSoup, debug: DebugInfo
) -> tuple[StockStatus, Optional[str], Optional[bool]]:
"""Extrait le statut de stock avec texte brut."""
selectors = self.get_selector("stock_status", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
text = element.get_text(strip=True)
normalized = text.lower()
if "en stock" in normalized or "available" in normalized or "in stock" in normalized:
return StockStatus.IN_STOCK, text, True
elif (
"rupture" in normalized
or "indisponible" in normalized
or "out of stock" in normalized
):
return StockStatus.OUT_OF_STOCK, text, False
return StockStatus.UNKNOWN, None, None
def _extract_images(
self, soup: BeautifulSoup, debug: DebugInfo
) -> tuple[Optional[str], list[str], list[str]]:
"""Extrait l'image principale et la galerie."""
images: list[str] = []
seen: set[str] = set()
main_image: Optional[str] = None
max_gallery = 15
selectors = self.get_selector("images", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
elements = soup.select(selector)
for element in elements:
# Attribut src ou data-src
url = element.get("src") or element.get("data-src") or element.get("data-old-hires")
if url and url.startswith("http"):
if self._is_product_image(url) and url not in seen:
images.append(url)
seen.add(url)
if main_image is None:
main_image = url
dynamic = element.get("data-a-dynamic-image")
if dynamic:
urls = self._extract_dynamic_images(dynamic)
for dyn_url in urls:
if self._is_product_image(dyn_url) and dyn_url not in seen:
images.append(dyn_url)
seen.add(dyn_url)
if main_image is None:
main_image = dyn_url
# Fallback: chercher tous les img tags si aucune image trouvée
if not images:
all_imgs = soup.find_all("img")
for img in all_imgs:
url = img.get("src") or img.get("data-src")
if url and url.startswith("http") and self._is_product_image(url):
if url not in seen:
images.append(url)
seen.add(url)
if main_image is None:
main_image = url
if main_image is None and images:
main_image = images[0]
gallery_images = [url for url in images if url != main_image]
gallery_images = gallery_images[:max_gallery]
final_images = [main_image] + gallery_images if main_image else gallery_images
return main_image, gallery_images, final_images
def _extract_dynamic_images(self, raw: str) -> list[str]:
"""Extrait les URLs du JSON data-a-dynamic-image."""
try:
data = json.loads(unescape(raw))
except (TypeError, json.JSONDecodeError):
return []
urls = []
if isinstance(data, dict):
candidates = []
for url, dims in data.items():
if not isinstance(url, str) or not url.startswith("http"):
continue
size = dims[0] if isinstance(dims, list) and dims else 0
candidates.append((size, url))
candidates.sort(key=lambda item: item[0], reverse=True)
for _, url in candidates:
urls.append(url)
return urls
def _is_product_image(self, url: str) -> bool:
"""Filtre basique pour eviter les logos et sprites."""
lowered = url.lower()
if "prime_logo" in lowered or "sprite" in lowered:
return False
return True
def _extract_category(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait la catégorie depuis les breadcrumbs."""
selectors = self.get_selector("category", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
# Prendre le dernier élément du breadcrumb
links = element.select("a")
if links:
return links[-1].get_text(strip=True)
return None
def _extract_specs(self, soup: BeautifulSoup, debug: DebugInfo) -> dict[str, str]:
"""Extrait les caractéristiques techniques."""
specs = {}
selectors = self.get_selector("specs_table", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
table = soup.select_one(selector)
if table:
# Parser table <th>/<td>
rows = table.select("tr")
for row in rows:
th = row.select_one("th")
td = row.select_one("td")
if th and td:
key = th.get_text(strip=True)
value = td.get_text(strip=True)
if key and value:
specs[key] = value
# Détails de l'article sous forme de liste
detail_list = soup.select("#detailBullets_feature_div li")
for item in detail_list:
text = item.get_text(" ", strip=True)
if ":" not in text:
continue
key, value = text.split(":", 1)
key = key.strip()
value = value.strip()
if key and value and key not in specs:
specs[key] = value
return specs
def _extract_rating_value(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait la note moyenne."""
selectors = self.get_selector("rating_value", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if not element:
continue
text = element.get_text(" ", strip=True) or element.get("title", "").strip()
match = re.search(r"([\d.,]+)", text)
if match:
value = match.group(1).replace(",", ".")
try:
return float(value)
except ValueError:
continue
return None
def _extract_rating_count(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[int]:
"""Extrait le nombre d'évaluations."""
selectors = self.get_selector("rating_count", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if not element:
continue
text = element.get_text(" ", strip=True)
match = re.search(r"([\d\s\u202f\u00a0]+)", text)
if match:
numeric = re.sub(r"[^\d]", "", match.group(1))
if numeric:
return int(numeric)
return None
def _extract_amazon_choice(
self, soup: BeautifulSoup, debug: DebugInfo
) -> tuple[Optional[bool], Optional[str]]:
"""Extrait le badge Choix d'Amazon."""
selectors = self.get_selector("amazon_choice", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
label_candidates = [
element.get_text(" ", strip=True),
element.get("aria-label", "").strip(),
element.get("title", "").strip(),
element.get("data-a-badge-label", "").strip(),
]
label = next((item for item in label_candidates if item), "")
normalized = label.lower()
if "choix d'amazon" in normalized or "amazon's choice" in normalized:
return True, label
if label:
return True, label
return True, None
return None, None
def _extract_discount_text(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait le texte de réduction explicite."""
selectors = self.get_selector("discount_text", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if not element:
continue
text = element.get_text(" ", strip=True)
if text:
return text
return None
def _extract_model_details(self, specs: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
"""Extrait le numero et le nom du modele depuis les specs."""
model_number = None
model_name = None
for key, value in specs.items():
normalized = key.lower()
if "numéro du modèle de l'article" in normalized or "numero du modele de l'article" in normalized:
model_number = value
if "nom du modèle" in normalized or "nom du modele" in normalized:
model_name = value
return model_number, model_name
def _extract_asin_from_html(self, soup: BeautifulSoup) -> Optional[str]:
"""Extrait l'ASIN depuis le HTML (fallback)."""
selectors = self.get_selector("asin", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
# Input avec attribut value
if element.name == "input":
return element.get("value")
# TD dans une table
else:
return element.get_text(strip=True)
return None