Compare commits

..

2 Commits

Author SHA1 Message Date
Gilles Soulier
1f7f7da0c3 claude 2026-01-17 14:48:14 +01:00
Gilles Soulier
152c2724fc feat: improve SPA scraping and increase test coverage
- Add SPA support for Playwright with wait_for_network_idle and extra_wait_ms
- Add BaseStore.get_spa_config() and requires_playwright() methods
- Implement AliExpress SPA config with JSON price extraction patterns
- Fix Amazon price parsing to prioritize whole+fraction combination
- Fix AliExpress regex patterns (remove double backslashes)
- Add CLI tests: detect, doctor, fetch, parse, run commands
- Add API tests: auth, logs, products, scraping_logs, webhooks

Tests: 417 passed, 85% coverage

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 14:46:55 +01:00
40 changed files with 1315 additions and 22 deletions

7
.claude/settings.json Normal file
View File

@@ -0,0 +1,7 @@
{
"permissions": {
"allow": [
"Bash(sort:*)"
]
}
}

BIN
.coverage

Binary file not shown.

BIN
pricewatch/app/core/__pycache__/io.cpython-313.pyc Executable file → Normal file

Binary file not shown.

Binary file not shown.

View File

@@ -45,6 +45,8 @@ def fetch_playwright(
timeout_ms: int = 60000,
save_screenshot: bool = False,
wait_for_selector: Optional[str] = None,
wait_for_network_idle: bool = False,
extra_wait_ms: int = 0,
) -> PlaywrightFetchResult:
"""
Récupère une page avec Playwright.
@@ -55,6 +57,8 @@ def fetch_playwright(
timeout_ms: Timeout en millisecondes
save_screenshot: Prendre un screenshot
wait_for_selector: Attendre un sélecteur CSS avant de récupérer
wait_for_network_idle: Attendre que le réseau soit inactif (pour SPA)
extra_wait_ms: Délai supplémentaire après chargement (pour JS lent)
Returns:
PlaywrightFetchResult avec HTML, screenshot (optionnel), ou erreur
@@ -65,6 +69,8 @@ def fetch_playwright(
- Headful disponible pour debug visuel
- Screenshot optionnel pour diagnostiquer les échecs
- wait_for_selector permet d'attendre le chargement dynamique
- wait_for_network_idle utile pour les SPA qui chargent via AJAX
- extra_wait_ms pour les sites avec JS lent après DOM ready
"""
if not url or not url.strip():
logger.error("URL vide fournie")
@@ -101,7 +107,8 @@ def fetch_playwright(
# Naviguer vers la page
logger.debug(f"[Playwright] Navigation vers {url}")
response = page.goto(url, wait_until="domcontentloaded")
wait_until = "networkidle" if wait_for_network_idle else "domcontentloaded"
response = page.goto(url, wait_until=wait_until)
if not response:
raise Exception("Pas de réponse du serveur")
@@ -116,6 +123,11 @@ def fetch_playwright(
f"[Playwright] Timeout en attendant le sélecteur: {wait_for_selector}"
)
# Délai supplémentaire pour JS lent (SPA)
if extra_wait_ms > 0:
logger.debug(f"[Playwright] Attente supplémentaire: {extra_wait_ms}ms")
page.wait_for_timeout(extra_wait_ms)
# Récupérer le HTML
html = page.content()

BIN
pricewatch/app/stores/__pycache__/base.cpython-313.pyc Executable file → Normal file

Binary file not shown.

Binary file not shown.

View File

@@ -29,13 +29,39 @@ logger = get_logger("stores.aliexpress")
class AliexpressStore(BaseStore):
"""Store pour AliExpress.com (marketplace chinois)."""
"""Store pour AliExpress.com (marketplace chinois).
AliExpress est une SPA (Single Page Application) qui charge
le contenu via JavaScript/AJAX. Nécessite Playwright avec
attente du chargement dynamique.
"""
def __init__(self):
"""Initialise le store AliExpress avec ses sélecteurs."""
selectors_path = Path(__file__).parent / "selectors.yml"
super().__init__(store_id="aliexpress", selectors_path=selectors_path)
def get_spa_config(self) -> dict:
"""
Configuration SPA pour AliExpress.
AliExpress charge les données produit (prix, titre) via AJAX.
Il faut attendre que le réseau soit inactif ET ajouter un délai
pour laisser le JS terminer le rendu.
Returns:
Configuration Playwright pour SPA
"""
return {
"wait_for_network_idle": True,
"wait_for_selector": "h1", # Titre du produit
"extra_wait_ms": 2000, # 2s pour le rendu JS
}
def requires_playwright(self) -> bool:
"""AliExpress nécessite Playwright pour le rendu SPA."""
return True
def match(self, url: str) -> float:
"""
Détecte si l'URL est AliExpress.
@@ -206,28 +232,71 @@ class AliexpressStore(BaseStore):
Extrait le prix.
AliExpress n'a PAS de sélecteur CSS stable pour le prix.
On utilise regex sur le HTML brut.
Stratégie multi-niveaux:
1. Chercher dans les données JSON embarquées
2. Chercher dans les spans avec classes contenant "price"
3. Regex sur le HTML brut
4. Meta tags og:price
"""
# Pattern 1: Prix avant € (ex: "136,69 €")
match = re.search(r"([0-9][0-9\\s.,\\u00a0\\u202f\\u2009]*)\\s*€", html)
# Priorité 1: Extraire depuis JSON embarqué (skuActivityAmount, formattedActivityPrice)
json_patterns = [
r'"skuActivityAmount"\s*:\s*\{\s*"value"\s*:\s*(\d+(?:\.\d+)?)', # {"value": 123.45}
r'"formattedActivityPrice"\s*:\s*"([0-9,.\s]+)\s*€"', # "123,45 €"
r'"formattedActivityPrice"\s*:\s*"\s*([0-9,.\s]+)"', # "€ 123.45"
r'"minPrice"\s*:\s*"([0-9,.\s]+)"', # "minPrice": "123.45"
r'"price"\s*:\s*"([0-9,.\s]+)"', # "price": "123.45"
r'"activityAmount"\s*:\s*\{\s*"value"\s*:\s*(\d+(?:\.\d+)?)', # activityAmount.value
]
for pattern in json_patterns:
match = re.search(pattern, html)
if match:
price = parse_price_text(match.group(1))
if price is not None:
if price is not None and price > 0:
debug.notes.append(f"Prix extrait depuis JSON: {price}")
return price
# Pattern 2: € avant prix (ex: "€ 136.69")
match = re.search(r"\\s*([0-9][0-9\\s.,\\u00a0\\u202f\\u2009]*)", html)
# Priorité 2: Chercher dans les spans/divs avec classes contenant "price"
price_selectors = [
'span[class*="price--current"]',
'span[class*="price--sale"]',
'div[class*="price--current"]',
'span[class*="product-price"]',
'span[class*="Price_Price"]',
'div[class*="es--wrap"]', # Structure AliExpress spécifique
]
for selector in price_selectors:
elements = soup.select(selector)
for elem in elements:
text = elem.get_text(strip=True)
# Chercher un prix dans le texte
price_match = re.search(r'(\d+[,.\s]*\d*)\s*€|€\s*(\d+[,.\s]*\d*)', text)
if price_match:
price_str = price_match.group(1) or price_match.group(2)
price = parse_price_text(price_str)
if price is not None and price > 0:
debug.notes.append(f"Prix extrait depuis sélecteur {selector}")
return price
# Priorité 3: Prix avant € (ex: "136,69€" ou "136,69 €")
match = re.search(r'(\d+[,.\s\u00a0\u202f\u2009]*\d*)\s*€', html)
if match:
price = parse_price_text(match.group(1))
if price is not None:
if price is not None and price > 0:
return price
# Pattern 3: Chercher dans meta tags (moins fiable)
# Priorité 4: € avant prix (ex: "€136.69" ou "€ 136.69")
match = re.search(r'\s*(\d+[,.\s\u00a0\u202f\u2009]*\d*)', html)
if match:
price = parse_price_text(match.group(1))
if price is not None and price > 0:
return price
# Priorité 5: Chercher dans meta tags (moins fiable)
og_price = soup.find("meta", property="og:price:amount")
if og_price:
price_str = og_price.get("content", "")
price = parse_price_text(price_str)
if price is not None:
if price is not None and price > 0:
return price
debug.errors.append("Prix non trouvé")
@@ -235,7 +304,7 @@ class AliexpressStore(BaseStore):
def _extract_msrp(self, html: str, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix conseille si present."""
match = re.search(r"originalPrice\"\\s*:\\s*\"([0-9\\s.,]+)\"", html)
match = re.search(r'originalPrice"\s*:\s*"([0-9\s.,]+)"', html)
if match:
price = parse_price_text(match.group(1))
if price is not None:

View File

@@ -215,6 +215,19 @@ class AmazonStore(BaseStore):
def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix."""
# Priorité 1: combiner les spans séparés a-price-whole et a-price-fraction
# C'est le format le plus courant sur Amazon pour les prix avec centimes séparés
whole = soup.select_one("span.a-price-whole")
fraction = soup.select_one("span.a-price-fraction")
if whole and fraction:
whole_text = whole.get_text(strip=True).rstrip(",.")
fraction_text = fraction.get_text(strip=True)
if whole_text and fraction_text:
price = parse_price_text(f"{whole_text}.{fraction_text}")
if price is not None:
return price
# Priorité 2: essayer les sélecteurs (incluant a-price-whole seul avec prix complet)
selectors = self.get_selector("price", [])
if isinstance(selectors, str):
selectors = [selectors]
@@ -227,16 +240,6 @@ class AmazonStore(BaseStore):
if price is not None:
return price
# Fallback: chercher les spans séparés a-price-whole et a-price-fraction
whole = soup.select_one("span.a-price-whole")
fraction = soup.select_one("span.a-price-fraction")
if whole and fraction:
whole_text = whole.get_text(strip=True)
fraction_text = fraction.get_text(strip=True)
price = parse_price_text(f"{whole_text}.{fraction_text}")
if price is not None:
return price
debug.errors.append("Prix non trouvé")
return None

Binary file not shown.

View File

@@ -152,5 +152,32 @@ class BaseStore(ABC):
"""
return self.selectors.get(key, default)
def get_spa_config(self) -> Optional[dict]:
"""
Retourne la configuration SPA pour Playwright si ce store est une SPA.
Returns:
dict avec les options Playwright ou None si pas une SPA:
- wait_for_selector: Sélecteur CSS à attendre avant scraping
- wait_for_network_idle: Attendre que le réseau soit inactif
- extra_wait_ms: Délai supplémentaire après chargement
Par défaut retourne None (pas de config SPA spécifique).
Les stores SPA doivent surcharger cette méthode.
"""
return None
def requires_playwright(self) -> bool:
"""
Indique si ce store nécessite obligatoirement Playwright.
Returns:
True si Playwright est requis, False sinon
Par défaut False. Les stores avec anti-bot agressif ou
rendu SPA obligatoire doivent surcharger cette méthode.
"""
return False
def __repr__(self) -> str:
return f"<{self.__class__.__name__} id={self.store_id}>"

View File

@@ -0,0 +1 @@
<html><body>content</body></html>

View File

@@ -0,0 +1,53 @@
"""Tests simples pour l'authentification API."""
import pytest
from fastapi import HTTPException
from pricewatch.app.api.main import require_token
class FakeConfig:
api_token = "valid-token"
class FakeConfigNoToken:
api_token = None
def test_require_token_valid(monkeypatch):
"""Token valide ne leve pas d'exception."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
# Ne doit pas lever d'exception
require_token("Bearer valid-token")
def test_require_token_missing(monkeypatch):
"""Token manquant leve 401."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
with pytest.raises(HTTPException) as exc_info:
require_token(None)
assert exc_info.value.status_code == 401
def test_require_token_invalid_format(monkeypatch):
"""Token sans Bearer leve 401."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
with pytest.raises(HTTPException) as exc_info:
require_token("invalid-format")
assert exc_info.value.status_code == 401
def test_require_token_wrong_value(monkeypatch):
"""Mauvais token leve 403."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
with pytest.raises(HTTPException) as exc_info:
require_token("Bearer wrong-token")
assert exc_info.value.status_code == 403
def test_require_token_not_configured(monkeypatch):
"""Token non configure leve 500."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfigNoToken())
with pytest.raises(HTTPException) as exc_info:
require_token("Bearer any-token")
assert exc_info.value.status_code == 500

View File

@@ -0,0 +1,26 @@
"""Tests pour les endpoints de logs API."""
from pricewatch.app.api.main import list_backend_logs, BACKEND_LOGS
from pricewatch.app.api.schemas import BackendLogEntry
def test_list_backend_logs_empty():
"""Liste des logs backend vide."""
BACKEND_LOGS.clear()
result = list_backend_logs()
assert result == []
def test_list_backend_logs_with_entries():
"""Liste des logs backend avec entrees."""
from datetime import datetime
BACKEND_LOGS.clear()
entry = BackendLogEntry(level="INFO", message="Test log", time=datetime(2026, 1, 17, 12, 0, 0))
BACKEND_LOGS.append(entry)
result = list_backend_logs()
assert len(result) == 1
assert result[0].message == "Test log"
assert result[0].level == "INFO"
BACKEND_LOGS.clear()

View File

@@ -0,0 +1,267 @@
"""Tests fonctions API produits avec mocks."""
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
from fastapi import HTTPException
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from pricewatch.app.api.main import (
create_product,
get_product,
update_product,
delete_product,
list_prices,
create_price,
update_price,
delete_price,
)
from pricewatch.app.api.schemas import ProductCreate, ProductUpdate, PriceHistoryCreate, PriceHistoryUpdate
class MockProduct:
"""Mock Product model."""
def __init__(self, **kwargs):
self.id = kwargs.get("id", 1)
self.source = kwargs.get("source", "amazon")
self.reference = kwargs.get("reference", "REF123")
self.url = kwargs.get("url", "https://example.com")
self.title = kwargs.get("title", "Test Product")
self.category = kwargs.get("category")
self.description = kwargs.get("description")
self.currency = kwargs.get("currency", "EUR")
self.msrp = kwargs.get("msrp")
self.first_seen_at = kwargs.get("first_seen_at", datetime.now())
self.last_updated_at = kwargs.get("last_updated_at", datetime.now())
class MockPrice:
"""Mock PriceHistory model."""
def __init__(self, **kwargs):
self.id = kwargs.get("id", 1)
self.product_id = kwargs.get("product_id", 1)
self.price = kwargs.get("price", 99.99)
self.shipping_cost = kwargs.get("shipping_cost")
self.stock_status = kwargs.get("stock_status", "in_stock")
self.fetch_method = kwargs.get("fetch_method", "http")
self.fetch_status = kwargs.get("fetch_status", "success")
self.fetched_at = kwargs.get("fetched_at", datetime.now())
class TestCreateProduct:
"""Tests create_product."""
def test_create_success(self):
"""Cree un produit avec succes."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock()
session.refresh = MagicMock()
payload = ProductCreate(
source="amazon",
reference="NEW123",
url="https://amazon.fr/dp/NEW123",
title="New Product",
currency="EUR",
)
with patch("pricewatch.app.api.main.Product") as MockProductClass:
mock_product = MockProduct(reference="NEW123")
MockProductClass.return_value = mock_product
with patch("pricewatch.app.api.main._product_to_out") as mock_to_out:
mock_to_out.return_value = MagicMock()
result = create_product(payload, session)
session.add.assert_called_once()
session.commit.assert_called_once()
def test_create_duplicate(self):
"""Cree un produit duplique leve 409."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=IntegrityError("duplicate", {}, None))
session.rollback = MagicMock()
payload = ProductCreate(
source="amazon",
reference="DUPE",
url="https://amazon.fr/dp/DUPE",
title="Duplicate",
currency="EUR",
)
with patch("pricewatch.app.api.main.Product"):
with pytest.raises(HTTPException) as exc_info:
create_product(payload, session)
assert exc_info.value.status_code == 409
def test_create_db_error(self):
"""Erreur DB leve 500."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("db error"))
session.rollback = MagicMock()
payload = ProductCreate(
source="amazon",
reference="ERR",
url="https://amazon.fr/dp/ERR",
title="Error",
currency="EUR",
)
with patch("pricewatch.app.api.main.Product"):
with pytest.raises(HTTPException) as exc_info:
create_product(payload, session)
assert exc_info.value.status_code == 500
class TestGetProduct:
"""Tests get_product."""
def test_get_not_found(self):
"""Produit non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
get_product(99999, session)
assert exc_info.value.status_code == 404
class TestUpdateProduct:
"""Tests update_product."""
def test_update_not_found(self):
"""Update produit non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
payload = ProductUpdate(title="Updated")
with pytest.raises(HTTPException) as exc_info:
update_product(99999, payload, session)
assert exc_info.value.status_code == 404
def test_update_db_error(self):
"""Erreur DB lors d'update leve 500."""
session = MagicMock()
mock_product = MockProduct()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_product
session.query.return_value = mock_query
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = ProductUpdate(title="Updated")
with pytest.raises(HTTPException) as exc_info:
update_product(1, payload, session)
assert exc_info.value.status_code == 500
class TestDeleteProduct:
"""Tests delete_product."""
def test_delete_not_found(self):
"""Delete produit non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
delete_product(99999, session)
assert exc_info.value.status_code == 404
def test_delete_success(self):
"""Delete produit avec succes."""
session = MagicMock()
mock_product = MockProduct()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_product
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock()
result = delete_product(1, session)
assert result == {"status": "deleted"}
session.delete.assert_called_once()
def test_delete_db_error(self):
"""Erreur DB lors de delete leve 500."""
session = MagicMock()
mock_product = MockProduct()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_product
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
with pytest.raises(HTTPException) as exc_info:
delete_product(1, session)
assert exc_info.value.status_code == 500
class TestCreatePrice:
"""Tests create_price."""
def test_create_price_db_error(self):
"""Erreur DB lors de creation prix."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = PriceHistoryCreate(
product_id=1,
price=99.99,
fetch_method="http",
fetch_status="success",
fetched_at=datetime.now(),
)
with patch("pricewatch.app.api.main.PriceHistory"):
with pytest.raises(HTTPException) as exc_info:
create_price(payload, session)
assert exc_info.value.status_code == 500
class TestUpdatePrice:
"""Tests update_price."""
def test_update_price_not_found(self):
"""Update prix non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
payload = PriceHistoryUpdate(price=149.99)
with pytest.raises(HTTPException) as exc_info:
update_price(99999, payload, session)
assert exc_info.value.status_code == 404
class TestDeletePrice:
"""Tests delete_price."""
def test_delete_price_not_found(self):
"""Delete prix non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
delete_price(99999, session)
assert exc_info.value.status_code == 404

View File

@@ -0,0 +1,135 @@
"""Tests API endpoints scraping logs."""
from datetime import datetime
from unittest.mock import MagicMock
import pytest
from fastapi import HTTPException
from sqlalchemy.exc import SQLAlchemyError
from pricewatch.app.api.main import create_log, update_log, delete_log
from pricewatch.app.api.schemas import ScrapingLogCreate, ScrapingLogUpdate
class MockScrapingLog:
"""Mock ScrapingLog model."""
def __init__(self, **kwargs):
self.id = kwargs.get("id", 1)
self.product_id = kwargs.get("product_id")
self.url = kwargs.get("url", "https://example.com")
self.source = kwargs.get("source", "amazon")
self.reference = kwargs.get("reference", "REF123")
self.fetch_method = kwargs.get("fetch_method", "http")
self.fetch_status = kwargs.get("fetch_status", "success")
self.fetched_at = kwargs.get("fetched_at", datetime.now())
self.duration_ms = kwargs.get("duration_ms", 1500)
self.html_size_bytes = kwargs.get("html_size_bytes", 50000)
self.errors = kwargs.get("errors", [])
self.notes = kwargs.get("notes", [])
class TestCreateLog:
"""Tests create_log endpoint."""
def test_create_log_db_error(self):
"""Erreur DB lors de creation log leve 500."""
from unittest.mock import patch
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = ScrapingLogCreate(
url="https://amazon.fr/dp/TEST",
source="amazon",
reference="TEST123",
fetch_method="http",
fetch_status="success",
fetched_at=datetime.now(),
)
with patch("pricewatch.app.api.main.ScrapingLog"):
with pytest.raises(HTTPException) as exc_info:
create_log(payload, session)
assert exc_info.value.status_code == 500
class TestUpdateLog:
"""Tests update_log endpoint."""
def test_update_log_not_found(self):
"""Update log non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
payload = ScrapingLogUpdate(fetch_status="failed")
with pytest.raises(HTTPException) as exc_info:
update_log(99999, payload, session)
assert exc_info.value.status_code == 404
def test_update_log_db_error(self):
"""Erreur DB lors d'update log leve 500."""
from unittest.mock import patch
session = MagicMock()
mock_log = MockScrapingLog()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_log
session.query.return_value = mock_query
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = ScrapingLogUpdate(fetch_status="failed")
with patch("pricewatch.app.api.main._log_to_out"):
with pytest.raises(HTTPException) as exc_info:
update_log(1, payload, session)
assert exc_info.value.status_code == 500
class TestDeleteLog:
"""Tests delete_log endpoint."""
def test_delete_log_not_found(self):
"""Delete log non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
delete_log(99999, session)
assert exc_info.value.status_code == 404
def test_delete_log_success(self):
"""Delete log avec succes."""
session = MagicMock()
mock_log = MockScrapingLog()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_log
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock()
result = delete_log(1, session)
assert result == {"status": "deleted"}
session.delete.assert_called_once()
def test_delete_log_db_error(self):
"""Erreur DB lors de delete log leve 500."""
session = MagicMock()
mock_log = MockScrapingLog()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_log
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
with pytest.raises(HTTPException) as exc_info:
delete_log(1, session)
assert exc_info.value.status_code == 500

View File

@@ -0,0 +1,159 @@
"""Tests API endpoints webhooks."""
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
from fastapi import HTTPException
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from pricewatch.app.api.main import (
list_webhooks,
create_webhook,
update_webhook,
delete_webhook,
)
from pricewatch.app.api.schemas import WebhookCreate, WebhookUpdate
class MockWebhook:
"""Mock Webhook model."""
def __init__(self, **kwargs):
self.id = kwargs.get("id", 1)
self.url = kwargs.get("url", "https://example.com/webhook")
self.events = kwargs.get("events", ["price_change", "stock_change"])
self.active = kwargs.get("active", True)
self.created_at = kwargs.get("created_at", datetime.now())
self.last_triggered_at = kwargs.get("last_triggered_at")
class TestListWebhooks:
"""Tests list_webhooks endpoint."""
def test_list_webhooks_empty(self):
"""Liste vide de webhooks."""
session = MagicMock()
mock_query = MagicMock()
mock_query.all.return_value = []
session.query.return_value = mock_query
with patch("pricewatch.app.api.main._webhook_to_out") as mock_to_out:
result = list_webhooks(session=session)
assert result == []
class TestCreateWebhook:
"""Tests create_webhook endpoint."""
def test_create_webhook_integrity_error(self):
"""Erreur d'integrite lors de creation webhook leve 500."""
# Note: le code actuel ne distingue pas IntegrityError de SQLAlchemyError
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=IntegrityError("duplicate", {}, None))
session.rollback = MagicMock()
payload = WebhookCreate(
event="price_change",
url="https://example.com/webhook",
)
with patch("pricewatch.app.api.main.Webhook"):
with pytest.raises(HTTPException) as exc_info:
create_webhook(payload, session)
assert exc_info.value.status_code == 500
def test_create_webhook_db_error(self):
"""Erreur DB lors de creation webhook leve 500."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = WebhookCreate(
event="price_change",
url="https://example.com/webhook",
)
with patch("pricewatch.app.api.main.Webhook"):
with pytest.raises(HTTPException) as exc_info:
create_webhook(payload, session)
assert exc_info.value.status_code == 500
class TestUpdateWebhook:
"""Tests update_webhook endpoint."""
def test_update_webhook_not_found(self):
"""Update webhook non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
payload = WebhookUpdate(active=False)
with pytest.raises(HTTPException) as exc_info:
update_webhook(99999, payload, session)
assert exc_info.value.status_code == 404
def test_update_webhook_db_error(self):
"""Erreur DB lors d'update webhook leve 500."""
session = MagicMock()
mock_webhook = MockWebhook()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_webhook
session.query.return_value = mock_query
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = WebhookUpdate(active=False)
with patch("pricewatch.app.api.main._webhook_to_out"):
with pytest.raises(HTTPException) as exc_info:
update_webhook(1, payload, session)
assert exc_info.value.status_code == 500
class TestDeleteWebhook:
"""Tests delete_webhook endpoint."""
def test_delete_webhook_not_found(self):
"""Delete webhook non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
delete_webhook(99999, session)
assert exc_info.value.status_code == 404
def test_delete_webhook_success(self):
"""Delete webhook avec succes."""
session = MagicMock()
mock_webhook = MockWebhook()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_webhook
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock()
result = delete_webhook(1, session)
assert result == {"status": "deleted"}
session.delete.assert_called_once()
def test_delete_webhook_db_error(self):
"""Erreur DB lors de delete webhook leve 500."""
session = MagicMock()
mock_webhook = MockWebhook()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_webhook
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
with pytest.raises(HTTPException) as exc_info:
delete_webhook(1, session)
assert exc_info.value.status_code == 500

42
tests/cli/test_detect.py Normal file
View File

@@ -0,0 +1,42 @@
"""Tests pour la commande CLI detect."""
import pytest
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
runner = CliRunner()
class TestDetectCommand:
"""Tests pour la commande detect."""
def test_detect_amazon_url(self):
"""Detect doit identifier une URL Amazon."""
result = runner.invoke(app, ["detect", "https://www.amazon.fr/dp/B08N5WRWNW"])
assert result.exit_code == 0
assert "amazon" in result.stdout.lower()
assert "B08N5WRWNW" in result.stdout
def test_detect_cdiscount_url(self):
"""Detect doit identifier une URL Cdiscount."""
result = runner.invoke(
app,
[
"detect",
"https://www.cdiscount.com/informatique/f-10709-tuf608umrv004.html",
],
)
assert result.exit_code == 0
assert "cdiscount" in result.stdout.lower()
def test_detect_unknown_url(self):
"""Detect doit echouer pour une URL inconnue."""
result = runner.invoke(app, ["detect", "https://www.unknown-store.com/product"])
assert result.exit_code == 1
assert "aucun store" in result.stdout.lower()
def test_detect_invalid_url(self):
"""Detect doit echouer pour une URL invalide."""
result = runner.invoke(app, ["detect", "not-a-valid-url"])
assert result.exit_code == 1

36
tests/cli/test_doctor.py Normal file
View File

@@ -0,0 +1,36 @@
"""Tests pour la commande CLI doctor."""
import pytest
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
runner = CliRunner()
class TestDoctorCommand:
"""Tests pour la commande doctor."""
def test_doctor_success(self):
"""Doctor doit afficher le statut de l'installation."""
result = runner.invoke(app, ["doctor"])
assert result.exit_code == 0
assert "PriceWatch Doctor" in result.stdout
assert "Python" in result.stdout
# "prêt" avec accent
assert "prêt" in result.stdout.lower() or "ready" in result.stdout.lower()
def test_doctor_shows_dependencies(self):
"""Doctor doit lister les dependances."""
result = runner.invoke(app, ["doctor"])
assert result.exit_code == 0
assert "typer" in result.stdout.lower()
assert "pydantic" in result.stdout.lower()
assert "playwright" in result.stdout.lower()
def test_doctor_shows_stores(self):
"""Doctor doit lister les stores disponibles."""
result = runner.invoke(app, ["doctor"])
assert result.exit_code == 0
assert "amazon" in result.stdout.lower()
assert "cdiscount" in result.stdout.lower()

