Compare commits
2 Commits
cf7c415e22
...
1f7f7da0c3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1f7f7da0c3 | ||
|
|
152c2724fc |
7
.claude/settings.json
Normal file
7
.claude/settings.json
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"permissions": {
|
||||||
|
"allow": [
|
||||||
|
"Bash(sort:*)"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
pricewatch/app/core/__pycache__/io.cpython-313.pyc
Executable file → Normal file
BIN
pricewatch/app/core/__pycache__/io.cpython-313.pyc
Executable file → Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
pricewatch/app/scraping/__pycache__/pw_fetch.cpython-313.pyc
Executable file → Normal file
BIN
pricewatch/app/scraping/__pycache__/pw_fetch.cpython-313.pyc
Executable file → Normal file
Binary file not shown.
@@ -45,6 +45,8 @@ def fetch_playwright(
|
|||||||
timeout_ms: int = 60000,
|
timeout_ms: int = 60000,
|
||||||
save_screenshot: bool = False,
|
save_screenshot: bool = False,
|
||||||
wait_for_selector: Optional[str] = None,
|
wait_for_selector: Optional[str] = None,
|
||||||
|
wait_for_network_idle: bool = False,
|
||||||
|
extra_wait_ms: int = 0,
|
||||||
) -> PlaywrightFetchResult:
|
) -> PlaywrightFetchResult:
|
||||||
"""
|
"""
|
||||||
Récupère une page avec Playwright.
|
Récupère une page avec Playwright.
|
||||||
@@ -55,6 +57,8 @@ def fetch_playwright(
|
|||||||
timeout_ms: Timeout en millisecondes
|
timeout_ms: Timeout en millisecondes
|
||||||
save_screenshot: Prendre un screenshot
|
save_screenshot: Prendre un screenshot
|
||||||
wait_for_selector: Attendre un sélecteur CSS avant de récupérer
|
wait_for_selector: Attendre un sélecteur CSS avant de récupérer
|
||||||
|
wait_for_network_idle: Attendre que le réseau soit inactif (pour SPA)
|
||||||
|
extra_wait_ms: Délai supplémentaire après chargement (pour JS lent)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
PlaywrightFetchResult avec HTML, screenshot (optionnel), ou erreur
|
PlaywrightFetchResult avec HTML, screenshot (optionnel), ou erreur
|
||||||
@@ -65,6 +69,8 @@ def fetch_playwright(
|
|||||||
- Headful disponible pour debug visuel
|
- Headful disponible pour debug visuel
|
||||||
- Screenshot optionnel pour diagnostiquer les échecs
|
- Screenshot optionnel pour diagnostiquer les échecs
|
||||||
- wait_for_selector permet d'attendre le chargement dynamique
|
- wait_for_selector permet d'attendre le chargement dynamique
|
||||||
|
- wait_for_network_idle utile pour les SPA qui chargent via AJAX
|
||||||
|
- extra_wait_ms pour les sites avec JS lent après DOM ready
|
||||||
"""
|
"""
|
||||||
if not url or not url.strip():
|
if not url or not url.strip():
|
||||||
logger.error("URL vide fournie")
|
logger.error("URL vide fournie")
|
||||||
@@ -101,7 +107,8 @@ def fetch_playwright(
|
|||||||
|
|
||||||
# Naviguer vers la page
|
# Naviguer vers la page
|
||||||
logger.debug(f"[Playwright] Navigation vers {url}")
|
logger.debug(f"[Playwright] Navigation vers {url}")
|
||||||
response = page.goto(url, wait_until="domcontentloaded")
|
wait_until = "networkidle" if wait_for_network_idle else "domcontentloaded"
|
||||||
|
response = page.goto(url, wait_until=wait_until)
|
||||||
|
|
||||||
if not response:
|
if not response:
|
||||||
raise Exception("Pas de réponse du serveur")
|
raise Exception("Pas de réponse du serveur")
|
||||||
@@ -116,6 +123,11 @@ def fetch_playwright(
|
|||||||
f"[Playwright] Timeout en attendant le sélecteur: {wait_for_selector}"
|
f"[Playwright] Timeout en attendant le sélecteur: {wait_for_selector}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Délai supplémentaire pour JS lent (SPA)
|
||||||
|
if extra_wait_ms > 0:
|
||||||
|
logger.debug(f"[Playwright] Attente supplémentaire: {extra_wait_ms}ms")
|
||||||
|
page.wait_for_timeout(extra_wait_ms)
|
||||||
|
|
||||||
# Récupérer le HTML
|
# Récupérer le HTML
|
||||||
html = page.content()
|
html = page.content()
|
||||||
|
|
||||||
|
|||||||
BIN
pricewatch/app/stores/__pycache__/base.cpython-313.pyc
Executable file → Normal file
BIN
pricewatch/app/stores/__pycache__/base.cpython-313.pyc
Executable file → Normal file
Binary file not shown.
BIN
pricewatch/app/stores/aliexpress/__pycache__/store.cpython-313.pyc
Executable file → Normal file
BIN
pricewatch/app/stores/aliexpress/__pycache__/store.cpython-313.pyc
Executable file → Normal file
Binary file not shown.
@@ -29,13 +29,39 @@ logger = get_logger("stores.aliexpress")
|
|||||||
|
|
||||||
|
|
||||||
class AliexpressStore(BaseStore):
|
class AliexpressStore(BaseStore):
|
||||||
"""Store pour AliExpress.com (marketplace chinois)."""
|
"""Store pour AliExpress.com (marketplace chinois).
|
||||||
|
|
||||||
|
AliExpress est une SPA (Single Page Application) qui charge
|
||||||
|
le contenu via JavaScript/AJAX. Nécessite Playwright avec
|
||||||
|
attente du chargement dynamique.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
"""Initialise le store AliExpress avec ses sélecteurs."""
|
"""Initialise le store AliExpress avec ses sélecteurs."""
|
||||||
selectors_path = Path(__file__).parent / "selectors.yml"
|
selectors_path = Path(__file__).parent / "selectors.yml"
|
||||||
super().__init__(store_id="aliexpress", selectors_path=selectors_path)
|
super().__init__(store_id="aliexpress", selectors_path=selectors_path)
|
||||||
|
|
||||||
|
def get_spa_config(self) -> dict:
|
||||||
|
"""
|
||||||
|
Configuration SPA pour AliExpress.
|
||||||
|
|
||||||
|
AliExpress charge les données produit (prix, titre) via AJAX.
|
||||||
|
Il faut attendre que le réseau soit inactif ET ajouter un délai
|
||||||
|
pour laisser le JS terminer le rendu.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Configuration Playwright pour SPA
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"wait_for_network_idle": True,
|
||||||
|
"wait_for_selector": "h1", # Titre du produit
|
||||||
|
"extra_wait_ms": 2000, # 2s pour le rendu JS
|
||||||
|
}
|
||||||
|
|
||||||
|
def requires_playwright(self) -> bool:
|
||||||
|
"""AliExpress nécessite Playwright pour le rendu SPA."""
|
||||||
|
return True
|
||||||
|
|
||||||
def match(self, url: str) -> float:
|
def match(self, url: str) -> float:
|
||||||
"""
|
"""
|
||||||
Détecte si l'URL est AliExpress.
|
Détecte si l'URL est AliExpress.
|
||||||
@@ -206,28 +232,71 @@ class AliexpressStore(BaseStore):
|
|||||||
Extrait le prix.
|
Extrait le prix.
|
||||||
|
|
||||||
AliExpress n'a PAS de sélecteur CSS stable pour le prix.
|
AliExpress n'a PAS de sélecteur CSS stable pour le prix.
|
||||||
On utilise regex sur le HTML brut.
|
Stratégie multi-niveaux:
|
||||||
|
1. Chercher dans les données JSON embarquées
|
||||||
|
2. Chercher dans les spans avec classes contenant "price"
|
||||||
|
3. Regex sur le HTML brut
|
||||||
|
4. Meta tags og:price
|
||||||
"""
|
"""
|
||||||
# Pattern 1: Prix avant € (ex: "136,69 €")
|
# Priorité 1: Extraire depuis JSON embarqué (skuActivityAmount, formattedActivityPrice)
|
||||||
match = re.search(r"([0-9][0-9\\s.,\\u00a0\\u202f\\u2009]*)\\s*€", html)
|
json_patterns = [
|
||||||
|
r'"skuActivityAmount"\s*:\s*\{\s*"value"\s*:\s*(\d+(?:\.\d+)?)', # {"value": 123.45}
|
||||||
|
r'"formattedActivityPrice"\s*:\s*"([0-9,.\s]+)\s*€"', # "123,45 €"
|
||||||
|
r'"formattedActivityPrice"\s*:\s*"€\s*([0-9,.\s]+)"', # "€ 123.45"
|
||||||
|
r'"minPrice"\s*:\s*"([0-9,.\s]+)"', # "minPrice": "123.45"
|
||||||
|
r'"price"\s*:\s*"([0-9,.\s]+)"', # "price": "123.45"
|
||||||
|
r'"activityAmount"\s*:\s*\{\s*"value"\s*:\s*(\d+(?:\.\d+)?)', # activityAmount.value
|
||||||
|
]
|
||||||
|
for pattern in json_patterns:
|
||||||
|
match = re.search(pattern, html)
|
||||||
|
if match:
|
||||||
|
price = parse_price_text(match.group(1))
|
||||||
|
if price is not None and price > 0:
|
||||||
|
debug.notes.append(f"Prix extrait depuis JSON: {price}")
|
||||||
|
return price
|
||||||
|
|
||||||
|
# Priorité 2: Chercher dans les spans/divs avec classes contenant "price"
|
||||||
|
price_selectors = [
|
||||||
|
'span[class*="price--current"]',
|
||||||
|
'span[class*="price--sale"]',
|
||||||
|
'div[class*="price--current"]',
|
||||||
|
'span[class*="product-price"]',
|
||||||
|
'span[class*="Price_Price"]',
|
||||||
|
'div[class*="es--wrap"]', # Structure AliExpress spécifique
|
||||||
|
]
|
||||||
|
for selector in price_selectors:
|
||||||
|
elements = soup.select(selector)
|
||||||
|
for elem in elements:
|
||||||
|
text = elem.get_text(strip=True)
|
||||||
|
# Chercher un prix dans le texte
|
||||||
|
price_match = re.search(r'(\d+[,.\s]*\d*)\s*€|€\s*(\d+[,.\s]*\d*)', text)
|
||||||
|
if price_match:
|
||||||
|
price_str = price_match.group(1) or price_match.group(2)
|
||||||
|
price = parse_price_text(price_str)
|
||||||
|
if price is not None and price > 0:
|
||||||
|
debug.notes.append(f"Prix extrait depuis sélecteur {selector}")
|
||||||
|
return price
|
||||||
|
|
||||||
|
# Priorité 3: Prix avant € (ex: "136,69€" ou "136,69 €")
|
||||||
|
match = re.search(r'(\d+[,.\s\u00a0\u202f\u2009]*\d*)\s*€', html)
|
||||||
if match:
|
if match:
|
||||||
price = parse_price_text(match.group(1))
|
price = parse_price_text(match.group(1))
|
||||||
if price is not None:
|
if price is not None and price > 0:
|
||||||
return price
|
return price
|
||||||
|
|
||||||
# Pattern 2: € avant prix (ex: "€ 136.69")
|
# Priorité 4: € avant prix (ex: "€136.69" ou "€ 136.69")
|
||||||
match = re.search(r"€\\s*([0-9][0-9\\s.,\\u00a0\\u202f\\u2009]*)", html)
|
match = re.search(r'€\s*(\d+[,.\s\u00a0\u202f\u2009]*\d*)', html)
|
||||||
if match:
|
if match:
|
||||||
price = parse_price_text(match.group(1))
|
price = parse_price_text(match.group(1))
|
||||||
if price is not None:
|
if price is not None and price > 0:
|
||||||
return price
|
return price
|
||||||
|
|
||||||
# Pattern 3: Chercher dans meta tags (moins fiable)
|
# Priorité 5: Chercher dans meta tags (moins fiable)
|
||||||
og_price = soup.find("meta", property="og:price:amount")
|
og_price = soup.find("meta", property="og:price:amount")
|
||||||
if og_price:
|
if og_price:
|
||||||
price_str = og_price.get("content", "")
|
price_str = og_price.get("content", "")
|
||||||
price = parse_price_text(price_str)
|
price = parse_price_text(price_str)
|
||||||
if price is not None:
|
if price is not None and price > 0:
|
||||||
return price
|
return price
|
||||||
|
|
||||||
debug.errors.append("Prix non trouvé")
|
debug.errors.append("Prix non trouvé")
|
||||||
@@ -235,7 +304,7 @@ class AliexpressStore(BaseStore):
|
|||||||
|
|
||||||
def _extract_msrp(self, html: str, debug: DebugInfo) -> Optional[float]:
|
def _extract_msrp(self, html: str, debug: DebugInfo) -> Optional[float]:
|
||||||
"""Extrait le prix conseille si present."""
|
"""Extrait le prix conseille si present."""
|
||||||
match = re.search(r"originalPrice\"\\s*:\\s*\"([0-9\\s.,]+)\"", html)
|
match = re.search(r'originalPrice"\s*:\s*"([0-9\s.,]+)"', html)
|
||||||
if match:
|
if match:
|
||||||
price = parse_price_text(match.group(1))
|
price = parse_price_text(match.group(1))
|
||||||
if price is not None:
|
if price is not None:
|
||||||
|
|||||||
Binary file not shown.
@@ -215,6 +215,19 @@ class AmazonStore(BaseStore):
|
|||||||
|
|
||||||
def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
|
def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
|
||||||
"""Extrait le prix."""
|
"""Extrait le prix."""
|
||||||
|
# Priorité 1: combiner les spans séparés a-price-whole et a-price-fraction
|
||||||
|
# C'est le format le plus courant sur Amazon pour les prix avec centimes séparés
|
||||||
|
whole = soup.select_one("span.a-price-whole")
|
||||||
|
fraction = soup.select_one("span.a-price-fraction")
|
||||||
|
if whole and fraction:
|
||||||
|
whole_text = whole.get_text(strip=True).rstrip(",.")
|
||||||
|
fraction_text = fraction.get_text(strip=True)
|
||||||
|
if whole_text and fraction_text:
|
||||||
|
price = parse_price_text(f"{whole_text}.{fraction_text}")
|
||||||
|
if price is not None:
|
||||||
|
return price
|
||||||
|
|
||||||
|
# Priorité 2: essayer les sélecteurs (incluant a-price-whole seul avec prix complet)
|
||||||
selectors = self.get_selector("price", [])
|
selectors = self.get_selector("price", [])
|
||||||
if isinstance(selectors, str):
|
if isinstance(selectors, str):
|
||||||
selectors = [selectors]
|
selectors = [selectors]
|
||||||
@@ -227,16 +240,6 @@ class AmazonStore(BaseStore):
|
|||||||
if price is not None:
|
if price is not None:
|
||||||
return price
|
return price
|
||||||
|
|
||||||
# Fallback: chercher les spans séparés a-price-whole et a-price-fraction
|
|
||||||
whole = soup.select_one("span.a-price-whole")
|
|
||||||
fraction = soup.select_one("span.a-price-fraction")
|
|
||||||
if whole and fraction:
|
|
||||||
whole_text = whole.get_text(strip=True)
|
|
||||||
fraction_text = fraction.get_text(strip=True)
|
|
||||||
price = parse_price_text(f"{whole_text}.{fraction_text}")
|
|
||||||
if price is not None:
|
|
||||||
return price
|
|
||||||
|
|
||||||
debug.errors.append("Prix non trouvé")
|
debug.errors.append("Prix non trouvé")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|||||||
BIN
pricewatch/app/stores/backmarket/__pycache__/store.cpython-313.pyc
Executable file → Normal file
BIN
pricewatch/app/stores/backmarket/__pycache__/store.cpython-313.pyc
Executable file → Normal file
Binary file not shown.
@@ -152,5 +152,32 @@ class BaseStore(ABC):
|
|||||||
"""
|
"""
|
||||||
return self.selectors.get(key, default)
|
return self.selectors.get(key, default)
|
||||||
|
|
||||||
|
def get_spa_config(self) -> Optional[dict]:
|
||||||
|
"""
|
||||||
|
Retourne la configuration SPA pour Playwright si ce store est une SPA.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict avec les options Playwright ou None si pas une SPA:
|
||||||
|
- wait_for_selector: Sélecteur CSS à attendre avant scraping
|
||||||
|
- wait_for_network_idle: Attendre que le réseau soit inactif
|
||||||
|
- extra_wait_ms: Délai supplémentaire après chargement
|
||||||
|
|
||||||
|
Par défaut retourne None (pas de config SPA spécifique).
|
||||||
|
Les stores SPA doivent surcharger cette méthode.
|
||||||
|
"""
|
||||||
|
return None
|
||||||
|
|
||||||
|
def requires_playwright(self) -> bool:
|
||||||
|
"""
|
||||||
|
Indique si ce store nécessite obligatoirement Playwright.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True si Playwright est requis, False sinon
|
||||||
|
|
||||||
|
Par défaut False. Les stores avec anti-bot agressif ou
|
||||||
|
rendu SPA obligatoire doivent surcharger cette méthode.
|
||||||
|
"""
|
||||||
|
return False
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"<{self.__class__.__name__} id={self.store_id}>"
|
return f"<{self.__class__.__name__} id={self.store_id}>"
|
||||||
|
|||||||
1
scraped/amazon_B08N5WRWNW.html
Normal file
1
scraped/amazon_B08N5WRWNW.html
Normal file
@@ -0,0 +1 @@
|
|||||||
|
<html><body>content</body></html>
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
53
tests/api/test_auth_simple.py
Normal file
53
tests/api/test_auth_simple.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
"""Tests simples pour l'authentification API."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi import HTTPException
|
||||||
|
|
||||||
|
from pricewatch.app.api.main import require_token
|
||||||
|
|
||||||
|
|
||||||
|
class FakeConfig:
|
||||||
|
api_token = "valid-token"
|
||||||
|
|
||||||
|
|
||||||
|
class FakeConfigNoToken:
|
||||||
|
api_token = None
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_token_valid(monkeypatch):
|
||||||
|
"""Token valide ne leve pas d'exception."""
|
||||||
|
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
|
||||||
|
# Ne doit pas lever d'exception
|
||||||
|
require_token("Bearer valid-token")
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_token_missing(monkeypatch):
|
||||||
|
"""Token manquant leve 401."""
|
||||||
|
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
require_token(None)
|
||||||
|
assert exc_info.value.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_token_invalid_format(monkeypatch):
|
||||||
|
"""Token sans Bearer leve 401."""
|
||||||
|
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
require_token("invalid-format")
|
||||||
|
assert exc_info.value.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_token_wrong_value(monkeypatch):
|
||||||
|
"""Mauvais token leve 403."""
|
||||||
|
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
require_token("Bearer wrong-token")
|
||||||
|
assert exc_info.value.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_token_not_configured(monkeypatch):
|
||||||
|
"""Token non configure leve 500."""
|
||||||
|
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfigNoToken())
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
require_token("Bearer any-token")
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
26
tests/api/test_logs_endpoints.py
Normal file
26
tests/api/test_logs_endpoints.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
"""Tests pour les endpoints de logs API."""
|
||||||
|
|
||||||
|
from pricewatch.app.api.main import list_backend_logs, BACKEND_LOGS
|
||||||
|
from pricewatch.app.api.schemas import BackendLogEntry
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_backend_logs_empty():
|
||||||
|
"""Liste des logs backend vide."""
|
||||||
|
BACKEND_LOGS.clear()
|
||||||
|
result = list_backend_logs()
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_backend_logs_with_entries():
|
||||||
|
"""Liste des logs backend avec entrees."""
|
||||||
|
from datetime import datetime
|
||||||
|
BACKEND_LOGS.clear()
|
||||||
|
entry = BackendLogEntry(level="INFO", message="Test log", time=datetime(2026, 1, 17, 12, 0, 0))
|
||||||
|
BACKEND_LOGS.append(entry)
|
||||||
|
|
||||||
|
result = list_backend_logs()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].message == "Test log"
|
||||||
|
assert result[0].level == "INFO"
|
||||||
|
|
||||||
|
BACKEND_LOGS.clear()
|
||||||
267
tests/api/test_products_funcs.py
Normal file
267
tests/api/test_products_funcs.py
Normal file
@@ -0,0 +1,267 @@
|
|||||||
|
"""Tests fonctions API produits avec mocks."""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
||||||
|
|
||||||
|
from pricewatch.app.api.main import (
|
||||||
|
create_product,
|
||||||
|
get_product,
|
||||||
|
update_product,
|
||||||
|
delete_product,
|
||||||
|
list_prices,
|
||||||
|
create_price,
|
||||||
|
update_price,
|
||||||
|
delete_price,
|
||||||
|
)
|
||||||
|
from pricewatch.app.api.schemas import ProductCreate, ProductUpdate, PriceHistoryCreate, PriceHistoryUpdate
|
||||||
|
|
||||||
|
|
||||||
|
class MockProduct:
|
||||||
|
"""Mock Product model."""
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.id = kwargs.get("id", 1)
|
||||||
|
self.source = kwargs.get("source", "amazon")
|
||||||
|
self.reference = kwargs.get("reference", "REF123")
|
||||||
|
self.url = kwargs.get("url", "https://example.com")
|
||||||
|
self.title = kwargs.get("title", "Test Product")
|
||||||
|
self.category = kwargs.get("category")
|
||||||
|
self.description = kwargs.get("description")
|
||||||
|
self.currency = kwargs.get("currency", "EUR")
|
||||||
|
self.msrp = kwargs.get("msrp")
|
||||||
|
self.first_seen_at = kwargs.get("first_seen_at", datetime.now())
|
||||||
|
self.last_updated_at = kwargs.get("last_updated_at", datetime.now())
|
||||||
|
|
||||||
|
|
||||||
|
class MockPrice:
|
||||||
|
"""Mock PriceHistory model."""
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.id = kwargs.get("id", 1)
|
||||||
|
self.product_id = kwargs.get("product_id", 1)
|
||||||
|
self.price = kwargs.get("price", 99.99)
|
||||||
|
self.shipping_cost = kwargs.get("shipping_cost")
|
||||||
|
self.stock_status = kwargs.get("stock_status", "in_stock")
|
||||||
|
self.fetch_method = kwargs.get("fetch_method", "http")
|
||||||
|
self.fetch_status = kwargs.get("fetch_status", "success")
|
||||||
|
self.fetched_at = kwargs.get("fetched_at", datetime.now())
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateProduct:
|
||||||
|
"""Tests create_product."""
|
||||||
|
|
||||||
|
def test_create_success(self):
|
||||||
|
"""Cree un produit avec succes."""
|
||||||
|
session = MagicMock()
|
||||||
|
session.add = MagicMock()
|
||||||
|
session.commit = MagicMock()
|
||||||
|
session.refresh = MagicMock()
|
||||||
|
|
||||||
|
payload = ProductCreate(
|
||||||
|
source="amazon",
|
||||||
|
reference="NEW123",
|
||||||
|
url="https://amazon.fr/dp/NEW123",
|
||||||
|
title="New Product",
|
||||||
|
currency="EUR",
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main.Product") as MockProductClass:
|
||||||
|
mock_product = MockProduct(reference="NEW123")
|
||||||
|
MockProductClass.return_value = mock_product
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main._product_to_out") as mock_to_out:
|
||||||
|
mock_to_out.return_value = MagicMock()
|
||||||
|
result = create_product(payload, session)
|
||||||
|
|
||||||
|
session.add.assert_called_once()
|
||||||
|
session.commit.assert_called_once()
|
||||||
|
|
||||||
|
def test_create_duplicate(self):
|
||||||
|
"""Cree un produit duplique leve 409."""
|
||||||
|
session = MagicMock()
|
||||||
|
session.add = MagicMock()
|
||||||
|
session.commit = MagicMock(side_effect=IntegrityError("duplicate", {}, None))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
payload = ProductCreate(
|
||||||
|
source="amazon",
|
||||||
|
reference="DUPE",
|
||||||
|
url="https://amazon.fr/dp/DUPE",
|
||||||
|
title="Duplicate",
|
||||||
|
currency="EUR",
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main.Product"):
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
create_product(payload, session)
|
||||||
|
assert exc_info.value.status_code == 409
|
||||||
|
|
||||||
|
def test_create_db_error(self):
|
||||||
|
"""Erreur DB leve 500."""
|
||||||
|
session = MagicMock()
|
||||||
|
session.add = MagicMock()
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("db error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
payload = ProductCreate(
|
||||||
|
source="amazon",
|
||||||
|
reference="ERR",
|
||||||
|
url="https://amazon.fr/dp/ERR",
|
||||||
|
title="Error",
|
||||||
|
currency="EUR",
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main.Product"):
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
create_product(payload, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetProduct:
|
||||||
|
"""Tests get_product."""
|
||||||
|
|
||||||
|
def test_get_not_found(self):
|
||||||
|
"""Produit non trouve leve 404."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = None
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
get_product(99999, session)
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
class TestUpdateProduct:
|
||||||
|
"""Tests update_product."""
|
||||||
|
|
||||||
|
def test_update_not_found(self):
|
||||||
|
"""Update produit non trouve leve 404."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = None
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
payload = ProductUpdate(title="Updated")
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
update_product(99999, payload, session)
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
|
|
||||||
|
def test_update_db_error(self):
|
||||||
|
"""Erreur DB lors d'update leve 500."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_product = MockProduct()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = mock_product
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
payload = ProductUpdate(title="Updated")
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
update_product(1, payload, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeleteProduct:
|
||||||
|
"""Tests delete_product."""
|
||||||
|
|
||||||
|
def test_delete_not_found(self):
|
||||||
|
"""Delete produit non trouve leve 404."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = None
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
delete_product(99999, session)
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
|
|
||||||
|
def test_delete_success(self):
|
||||||
|
"""Delete produit avec succes."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_product = MockProduct()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = mock_product
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
session.delete = MagicMock()
|
||||||
|
session.commit = MagicMock()
|
||||||
|
|
||||||
|
result = delete_product(1, session)
|
||||||
|
assert result == {"status": "deleted"}
|
||||||
|
session.delete.assert_called_once()
|
||||||
|
|
||||||
|
def test_delete_db_error(self):
|
||||||
|
"""Erreur DB lors de delete leve 500."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_product = MockProduct()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = mock_product
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
session.delete = MagicMock()
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
delete_product(1, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreatePrice:
|
||||||
|
"""Tests create_price."""
|
||||||
|
|
||||||
|
def test_create_price_db_error(self):
|
||||||
|
"""Erreur DB lors de creation prix."""
|
||||||
|
session = MagicMock()
|
||||||
|
session.add = MagicMock()
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
payload = PriceHistoryCreate(
|
||||||
|
product_id=1,
|
||||||
|
price=99.99,
|
||||||
|
fetch_method="http",
|
||||||
|
fetch_status="success",
|
||||||
|
fetched_at=datetime.now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main.PriceHistory"):
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
create_price(payload, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
|
|
||||||
|
|
||||||
|
class TestUpdatePrice:
|
||||||
|
"""Tests update_price."""
|
||||||
|
|
||||||
|
def test_update_price_not_found(self):
|
||||||
|
"""Update prix non trouve leve 404."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = None
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
payload = PriceHistoryUpdate(price=149.99)
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
update_price(99999, payload, session)
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeletePrice:
|
||||||
|
"""Tests delete_price."""
|
||||||
|
|
||||||
|
def test_delete_price_not_found(self):
|
||||||
|
"""Delete prix non trouve leve 404."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = None
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
delete_price(99999, session)
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
135
tests/api/test_scraping_logs.py
Normal file
135
tests/api/test_scraping_logs.py
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
"""Tests API endpoints scraping logs."""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
|
|
||||||
|
from pricewatch.app.api.main import create_log, update_log, delete_log
|
||||||
|
from pricewatch.app.api.schemas import ScrapingLogCreate, ScrapingLogUpdate
|
||||||
|
|
||||||
|
|
||||||
|
class MockScrapingLog:
|
||||||
|
"""Mock ScrapingLog model."""
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.id = kwargs.get("id", 1)
|
||||||
|
self.product_id = kwargs.get("product_id")
|
||||||
|
self.url = kwargs.get("url", "https://example.com")
|
||||||
|
self.source = kwargs.get("source", "amazon")
|
||||||
|
self.reference = kwargs.get("reference", "REF123")
|
||||||
|
self.fetch_method = kwargs.get("fetch_method", "http")
|
||||||
|
self.fetch_status = kwargs.get("fetch_status", "success")
|
||||||
|
self.fetched_at = kwargs.get("fetched_at", datetime.now())
|
||||||
|
self.duration_ms = kwargs.get("duration_ms", 1500)
|
||||||
|
self.html_size_bytes = kwargs.get("html_size_bytes", 50000)
|
||||||
|
self.errors = kwargs.get("errors", [])
|
||||||
|
self.notes = kwargs.get("notes", [])
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateLog:
|
||||||
|
"""Tests create_log endpoint."""
|
||||||
|
|
||||||
|
def test_create_log_db_error(self):
|
||||||
|
"""Erreur DB lors de creation log leve 500."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
session = MagicMock()
|
||||||
|
session.add = MagicMock()
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
payload = ScrapingLogCreate(
|
||||||
|
url="https://amazon.fr/dp/TEST",
|
||||||
|
source="amazon",
|
||||||
|
reference="TEST123",
|
||||||
|
fetch_method="http",
|
||||||
|
fetch_status="success",
|
||||||
|
fetched_at=datetime.now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main.ScrapingLog"):
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
create_log(payload, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
|
|
||||||
|
|
||||||
|
class TestUpdateLog:
|
||||||
|
"""Tests update_log endpoint."""
|
||||||
|
|
||||||
|
def test_update_log_not_found(self):
|
||||||
|
"""Update log non trouve leve 404."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = None
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
payload = ScrapingLogUpdate(fetch_status="failed")
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
update_log(99999, payload, session)
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
|
|
||||||
|
def test_update_log_db_error(self):
|
||||||
|
"""Erreur DB lors d'update log leve 500."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
session = MagicMock()
|
||||||
|
mock_log = MockScrapingLog()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = mock_log
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
payload = ScrapingLogUpdate(fetch_status="failed")
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main._log_to_out"):
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
update_log(1, payload, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeleteLog:
|
||||||
|
"""Tests delete_log endpoint."""
|
||||||
|
|
||||||
|
def test_delete_log_not_found(self):
|
||||||
|
"""Delete log non trouve leve 404."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = None
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
delete_log(99999, session)
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
|
|
||||||
|
def test_delete_log_success(self):
|
||||||
|
"""Delete log avec succes."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_log = MockScrapingLog()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = mock_log
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
session.delete = MagicMock()
|
||||||
|
session.commit = MagicMock()
|
||||||
|
|
||||||
|
result = delete_log(1, session)
|
||||||
|
assert result == {"status": "deleted"}
|
||||||
|
session.delete.assert_called_once()
|
||||||
|
|
||||||
|
def test_delete_log_db_error(self):
|
||||||
|
"""Erreur DB lors de delete log leve 500."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_log = MockScrapingLog()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = mock_log
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
session.delete = MagicMock()
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
delete_log(1, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
159
tests/api/test_webhooks_funcs.py
Normal file
159
tests/api/test_webhooks_funcs.py
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
"""Tests API endpoints webhooks."""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
||||||
|
|
||||||
|
from pricewatch.app.api.main import (
|
||||||
|
list_webhooks,
|
||||||
|
create_webhook,
|
||||||
|
update_webhook,
|
||||||
|
delete_webhook,
|
||||||
|
)
|
||||||
|
from pricewatch.app.api.schemas import WebhookCreate, WebhookUpdate
|
||||||
|
|
||||||
|
|
||||||
|
class MockWebhook:
|
||||||
|
"""Mock Webhook model."""
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.id = kwargs.get("id", 1)
|
||||||
|
self.url = kwargs.get("url", "https://example.com/webhook")
|
||||||
|
self.events = kwargs.get("events", ["price_change", "stock_change"])
|
||||||
|
self.active = kwargs.get("active", True)
|
||||||
|
self.created_at = kwargs.get("created_at", datetime.now())
|
||||||
|
self.last_triggered_at = kwargs.get("last_triggered_at")
|
||||||
|
|
||||||
|
|
||||||
|
class TestListWebhooks:
|
||||||
|
"""Tests list_webhooks endpoint."""
|
||||||
|
|
||||||
|
def test_list_webhooks_empty(self):
|
||||||
|
"""Liste vide de webhooks."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.all.return_value = []
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main._webhook_to_out") as mock_to_out:
|
||||||
|
result = list_webhooks(session=session)
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateWebhook:
|
||||||
|
"""Tests create_webhook endpoint."""
|
||||||
|
|
||||||
|
def test_create_webhook_integrity_error(self):
|
||||||
|
"""Erreur d'integrite lors de creation webhook leve 500."""
|
||||||
|
# Note: le code actuel ne distingue pas IntegrityError de SQLAlchemyError
|
||||||
|
session = MagicMock()
|
||||||
|
session.add = MagicMock()
|
||||||
|
session.commit = MagicMock(side_effect=IntegrityError("duplicate", {}, None))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
payload = WebhookCreate(
|
||||||
|
event="price_change",
|
||||||
|
url="https://example.com/webhook",
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main.Webhook"):
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
create_webhook(payload, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
|
|
||||||
|
def test_create_webhook_db_error(self):
|
||||||
|
"""Erreur DB lors de creation webhook leve 500."""
|
||||||
|
session = MagicMock()
|
||||||
|
session.add = MagicMock()
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
payload = WebhookCreate(
|
||||||
|
event="price_change",
|
||||||
|
url="https://example.com/webhook",
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main.Webhook"):
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
create_webhook(payload, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
|
|
||||||
|
|
||||||
|
class TestUpdateWebhook:
|
||||||
|
"""Tests update_webhook endpoint."""
|
||||||
|
|
||||||
|
def test_update_webhook_not_found(self):
|
||||||
|
"""Update webhook non trouve leve 404."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = None
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
payload = WebhookUpdate(active=False)
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
update_webhook(99999, payload, session)
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
|
|
||||||
|
def test_update_webhook_db_error(self):
|
||||||
|
"""Erreur DB lors d'update webhook leve 500."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_webhook = MockWebhook()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = mock_webhook
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
payload = WebhookUpdate(active=False)
|
||||||
|
|
||||||
|
with patch("pricewatch.app.api.main._webhook_to_out"):
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
update_webhook(1, payload, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeleteWebhook:
|
||||||
|
"""Tests delete_webhook endpoint."""
|
||||||
|
|
||||||
|
def test_delete_webhook_not_found(self):
|
||||||
|
"""Delete webhook non trouve leve 404."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = None
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
delete_webhook(99999, session)
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
|
|
||||||
|
def test_delete_webhook_success(self):
|
||||||
|
"""Delete webhook avec succes."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_webhook = MockWebhook()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = mock_webhook
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
session.delete = MagicMock()
|
||||||
|
session.commit = MagicMock()
|
||||||
|
|
||||||
|
result = delete_webhook(1, session)
|
||||||
|
assert result == {"status": "deleted"}
|
||||||
|
session.delete.assert_called_once()
|
||||||
|
|
||||||
|
def test_delete_webhook_db_error(self):
|
||||||
|
"""Erreur DB lors de delete webhook leve 500."""
|
||||||
|
session = MagicMock()
|
||||||
|
mock_webhook = MockWebhook()
|
||||||
|
mock_query = MagicMock()
|
||||||
|
mock_query.filter.return_value.one_or_none.return_value = mock_webhook
|
||||||
|
session.query.return_value = mock_query
|
||||||
|
session.delete = MagicMock()
|
||||||
|
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
|
||||||
|
session.rollback = MagicMock()
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
delete_webhook(1, session)
|
||||||
|
assert exc_info.value.status_code == 500
|
||||||
BIN
tests/cli/__pycache__/test_detect.cpython-313-pytest-9.0.2.pyc
Normal file
BIN
tests/cli/__pycache__/test_detect.cpython-313-pytest-9.0.2.pyc
Normal file
Binary file not shown.
BIN
tests/cli/__pycache__/test_doctor.cpython-313-pytest-9.0.2.pyc
Normal file
BIN
tests/cli/__pycache__/test_doctor.cpython-313-pytest-9.0.2.pyc
Normal file
Binary file not shown.
BIN
tests/cli/__pycache__/test_fetch.cpython-313-pytest-9.0.2.pyc
Normal file
BIN
tests/cli/__pycache__/test_fetch.cpython-313-pytest-9.0.2.pyc
Normal file
Binary file not shown.
BIN
tests/cli/__pycache__/test_parse.cpython-313-pytest-9.0.2.pyc
Normal file
BIN
tests/cli/__pycache__/test_parse.cpython-313-pytest-9.0.2.pyc
Normal file
Binary file not shown.
Binary file not shown.
42
tests/cli/test_detect.py
Normal file
42
tests/cli/test_detect.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
"""Tests pour la commande CLI detect."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
|
from pricewatch.app.cli.main import app
|
||||||
|
|
||||||
|
runner = CliRunner()
|
||||||
|
|
||||||
|
|
||||||
|
class TestDetectCommand:
|
||||||
|
"""Tests pour la commande detect."""
|
||||||
|
|
||||||
|
def test_detect_amazon_url(self):
|
||||||
|
"""Detect doit identifier une URL Amazon."""
|
||||||
|
result = runner.invoke(app, ["detect", "https://www.amazon.fr/dp/B08N5WRWNW"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "amazon" in result.stdout.lower()
|
||||||
|
assert "B08N5WRWNW" in result.stdout
|
||||||
|
|
||||||
|
def test_detect_cdiscount_url(self):
|
||||||
|
"""Detect doit identifier une URL Cdiscount."""
|
||||||
|
result = runner.invoke(
|
||||||
|
app,
|
||||||
|
[
|
||||||
|
"detect",
|
||||||
|
"https://www.cdiscount.com/informatique/f-10709-tuf608umrv004.html",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "cdiscount" in result.stdout.lower()
|
||||||
|
|
||||||
|
def test_detect_unknown_url(self):
|
||||||
|
"""Detect doit echouer pour une URL inconnue."""
|
||||||
|
result = runner.invoke(app, ["detect", "https://www.unknown-store.com/product"])
|
||||||
|
assert result.exit_code == 1
|
||||||
|
assert "aucun store" in result.stdout.lower()
|
||||||
|
|
||||||
|
def test_detect_invalid_url(self):
|
||||||
|
"""Detect doit echouer pour une URL invalide."""
|
||||||
|
result = runner.invoke(app, ["detect", "not-a-valid-url"])
|
||||||
|
assert result.exit_code == 1
|
||||||
36
tests/cli/test_doctor.py
Normal file
36
tests/cli/test_doctor.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
"""Tests pour la commande CLI doctor."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
|
from pricewatch.app.cli.main import app
|
||||||
|
|
||||||
|
runner = CliRunner()
|
||||||
|
|
||||||
|
|
||||||
|
class TestDoctorCommand:
|
||||||
|
"""Tests pour la commande doctor."""
|
||||||
|
|
||||||
|
def test_doctor_success(self):
|
||||||
|
"""Doctor doit afficher le statut de l'installation."""
|
||||||
|
result = runner.invoke(app, ["doctor"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "PriceWatch Doctor" in result.stdout
|
||||||
|
assert "Python" in result.stdout
|
||||||
|
# "prêt" avec accent
|
||||||
|
assert "prêt" in result.stdout.lower() or "ready" in result.stdout.lower()
|
||||||
|
|
||||||
|
def test_doctor_shows_dependencies(self):
|
||||||
|
"""Doctor doit lister les dependances."""
|
||||||
|
result = runner.invoke(app, ["doctor"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "typer" in result.stdout.lower()
|
||||||
|
assert "pydantic" in result.stdout.lower()
|
||||||
|
assert "playwright" in result.stdout.lower()
|
||||||
|
|
||||||
|
def test_doctor_shows_stores(self):
|
||||||
|
"""Doctor doit lister les stores disponibles."""
|
||||||
|
result = runner.invoke(app, ["doctor"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "amazon" in result.stdout.lower()
|
||||||
|
assert "cdiscount" in result.stdout.lower()
|
||||||
99
tests/cli/test_fetch.py
Normal file
99
tests/cli/test_fetch.py
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
"""Tests pour la commande CLI fetch."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
|
from pricewatch.app.cli.main import app
|
||||||
|
|
||||||
|
runner = CliRunner()
|
||||||
|
|
||||||
|
|
||||||
|
class TestFetchCommand:
|
||||||
|
"""Tests pour la commande fetch."""
|
||||||
|
|
||||||
|
def test_fetch_conflicting_options(self):
|
||||||
|
"""Fetch doit echouer si --http et --playwright sont specifies."""
|
||||||
|
result = runner.invoke(
|
||||||
|
app, ["fetch", "https://example.com", "--http", "--playwright"]
|
||||||
|
)
|
||||||
|
assert result.exit_code == 1
|
||||||
|
assert "impossible" in result.stdout.lower()
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_http")
|
||||||
|
def test_fetch_http_success(self, mock_fetch: MagicMock):
|
||||||
|
"""Fetch HTTP doit afficher le resultat."""
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = True
|
||||||
|
mock_result.html = "<html>test</html>"
|
||||||
|
mock_result.status_code = 200
|
||||||
|
mock_result.duration_ms = 150
|
||||||
|
mock_fetch.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.invoke(app, ["fetch", "https://example.com", "--http"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "Succes" in result.stdout or "✓" in result.stdout
|
||||||
|
assert "150" in result.stdout
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_http")
|
||||||
|
def test_fetch_http_failure(self, mock_fetch: MagicMock):
|
||||||
|
"""Fetch HTTP doit signaler l'echec."""
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = False
|
||||||
|
mock_result.error = "Connection refused"
|
||||||
|
mock_fetch.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.invoke(app, ["fetch", "https://example.com", "--http"])
|
||||||
|
assert result.exit_code == 1
|
||||||
|
assert "Connection refused" in result.stdout
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_playwright")
|
||||||
|
def test_fetch_playwright_success(self, mock_fetch: MagicMock):
|
||||||
|
"""Fetch Playwright doit afficher le resultat."""
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = True
|
||||||
|
mock_result.html = "<html>test playwright</html>"
|
||||||
|
mock_result.duration_ms = 2500
|
||||||
|
mock_fetch.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.invoke(app, ["fetch", "https://example.com", "--playwright"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "Succes" in result.stdout or "✓" in result.stdout
|
||||||
|
assert "2500" in result.stdout
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_playwright")
|
||||||
|
def test_fetch_playwright_failure(self, mock_fetch: MagicMock):
|
||||||
|
"""Fetch Playwright doit signaler l'echec."""
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = False
|
||||||
|
mock_result.error = "Timeout waiting for page"
|
||||||
|
mock_fetch.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.invoke(app, ["fetch", "https://example.com", "--playwright"])
|
||||||
|
assert result.exit_code == 1
|
||||||
|
assert "Timeout" in result.stdout
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_playwright")
|
||||||
|
def test_fetch_default_is_playwright(self, mock_fetch: MagicMock):
|
||||||
|
"""Fetch sans option utilise Playwright par defaut."""
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = True
|
||||||
|
mock_result.html = "<html>test</html>"
|
||||||
|
mock_result.duration_ms = 1000
|
||||||
|
mock_fetch.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.invoke(app, ["fetch", "https://example.com"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
mock_fetch.assert_called_once()
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_playwright")
|
||||||
|
def test_fetch_with_debug(self, mock_fetch: MagicMock):
|
||||||
|
"""Fetch doit fonctionner avec --debug."""
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = True
|
||||||
|
mock_result.html = "<html>test</html>"
|
||||||
|
mock_result.duration_ms = 1000
|
||||||
|
mock_fetch.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.invoke(app, ["fetch", "https://example.com", "--debug"])
|
||||||
|
assert result.exit_code == 0
|
||||||
99
tests/cli/test_parse.py
Normal file
99
tests/cli/test_parse.py
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
"""Tests pour la commande CLI parse."""
|
||||||
|
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
|
from pricewatch.app.cli.main import app
|
||||||
|
|
||||||
|
runner = CliRunner()
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseCommand:
|
||||||
|
"""Tests pour la commande parse."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def amazon_html_file(self, tmp_path: Path) -> Path:
|
||||||
|
"""Cree un fichier HTML Amazon temporaire."""
|
||||||
|
html = """
|
||||||
|
<html>
|
||||||
|
<body>
|
||||||
|
<span id="productTitle">Test Product</span>
|
||||||
|
<span class="a-price-whole">299,99 €</span>
|
||||||
|
<div id="availability">
|
||||||
|
<span>En stock</span>
|
||||||
|
</div>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
file_path = tmp_path / "amazon_test.html"
|
||||||
|
file_path.write_text(html, encoding="utf-8")
|
||||||
|
return file_path
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def cdiscount_html_file(self, tmp_path: Path) -> Path:
|
||||||
|
"""Cree un fichier HTML Cdiscount temporaire."""
|
||||||
|
html = """
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<script type="application/ld+json">
|
||||||
|
{
|
||||||
|
"@type": "Product",
|
||||||
|
"name": "Produit Cdiscount",
|
||||||
|
"offers": {"price": "199.99", "priceCurrency": "EUR"}
|
||||||
|
}
|
||||||
|
</script>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<h1 data-e2e="title">Produit Cdiscount</h1>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
file_path = tmp_path / "cdiscount_test.html"
|
||||||
|
file_path.write_text(html, encoding="utf-8")
|
||||||
|
return file_path
|
||||||
|
|
||||||
|
def test_parse_amazon_success(self, amazon_html_file: Path):
|
||||||
|
"""Parse doit extraire les donnees d'un HTML Amazon."""
|
||||||
|
result = runner.invoke(
|
||||||
|
app, ["parse", "amazon", "--in", str(amazon_html_file)]
|
||||||
|
)
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "Test Product" in result.stdout
|
||||||
|
assert "299" in result.stdout
|
||||||
|
|
||||||
|
def test_parse_cdiscount_success(self, cdiscount_html_file: Path):
|
||||||
|
"""Parse doit extraire les donnees d'un HTML Cdiscount."""
|
||||||
|
result = runner.invoke(
|
||||||
|
app, ["parse", "cdiscount", "--in", str(cdiscount_html_file)]
|
||||||
|
)
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "Produit Cdiscount" in result.stdout
|
||||||
|
assert "199" in result.stdout
|
||||||
|
|
||||||
|
def test_parse_unknown_store(self, amazon_html_file: Path):
|
||||||
|
"""Parse doit echouer pour un store inconnu."""
|
||||||
|
result = runner.invoke(
|
||||||
|
app, ["parse", "unknown_store", "--in", str(amazon_html_file)]
|
||||||
|
)
|
||||||
|
assert result.exit_code == 1
|
||||||
|
assert "inconnu" in result.stdout.lower()
|
||||||
|
|
||||||
|
def test_parse_with_debug(self, amazon_html_file: Path):
|
||||||
|
"""Parse doit fonctionner avec --debug."""
|
||||||
|
result = runner.invoke(
|
||||||
|
app, ["parse", "amazon", "--in", str(amazon_html_file), "--debug"]
|
||||||
|
)
|
||||||
|
assert result.exit_code == 0
|
||||||
|
|
||||||
|
def test_parse_shows_fields(self, amazon_html_file: Path):
|
||||||
|
"""Parse doit afficher les champs extraits."""
|
||||||
|
result = runner.invoke(
|
||||||
|
app, ["parse", "amazon", "--in", str(amazon_html_file)]
|
||||||
|
)
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "Titre" in result.stdout
|
||||||
|
assert "Prix" in result.stdout
|
||||||
|
assert "Stock" in result.stdout
|
||||||
258
tests/cli/test_run_command.py
Normal file
258
tests/cli/test_run_command.py
Normal file
@@ -0,0 +1,258 @@
|
|||||||
|
"""Tests pour la commande CLI run."""
|
||||||
|
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
|
from pricewatch.app.cli.main import app
|
||||||
|
from pricewatch.app.core.schema import ProductSnapshot, DebugInfo, DebugStatus, FetchMethod
|
||||||
|
|
||||||
|
runner = CliRunner()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def yaml_config(tmp_path: Path) -> Path:
|
||||||
|
"""Cree un fichier YAML de config temporaire."""
|
||||||
|
yaml_content = """
|
||||||
|
urls:
|
||||||
|
- "https://www.amazon.fr/dp/B08N5WRWNW"
|
||||||
|
options:
|
||||||
|
use_playwright: false
|
||||||
|
force_playwright: false
|
||||||
|
headful: false
|
||||||
|
save_html: false
|
||||||
|
save_screenshot: false
|
||||||
|
timeout_ms: 30000
|
||||||
|
"""
|
||||||
|
file_path = tmp_path / "test_config.yaml"
|
||||||
|
file_path.write_text(yaml_content, encoding="utf-8")
|
||||||
|
return file_path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def output_json(tmp_path: Path) -> Path:
|
||||||
|
"""Chemin pour le fichier JSON de sortie."""
|
||||||
|
return tmp_path / "output.json"
|
||||||
|
|
||||||
|
|
||||||
|
class TestRunCommand:
|
||||||
|
"""Tests pour la commande run."""
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_http")
|
||||||
|
def test_run_http_success(self, mock_fetch, yaml_config, output_json):
|
||||||
|
"""Run avec HTTP reussi."""
|
||||||
|
# Mock HTTP fetch
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = True
|
||||||
|
mock_result.html = """
|
||||||
|
<html><body>
|
||||||
|
<span id="productTitle">Test Product</span>
|
||||||
|
<span class="a-price-whole">299,99 €</span>
|
||||||
|
</body></html>
|
||||||
|
"""
|
||||||
|
mock_result.error = None
|
||||||
|
mock_fetch.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.invoke(
|
||||||
|
app,
|
||||||
|
["run", "--yaml", str(yaml_config), "--out", str(output_json), "--no-db"],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert output_json.exists()
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_http")
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_playwright")
|
||||||
|
def test_run_http_fail_playwright_fallback(
|
||||||
|
self, mock_pw, mock_http, yaml_config, output_json
|
||||||
|
):
|
||||||
|
"""Run avec fallback Playwright quand HTTP echoue."""
|
||||||
|
# Mock HTTP fail
|
||||||
|
mock_http_result = MagicMock()
|
||||||
|
mock_http_result.success = False
|
||||||
|
mock_http_result.error = "403 Forbidden"
|
||||||
|
mock_http.return_value = mock_http_result
|
||||||
|
|
||||||
|
# Mock Playwright success
|
||||||
|
mock_pw_result = MagicMock()
|
||||||
|
mock_pw_result.success = True
|
||||||
|
mock_pw_result.html = """
|
||||||
|
<html><body>
|
||||||
|
<span id="productTitle">Playwright Product</span>
|
||||||
|
<span class="a-price-whole">199,99 €</span>
|
||||||
|
</body></html>
|
||||||
|
"""
|
||||||
|
mock_pw_result.screenshot = None
|
||||||
|
mock_pw.return_value = mock_pw_result
|
||||||
|
|
||||||
|
# Modifier config pour activer playwright
|
||||||
|
yaml_content = """
|
||||||
|
urls:
|
||||||
|
- "https://www.amazon.fr/dp/B08N5WRWNW"
|
||||||
|
options:
|
||||||
|
use_playwright: true
|
||||||
|
force_playwright: false
|
||||||
|
headful: false
|
||||||
|
save_html: false
|
||||||
|
save_screenshot: false
|
||||||
|
timeout_ms: 30000
|
||||||
|
"""
|
||||||
|
yaml_config.write_text(yaml_content, encoding="utf-8")
|
||||||
|
|
||||||
|
result = runner.invoke(
|
||||||
|
app,
|
||||||
|
["run", "--yaml", str(yaml_config), "--out", str(output_json), "--no-db"],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.exit_code == 0
|
||||||
|
mock_pw.assert_called()
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_http")
|
||||||
|
def test_run_http_fail_no_playwright(self, mock_http, yaml_config, output_json):
|
||||||
|
"""Run avec HTTP echoue sans Playwright."""
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = False
|
||||||
|
mock_result.error = "Connection refused"
|
||||||
|
mock_http.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.invoke(
|
||||||
|
app,
|
||||||
|
["run", "--yaml", str(yaml_config), "--out", str(output_json), "--no-db"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Doit quand meme creer le fichier JSON (avec snapshot failed)
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert output_json.exists()
|
||||||
|
|
||||||
|
def test_run_invalid_yaml(self, tmp_path, output_json):
|
||||||
|
"""Run avec YAML invalide echoue."""
|
||||||
|
yaml_file = tmp_path / "invalid.yaml"
|
||||||
|
yaml_file.write_text("invalid: [yaml: content", encoding="utf-8")
|
||||||
|
|
||||||
|
result = runner.invoke(
|
||||||
|
app,
|
||||||
|
["run", "--yaml", str(yaml_file), "--out", str(output_json)],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.exit_code == 1
|
||||||
|
|
||||||
|
def test_run_with_debug(self, yaml_config, output_json):
|
||||||
|
"""Run avec --debug active les logs."""
|
||||||
|
with patch("pricewatch.app.cli.main.fetch_http") as mock_fetch:
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = True
|
||||||
|
mock_result.html = "<html><body>Test</body></html>"
|
||||||
|
mock_fetch.return_value = mock_result
|
||||||
|
|
||||||
|
result = runner.invoke(
|
||||||
|
app,
|
||||||
|
[
|
||||||
|
"run",
|
||||||
|
"--yaml",
|
||||||
|
str(yaml_config),
|
||||||
|
"--out",
|
||||||
|
str(output_json),
|
||||||
|
"--debug",
|
||||||
|
"--no-db",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.exit_code == 0
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_playwright")
|
||||||
|
def test_run_force_playwright(self, mock_pw, tmp_path, output_json):
|
||||||
|
"""Run avec force_playwright skip HTTP."""
|
||||||
|
yaml_content = """
|
||||||
|
urls:
|
||||||
|
- "https://www.amazon.fr/dp/B08N5WRWNW"
|
||||||
|
options:
|
||||||
|
use_playwright: true
|
||||||
|
force_playwright: true
|
||||||
|
headful: false
|
||||||
|
save_html: false
|
||||||
|
save_screenshot: false
|
||||||
|
timeout_ms: 30000
|
||||||
|
"""
|
||||||
|
yaml_file = tmp_path / "force_pw.yaml"
|
||||||
|
yaml_file.write_text(yaml_content, encoding="utf-8")
|
||||||
|
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.success = True
|
||||||
|
mock_result.html = "<html><body>PW content</body></html>"
|
||||||
|
mock_result.screenshot = None
|
||||||
|
mock_pw.return_value = mock_result
|
||||||
|
|
||||||
|
with patch("pricewatch.app.cli.main.fetch_http") as mock_http:
|
||||||
|
result = runner.invoke(
|
||||||
|
app,
|
||||||
|
["run", "--yaml", str(yaml_file), "--out", str(output_json), "--no-db"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# HTTP ne doit pas etre appele
|
||||||
|
mock_http.assert_not_called()
|
||||||
|
mock_pw.assert_called()
|
||||||
|
assert result.exit_code == 0
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_http")
|
||||||
|
def test_run_unknown_store(self, mock_fetch, tmp_path, output_json):
|
||||||
|
"""Run avec URL de store inconnu."""
|
||||||
|
yaml_content = """
|
||||||
|
urls:
|
||||||
|
- "https://www.unknown-store.com/product/123"
|
||||||
|
options:
|
||||||
|
use_playwright: false
|
||||||
|
"""
|
||||||
|
yaml_file = tmp_path / "unknown.yaml"
|
||||||
|
yaml_file.write_text(yaml_content, encoding="utf-8")
|
||||||
|
|
||||||
|
result = runner.invoke(
|
||||||
|
app,
|
||||||
|
["run", "--yaml", str(yaml_file), "--out", str(output_json), "--no-db"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Doit continuer sans crash
|
||||||
|
assert result.exit_code == 0
|
||||||
|
# HTTP ne doit pas etre appele (store non trouve)
|
||||||
|
mock_fetch.assert_not_called()
|
||||||
|
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_http")
|
||||||
|
@patch("pricewatch.app.cli.main.fetch_playwright")
|
||||||
|
def test_run_with_save_screenshot(self, mock_pw, mock_http, tmp_path, output_json):
|
||||||
|
"""Run avec save_screenshot."""
|
||||||
|
yaml_content = """
|
||||||
|
urls:
|
||||||
|
- "https://www.amazon.fr/dp/B08N5WRWNW"
|
||||||
|
options:
|
||||||
|
use_playwright: true
|
||||||
|
force_playwright: false
|
||||||
|
save_screenshot: true
|
||||||
|
timeout_ms: 30000
|
||||||
|
"""
|
||||||
|
yaml_file = tmp_path / "screenshot.yaml"
|
||||||
|
yaml_file.write_text(yaml_content, encoding="utf-8")
|
||||||
|
|
||||||
|
# HTTP fail
|
||||||
|
mock_http_result = MagicMock()
|
||||||
|
mock_http_result.success = False
|
||||||
|
mock_http_result.error = "blocked"
|
||||||
|
mock_http.return_value = mock_http_result
|
||||||
|
|
||||||
|
# PW success avec screenshot
|
||||||
|
mock_pw_result = MagicMock()
|
||||||
|
mock_pw_result.success = True
|
||||||
|
mock_pw_result.html = "<html><body>content</body></html>"
|
||||||
|
mock_pw_result.screenshot = b"fake_png_data"
|
||||||
|
mock_pw.return_value = mock_pw_result
|
||||||
|
|
||||||
|
with patch("pricewatch.app.core.io.save_debug_screenshot") as mock_save:
|
||||||
|
result = runner.invoke(
|
||||||
|
app,
|
||||||
|
["run", "--yaml", str(yaml_file), "--out", str(output_json), "--no-db"],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.exit_code == 0
|
||||||
|
# Le screenshot doit etre sauvegarde si present
|
||||||
|
mock_save.assert_called()
|
||||||
Reference in New Issue
Block a user