Files
scrap/pricewatch/app/stores/cdiscount/store.py
Gilles Soulier d0b73b9319 codex2
2026-01-14 21:54:55 +01:00

415 lines
14 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Store Cdiscount - Parsing de produits Cdiscount.com.
Supporte l'extraction de: titre, prix, SKU, images, specs, etc.
"""
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from pricewatch.app.core.logging import get_logger
from pricewatch.app.core.schema import (
DebugInfo,
DebugStatus,
FetchMethod,
ProductSnapshot,
StockStatus,
)
from pricewatch.app.stores.base import BaseStore
from pricewatch.app.stores.price_parser import parse_price_text
logger = get_logger("stores.cdiscount")
class CdiscountStore(BaseStore):
"""Store pour Cdiscount.com."""
def __init__(self):
"""Initialise le store Cdiscount avec ses sélecteurs."""
selectors_path = Path(__file__).parent / "selectors.yml"
super().__init__(store_id="cdiscount", selectors_path=selectors_path)
def match(self, url: str) -> float:
"""
Détecte si l'URL est Cdiscount.
Returns:
0.9 pour cdiscount.com
0.0 sinon
"""
if not url:
return 0.0
url_lower = url.lower()
if "cdiscount.com" in url_lower:
return 0.9
return 0.0
def canonicalize(self, url: str) -> str:
"""
Normalise l'URL Cdiscount.
Les URLs Cdiscount ont généralement la forme:
https://www.cdiscount.com/category/product-name/f-{ID}-{SKU}.html
On garde l'URL complète sans query params.
"""
if not url:
return url
parsed = urlparse(url)
# Retirer query params et fragment
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
def extract_reference(self, url: str) -> Optional[str]:
"""
Extrait le SKU depuis l'URL.
Format typique: /f-{ID}-{SKU}.html
Exemple: /f-1070123-example.html → "1070123-example"
"""
if not url:
return None
# Pattern: /f-{ID}-{SKU}.html
match = re.search(r"/f-(\d+-[\w-]+)\.html", url)
if match:
return match.group(1)
# Fallback: extraire après /f-
match = re.search(r"/f-([\w-]+)", url)
if match:
return match.group(1)
return None
def parse(self, html: str, url: str) -> ProductSnapshot:
"""
Parse le HTML Cdiscount vers ProductSnapshot.
Utilise BeautifulSoup et les sélecteurs du fichier YAML.
"""
soup = BeautifulSoup(html, "lxml")
debug_info = DebugInfo(
method=FetchMethod.HTTP, # Sera mis à jour par l'appelant
status=DebugStatus.SUCCESS,
errors=[],
notes=[],
)
# Extraction des champs
title = self._extract_title(soup, debug_info)
price = self._extract_price(soup, debug_info)
currency = self._extract_currency(soup, debug_info)
stock_status = self._extract_stock(soup, debug_info)
images = self._extract_images(soup, debug_info)
category = self._extract_category(soup, debug_info)
specs = self._extract_specs(soup, debug_info)
description = self._extract_description(soup, debug_info)
msrp = self._extract_msrp(soup, debug_info)
reference = self.extract_reference(url) or self._extract_sku_from_html(soup)
# Déterminer le statut final
if not title or price is None:
debug_info.status = DebugStatus.PARTIAL
debug_info.notes.append("Parsing incomplet: titre ou prix manquant")
snapshot = ProductSnapshot(
source=self.store_id,
url=self.canonicalize(url),
fetched_at=datetime.now(),
title=title,
price=price,
currency=currency or "EUR",
shipping_cost=None,
stock_status=stock_status,
reference=reference,
category=category,
description=description,
images=images,
specs=specs,
msrp=msrp,
debug=debug_info,
)
logger.info(
f"[Cdiscount] Parsing {'réussi' if snapshot.is_complete() else 'partiel'}: "
f"title={bool(title)}, price={price is not None}"
)
return snapshot
def _extract_title(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait le titre du produit."""
selectors = self.get_selector("title", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
title = element.get_text(strip=True)
if title:
return title
debug.errors.append("Titre non trouvé")
return None
def _extract_description(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait la description (meta tags)."""
meta = soup.find("meta", property="og:description") or soup.find(
"meta", attrs={"name": "description"}
)
if meta:
description = meta.get("content", "").strip()
if description:
return description
product_ld = self._find_product_ld(soup)
desc_ld = product_ld.get("description") if product_ld else None
if isinstance(desc_ld, str) and desc_ld.strip():
return desc_ld.strip()
return None
def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix."""
selectors = self.get_selector("price", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
elements = soup.select(selector)
for element in elements:
# Attribut content (schema.org) ou texte
price_text = element.get("content") or element.get_text(strip=True)
price = parse_price_text(price_text)
if price is not None:
return price
debug.errors.append("Prix non trouvé")
return None
def _extract_msrp(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix conseille."""
selectors = [
".jsStrikePrice",
".price__old",
".c-price__strike",
".price-strike",
]
for selector in selectors:
element = soup.select_one(selector)
if element:
price = parse_price_text(element.get_text(strip=True))
if price is not None:
return price
return None
def _extract_currency(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait la devise."""
selectors = self.get_selector("currency", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
# Attribut content
currency = element.get("content")
if currency:
return currency.upper()
# Défaut EUR pour Cdiscount
return "EUR"
def _extract_stock(self, soup: BeautifulSoup, debug: DebugInfo) -> StockStatus:
"""Extrait le statut de stock."""
selectors = self.get_selector("stock_status", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
# Attribut href (schema.org) ou texte
href = element.get("href", "").lower()
text = element.get_text(strip=True).lower()
combined = href + " " + text
if "instock" in combined or "en stock" in combined:
return StockStatus.IN_STOCK
elif (
"outofstock" in combined
or "rupture" in combined
or "indisponible" in combined
):
return StockStatus.OUT_OF_STOCK
return StockStatus.UNKNOWN
def _extract_images(self, soup: BeautifulSoup, debug: DebugInfo) -> list[str]:
"""Extrait les URLs d'images."""
images = []
selectors = self.get_selector("images", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
elements = soup.select(selector)
for element in elements:
# Attribut src, data-src, ou itemprop
url = (
element.get("src")
or element.get("data-src")
or element.get("content")
)
if url and ("http" in url or url.startswith("//")):
# Normaliser // vers https://
if url.startswith("//"):
url = f"https:{url}"
images.append(url)
ld_images = self._extract_ld_images(self._find_product_ld(soup))
for url in ld_images:
if url and url not in images:
if url.startswith("//"):
url = f"https:{url}"
images.append(url)
return list(dict.fromkeys(images)) # Préserver lordre
def _extract_category(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait la catégorie depuis les breadcrumbs."""
selectors = self.get_selector("category", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
# Prendre le dernier élément du breadcrumb
links = element.select("a")
if links:
return links[-1].get_text(strip=True)
# Fallback sur le texte complet
text = element.get_text(strip=True)
if text:
# Séparer par > et prendre le dernier
parts = [p.strip() for p in text.split(">")]
if parts:
return parts[-1]
return None
def _extract_json_ld_entries(self, soup: BeautifulSoup) -> list[dict]:
"""Parse les scripts JSON-LD et retourne les objets."""
entries = []
scripts = soup.find_all("script", type="application/ld+json")
for script in scripts:
raw = script.string or script.text
if not raw:
continue
try:
payload = json.loads(raw.strip())
except (json.JSONDecodeError, TypeError):
continue
if isinstance(payload, list):
entries.extend(payload)
else:
entries.append(payload)
return entries
def _find_product_ld(self, soup: BeautifulSoup) -> dict:
"""Retourne lobjet Product JSON-LD si présent."""
for entry in self._extract_json_ld_entries(soup):
if not isinstance(entry, dict):
continue
type_field = entry.get("@type") or entry.get("type")
if isinstance(type_field, str) and "product" in type_field.lower():
return entry
return {}
def _extract_ld_images(self, product_ld: dict) -> list[str]:
"""Récupère les images listées dans le JSON-LD."""
if not product_ld:
return []
images = product_ld.get("image") or product_ld.get("images")
if not images:
return []
if isinstance(images, str):
images = [images]
extracted = []
for item in images:
if isinstance(item, str):
extracted.append(item)
elif isinstance(item, dict):
url = item.get("url")
if isinstance(url, str):
extracted.append(url)
return extracted
def _extract_specs(self, soup: BeautifulSoup, debug: DebugInfo) -> dict[str, str]:
"""Extrait les caractéristiques techniques."""
specs = {}
selectors = self.get_selector("specs_table", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
container = soup.select_one(selector)
if container:
# Parser les lignes (souvent des divs ou des li)
# Chercher des paires clé: valeur
lines = container.get_text(separator="\n").split("\n")
for line in lines:
# Format "Clé: Valeur" ou "Clé : Valeur"
if ":" in line:
parts = line.split(":", 1)
if len(parts) == 2:
key = parts[0].strip()
value = parts[1].strip()
if key and value:
specs[key] = value
product_ld = self._find_product_ld(soup)
additional = product_ld.get("additionalProperty") if product_ld else None
if isinstance(additional, dict):
additional = [additional]
if isinstance(additional, list):
for item in additional:
if not isinstance(item, dict):
continue
key = item.get("name") or item.get("propertyID")
value = item.get("value") or item.get("valueReference")
if key and value:
specs[key] = value
return specs
def _extract_sku_from_html(self, soup: BeautifulSoup) -> Optional[str]:
"""Extrait le SKU depuis le HTML (fallback)."""
selectors = self.get_selector("sku", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
# Attribut content ou itemprop
sku = element.get("content") or element.get_text(strip=True)
if sku:
return sku
return None