99
tests/cli/test_fetch.py Normal file
View File

@@ -0,0 +1,99 @@
"""Tests pour la commande CLI fetch."""
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
runner = CliRunner()
class TestFetchCommand:
"""Tests pour la commande fetch."""
def test_fetch_conflicting_options(self):
"""Fetch doit echouer si --http et --playwright sont specifies."""
result = runner.invoke(
app, ["fetch", "https://example.com", "--http", "--playwright"]
)
assert result.exit_code == 1
assert "impossible" in result.stdout.lower()
@patch("pricewatch.app.cli.main.fetch_http")
def test_fetch_http_success(self, mock_fetch: MagicMock):
"""Fetch HTTP doit afficher le resultat."""
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html>test</html>"
mock_result.status_code = 200
mock_result.duration_ms = 150
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--http"])
assert result.exit_code == 0
assert "Succes" in result.stdout or "" in result.stdout
assert "150" in result.stdout
@patch("pricewatch.app.cli.main.fetch_http")
def test_fetch_http_failure(self, mock_fetch: MagicMock):
"""Fetch HTTP doit signaler l'echec."""
mock_result = MagicMock()
mock_result.success = False
mock_result.error = "Connection refused"
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--http"])
assert result.exit_code == 1
assert "Connection refused" in result.stdout
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_fetch_playwright_success(self, mock_fetch: MagicMock):
"""Fetch Playwright doit afficher le resultat."""
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html>test playwright</html>"
mock_result.duration_ms = 2500
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--playwright"])
assert result.exit_code == 0
assert "Succes" in result.stdout or "" in result.stdout
assert "2500" in result.stdout
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_fetch_playwright_failure(self, mock_fetch: MagicMock):
"""Fetch Playwright doit signaler l'echec."""
mock_result = MagicMock()
mock_result.success = False
mock_result.error = "Timeout waiting for page"
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--playwright"])
assert result.exit_code == 1
assert "Timeout" in result.stdout
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_fetch_default_is_playwright(self, mock_fetch: MagicMock):
"""Fetch sans option utilise Playwright par defaut."""
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html>test</html>"
mock_result.duration_ms = 1000
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com"])
assert result.exit_code == 0
mock_fetch.assert_called_once()
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_fetch_with_debug(self, mock_fetch: MagicMock):
"""Fetch doit fonctionner avec --debug."""
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html>test</html>"
mock_result.duration_ms = 1000
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--debug"])
assert result.exit_code == 0

