1er
This commit is contained in:
BIN
backend/app/scraper/__pycache__/normalize.cpython-313.pyc
Normal file
BIN
backend/app/scraper/__pycache__/normalize.cpython-313.pyc
Normal file
Binary file not shown.
BIN
backend/app/scraper/amazon/__pycache__/parser.cpython-313.pyc
Normal file
BIN
backend/app/scraper/amazon/__pycache__/parser.cpython-313.pyc
Normal file
Binary file not shown.
438
backend/app/scraper/amazon/parser.py
Normal file
438
backend/app/scraper/amazon/parser.py
Normal file
@@ -0,0 +1,438 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from loguru import logger
|
||||
from playwright.sync_api import Page
|
||||
|
||||
from backend.app.scraper.normalize import (
|
||||
parse_price_fr,
|
||||
parse_rating_count,
|
||||
parse_rating_value,
|
||||
parse_stock_status,
|
||||
)
|
||||
|
||||
|
||||
def detect_blocked(html: str) -> bool:
|
||||
# détection simple des blocages / captcha
|
||||
lowered = html.lower()
|
||||
if "captcha" in lowered or "robot" in lowered:
|
||||
return True
|
||||
if "saisissez les caractères" in lowered or "vérification" in lowered:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _safe_text(page: Page, selector: str) -> str | None:
|
||||
try:
|
||||
locator = page.locator(selector)
|
||||
if locator.count() == 0:
|
||||
return None
|
||||
value = locator.first.inner_text().strip()
|
||||
return value or None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _safe_attr(page: Page, selector: str, attr: str) -> str | None:
|
||||
try:
|
||||
locator = page.locator(selector)
|
||||
if locator.count() == 0:
|
||||
return None
|
||||
return locator.first.get_attribute(attr)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _extract_asin_from_url(url: str) -> str | None:
|
||||
match = re.search(r"/dp/([A-Z0-9]{10})", url)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
|
||||
def _safe_text_soup(soup: BeautifulSoup, selector: str) -> str | None:
|
||||
node = soup.select_one(selector)
|
||||
if not node:
|
||||
return None
|
||||
value = node.get_text(strip=True)
|
||||
return value or None
|
||||
|
||||
|
||||
def _safe_attr_soup(soup: BeautifulSoup, selector: str, attr: str) -> str | None:
|
||||
node = soup.select_one(selector)
|
||||
if not node:
|
||||
return None
|
||||
return node.get(attr)
|
||||
|
||||
|
||||
def _has_selector_soup(soup: BeautifulSoup, selector: str) -> bool:
|
||||
return soup.select_one(selector) is not None
|
||||
|
||||
|
||||
def _compose_price_from_parts(whole: str | None, fraction: str | None, symbol: str | None) -> str | None:
|
||||
if not whole:
|
||||
return None
|
||||
whole_digits = re.sub(r"[^\d]", "", whole)
|
||||
if not whole_digits:
|
||||
return None
|
||||
fraction_digits = re.sub(r"[^\d]", "", fraction or "")
|
||||
if not fraction_digits:
|
||||
fraction_digits = "00"
|
||||
fraction_digits = fraction_digits[:2].ljust(2, "0")
|
||||
symbol = (symbol or "€").strip()
|
||||
return f"{whole_digits},{fraction_digits} {symbol}"
|
||||
|
||||
|
||||
def _extract_lowest_30d_text_soup(soup: BeautifulSoup) -> str | None:
|
||||
containers = []
|
||||
container = soup.select_one("#priceBadging_feature_div")
|
||||
if container:
|
||||
containers.append(container)
|
||||
containers.extend(soup.select(".basisPrice"))
|
||||
for node in containers:
|
||||
text = node.get_text(" ", strip=True)
|
||||
if text and re.search(r"prix.+(30|trente).+jour", text.lower()):
|
||||
price_node = node.select_one(".a-offscreen")
|
||||
if price_node:
|
||||
price_text = price_node.get_text(" ", strip=True)
|
||||
if price_text:
|
||||
return price_text
|
||||
return text
|
||||
return None
|
||||
|
||||
|
||||
def _extract_about_bullets(soup: BeautifulSoup) -> list[str] | None:
|
||||
container = soup.select_one("#feature-bullets")
|
||||
if not container:
|
||||
return None
|
||||
items = []
|
||||
for node in container.select("ul li span.a-list-item"):
|
||||
text = node.get_text(" ", strip=True)
|
||||
if text:
|
||||
items.append(text)
|
||||
return items or None
|
||||
|
||||
|
||||
def _extract_description(soup: BeautifulSoup) -> str | None:
|
||||
node = soup.select_one("#productDescription")
|
||||
if not node:
|
||||
return None
|
||||
text = node.get_text(" ", strip=True)
|
||||
return text or None
|
||||
|
||||
|
||||
def _extract_table_kv(table) -> dict[str, str]:
|
||||
data: dict[str, str] = {}
|
||||
for row in table.select("tr"):
|
||||
key = row.select_one("th")
|
||||
value = row.select_one("td")
|
||||
if not key or not value:
|
||||
continue
|
||||
key_text = key.get_text(" ", strip=True)
|
||||
value_text = value.get_text(" ", strip=True)
|
||||
if key_text and value_text:
|
||||
data[key_text] = value_text
|
||||
return data
|
||||
|
||||
|
||||
def _extract_tables_from_selector(soup: BeautifulSoup, selector: str) -> list:
|
||||
section = soup.select_one(selector)
|
||||
if not section:
|
||||
return []
|
||||
if section.name == "table":
|
||||
return [section]
|
||||
return section.select("table")
|
||||
|
||||
|
||||
def _extract_carateristique(soup: BeautifulSoup) -> dict[str, str] | None:
|
||||
selectors = [
|
||||
"[data-csa-c-content-id='voyager-expander-btn']",
|
||||
"#productDetails_techSpec_section_1",
|
||||
"#productDetails_techSpec_section_2",
|
||||
]
|
||||
specs: dict[str, str] = {}
|
||||
for selector in selectors:
|
||||
tables = _extract_tables_from_selector(soup, selector)
|
||||
for table in tables:
|
||||
specs.update(_extract_table_kv(table))
|
||||
return specs or None
|
||||
|
||||
|
||||
def _extract_details(soup: BeautifulSoup) -> dict[str, str] | None:
|
||||
container = soup.select_one("[data-csa-c-content-id='voyager-expander-btn']")
|
||||
carateristique_tables = set(container.select("table")) if container else set()
|
||||
selectors = [
|
||||
"#productDetails_techSpec_section_1",
|
||||
"#productDetails_detailBullets_sections1",
|
||||
"#productDetails_detailBullets_sections2",
|
||||
"#productDetails",
|
||||
]
|
||||
details: dict[str, str] = {}
|
||||
seen_tables = set()
|
||||
for selector in selectors:
|
||||
for table in _extract_tables_from_selector(soup, selector):
|
||||
if table in carateristique_tables or table in seen_tables:
|
||||
continue
|
||||
seen_tables.add(table)
|
||||
details.update(_extract_table_kv(table))
|
||||
return details or None
|
||||
|
||||
|
||||
def _parse_percent(text: str | None) -> int | None:
|
||||
if not text:
|
||||
return None
|
||||
match = re.search(r"(-?\d+)", text.replace("\u00a0", " "))
|
||||
if not match:
|
||||
return None
|
||||
try:
|
||||
return int(match.group(1))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def extract_product_data_from_html(html: str, url: str) -> dict[str, Any]:
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
title = _safe_text_soup(soup, "#productTitle")
|
||||
|
||||
image_main_url = _safe_attr_soup(soup, "#landingImage", "src")
|
||||
if not image_main_url:
|
||||
image_main_url = _safe_attr_soup(soup, "#imgTagWrapperId img", "src")
|
||||
|
||||
price_text = _safe_text_soup(soup, "#corePriceDisplay_desktop_feature_div .a-offscreen")
|
||||
if not price_text:
|
||||
price_text = _safe_text_soup(soup, "#priceblock_ourprice")
|
||||
if not price_text:
|
||||
price_text = _safe_text_soup(soup, "#priceblock_dealprice")
|
||||
if not price_text:
|
||||
whole = _safe_text_soup(soup, ".a-price .a-price-whole")
|
||||
fraction = _safe_text_soup(soup, ".a-price .a-price-fraction")
|
||||
symbol = _safe_text_soup(soup, ".a-price .a-price-symbol")
|
||||
price_text = _compose_price_from_parts(whole, fraction, symbol)
|
||||
if not price_text:
|
||||
price_text = _safe_attr_soup(soup, "#twister-plus-price-data-price", "value")
|
||||
|
||||
price_list_text = _safe_text_soup(
|
||||
soup, "#corePriceDisplay_desktop_feature_div .a-text-price span.a-offscreen"
|
||||
)
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text_soup(soup, "#priceblock_strikeprice")
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text_soup(soup, ".srpPriceBlock .a-offscreen")
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text_soup(soup, ".srpPriceBlockAUI .a-offscreen")
|
||||
|
||||
stock_text = _safe_text_soup(soup, "#availability span")
|
||||
if not stock_text:
|
||||
stock_text = _safe_text_soup(soup, "#availability")
|
||||
|
||||
in_stock, stock_text = parse_stock_status(stock_text)
|
||||
|
||||
rating_text = _safe_text_soup(soup, "#acrPopover .a-icon-alt")
|
||||
rating_count_text = _safe_text_soup(soup, "#acrCustomerReviewText")
|
||||
|
||||
amazon_choice = _safe_text_soup(soup, "#acBadge_feature_div")
|
||||
limited_time_deal = _safe_text_soup(soup, "#dealBadge_feature_div")
|
||||
prime_eligible = None
|
||||
if _has_selector_soup(soup, "#primeBadge"):
|
||||
prime_eligible = True
|
||||
elif _has_selector_soup(soup, "#priceBadging_feature_div #prime-badge"):
|
||||
prime_eligible = True
|
||||
elif _has_selector_soup(soup, "#priceBadging_feature_div i.a-icon-prime"):
|
||||
prime_eligible = True
|
||||
elif _has_selector_soup(soup, "#corePriceDisplay_desktop_feature_div i.a-icon-prime"):
|
||||
prime_eligible = True
|
||||
elif _has_selector_soup(soup, "#priceBadging_feature_div #prime-badge"):
|
||||
prime_eligible = True
|
||||
elif _has_selector_soup(soup, "i#prime-badge"):
|
||||
prime_eligible = True
|
||||
elif _has_selector_soup(soup, "i.a-icon-prime[aria-label*='prime']"):
|
||||
prime_eligible = True
|
||||
amazon_exclusive = "Exclusivité Amazon" if "Exclusivité Amazon" in soup.get_text() else None
|
||||
|
||||
lowest_30d_text = _extract_lowest_30d_text_soup(soup)
|
||||
lowest_30d_price = None
|
||||
if lowest_30d_text:
|
||||
lowest_30d_price = parse_price_fr(lowest_30d_text)
|
||||
if lowest_30d_price is not None:
|
||||
candidate_list = parse_price_fr(price_list_text)
|
||||
if candidate_list == lowest_30d_price:
|
||||
price_list_text = None
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text_soup(soup, ".srpPriceBlock .a-offscreen")
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text_soup(soup, ".srpPriceBlockAUI .a-offscreen")
|
||||
|
||||
reduction_savings_text = _safe_text_soup(
|
||||
soup, "#corePriceDisplay_desktop_feature_div .savingsPercentage"
|
||||
)
|
||||
reduction_conseille_text = _safe_text_soup(soup, ".srpSavingsPercentageBlock")
|
||||
reduction_min_30j = _parse_percent(reduction_savings_text)
|
||||
reduction_conseille = _parse_percent(reduction_conseille_text)
|
||||
|
||||
a_propos = _extract_about_bullets(soup)
|
||||
description = _extract_description(soup)
|
||||
carateristique = _extract_carateristique(soup)
|
||||
details = _extract_details(soup)
|
||||
|
||||
asin = _safe_attr_soup(soup, "input#ASIN", "value") or _extract_asin_from_url(url)
|
||||
|
||||
data = {
|
||||
"url": url,
|
||||
"asin": asin,
|
||||
"titre": title,
|
||||
"url_image_principale": image_main_url,
|
||||
"prix_actuel": parse_price_fr(price_text),
|
||||
"prix_conseille": parse_price_fr(price_list_text),
|
||||
"prix_min_30j": lowest_30d_price,
|
||||
"prix_conseille_reduction": reduction_conseille,
|
||||
"prix_min_30j_reduction": reduction_min_30j,
|
||||
"etat_stock": stock_text,
|
||||
"en_stock": in_stock,
|
||||
"note": parse_rating_value(rating_text),
|
||||
"nombre_avis": parse_rating_count(rating_count_text),
|
||||
"choix_amazon": bool(amazon_choice) if amazon_choice is not None else None,
|
||||
"offre_limitee": bool(limited_time_deal) if limited_time_deal is not None else None,
|
||||
"prime": True if prime_eligible else None,
|
||||
"exclusivite_amazon": bool(amazon_exclusive) if amazon_exclusive is not None else None,
|
||||
"a_propos": a_propos,
|
||||
"description": description,
|
||||
"carateristique": carateristique,
|
||||
"details": details,
|
||||
}
|
||||
|
||||
missing = [key for key in ("titre", "prix_actuel", "note") if not data.get(key)]
|
||||
if missing:
|
||||
logger.warning("Champs manquants (html): {}", ", ".join(missing))
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def extract_product_data(page: Page, url: str) -> dict[str, Any]:
|
||||
# champ titre
|
||||
title = _safe_text(page, "#productTitle")
|
||||
|
||||
# image principale
|
||||
image_main_url = _safe_attr(page, "#landingImage", "src")
|
||||
if not image_main_url:
|
||||
image_main_url = _safe_attr(page, "#imgTagWrapperId img", "src")
|
||||
|
||||
# prix actuel
|
||||
price_text = _safe_text(page, "#corePriceDisplay_desktop_feature_div .a-offscreen")
|
||||
if not price_text:
|
||||
price_text = _safe_text(page, "#priceblock_ourprice")
|
||||
if not price_text:
|
||||
price_text = _safe_text(page, "#priceblock_dealprice")
|
||||
if not price_text:
|
||||
whole = _safe_text(page, ".a-price .a-price-whole")
|
||||
fraction = _safe_text(page, ".a-price .a-price-fraction")
|
||||
symbol = _safe_text(page, ".a-price .a-price-symbol")
|
||||
price_text = _compose_price_from_parts(whole, fraction, symbol)
|
||||
if not price_text:
|
||||
price_text = _safe_attr(page, "#twister-plus-price-data-price", "value")
|
||||
|
||||
# prix barré / conseillé
|
||||
price_list_text = _safe_text(page, "#corePriceDisplay_desktop_feature_div .a-text-price span.a-offscreen")
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text(page, "#priceblock_strikeprice")
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text(page, ".srpPriceBlock .a-offscreen")
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text(page, ".srpPriceBlockAUI .a-offscreen")
|
||||
|
||||
# stock
|
||||
stock_text = _safe_text(page, "#availability span")
|
||||
if not stock_text:
|
||||
stock_text = _safe_text(page, "#availability")
|
||||
|
||||
in_stock, stock_text = parse_stock_status(stock_text)
|
||||
|
||||
# rating
|
||||
rating_text = _safe_text(page, "#acrPopover .a-icon-alt")
|
||||
rating_count_text = _safe_text(page, "#acrCustomerReviewText")
|
||||
|
||||
# badges
|
||||
amazon_choice = _safe_text(page, "#acBadge_feature_div")
|
||||
limited_time_deal = _safe_text(page, "#dealBadge_feature_div")
|
||||
prime_eligible = None
|
||||
if page.locator("#primeBadge").count() > 0:
|
||||
prime_eligible = True
|
||||
elif page.locator("#priceBadging_feature_div #prime-badge").count() > 0:
|
||||
prime_eligible = True
|
||||
elif page.locator("#priceBadging_feature_div i.a-icon-prime").count() > 0:
|
||||
prime_eligible = True
|
||||
elif page.locator("#corePriceDisplay_desktop_feature_div i.a-icon-prime").count() > 0:
|
||||
prime_eligible = True
|
||||
elif page.locator("#priceBadging_feature_div #prime-badge").count() > 0:
|
||||
prime_eligible = True
|
||||
elif page.locator("i#prime-badge").count() > 0:
|
||||
prime_eligible = True
|
||||
elif page.locator("i.a-icon-prime[aria-label*='prime']").count() > 0:
|
||||
prime_eligible = True
|
||||
|
||||
amazon_exclusive = _safe_text(page, "text=Exclusivité Amazon")
|
||||
|
||||
# prix plus bas 30 jours
|
||||
lowest_30d_text = None
|
||||
if page.locator(".basisPrice").count() > 0:
|
||||
basis_text = page.locator(".basisPrice").first.inner_text()
|
||||
if basis_text and re.search(r"prix.+(30|trente).+jour", basis_text.lower()):
|
||||
lowest_30d_text = _safe_text(page, ".basisPrice .a-offscreen") or basis_text
|
||||
if not lowest_30d_text and page.locator("#priceBadging_feature_div").count() > 0:
|
||||
badging_text = page.locator("#priceBadging_feature_div").first.inner_text()
|
||||
if badging_text and re.search(r"prix.+(30|trente).+jour", badging_text.lower()):
|
||||
lowest_30d_text = _safe_text(page, "#priceBadging_feature_div .a-offscreen") or badging_text
|
||||
if lowest_30d_text and not re.search(r"prix.+(30|trente).+jour", lowest_30d_text.lower()):
|
||||
lowest_30d_text = None
|
||||
lowest_30d_price = None
|
||||
if lowest_30d_text and "prix" in lowest_30d_text.lower():
|
||||
lowest_30d_price = parse_price_fr(lowest_30d_text)
|
||||
if lowest_30d_price is not None:
|
||||
candidate_list = parse_price_fr(price_list_text)
|
||||
if candidate_list == lowest_30d_price:
|
||||
price_list_text = None
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text(page, ".srpPriceBlock .a-offscreen")
|
||||
if not price_list_text:
|
||||
price_list_text = _safe_text(page, ".srpPriceBlockAUI .a-offscreen")
|
||||
|
||||
reduction_savings_text = _safe_text(page, "#corePriceDisplay_desktop_feature_div .savingsPercentage")
|
||||
reduction_conseille_text = _safe_text(page, ".srpSavingsPercentageBlock")
|
||||
reduction_min_30j = _parse_percent(reduction_savings_text)
|
||||
reduction_conseille = _parse_percent(reduction_conseille_text)
|
||||
|
||||
asin = _safe_attr(page, "input#ASIN", "value") or _extract_asin_from_url(url)
|
||||
|
||||
soup = BeautifulSoup(page.content(), "html.parser")
|
||||
a_propos = _extract_about_bullets(soup)
|
||||
description = _extract_description(soup)
|
||||
carateristique = _extract_carateristique(soup)
|
||||
details = _extract_details(soup)
|
||||
|
||||
data = {
|
||||
"url": url,
|
||||
"asin": asin,
|
||||
"titre": title,
|
||||
"url_image_principale": image_main_url,
|
||||
"prix_actuel": parse_price_fr(price_text),
|
||||
"prix_conseille": parse_price_fr(price_list_text),
|
||||
"prix_min_30j": lowest_30d_price,
|
||||
"prix_conseille_reduction": reduction_conseille,
|
||||
"prix_min_30j_reduction": reduction_min_30j,
|
||||
"etat_stock": stock_text,
|
||||
"en_stock": in_stock,
|
||||
"note": parse_rating_value(rating_text),
|
||||
"nombre_avis": parse_rating_count(rating_count_text),
|
||||
"choix_amazon": bool(amazon_choice) if amazon_choice is not None else None,
|
||||
"offre_limitee": bool(limited_time_deal) if limited_time_deal is not None else None,
|
||||
"prime": True if prime_eligible else None,
|
||||
"exclusivite_amazon": bool(amazon_exclusive) if amazon_exclusive is not None else None,
|
||||
"a_propos": a_propos,
|
||||
"description": description,
|
||||
"carateristique": carateristique,
|
||||
"details": details,
|
||||
}
|
||||
|
||||
return data
|
||||
23
backend/app/scraper/amazon/selectors.md
Normal file
23
backend/app/scraper/amazon/selectors.md
Normal file
@@ -0,0 +1,23 @@
|
||||
# Sélecteurs Amazon (FR)
|
||||
|
||||
## Identifiants stables
|
||||
- `#productTitle`
|
||||
- `#acrCustomerReviewText`
|
||||
- `#availability`
|
||||
|
||||
## Prix
|
||||
- `#corePriceDisplay_desktop_feature_div .a-offscreen`
|
||||
- `#priceblock_ourprice`
|
||||
- `#priceblock_dealprice`
|
||||
- `#corePriceDisplay_desktop_feature_div .a-text-price span.a-offscreen`
|
||||
- `#priceblock_strikeprice`
|
||||
|
||||
## Images
|
||||
- `#landingImage`
|
||||
- `#imgTagWrapperId img`
|
||||
|
||||
## Badges
|
||||
- `#acBadge_feature_div`
|
||||
- `#dealBadge_feature_div`
|
||||
- `#primeBadge`
|
||||
- `text=Exclusivité Amazon`
|
||||
23
backend/app/scraper/browser.py
Normal file
23
backend/app/scraper/browser.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from playwright.async_api import async_playwright, Browser, BrowserContext
|
||||
|
||||
|
||||
async def build_browser_context(headless: bool, viewport: dict[str, int], locale: str, timezone: str) -> BrowserContext:
|
||||
playwright = await async_playwright().start()
|
||||
browser: Browser = await playwright.chromium.launch(headless=headless)
|
||||
context = await browser.new_context(
|
||||
viewport=viewport,
|
||||
locale=locale,
|
||||
timezone_id=timezone,
|
||||
)
|
||||
context.set_default_timeout(30000)
|
||||
return context
|
||||
|
||||
|
||||
async def close_context(context: BrowserContext) -> None:
|
||||
await context.close()
|
||||
await context.browser.close()
|
||||
await context._playwright.stop()
|
||||
61
backend/app/scraper/normalize.py
Normal file
61
backend/app/scraper/normalize.py
Normal file
@@ -0,0 +1,61 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def parse_price_fr(text: str | None) -> Optional[float]:
|
||||
if not text:
|
||||
return None
|
||||
# Exemple: "1 249,99 €" -> 1249.99 (gère espaces insécables)
|
||||
match = re.search(r"([0-9][0-9\s\.\u00a0\u202f]*(?:[,.][0-9]{2})?)", text)
|
||||
if not match:
|
||||
return None
|
||||
cleaned = match.group(1).replace(" ", "").replace("\u00a0", "").replace("\u202f", "")
|
||||
if "," in cleaned:
|
||||
cleaned = cleaned.replace(".", "").replace(",", ".")
|
||||
elif cleaned.count(".") == 1 and len(cleaned.split(".")[-1]) == 2:
|
||||
# conserve le point comme séparateur décimal
|
||||
pass
|
||||
else:
|
||||
cleaned = cleaned.replace(".", "")
|
||||
try:
|
||||
return float(cleaned)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def parse_rating_value(text: str | None) -> Optional[float]:
|
||||
if not text:
|
||||
return None
|
||||
match = re.search(r"([0-9]+(?:[\.,][0-9]+)?)", text)
|
||||
if not match:
|
||||
return None
|
||||
try:
|
||||
return float(match.group(1).replace(",", "."))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def parse_rating_count(text: str | None) -> Optional[int]:
|
||||
if not text:
|
||||
return None
|
||||
digits = re.sub(r"[^0-9]", "", text)
|
||||
if not digits:
|
||||
return None
|
||||
try:
|
||||
return int(digits)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def parse_stock_status(text: str | None) -> tuple[Optional[bool], Optional[str]]:
|
||||
if not text:
|
||||
return None, None
|
||||
cleaned = " ".join(text.split())
|
||||
lowered = cleaned.lower()
|
||||
if "en stock" in lowered or "disponible" in lowered:
|
||||
return True, cleaned
|
||||
if "indisponible" in lowered or "rupture" in lowered:
|
||||
return False, cleaned
|
||||
return None, cleaned
|
||||
270
backend/app/scraper/run_scrape_tests.py
Normal file
270
backend/app/scraper/run_scrape_tests.py
Normal file
@@ -0,0 +1,270 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[3]
|
||||
if str(PROJECT_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
from loguru import logger
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
from backend.app.core.config import load_config
|
||||
from backend.app.scraper.amazon.parser import extract_product_data
|
||||
|
||||
SAMPLES_DIR = Path(__file__).resolve().parent.parent / "samples"
|
||||
TESTS_PATH = SAMPLES_DIR / "scrape_test.json"
|
||||
RESULTS_PATH = SAMPLES_DIR / "scrap_result.json"
|
||||
FIELDS_PATH = SAMPLES_DIR / "scrape_fields.json"
|
||||
STORAGE_STATE_PATH = SAMPLES_DIR / "storage_state.json"
|
||||
DEBUG_DIR = SAMPLES_DIR / "debug"
|
||||
|
||||
DEFAULT_REQUIRED_FIELDS = ("titre", "prix_actuel")
|
||||
DEFAULT_OPTIONAL_FIELDS = (
|
||||
"prix_conseille",
|
||||
"prix_min_30j",
|
||||
"etat_stock",
|
||||
"en_stock",
|
||||
"note",
|
||||
"nombre_avis",
|
||||
"prime",
|
||||
"choix_amazon",
|
||||
"offre_limitee",
|
||||
"exclusivite_amazon",
|
||||
)
|
||||
|
||||
|
||||
def load_fields_config() -> tuple[tuple[str, ...], tuple[str, ...]]:
|
||||
if not FIELDS_PATH.exists():
|
||||
return DEFAULT_REQUIRED_FIELDS, DEFAULT_OPTIONAL_FIELDS
|
||||
payload = json.loads(FIELDS_PATH.read_text(encoding="utf-8"))
|
||||
required = tuple(payload.get("required", DEFAULT_REQUIRED_FIELDS))
|
||||
optional = tuple(payload.get("optional", DEFAULT_OPTIONAL_FIELDS))
|
||||
return required, optional
|
||||
|
||||
|
||||
def canonicalize_url(url: str) -> str:
|
||||
if not url:
|
||||
return url
|
||||
parsed = urlparse(url)
|
||||
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
|
||||
|
||||
|
||||
def extract_reference(url: str) -> str | None:
|
||||
if not url:
|
||||
return None
|
||||
match = re.search(r"/dp/([A-Z0-9]{10})", url)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
|
||||
|
||||
def build_debug(statut: str, erreurs: list[str] | None = None, notes: list[str] | None = None) -> dict:
|
||||
return {
|
||||
"statut": statut,
|
||||
"erreurs": erreurs or [],
|
||||
"notes": notes or [],
|
||||
}
|
||||
|
||||
|
||||
def build_result(
|
||||
test_id: str,
|
||||
url: str,
|
||||
statut: str,
|
||||
data: dict | None = None,
|
||||
debug: dict | None = None,
|
||||
) -> dict:
|
||||
return {
|
||||
"id": test_id,
|
||||
"url": url,
|
||||
"url_canonique": canonicalize_url(url),
|
||||
"reference": extract_reference(url),
|
||||
"statut": statut,
|
||||
"donnees": data,
|
||||
"debug": debug,
|
||||
}
|
||||
|
||||
|
||||
def save_debug_artifacts(page, test_id: str, suffix: str) -> dict:
|
||||
debug_files = {}
|
||||
try:
|
||||
screenshot_path = DEBUG_DIR / f"{test_id}_{suffix}.png"
|
||||
html_path = DEBUG_DIR / f"{test_id}_{suffix}.html"
|
||||
page.screenshot(path=str(screenshot_path), full_page=True)
|
||||
html_path.write_text(page.content(), encoding="utf-8")
|
||||
debug_files = {
|
||||
"screenshot": str(screenshot_path),
|
||||
"html": str(html_path),
|
||||
}
|
||||
logger.info("Artifacts debug: {}", debug_files)
|
||||
except Exception:
|
||||
logger.warning("Impossible de générer les artifacts de debug.")
|
||||
return debug_files
|
||||
|
||||
|
||||
def evaluate_data(
|
||||
data: dict,
|
||||
required_fields: tuple[str, ...],
|
||||
optional_fields: tuple[str, ...],
|
||||
) -> tuple[str, dict]:
|
||||
missing_required = [field for field in required_fields if not data.get(field)]
|
||||
missing_optional = [field for field in optional_fields if data.get(field) is None]
|
||||
|
||||
if missing_required:
|
||||
notes = []
|
||||
if missing_optional:
|
||||
notes.append(f"Optionnels manquants: {', '.join(missing_optional)}")
|
||||
return "partiel", build_debug(
|
||||
"partiel",
|
||||
erreurs=[f"Obligatoires manquants: {', '.join(missing_required)}"],
|
||||
notes=notes,
|
||||
)
|
||||
|
||||
if missing_optional:
|
||||
return "ok", build_debug(
|
||||
"succes",
|
||||
notes=[f"Optionnels manquants: {', '.join(missing_optional)}"],
|
||||
)
|
||||
|
||||
return "ok", build_debug("succes")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
logger.remove()
|
||||
logger.add(sys.stdout, level="INFO")
|
||||
DEBUG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
payload = json.loads(TESTS_PATH.read_text(encoding="utf-8"))
|
||||
tests = payload.get("tests", [])
|
||||
if not tests:
|
||||
logger.warning("Aucun test trouvé dans {}", TESTS_PATH)
|
||||
return
|
||||
|
||||
config = load_config()
|
||||
required_fields, optional_fields = load_fields_config()
|
||||
min_delay = int(os.getenv("SCRAPE_TEST_MIN_DELAY", "1"))
|
||||
max_delay = int(os.getenv("SCRAPE_TEST_MAX_DELAY", "5"))
|
||||
max_tests = int(os.getenv("SCRAPE_TEST_MAX", "0"))
|
||||
headful_on_block = os.getenv("SCRAPE_TEST_HEADFUL_ON_BLOCK", "0") == "1"
|
||||
wait_on_block = int(os.getenv("SCRAPE_TEST_WAIT_ON_BLOCK", "60"))
|
||||
results = []
|
||||
|
||||
with sync_playwright() as playwright:
|
||||
browser = playwright.chromium.launch(headless=config.scrape.headless)
|
||||
context_kwargs = {
|
||||
"locale": config.scrape.locale,
|
||||
"timezone_id": config.scrape.timezone,
|
||||
"user_agent": config.scrape.user_agent,
|
||||
"viewport": config.scrape.viewport,
|
||||
}
|
||||
if STORAGE_STATE_PATH.exists():
|
||||
context_kwargs["storage_state"] = str(STORAGE_STATE_PATH)
|
||||
logger.info("Session persistée chargée: {}", STORAGE_STATE_PATH)
|
||||
context = browser.new_context(**context_kwargs)
|
||||
page = context.new_page()
|
||||
page.set_default_timeout(config.scrape.timeout_ms)
|
||||
|
||||
try:
|
||||
for index, test in enumerate(tests, start=1):
|
||||
if max_tests > 0 and index > max_tests:
|
||||
logger.info("Limite atteinte ({} tests), arrêt de la session.", max_tests)
|
||||
break
|
||||
|
||||
test_id = test.get("id")
|
||||
url = test.get("url")
|
||||
pause_s = test.get("pause_s", 0)
|
||||
|
||||
if not url:
|
||||
logger.warning("Test {} sans URL", test_id)
|
||||
continue
|
||||
|
||||
logger.info("Scraping {} ({})", test_id, url)
|
||||
page.goto(url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms)
|
||||
debug_files = save_debug_artifacts(page, test_id, "capture")
|
||||
data = extract_product_data(page, url)
|
||||
if not data.get("titre"):
|
||||
logger.warning("Titre absent, suspicion de blocage pour {}", test_id)
|
||||
if headful_on_block:
|
||||
logger.info("Ouverture headful pour résolution manuelle.")
|
||||
manual_browser = playwright.chromium.launch(headless=False)
|
||||
manual_context_kwargs = dict(context_kwargs)
|
||||
manual_context = manual_browser.new_context(**manual_context_kwargs)
|
||||
manual_page = manual_context.new_page()
|
||||
manual_page.goto(url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms)
|
||||
save_debug_artifacts(manual_page, test_id, "manual")
|
||||
logger.info("Résoudre le captcha puis appuyer sur Entrée.")
|
||||
try:
|
||||
input()
|
||||
except EOFError:
|
||||
logger.info("Pas d'entrée disponible, attente {}s.", wait_on_block)
|
||||
time.sleep(wait_on_block)
|
||||
data = extract_product_data(manual_page, url)
|
||||
if not data.get("titre"):
|
||||
results.append(
|
||||
build_result(
|
||||
test_id,
|
||||
url,
|
||||
"bloque",
|
||||
debug=build_debug("bloque", notes=[f"debug={debug_files}"]),
|
||||
)
|
||||
)
|
||||
else:
|
||||
status, debug = evaluate_data(data, required_fields, optional_fields)
|
||||
if status == "partiel":
|
||||
logger.warning("Champs manquants: {}", debug.get("erreurs"))
|
||||
debug["notes"].append(f"debug={debug_files}")
|
||||
results.append(build_result(test_id, url, status, data=data, debug=debug))
|
||||
logger.info("OK {} (titre={})", test_id, data.get("titre"))
|
||||
try:
|
||||
manual_context.storage_state(path=str(STORAGE_STATE_PATH))
|
||||
logger.info("Session persistée sauvegardée: {}", STORAGE_STATE_PATH)
|
||||
except Exception:
|
||||
logger.warning("Impossible de sauvegarder la session persistée.")
|
||||
manual_context.close()
|
||||
manual_browser.close()
|
||||
else:
|
||||
results.append(
|
||||
build_result(
|
||||
test_id,
|
||||
url,
|
||||
"bloque",
|
||||
debug=build_debug("bloque", notes=[f"debug={debug_files}"]),
|
||||
)
|
||||
)
|
||||
else:
|
||||
status, debug = evaluate_data(data, required_fields, optional_fields)
|
||||
if status == "partiel":
|
||||
logger.warning("Champs manquants: {}", debug.get("erreurs"))
|
||||
debug["notes"].append(f"debug={debug_files}")
|
||||
results.append(build_result(test_id, url, status, data=data, debug=debug))
|
||||
logger.info("OK {} (titre={})", test_id, data.get("titre"))
|
||||
|
||||
if pause_s:
|
||||
logger.info("Pause {}s", pause_s)
|
||||
time.sleep(pause_s)
|
||||
|
||||
# délai supplémentaire entre pages pour limiter les blocages
|
||||
jitter = random.uniform(min_delay, max_delay)
|
||||
logger.info("Délai anti-blocage: {:.1f}s", jitter)
|
||||
time.sleep(jitter)
|
||||
finally:
|
||||
try:
|
||||
context.storage_state(path=str(STORAGE_STATE_PATH))
|
||||
logger.info("Session persistée sauvegardée: {}", STORAGE_STATE_PATH)
|
||||
except Exception:
|
||||
logger.warning("Impossible de sauvegarder la session persistée.")
|
||||
context.close()
|
||||
browser.close()
|
||||
|
||||
RESULTS_PATH.write_text(json.dumps({"results": results}, ensure_ascii=False, indent=2))
|
||||
logger.info("Résultats sauvegardés dans {}", RESULTS_PATH)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
242
backend/app/scraper/runner.py
Normal file
242
backend/app/scraper/runner.py
Normal file
@@ -0,0 +1,242 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
import json
|
||||
import random
|
||||
import time
|
||||
|
||||
from loguru import logger
|
||||
from playwright.sync_api import sync_playwright
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from backend.app.core.config import load_config
|
||||
from backend.app.db import database, models
|
||||
from backend.app.scraper.amazon.parser import detect_blocked, extract_product_data
|
||||
|
||||
|
||||
def _create_run(session: Session) -> models.ScrapeRun:
|
||||
run = models.ScrapeRun(demarre_le=datetime.utcnow(), statut="en_cours")
|
||||
session.add(run)
|
||||
session.commit()
|
||||
session.refresh(run)
|
||||
return run
|
||||
|
||||
|
||||
def _finalize_run(run: models.ScrapeRun, session: Session, status: str) -> None:
|
||||
run.statut = status
|
||||
run.termine_le = datetime.utcnow()
|
||||
session.add(run)
|
||||
session.commit()
|
||||
|
||||
|
||||
def _save_raw_json(payload: dict, product_id: int) -> Path:
|
||||
base_dir = Path(__file__).resolve().parent.parent.parent / "data" / "raw"
|
||||
timestamp = datetime.utcnow().strftime("%Y-%m-%d")
|
||||
folder = base_dir / timestamp
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
filename = f"{product_id}_{datetime.utcnow().strftime('%H%M%S')}.json"
|
||||
path = folder / filename
|
||||
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
|
||||
return path
|
||||
|
||||
|
||||
def _save_debug_artifacts(page, product_id: int) -> tuple[Path, Path]:
|
||||
base_dir = Path(__file__).resolve().parent.parent.parent / "data" / "screenshots"
|
||||
base_dir.mkdir(parents=True, exist_ok=True)
|
||||
stamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
||||
screenshot_path = base_dir / f"{product_id}_{stamp}.png"
|
||||
html_path = base_dir / f"{product_id}_{stamp}.html"
|
||||
page.screenshot(path=str(screenshot_path), full_page=True)
|
||||
html_path.write_text(page.content())
|
||||
return screenshot_path, html_path
|
||||
|
||||
|
||||
def _create_snapshot(
|
||||
session: Session,
|
||||
product: models.Product,
|
||||
run: models.ScrapeRun,
|
||||
data: dict,
|
||||
status: str,
|
||||
raw_json_path: Path | None,
|
||||
error_message: str | None = None,
|
||||
) -> None:
|
||||
snapshot = models.ProductSnapshot(
|
||||
produit_id=product.id,
|
||||
run_scrap_id=run.id,
|
||||
prix_actuel=data.get("prix_actuel"),
|
||||
prix_conseille=data.get("prix_conseille"),
|
||||
prix_min_30j=data.get("prix_min_30j"),
|
||||
etat_stock=data.get("etat_stock"),
|
||||
en_stock=data.get("en_stock"),
|
||||
note=data.get("note"),
|
||||
nombre_avis=data.get("nombre_avis"),
|
||||
prime=data.get("prime"),
|
||||
choix_amazon=data.get("choix_amazon"),
|
||||
offre_limitee=data.get("offre_limitee"),
|
||||
exclusivite_amazon=data.get("exclusivite_amazon"),
|
||||
chemin_json_brut=str(raw_json_path) if raw_json_path else None,
|
||||
statut_scrap=status,
|
||||
message_erreur=error_message,
|
||||
)
|
||||
session.add(snapshot)
|
||||
session.commit()
|
||||
|
||||
|
||||
def scrape_product(product_id: int) -> None:
|
||||
logger.info("Déclenchement du scraping pour le produit %s", product_id)
|
||||
session = database.SessionLocal()
|
||||
run = _create_run(session)
|
||||
try:
|
||||
product = session.get(models.Product, product_id)
|
||||
if not product:
|
||||
logger.warning("Produit %s introuvable", product_id)
|
||||
_finalize_run(run, session, "echec")
|
||||
return
|
||||
config = load_config()
|
||||
run.nb_total = 1
|
||||
session.commit()
|
||||
|
||||
with sync_playwright() as playwright:
|
||||
browser = playwright.chromium.launch(headless=config.scrape.headless)
|
||||
context = browser.new_context(
|
||||
locale=config.scrape.locale,
|
||||
timezone_id=config.scrape.timezone,
|
||||
user_agent=config.scrape.user_agent,
|
||||
viewport=config.scrape.viewport,
|
||||
)
|
||||
page = context.new_page()
|
||||
page.set_default_timeout(config.scrape.timeout_ms)
|
||||
try:
|
||||
page.goto(product.url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms)
|
||||
|
||||
html = page.content()
|
||||
if detect_blocked(html):
|
||||
screenshot_path, html_path = _save_debug_artifacts(page, product.id)
|
||||
data = {"url": product.url, "asin": product.asin, "bloque": True}
|
||||
raw_path = _save_raw_json(data, product.id)
|
||||
_create_snapshot(
|
||||
session,
|
||||
product,
|
||||
run,
|
||||
data,
|
||||
status="bloque",
|
||||
raw_json_path=raw_path,
|
||||
error_message=f"Bloque: {screenshot_path.name} / {html_path.name}",
|
||||
)
|
||||
run.nb_echec = 1
|
||||
_finalize_run(run, session, "partiel")
|
||||
return
|
||||
|
||||
data = extract_product_data(page, product.url)
|
||||
raw_path = _save_raw_json(data, product.id)
|
||||
required = ["titre", "prix_actuel", "note"]
|
||||
missing = [field for field in required if not data.get(field)]
|
||||
status = "champs_manquants" if missing else "ok"
|
||||
_create_snapshot(
|
||||
session,
|
||||
product,
|
||||
run,
|
||||
data,
|
||||
status=status,
|
||||
raw_json_path=raw_path,
|
||||
error_message=", ".join(missing) if missing else None,
|
||||
)
|
||||
run.nb_ok = 1 if not missing else 0
|
||||
run.nb_echec = 0 if not missing else 1
|
||||
_finalize_run(run, session, "succes" if not missing else "partiel")
|
||||
|
||||
delay_min, delay_max = config.scrape.delay_range_ms
|
||||
time.sleep(random.uniform(delay_min, delay_max) / 1000.0)
|
||||
finally:
|
||||
# fermeture propre du navigateur
|
||||
context.close()
|
||||
browser.close()
|
||||
except Exception as exc: # pragma: no cover
|
||||
logger.exception("Erreur pendant le scraping de %s", product_id)
|
||||
_finalize_run(run, session, "erreur")
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def scrape_all(product_ids: Iterable[int] | None = None) -> None:
|
||||
logger.info("Déclenchement du scraping global")
|
||||
session = database.SessionLocal()
|
||||
run = _create_run(session)
|
||||
try:
|
||||
config = load_config()
|
||||
products = session.query(models.Product).all()
|
||||
if product_ids:
|
||||
products = [product for product in products if product.id in product_ids]
|
||||
run.nb_total = len(products)
|
||||
session.commit()
|
||||
|
||||
with sync_playwright() as playwright:
|
||||
browser = playwright.chromium.launch(headless=config.scrape.headless)
|
||||
context = browser.new_context(
|
||||
locale=config.scrape.locale,
|
||||
timezone_id=config.scrape.timezone,
|
||||
user_agent=config.scrape.user_agent,
|
||||
viewport=config.scrape.viewport,
|
||||
)
|
||||
page = context.new_page()
|
||||
page.set_default_timeout(config.scrape.timeout_ms)
|
||||
|
||||
nb_ok = 0
|
||||
nb_echec = 0
|
||||
|
||||
try:
|
||||
for product in products:
|
||||
page.goto(product.url, wait_until="domcontentloaded", timeout=config.scrape.timeout_ms)
|
||||
html = page.content()
|
||||
if detect_blocked(html):
|
||||
screenshot_path, html_path = _save_debug_artifacts(page, product.id)
|
||||
data = {"url": product.url, "asin": product.asin, "bloque": True}
|
||||
raw_path = _save_raw_json(data, product.id)
|
||||
_create_snapshot(
|
||||
session,
|
||||
product,
|
||||
run,
|
||||
data,
|
||||
status="bloque",
|
||||
raw_json_path=raw_path,
|
||||
error_message=f"Bloque: {screenshot_path.name} / {html_path.name}",
|
||||
)
|
||||
nb_echec += 1
|
||||
continue
|
||||
|
||||
data = extract_product_data(page, product.url)
|
||||
raw_path = _save_raw_json(data, product.id)
|
||||
required = ["titre", "prix_actuel", "note"]
|
||||
missing = [field for field in required if not data.get(field)]
|
||||
status = "champs_manquants" if missing else "ok"
|
||||
_create_snapshot(
|
||||
session,
|
||||
product,
|
||||
run,
|
||||
data,
|
||||
status=status,
|
||||
raw_json_path=raw_path,
|
||||
error_message=", ".join(missing) if missing else None,
|
||||
)
|
||||
if missing:
|
||||
nb_echec += 1
|
||||
else:
|
||||
nb_ok += 1
|
||||
|
||||
delay_min, delay_max = config.scrape.delay_range_ms
|
||||
time.sleep(random.uniform(delay_min, delay_max) / 1000.0)
|
||||
|
||||
run.nb_ok = nb_ok
|
||||
run.nb_echec = nb_echec
|
||||
_finalize_run(run, session, "succes" if nb_echec == 0 else "partiel")
|
||||
finally:
|
||||
# fermeture propre du navigateur
|
||||
context.close()
|
||||
browser.close()
|
||||
except Exception: # pragma: no cover
|
||||
logger.exception("Erreur du scraping global")
|
||||
_finalize_run(run, session, "erreur")
|
||||
finally:
|
||||
session.close()
|
||||
Reference in New Issue
Block a user