99
tests/cli/test_parse.py Normal file
View File

@@ -0,0 +1,99 @@
"""Tests pour la commande CLI parse."""
import tempfile
from pathlib import Path
import pytest
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
runner = CliRunner()
class TestParseCommand:
"""Tests pour la commande parse."""
@pytest.fixture
def amazon_html_file(self, tmp_path: Path) -> Path:
"""Cree un fichier HTML Amazon temporaire."""
html = """
<html>
<body>
<span id="productTitle">Test Product</span>
<span class="a-price-whole">299,99 €</span>
<div id="availability">
<span>En stock</span>
</div>
</body>
</html>
"""
file_path = tmp_path / "amazon_test.html"
file_path.write_text(html, encoding="utf-8")
return file_path
@pytest.fixture
def cdiscount_html_file(self, tmp_path: Path) -> Path:
"""Cree un fichier HTML Cdiscount temporaire."""
html = """
<html>
<head>
<script type="application/ld+json">
{
"@type": "Product",
"name": "Produit Cdiscount",
"offers": {"price": "199.99", "priceCurrency": "EUR"}
}
</script>
</head>
<body>
<h1 data-e2e="title">Produit Cdiscount</h1>
</body>
</html>
"""
file_path = tmp_path / "cdiscount_test.html"
file_path.write_text(html, encoding="utf-8")
return file_path
def test_parse_amazon_success(self, amazon_html_file: Path):
"""Parse doit extraire les donnees d'un HTML Amazon."""
result = runner.invoke(
app, ["parse", "amazon", "--in", str(amazon_html_file)]
)
assert result.exit_code == 0
assert "Test Product" in result.stdout
assert "299" in result.stdout
def test_parse_cdiscount_success(self, cdiscount_html_file: Path):
"""Parse doit extraire les donnees d'un HTML Cdiscount."""
result = runner.invoke(
app, ["parse", "cdiscount", "--in", str(cdiscount_html_file)]
)
assert result.exit_code == 0
assert "Produit Cdiscount" in result.stdout
assert "199" in result.stdout
def test_parse_unknown_store(self, amazon_html_file: Path):
"""Parse doit echouer pour un store inconnu."""
result = runner.invoke(
app, ["parse", "unknown_store", "--in", str(amazon_html_file)]
)
assert result.exit_code == 1
assert "inconnu" in result.stdout.lower()
def test_parse_with_debug(self, amazon_html_file: Path):
"""Parse doit fonctionner avec --debug."""
result = runner.invoke(
app, ["parse", "amazon", "--in", str(amazon_html_file), "--debug"]
)
assert result.exit_code == 0
def test_parse_shows_fields(self, amazon_html_file: Path):
"""Parse doit afficher les champs extraits."""
result = runner.invoke(
app, ["parse", "amazon", "--in", str(amazon_html_file)]
)
assert result.exit_code == 0
assert "Titre" in result.stdout
assert "Prix" in result.stdout
assert "Stock" in result.stdout

View File

@@ -0,0 +1,258 @@
"""Tests pour la commande CLI run."""
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
from pricewatch.app.core.schema import ProductSnapshot, DebugInfo, DebugStatus, FetchMethod
runner = CliRunner()
@pytest.fixture
def yaml_config(tmp_path: Path) -> Path:
"""Cree un fichier YAML de config temporaire."""
yaml_content = """
urls:
- "https://www.amazon.fr/dp/B08N5WRWNW"
options:
use_playwright: false
force_playwright: false
headful: false
save_html: false
save_screenshot: false
timeout_ms: 30000
"""
file_path = tmp_path / "test_config.yaml"
file_path.write_text(yaml_content, encoding="utf-8")
return file_path
@pytest.fixture
def output_json(tmp_path: Path) -> Path:
"""Chemin pour le fichier JSON de sortie."""
return tmp_path / "output.json"
class TestRunCommand:
"""Tests pour la commande run."""
@patch("pricewatch.app.cli.main.fetch_http")
def test_run_http_success(self, mock_fetch, yaml_config, output_json):
"""Run avec HTTP reussi."""
# Mock HTTP fetch
mock_result = MagicMock()
mock_result.success = True
mock_result.html = """
<html><body>
<span id="productTitle">Test Product</span>
<span class="a-price-whole">299,99 €</span>
</body></html>
"""
mock_result.error = None
mock_fetch.return_value = mock_result
result = runner.invoke(
app,
["run", "--yaml", str(yaml_config), "--out", str(output_json), "--no-db"],
)
assert result.exit_code == 0
assert output_json.exists()
@patch("pricewatch.app.cli.main.fetch_http")
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_run_http_fail_playwright_fallback(
self, mock_pw, mock_http, yaml_config, output_json
):
"""Run avec fallback Playwright quand HTTP echoue."""
# Mock HTTP fail
mock_http_result = MagicMock()
mock_http_result.success = False
mock_http_result.error = "403 Forbidden"
mock_http.return_value = mock_http_result
# Mock Playwright success
mock_pw_result = MagicMock()
mock_pw_result.success = True
mock_pw_result.html = """
<html><body>
<span id="productTitle">Playwright Product</span>
<span class="a-price-whole">199,99 €</span>
</body></html>
"""
mock_pw_result.screenshot = None
mock_pw.return_value = mock_pw_result
# Modifier config pour activer playwright
yaml_content = """
urls:
- "https://www.amazon.fr/dp/B08N5WRWNW"
options:
use_playwright: true
force_playwright: false
headful: false
save_html: false
save_screenshot: false
timeout_ms: 30000
"""
yaml_config.write_text(yaml_content, encoding="utf-8")
result = runner.invoke(
app,
["run", "--yaml", str(yaml_config), "--out", str(output_json), "--no-db"],
)
assert result.exit_code == 0
mock_pw.assert_called()
@patch("pricewatch.app.cli.main.fetch_http")
def test_run_http_fail_no_playwright(self, mock_http, yaml_config, output_json):
"""Run avec HTTP echoue sans Playwright."""
mock_result = MagicMock()
mock_result.success = False
mock_result.error = "Connection refused"
mock_http.return_value = mock_result
result = runner.invoke(
app,
["run", "--yaml", str(yaml_config), "--out", str(output_json), "--no-db"],
)
# Doit quand meme creer le fichier JSON (avec snapshot failed)
assert result.exit_code == 0
assert output_json.exists()
def test_run_invalid_yaml(self, tmp_path, output_json):
"""Run avec YAML invalide echoue."""
yaml_file = tmp_path / "invalid.yaml"
yaml_file.write_text("invalid: [yaml: content", encoding="utf-8")
result = runner.invoke(
app,
["run", "--yaml", str(yaml_file), "--out", str(output_json)],
)
assert result.exit_code == 1
def test_run_with_debug(self, yaml_config, output_json):
"""Run avec --debug active les logs."""
with patch("pricewatch.app.cli.main.fetch_http") as mock_fetch:
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html><body>Test</body></html>"
mock_fetch.return_value = mock_result
result = runner.invoke(
app,
[
"run",
"--yaml",
str(yaml_config),
"--out",
str(output_json),
"--debug",
"--no-db",
],
)
assert result.exit_code == 0
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_run_force_playwright(self, mock_pw, tmp_path, output_json):
"""Run avec force_playwright skip HTTP."""
yaml_content = """
urls:
- "https://www.amazon.fr/dp/B08N5WRWNW"
options:
use_playwright: true
force_playwright: true
headful: false
save_html: false
save_screenshot: false
timeout_ms: 30000
"""
yaml_file = tmp_path / "force_pw.yaml"
yaml_file.write_text(yaml_content, encoding="utf-8")
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html><body>PW content</body></html>"
mock_result.screenshot = None
mock_pw.return_value = mock_result
with patch("pricewatch.app.cli.main.fetch_http") as mock_http:
result = runner.invoke(
app,
["run", "--yaml", str(yaml_file), "--out", str(output_json), "--no-db"],
)
# HTTP ne doit pas etre appele
mock_http.assert_not_called()
mock_pw.assert_called()
assert result.exit_code == 0
@patch("pricewatch.app.cli.main.fetch_http")
def test_run_unknown_store(self, mock_fetch, tmp_path, output_json):
"""Run avec URL de store inconnu."""
yaml_content = """
urls:
- "https://www.unknown-store.com/product/123"
options:
use_playwright: false
"""
yaml_file = tmp_path / "unknown.yaml"
yaml_file.write_text(yaml_content, encoding="utf-8")
result = runner.invoke(
app,
["run", "--yaml", str(yaml_file), "--out", str(output_json), "--no-db"],
)
# Doit continuer sans crash
assert result.exit_code == 0
# HTTP ne doit pas etre appele (store non trouve)
mock_fetch.assert_not_called()
@patch("pricewatch.app.cli.main.fetch_http")
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_run_with_save_screenshot(self, mock_pw, mock_http, tmp_path, output_json):
"""Run avec save_screenshot."""
yaml_content = """
urls:
- "https://www.amazon.fr/dp/B08N5WRWNW"
options:
use_playwright: true
force_playwright: false
save_screenshot: true
timeout_ms: 30000
"""
yaml_file = tmp_path / "screenshot.yaml"
yaml_file.write_text(yaml_content, encoding="utf-8")
# HTTP fail
mock_http_result = MagicMock()
mock_http_result.success = False
mock_http_result.error = "blocked"
mock_http.return_value = mock_http_result
# PW success avec screenshot
mock_pw_result = MagicMock()
mock_pw_result.success = True
mock_pw_result.html = "<html><body>content</body></html>"
mock_pw_result.screenshot = b"fake_png_data"
mock_pw.return_value = mock_pw_result
with patch("pricewatch.app.core.io.save_debug_screenshot") as mock_save:
result = runner.invoke(
app,
["run", "--yaml", str(yaml_file), "--out", str(output_json), "--no-db"],
)
assert result.exit_code == 0
# Le screenshot doit etre sauvegarde si present
mock_save.assert_called()