Compare commits

6 Commits

Author SHA1 Message Date
Gilles Soulier
e18976ad51 before maj scrap 2026-01-18 07:38:37 +01:00
Gilles Soulier
740c3d7516 before claude 2026-01-18 06:26:17 +01:00
Gilles Soulier
dc19315e5d claudec3 2026-01-17 15:58:01 +01:00
Gilles Soulier
1f7f7da0c3 claude 2026-01-17 14:48:14 +01:00
Gilles Soulier
152c2724fc feat: improve SPA scraping and increase test coverage
- Add SPA support for Playwright with wait_for_network_idle and extra_wait_ms
- Add BaseStore.get_spa_config() and requires_playwright() methods
- Implement AliExpress SPA config with JSON price extraction patterns
- Fix Amazon price parsing to prioritize whole+fraction combination
- Fix AliExpress regex patterns (remove double backslashes)
- Add CLI tests: detect, doctor, fetch, parse, run commands
- Add API tests: auth, logs, products, scraping_logs, webhooks

Tests: 417 passed, 85% coverage

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 14:46:55 +01:00
Gilles Soulier
cf7c415e22 before claude 2026-01-17 13:40:26 +01:00
5736 changed files with 1674725 additions and 391 deletions

7
.claude/settings.json Normal file
View File

@@ -0,0 +1,7 @@
{
"permissions": {
"allow": [
"Bash(sort:*)"
]
}
}

BIN
.coverage

Binary file not shown.

View File

@@ -53,11 +53,14 @@ Le format est basé sur [Keep a Changelog](https://keepachangelog.com/fr/1.0.0/)
- Web UI: popup ajout produit central + favicon
- API: logs Uvicorn exposes pour l UI
- Parsing prix: gestion des separateurs de milliers (espace, NBSP, point)
- API/DB: description + msrp + images/specs exposes, reduction calculee
- API/DB: exposition des champs Amazon enrichis (note, badge, stock texte, modele)
- Web UI: carte produit analytique avec resume, historique plein format et actions compactes
- Web UI: slider colonnes responsive + modal ajout produit scrollable avec footer sticky
### Corrigé
- Migration Alembic: down_revision aligne sur 20260114_02
- Amazon: extraction images via data-a-dynamic-image + filtrage logos
- API: suppression du calcul automatique des reductions (valeurs explicites uniquement)
---

Binary file not shown.

After

Width:  |  Height:  |  Size: 361 KiB

BIN
Image collée (5).png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

View File

@@ -152,6 +152,8 @@ Guide de migration JSON -> DB: `MIGRATION_GUIDE.md`
L'API est protegee par un token simple.
Note: l endpoint `/products` expose des champs Amazon explicites (asin, note, badge Choix d Amazon, stock_text/in_stock, model_number/model_name, main_image/gallery_images). Les reductions ne sont plus calculees cote API.
```bash
export PW_API_TOKEN=change_me
docker compose up -d api

View File

@@ -170,6 +170,7 @@ Liste des tâches priorisées pour le développement de PriceWatch.
- [x] Tests performance (100+ produits)
- [x] CRUD produits
- [x] Historique prix
- [ ] Ajouter migration DB pour les nouveaux champs Amazon (note, badge, stock texte, modele)
### Documentation
- [x] Migration guide (JSON -> DB)

15
analytics-ui/Dockerfile Normal file
View File

@@ -0,0 +1,15 @@
FROM python:3.12-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
EXPOSE 80
CMD ["python", "app.py"]

705
analytics-ui/app.py Normal file
View File

@@ -0,0 +1,705 @@
import os
from typing import Any, Dict, List, Optional, Tuple
from decimal import Decimal
from psycopg2.extras import RealDictCursor
import psycopg2
import redis
from flask import Flask, jsonify, render_template_string
app = Flask(__name__)
def _env_int(name: str, default: int) -> int:
try:
return int(os.getenv(name, "") or default)
except ValueError:
return default
def get_db_connection():
return psycopg2.connect(
host=os.getenv("PW_DB_HOST", "postgres"),
port=_env_int("PW_DB_PORT", 5432),
dbname=os.getenv("PW_DB_NAME", "pricewatch"),
user=os.getenv("PW_DB_USER", "pricewatch"),
password=os.getenv("PW_DB_PASSWORD", "pricewatch"),
)
def fetch_db_metrics() -> Tuple[Dict[str, Any], Optional[str]]:
data: Dict[str, Any] = {"counts": {}, "latest_products": []}
try:
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM products")
data["counts"]["products"] = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM price_history")
data["counts"]["price_history"] = cur.fetchone()[0]
cur.execute(
"SELECT COUNT(*) FROM scraping_logs"
)
data["counts"]["scraping_logs"] = cur.fetchone()[0]
cur.execute(
"""
SELECT id, source, reference, title, last_updated_at
FROM products
ORDER BY last_updated_at DESC
LIMIT 5
"""
)
rows = cur.fetchall()
data["latest_products"] = [
{
"id": row[0],
"source": row[1],
"reference": row[2],
"title": row[3] or "Sans titre",
"updated": row[4].strftime("%Y-%m-%d %H:%M:%S")
if row[4]
else "n/a",
}
for row in rows
]
return data, None
except Exception as exc: # pragma: no cover (simple explorer)
return data, str(exc)
def _serialize_decimal(value):
if isinstance(value, Decimal):
return float(value)
return value
def fetch_product_history(product_id: int) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""Récupère l'historique complet des scraps pour un produit."""
rows: List[Dict[str, Any]] = []
try:
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT
ph.id,
ph.price,
ph.shipping_cost,
ph.stock_status,
ph.fetch_method,
ph.fetch_status,
ph.fetched_at
FROM price_history ph
WHERE ph.product_id = %s
ORDER BY ph.fetched_at DESC
""",
(product_id,),
)
fetched = cur.fetchall()
for item in fetched:
serialized = {key: _serialize_decimal(value) for key, value in item.items()}
if serialized.get("fetched_at"):
serialized["fetched_at"] = serialized["fetched_at"].strftime(
"%Y-%m-%d %H:%M:%S"
)
rows.append(serialized)
return rows, None
except Exception as exc:
return rows, str(exc)
def fetch_all_price_history(limit: int = 500) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""Récupère toutes les entrées de price_history avec infos produit."""
rows: List[Dict[str, Any]] = []
try:
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT
ph.id,
ph.product_id,
p.source,
p.reference,
p.title,
ph.price,
ph.shipping_cost,
ph.stock_status,
ph.fetch_method,
ph.fetch_status,
ph.fetched_at
FROM price_history ph
LEFT JOIN products p ON p.id = ph.product_id
ORDER BY ph.fetched_at DESC
LIMIT %s
""",
(limit,),
)
fetched = cur.fetchall()
for item in fetched:
serialized = {key: _serialize_decimal(value) for key, value in item.items()}
if serialized.get("fetched_at"):
serialized["fetched_at"] = serialized["fetched_at"].strftime(
"%Y-%m-%d %H:%M:%S"
)
rows.append(serialized)
return rows, None
except Exception as exc:
return rows, str(exc)
def fetch_products_list(limit: int = 200) -> Tuple[List[Dict[str, Any]], Optional[str]]:
rows: List[Dict[str, Any]] = []
try:
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""
SELECT
p.id,
p.source,
p.reference,
p.title,
p.url,
p.category,
p.description,
p.currency,
p.msrp,
p.last_updated_at,
ph.price,
ph.stock_status,
ph.fetch_status,
ph.fetch_method,
ph.fetched_at
FROM products p
LEFT JOIN LATERAL (
SELECT price, stock_status, fetch_status, fetch_method, fetched_at
FROM price_history
WHERE product_id = p.id
ORDER BY fetched_at DESC
LIMIT 1
) ph ON true
ORDER BY p.last_updated_at DESC
LIMIT %s
""",
(limit,),
)
fetched = cur.fetchall()
for item in fetched:
serialized = {key: _serialize_decimal(value) for key, value in item.items()}
if serialized.get("last_updated_at"):
serialized["last_updated_at"] = serialized["last_updated_at"].strftime(
"%Y-%m-%d %H:%M:%S"
)
if serialized.get("fetched_at"):
serialized["fetched_at"] = serialized["fetched_at"].strftime(
"%Y-%m-%d %H:%M:%S"
)
rows.append(serialized)
return rows, None
except Exception as exc:
return rows, str(exc)
def get_redis_client() -> redis.Redis:
return redis.Redis(
host=os.getenv("PW_REDIS_HOST", "redis"),
port=_env_int("PW_REDIS_PORT", 6379),
db=_env_int("PW_REDIS_DB", 0),
socket_connect_timeout=2,
socket_timeout=2,
)
def check_redis() -> Tuple[str, Optional[str]]:
client = get_redis_client()
try:
client.ping()
return "OK", None
except Exception as exc:
return "KO", str(exc)
TEMPLATE = """
<!doctype html>
<html lang="fr">
<head>
<meta charset="utf-8" />
<title>PriceWatch Analytics UI</title>
<style>
body { font-family: "JetBrains Mono", system-ui, monospace; background:#1f1f1b; color:#ebe0c8; margin:0; padding:32px; }
main { max-width: 960px; margin: 0 auto; }
h1 { margin-bottom: 0; }
section { margin-top: 24px; background:#282828; border:1px solid rgba(255,255,255,0.08); padding:16px; border-radius:14px; box-shadow:0 14px 30px rgba(0,0,0,0.35); }
table { width:100%; border-collapse:collapse; margin-top:12px; }
th, td { text-align:left; padding:6px 8px; border-bottom:1px solid rgba(255,255,255,0.08); }
.status { display:inline-flex; align-items:center; gap:6px; font-size:14px; padding:4px 10px; border-radius:999px; background:rgba(255,255,255,0.05); }
.status.ok { background:rgba(184,187,38,0.15); }
.status.ko { background:rgba(251,73,52,0.2); }
.muted { color:rgba(255,255,255,0.5); font-size:13px; }
.browser-panel { margin-top: 16px; display: flex; flex-direction: column; gap: 12px; }
.browser-controls { display: flex; flex-wrap: wrap; gap: 8px; align-items: center; }
.browser-controls button { border-radius: 8px; border: 1px solid rgba(255,255,255,0.12); background: rgba(255,255,255,0.04); color: inherit; padding: 6px 12px; cursor: pointer; transition: transform 0.15s ease; }
.browser-controls button:hover { transform: translateY(-1px); }
.browser-display { padding: 12px; border-radius: 12px; background: rgba(255,255,255,0.02); border: 1px solid rgba(255,255,255,0.08); min-height: 150px; font-size: 0.85rem; }
.browser-display dt { font-weight: 700; }
.browser-display dd { margin: 0 0 8px 0; }
.browser-indicator { font-size: 0.9rem; }
</style>
</head>
<body>
<main>
<header>
<h1>PriceWatch Analytics UI</h1>
<p class="muted">PostgreSQL : {{ db_status }} · Redis : {{ redis_status }}</p>
</header>
<section>
<h2>Vue rapide</h2>
<div class="status {{ 'ok' if db_error is none else 'ko' }}">
Base : {{ db_status }}
</div>
<div class="status {{ 'ok' if redis_status == 'OK' else 'ko' }}">
Redis : {{ redis_status }}
</div>
{% if db_error or redis_error %}
<p class="muted">Erreurs : {{ db_error or '' }} {{ redis_error or '' }}</p>
{% endif %}
</section>
<section>
<h2>Stats métier</h2>
<table>
<tr><th>Produits</th><td>{{ metrics.counts.products }}</td></tr>
<tr><th>Historique prix</th><td>{{ metrics.counts.price_history }}</td></tr>
<tr><th>Logs de scraping</th><td>{{ metrics.counts.scraping_logs }}</td></tr>
</table>
</section>
<section>
<h2>Produits récemment mis à jour</h2>
{% if metrics.latest_products %}
<table>
<thead>
<tr><th>ID</th><th>Store</th><th>Référence</th><th>Révision</th><th>Mis à jour</th></tr>
</thead>
<tbody>
{% for item in metrics.latest_products %}
<tr>
<td>{{ item.id }}</td>
<td>{{ item.source }}</td>
<td>{{ item.reference }}</td>
<td>{{ item.title[:40] }}{% if item.title|length > 40 %}…{% endif %}</td>
<td>{{ item.updated }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="muted">Aucun produit enregistré.</p>
{% endif %}
</section>
<section>
<h2>Parcourir la base (produits)</h2>
<div class="browser-panel">
<div class="browser-controls">
<button id="load-products">Charger les produits</button>
<button id="product-prev" disabled>Précédent</button>
<button id="product-next" disabled>Suivant</button>
<strong class="browser-indicator" id="product-indicator">0 / 0</strong>
<span class="muted" id="product-message"></span>
</div>
<dl class="browser-display" id="product-details">
<dt data-field="title">Titre</dt>
<dd id="product-title">-</dd>
<dt data-field="store">Store</dt>
<dd data-field="store">-</dd>
<dt data-field="reference">Référence</dt>
<dd data-field="reference">-</dd>
<dt data-field="price">Dernier prix</dt>
<dd data-field="price">-</dd>
<dt data-field="currency">Devise</dt>
<dd data-field="currency">-</dd>
<dt data-field="msrp">Prix conseillé</dt>
<dd data-field="msrp">-</dd>
<dt data-field="stock_status">Stock</dt>
<dd data-field="stock_status">-</dd>
<dt data-field="category">Catégorie</dt>
<dd data-field="category">-</dd>
<dt data-field="description">Description</dt>
<dd data-field="description">-</dd>
<dt data-field="last_updated_at">Dernière mise à jour</dt>
<dd data-field="last_updated_at">-</dd>
<dt data-field="fetched_at">Historique dernier scrap</dt>
<dd data-field="fetched_at">-</dd>
</dl>
</div>
</section>
<section>
<h2>Historique complet des scraps</h2>
<div class="browser-panel">
<div class="browser-controls">
<button id="load-history">Charger l'historique du produit sélectionné</button>
<span class="muted" id="history-message"></span>
</div>
<div class="history-table-container" style="max-height: 400px; overflow-y: auto; margin-top: 12px;">
<table id="history-table">
<thead>
<tr>
<th>Date</th>
<th>Prix</th>
<th>Frais port</th>
<th>Stock</th>
<th>Méthode</th>
<th>Statut</th>
</tr>
</thead>
<tbody id="history-body">
<tr><td colspan="6" class="muted">Sélectionnez un produit puis cliquez sur "Charger l'historique"</td></tr>
</tbody>
</table>
</div>
</div>
</section>
<section>
<h2>Parcourir la table price_history</h2>
<div class="browser-panel">
<div class="browser-controls">
<button id="load-price-history">Charger price_history</button>
<button id="ph-prev" disabled>Précédent</button>
<button id="ph-next" disabled>Suivant</button>
<strong class="browser-indicator" id="ph-indicator">0 / 0</strong>
<span class="muted" id="ph-message"></span>
</div>
<dl class="browser-display" id="ph-details">
<dt>ID</dt>
<dd id="ph-id">-</dd>
<dt>Product ID</dt>
<dd id="ph-product-id">-</dd>
<dt>Store</dt>
<dd id="ph-source">-</dd>
<dt>Référence</dt>
<dd id="ph-reference">-</dd>
<dt>Titre produit</dt>
<dd id="ph-title">-</dd>
<dt>Prix</dt>
<dd id="ph-price">-</dd>
<dt>Frais de port</dt>
<dd id="ph-shipping">-</dd>
<dt>Stock</dt>
<dd id="ph-stock">-</dd>
<dt>Méthode</dt>
<dd id="ph-method">-</dd>
<dt>Statut</dt>
<dd id="ph-status">-</dd>
<dt>Date scraping</dt>
<dd id="ph-fetched-at">-</dd>
</dl>
</div>
</section>
</main>
<script>
document.addEventListener("DOMContentLoaded", () => {
const loadBtn = document.getElementById("load-products");
const prevBtn = document.getElementById("product-prev");
const nextBtn = document.getElementById("product-next");
const indicator = document.getElementById("product-indicator");
const message = document.getElementById("product-message");
const titleEl = document.getElementById("product-title");
const fields = Array.from(document.querySelectorAll("[data-field]")).reduce((acc, el) => {
acc[el.getAttribute("data-field")] = el;
return acc;
}, {});
let products = [];
let cursor = 0;
const setStatus = (text) => {
message.textContent = text || "";
};
const renderProduct = () => {
if (!products.length) {
indicator.textContent = "0 / 0";
titleEl.textContent = "-";
Object.values(fields).forEach((el) => (el.textContent = "-"));
prevBtn.disabled = true;
nextBtn.disabled = true;
return;
}
const current = products[cursor];
indicator.textContent = `${cursor + 1} / ${products.length}`;
titleEl.textContent = current.title || "Sans titre";
const mapField = {
store: current.source,
reference: current.reference,
price: current.price !== null && current.price !== undefined ? current.price : "n/a",
currency: current.currency || "EUR",
msrp: current.msrp || "-",
stock_status: current.stock_status || "n/a",
category: current.category || "n/a",
description: (current.description || "n/a").slice(0, 200),
last_updated_at: current.last_updated_at || "n/a",
fetched_at: current.fetched_at || "n/a",
};
Object.entries(mapField).forEach(([key, value]) => {
if (fields[key]) {
fields[key].textContent = value;
}
});
prevBtn.disabled = cursor === 0;
nextBtn.disabled = cursor >= products.length - 1;
};
const fetchProducts = async () => {
setStatus("Chargement…");
try {
const response = await fetch("/products.json");
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const data = await response.json();
if (!Array.isArray(data)) {
throw new Error("Réponse invalide");
}
products = data;
cursor = 0;
setStatus(`Chargé ${products.length} produit(s)`);
renderProduct();
} catch (err) {
setStatus(`Erreur: ${err.message}`);
products = [];
renderProduct();
}
};
loadBtn.addEventListener("click", fetchProducts);
prevBtn.addEventListener("click", () => {
if (cursor > 0) {
cursor -= 1;
renderProduct();
}
});
nextBtn.addEventListener("click", () => {
if (cursor + 1 < products.length) {
cursor += 1;
renderProduct();
}
});
// Historique des scraps
const loadHistoryBtn = document.getElementById("load-history");
const historyMessage = document.getElementById("history-message");
const historyBody = document.getElementById("history-body");
const setHistoryStatus = (text) => {
historyMessage.textContent = text || "";
};
const formatStock = (status) => {
const stockMap = {
"in_stock": "✓ En stock",
"out_of_stock": "✗ Rupture",
"limited": "⚠ Limité",
"preorder": "⏳ Précommande",
"unknown": "? Inconnu"
};
return stockMap[status] || status || "-";
};
const formatMethod = (method) => {
return method === "playwright" ? "🎭 Playwright" : "📡 HTTP";
};
const formatStatus = (status) => {
const statusMap = {
"success": "✓ Succès",
"partial": "⚠ Partiel",
"failed": "✗ Échec"
};
return statusMap[status] || status || "-";
};
const renderHistory = (history) => {
if (!history.length) {
historyBody.innerHTML = '<tr><td colspan="6" class="muted">Aucun historique disponible pour ce produit.</td></tr>';
return;
}
historyBody.innerHTML = history.map(entry => `
<tr>
<td>${entry.fetched_at || "-"}</td>
<td>${entry.price !== null ? entry.price + "" : "-"}</td>
<td>${entry.shipping_cost !== null ? entry.shipping_cost + "" : "-"}</td>
<td>${formatStock(entry.stock_status)}</td>
<td>${formatMethod(entry.fetch_method)}</td>
<td>${formatStatus(entry.fetch_status)}</td>
</tr>
`).join("");
};
const fetchHistory = async () => {
if (!products.length) {
setHistoryStatus("Chargez d'abord les produits.");
return;
}
const current = products[cursor];
if (!current || !current.id) {
setHistoryStatus("Aucun produit sélectionné.");
return;
}
setHistoryStatus(`Chargement de l'historique pour le produit #${current.id}…`);
try {
const response = await fetch(`/product/${current.id}/history.json`);
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const data = await response.json();
if (!Array.isArray(data)) {
throw new Error("Réponse invalide");
}
setHistoryStatus(`${data.length} entrée(s) pour "${(current.title || "Sans titre").slice(0, 30)}…"`);
renderHistory(data);
} catch (err) {
setHistoryStatus(`Erreur: ${err.message}`);
historyBody.innerHTML = '<tr><td colspan="6" class="muted">Erreur lors du chargement.</td></tr>';
}
};
loadHistoryBtn.addEventListener("click", fetchHistory);
// Parcourir price_history
const loadPhBtn = document.getElementById("load-price-history");
const phPrevBtn = document.getElementById("ph-prev");
const phNextBtn = document.getElementById("ph-next");
const phIndicator = document.getElementById("ph-indicator");
const phMessage = document.getElementById("ph-message");
let priceHistoryData = [];
let phCursor = 0;
const setPhStatus = (text) => {
phMessage.textContent = text || "";
};
const renderPriceHistory = () => {
const els = {
id: document.getElementById("ph-id"),
productId: document.getElementById("ph-product-id"),
source: document.getElementById("ph-source"),
reference: document.getElementById("ph-reference"),
title: document.getElementById("ph-title"),
price: document.getElementById("ph-price"),
shipping: document.getElementById("ph-shipping"),
stock: document.getElementById("ph-stock"),
method: document.getElementById("ph-method"),
status: document.getElementById("ph-status"),
fetchedAt: document.getElementById("ph-fetched-at"),
};
if (!priceHistoryData.length) {
phIndicator.textContent = "0 / 0";
Object.values(els).forEach((el) => (el.textContent = "-"));
phPrevBtn.disabled = true;
phNextBtn.disabled = true;
return;
}
const current = priceHistoryData[phCursor];
phIndicator.textContent = `${phCursor + 1} / ${priceHistoryData.length}`;
els.id.textContent = current.id || "-";
els.productId.textContent = current.product_id || "-";
els.source.textContent = current.source || "-";
els.reference.textContent = current.reference || "-";
els.title.textContent = current.title ? (current.title.length > 60 ? current.title.slice(0, 60) + "" : current.title) : "-";
els.price.textContent = current.price !== null ? current.price + "" : "-";
els.shipping.textContent = current.shipping_cost !== null ? current.shipping_cost + "" : "-";
els.stock.textContent = formatStock(current.stock_status);
els.method.textContent = formatMethod(current.fetch_method);
els.status.textContent = formatStatus(current.fetch_status);
els.fetchedAt.textContent = current.fetched_at || "-";
phPrevBtn.disabled = phCursor === 0;
phNextBtn.disabled = phCursor >= priceHistoryData.length - 1;
};
const fetchPriceHistory = async () => {
setPhStatus("Chargement…");
try {
const response = await fetch("/price_history.json");
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const data = await response.json();
if (!Array.isArray(data)) {
throw new Error("Réponse invalide");
}
priceHistoryData = data;
phCursor = 0;
setPhStatus(`Chargé ${priceHistoryData.length} entrée(s)`);
renderPriceHistory();
} catch (err) {
setPhStatus(`Erreur: ${err.message}`);
priceHistoryData = [];
renderPriceHistory();
}
};
loadPhBtn.addEventListener("click", fetchPriceHistory);
phPrevBtn.addEventListener("click", () => {
if (phCursor > 0) {
phCursor -= 1;
renderPriceHistory();
}
});
phNextBtn.addEventListener("click", () => {
if (phCursor + 1 < priceHistoryData.length) {
phCursor += 1;
renderPriceHistory();
}
});
});
</script>
</body>
</html>
"""
@app.route("/")
def root():
metrics, db_error = fetch_db_metrics()
redis_status, redis_error = check_redis()
return render_template_string(
TEMPLATE,
metrics=metrics,
db_status="connecté" if db_error is None else "erreur",
db_error=db_error,
redis_status=redis_status,
redis_error=redis_error,
)
@app.route("/products.json")
def products_json():
products, error = fetch_products_list()
if error:
return jsonify({"error": error}), 500
return jsonify(products)
@app.route("/product/<int:product_id>/history.json")
def product_history_json(product_id: int):
history, error = fetch_product_history(product_id)
if error:
return jsonify({"error": error}), 500
return jsonify(history)
@app.route("/price_history.json")
def all_price_history_json():
history, error = fetch_all_price_history()
if error:
return jsonify({"error": error}), 500
return jsonify(history)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=80)

View File

@@ -0,0 +1,3 @@
Flask==3.0.0
psycopg2-binary==2.9.11
redis==5.0.0

View File

@@ -33,6 +33,19 @@ services:
depends_on:
- postgres
- redis
worker:
build: .
command: python -m pricewatch.app.cli.main worker
env_file:
- .env
environment:
PW_DB_HOST: postgres
PW_REDIS_HOST: redis
TZ: Europe/Paris
depends_on:
- postgres
- redis
frontend:
build: ./webui
@@ -40,9 +53,58 @@ services:
- "3000:80"
environment:
TZ: Europe/Paris
VITE_API_TOKEN: ${API_TOKEN:-}
env_file:
- .env
depends_on:
- api
analytics-ui:
build: ./analytics-ui
ports:
- "8070:80"
environment:
TZ: Europe/Paris
PW_DB_HOST: postgres
PW_DB_PORT: 5432
PW_DB_NAME: pricewatch
PW_DB_USER: pricewatch
PW_DB_PASSWORD: pricewatch
PW_REDIS_HOST: redis
PW_REDIS_PORT: 6379
PW_REDIS_DB: 0
env_file:
- .env
depends_on:
- postgres
- redis
adminer:
image: adminer
ports:
- "8071:8080"
environment:
TZ: Europe/Paris
depends_on:
- postgres
pgadmin:
image: dpage/pgadmin4:latest
ports:
- "8072:80"
environment:
TZ: Europe/Paris
PGADMIN_DEFAULT_EMAIL: admin@pricewatch.dev
PGADMIN_DEFAULT_PASSWORD: pricewatch
PGADMIN_CONFIG_SERVER_MODE: "False"
PGADMIN_CONFIG_MASTER_PASSWORD_REQUIRED: "False"
volumes:
- pricewatch_pgadmin:/var/lib/pgadmin
- ./pgadmin-servers.json:/pgadmin4/servers.json:ro
depends_on:
- postgres
volumes:
pricewatch_pgdata:
pricewatch_redisdata:
pricewatch_pgadmin:

View File

@@ -0,0 +1,50 @@
## Objectif
Améliorer la clarté et la lisibilité de linterface (catalogue, filtres, détails produit) **sans modifier la palette de couleurs existante**.
## Contraintes strictes
- Interdit : changement de couleurs (fond, accent, badges, etc.)
- Autorisé : typographie, espacements, hiérarchie, mise en page, libellés, tooltips, états, comportements hover/focus, clamp.
---
## Tâches
### Cartes produit (catalogue)
- [ ] Titre : line-clamp 2 lignes + ellipse
- [ ] Tooltip titre complet (survol + clavier)
- [ ] Prix : taille 1820px, bold (prix = focal n°1)
- [ ] Delta : format standard ▲/▼ + % (sinon afficher —)
- [ ] Statuts : remplacer `unknown/n/a` par `En stock / Rupture / Inconnu / Erreur scrape`
- [ ] Badges statuts homogènes (sans changer couleurs)
- [ ] Actions : 1 action primaire visible, secondaires au hover ou menu “...”
- [ ] Tooltips obligatoires sur toutes les icônes + aria-label
### Panneau Détails (colonne droite)
- [ ] Découper en sections : Résumé / Prix / Historique / Source / Actions
- [ ] Prix dominant visuellement + espacement vertical accru
- [ ] URL cliquable + bouton copier + ASIN visible
- [ ] Actions regroupées en bas
### Filtres (colonne gauche)
- [ ] Afficher compteur `X affichés / Y`
- [ ] Chips filtres actifs (cliquables pour retirer)
- [ ] Bouton Reset filtres toujours visible
- [ ] Labels cohérents + placeholders explicites
### Comparaison
- [ ] Message guidage : “Sélectionnez 2 à 4 produits…”
- [ ] Afficher compteur de sélection (`2 sélectionnés`, etc.)
### Accessibilité
- [ ] Focus clavier visible
- [ ] Navigation clavier : Tab sur cartes, Enter ouvre détails
- [ ] Icônes avec aria-label + tooltips accessibles
---
## Critères dacceptation
- Prix clairement dominant sur cartes et détails
- Titres non envahissants (2 lignes max)
- Statuts compréhensibles (plus de unknown/n/a)
- Filtres : X/Y + chips + reset
- Aucune couleur modifiée

26
fonctionnement.md Normal file
View File

@@ -0,0 +1,26 @@
## Fonctionnement général de PriceWatch
Lorsquun utilisateur colle une URL dans la web UI et déclenche lajout/déclenchement dun scrap, voici le cheminement principal entre le **frontend Vue** et le **backend FastAPI** :
1. **Entrée utilisateur / validation**
* Le popup "Ajouter un produit" envoie `POST /scrape/preview` avec lURL + le mode (HTTP ou Playwright).
* Les boutons "Ajouter" et "Enregistrer" sont accessibles après que la preview ait renvoyé un `ProductSnapshot`, sinon une erreur est affichée dans le popup.
2. **Backend (API)**
* Lendpoint `/scrape/preview` reçoit lURL, détermine le store (via `pricewatch/app/core/registry.py`) et utilise un parser adapté (`pricewatch/app/stores/<store>/`) pour extraire titre, prix, images, description, caractéristiques, stock, etc.
* Si la page nécessite un navigateur, la stratégie Playwright (avec `pricewatch/app/scraping/playwright.py`) est déclenchée, sinon le fetch HTTP simple (`pricewatch/app/scraping/http.py`) suffit.
* Le snapshot structuré `ProductSnapshot` contient les métadonnées, la liste dimages (jpg/webp) et les champs `msrp`, `discount`, `categories`, `specs`, etc.
* En cas de succès, la preview renvoie un JSON que le frontend affiche dans le popup. En cas derreur (404, 401, scraping bloqué), lutilisateur voit directement le message retourné.
3. **Confirmation / persist**
* Quand lutilisateur clique sur "Enregistrer", la web UI déclenche `POST /scrape/commit` avec lobjet snapshot.
* Le backend réinsère les données dans la base (`pricewatch/app/core/io.py`) et lAPI `/products` ou `/enqueue` peut ensuite réafficher ou re-scraper ce produit.
4. **Cycle de rafraîchissement**
* Le frontend peut aussi appeler `/enqueue` pour forcer un nouveau scrap dune URL existante (bouton refresh dans la carte ou le détail).
* Le backend place la requête dans Redis (via `pricewatch/app/core/queue.py`), un worker la consomme, met à jour la base, et le frontend récupère les nouvelles données via `GET /products`.
5. **Observabilité / logs**
* Les étapes critiques (preview, commit, enqueue) génèrent des logs (backend/uvicorn) disponibles dans la web UI via les boutons logs. Les erreurs sont mises en rouge et peuvent être copiées pour diagnostic.
Ce flux respecte les contraintes : la web UI déroule les interactions, le backend orchestre le scraping (HTTP vs Playwright), applique la logique store et diffuse le résultat via les endpoints REST existants.

BIN
image.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.5 KiB

14
pgadmin-servers.json Normal file
View File

@@ -0,0 +1,14 @@
{
"Servers": {
"1": {
"Name": "PriceWatch PostgreSQL",
"Group": "Servers",
"Host": "postgres",
"Port": 5432,
"MaintenanceDB": "pricewatch",
"Username": "pricewatch",
"PassFile": "/pgadmin4/pgpass",
"SSLMode": "prefer"
}
}
}

View File

@@ -196,6 +196,8 @@ Guide de migration JSON -> DB: `MIGRATION_GUIDE.md`
L'API est protegee par un token simple.
Note: l endpoint `/products` expose des champs Amazon explicites (asin, note, badge Choix d Amazon, stock_text/in_stock, model_number/model_name, main_image/gallery_images). Les reductions ne sont plus calculees cote API.
```bash
export PW_API_TOKEN=change_me
docker compose up -d api
@@ -204,8 +206,54 @@ docker compose up -d api
Exemples:
```bash
curl -H "Authorization: Bearer $PW_API_TOKEN" http://localhost:8000/products
curl http://localhost:8000/health
curl -H "Authorization: Bearer $PW_API_TOKEN" http://localhost:8001/products
curl http://localhost:8001/health
```
Filtres (exemples rapides):
```bash
curl -H "Authorization: Bearer $PW_API_TOKEN" \\
"http://localhost:8001/products?price_min=100&stock_status=in_stock"
curl -H "Authorization: Bearer $PW_API_TOKEN" \\
"http://localhost:8001/products/1/prices?fetch_status=success&fetched_after=2026-01-14T00:00:00"
curl -H "Authorization: Bearer $PW_API_TOKEN" \\
"http://localhost:8001/logs?fetch_status=failed&fetched_before=2026-01-15T00:00:00"
```
Exports (CSV/JSON):
```bash
curl -H "Authorization: Bearer $PW_API_TOKEN" \\
"http://localhost:8001/products/export?format=csv"
curl -H "Authorization: Bearer $PW_API_TOKEN" \\
"http://localhost:8001/logs/export?format=json"
```
CRUD (examples rapides):
```bash
curl -H "Authorization: Bearer $PW_API_TOKEN" -X POST http://localhost:8001/products \\
-H "Content-Type: application/json" \\
-d '{"source":"amazon","reference":"REF1","url":"https://example.com"}'
```
Webhooks (exemples rapides):
```bash
curl -H "Authorization: Bearer $PW_API_TOKEN" -X POST http://localhost:8001/webhooks \\
-H "Content-Type: application/json" \\
-d '{"event":"price_changed","url":"https://example.com/webhook","enabled":true}'
curl -H "Authorization: Bearer $PW_API_TOKEN" -X POST http://localhost:8001/webhooks/1/test
```
## Web UI (Phase 4)
Interface Vue 3 dense avec themes Gruvbox/Monokai, header fixe, sidebar filtres, et split compare.
```bash
docker compose up -d frontend
# Acces: http://localhost:3000
```
## Configuration (scrap_url.yaml)

View File

@@ -22,6 +22,7 @@ pricewatch/app/scraping/pipeline.py
pricewatch/app/scraping/pw_fetch.py
pricewatch/app/stores/__init__.py
pricewatch/app/stores/base.py
pricewatch/app/stores/price_parser.py
pricewatch/app/stores/amazon/__init__.py
pricewatch/app/stores/amazon/store.py
pricewatch/app/stores/cdiscount/__init__.py

View File

@@ -21,37 +21,43 @@ from sqlalchemy import and_, desc, func
from sqlalchemy.orm import Session
from pricewatch.app.api.schemas import (
BackendLogEntry,
ClassificationOptionsOut,
ClassificationRuleCreate,
ClassificationRuleOut,
ClassificationRuleUpdate,
EnqueueRequest,
EnqueueResponse,
HealthStatus,
PriceHistoryOut,
PriceHistoryCreate,
PriceHistoryOut,
PriceHistoryUpdate,
ProductOut,
ProductCreate,
ProductHistoryPoint,
ProductOut,
ProductUpdate,
ScheduleRequest,
ScheduleResponse,
ScrapingLogOut,
ScrapingLogCreate,
ScrapingLogUpdate,
ScrapePreviewRequest,
ScrapePreviewResponse,
ScrapeCommitRequest,
ScrapeCommitResponse,
VersionResponse,
BackendLogEntry,
ScrapePreviewRequest,
ScrapePreviewResponse,
ScrapingLogCreate,
ScrapingLogOut,
ScrapingLogUpdate,
UvicornLogEntry,
WebhookOut,
VersionResponse,
WebhookCreate,
WebhookUpdate,
WebhookOut,
WebhookTestResponse,
WebhookUpdate,
)
from pricewatch.app.core.config import get_config
from pricewatch.app.core.logging import get_logger
from pricewatch.app.core.schema import ProductSnapshot
from pricewatch.app.db.connection import check_db_connection, get_session
from pricewatch.app.db.models import PriceHistory, Product, ScrapingLog, Webhook
from pricewatch.app.db.models import ClassificationRule, PriceHistory, Product, ScrapingLog, Webhook
from pricewatch.app.db.repository import ProductRepository
from pricewatch.app.scraping.pipeline import ScrapingPipeline
from pricewatch.app.tasks.scrape import scrape_product
from pricewatch.app.tasks.scheduler import RedisUnavailableError, check_redis_connection, ScrapingScheduler
@@ -187,6 +193,7 @@ def create_product(
url=payload.url,
title=payload.title,
category=payload.category,
type=payload.type,
description=payload.description,
currency=payload.currency,
msrp=payload.msrp,
@@ -240,6 +247,129 @@ def update_product(
return _product_to_out(session, product)
@app.get(
"/classification/rules",
response_model=list[ClassificationRuleOut],
dependencies=[Depends(require_token)],
)
def list_classification_rules(
session: Session = Depends(get_db_session),
) -> list[ClassificationRuleOut]:
"""Liste les regles de classification."""
rules = (
session.query(ClassificationRule)
.order_by(ClassificationRule.sort_order, ClassificationRule.id)
.all()
)
return [
ClassificationRuleOut(
id=rule.id,
category=rule.category,
type=rule.type,
keywords=rule.keywords or [],
sort_order=rule.sort_order,
is_active=rule.is_active,
)
for rule in rules
]
@app.post(
"/classification/rules",
response_model=ClassificationRuleOut,
dependencies=[Depends(require_token)],
)
def create_classification_rule(
payload: ClassificationRuleCreate,
session: Session = Depends(get_db_session),
) -> ClassificationRuleOut:
"""Cree une regle de classification."""
rule = ClassificationRule(
category=payload.category,
type=payload.type,
keywords=payload.keywords,
sort_order=payload.sort_order or 0,
is_active=True if payload.is_active is None else payload.is_active,
)
session.add(rule)
session.commit()
session.refresh(rule)
return ClassificationRuleOut(
id=rule.id,
category=rule.category,
type=rule.type,
keywords=rule.keywords or [],
sort_order=rule.sort_order,
is_active=rule.is_active,
)
@app.patch(
"/classification/rules/{rule_id}",
response_model=ClassificationRuleOut,
dependencies=[Depends(require_token)],
)
def update_classification_rule(
rule_id: int,
payload: ClassificationRuleUpdate,
session: Session = Depends(get_db_session),
) -> ClassificationRuleOut:
"""Met a jour une regle de classification."""
rule = session.query(ClassificationRule).filter(ClassificationRule.id == rule_id).one_or_none()
if not rule:
raise HTTPException(status_code=404, detail="Regle non trouvee")
updates = payload.model_dump(exclude_unset=True)
for key, value in updates.items():
setattr(rule, key, value)
session.commit()
session.refresh(rule)
return ClassificationRuleOut(
id=rule.id,
category=rule.category,
type=rule.type,
keywords=rule.keywords or [],
sort_order=rule.sort_order,
is_active=rule.is_active,
)
@app.delete(
"/classification/rules/{rule_id}",
dependencies=[Depends(require_token)],
)
def delete_classification_rule(
rule_id: int,
session: Session = Depends(get_db_session),
) -> dict[str, str]:
"""Supprime une regle de classification."""
rule = session.query(ClassificationRule).filter(ClassificationRule.id == rule_id).one_or_none()
if not rule:
raise HTTPException(status_code=404, detail="Regle non trouvee")
session.delete(rule)
session.commit()
return {"status": "deleted"}
@app.get(
"/classification/options",
response_model=ClassificationOptionsOut,
dependencies=[Depends(require_token)],
)
def get_classification_options(
session: Session = Depends(get_db_session),
) -> ClassificationOptionsOut:
"""Expose la liste des categories et types issus des regles actives."""
rules = (
session.query(ClassificationRule)
.filter(ClassificationRule.is_active == True)
.order_by(ClassificationRule.sort_order, ClassificationRule.id)
.all()
)
categories = sorted({rule.category for rule in rules if rule.category})
types = sorted({rule.type for rule in rules if rule.type})
return ClassificationOptionsOut(categories=categories, types=types)
@app.delete("/products/{product_id}", dependencies=[Depends(require_token)])
def delete_product(
product_id: int,
@@ -702,6 +832,13 @@ def preview_scrape(payload: ScrapePreviewRequest) -> ScrapePreviewResponse:
if snapshot is None:
_add_backend_log("ERROR", f"Preview scraping KO: {payload.url}")
return ScrapePreviewResponse(success=False, snapshot=None, error=result.get("error"))
config = get_config()
if config.enable_db:
try:
with get_session(config) as session:
ProductRepository(session).apply_classification(snapshot)
except Exception as exc:
snapshot.add_note(f"Classification ignoree: {exc}")
return ScrapePreviewResponse(
success=bool(result.get("success")),
snapshot=snapshot.model_dump(mode="json"),
@@ -718,7 +855,9 @@ def commit_scrape(payload: ScrapeCommitRequest) -> ScrapeCommitResponse:
_add_backend_log("ERROR", "Commit scraping KO: snapshot invalide")
raise HTTPException(status_code=400, detail="Snapshot invalide") from exc
product_id = ScrapingPipeline(config=get_config()).process_snapshot(snapshot, save_to_db=True)
product_id = ScrapingPipeline(config=get_config()).process_snapshot(
snapshot, save_to_db=True, apply_classification=False
)
_add_backend_log("INFO", f"Commit scraping OK: product_id={product_id}")
return ScrapeCommitResponse(success=True, product_id=product_id)
@@ -794,6 +933,9 @@ def _read_uvicorn_lines(limit: int = 200) -> list[str]:
return []
PRODUCT_HISTORY_LIMIT = 12
def _product_to_out(session: Session, product: Product) -> ProductOut:
"""Helper pour mapper Product + dernier prix."""
latest = (
@@ -804,22 +946,42 @@ def _product_to_out(session: Session, product: Product) -> ProductOut:
)
images = [image.image_url for image in product.images]
specs = {spec.spec_key: spec.spec_value for spec in product.specs}
discount_amount = None
discount_percent = None
if latest and latest.price is not None and product.msrp:
discount_amount = float(product.msrp) - float(latest.price)
if product.msrp > 0:
discount_percent = (discount_amount / float(product.msrp)) * 100
main_image = images[0] if images else None
gallery_images = images[1:] if len(images) > 1 else []
asin = product.reference if product.source == "amazon" else None
history_rows = (
session.query(PriceHistory)
.filter(PriceHistory.product_id == product.id, PriceHistory.price != None)
.order_by(desc(PriceHistory.fetched_at))
.limit(PRODUCT_HISTORY_LIMIT)
.all()
)
history_points = [
ProductHistoryPoint(price=float(row.price), fetched_at=row.fetched_at)
for row in reversed(history_rows)
if row.price is not None
]
return ProductOut(
id=product.id,
source=product.source,
reference=product.reference,
asin=asin,
url=product.url,
title=product.title,
category=product.category,
type=product.type,
description=product.description,
currency=product.currency,
msrp=float(product.msrp) if product.msrp is not None else None,
rating_value=float(product.rating_value) if product.rating_value is not None else None,
rating_count=product.rating_count,
amazon_choice=product.amazon_choice,
amazon_choice_label=product.amazon_choice_label,
discount_text=product.discount_text,
stock_text=product.stock_text,
in_stock=product.in_stock,
model_number=product.model_number,
model_name=product.model_name,
first_seen_at=product.first_seen_at,
last_updated_at=product.last_updated_at,
latest_price=float(latest.price) if latest and latest.price is not None else None,
@@ -829,9 +991,12 @@ def _product_to_out(session: Session, product: Product) -> ProductOut:
latest_stock_status=latest.stock_status if latest else None,
latest_fetched_at=latest.fetched_at if latest else None,
images=images,
main_image=main_image,
gallery_images=gallery_images,
specs=specs,
discount_amount=discount_amount,
discount_percent=discount_percent,
discount_amount=None,
discount_percent=None,
history=history_points,
)

View File

@@ -13,16 +13,32 @@ class HealthStatus(BaseModel):
redis: bool
class ProductHistoryPoint(BaseModel):
price: float
fetched_at: datetime
class ProductOut(BaseModel):
id: int
source: str
reference: str
asin: Optional[str] = None
url: str
title: Optional[str] = None
category: Optional[str] = None
type: Optional[str] = None
description: Optional[str] = None
currency: Optional[str] = None
msrp: Optional[float] = None
rating_value: Optional[float] = None
rating_count: Optional[int] = None
amazon_choice: Optional[bool] = None
amazon_choice_label: Optional[str] = None
discount_text: Optional[str] = None
stock_text: Optional[str] = None
in_stock: Optional[bool] = None
model_number: Optional[str] = None
model_name: Optional[str] = None
first_seen_at: datetime
last_updated_at: datetime
latest_price: Optional[float] = None
@@ -30,9 +46,12 @@ class ProductOut(BaseModel):
latest_stock_status: Optional[str] = None
latest_fetched_at: Optional[datetime] = None
images: list[str] = []
main_image: Optional[str] = None
gallery_images: list[str] = []
specs: dict[str, str] = {}
discount_amount: Optional[float] = None
discount_percent: Optional[float] = None
history: list[ProductHistoryPoint] = Field(default_factory=list)
class ProductCreate(BaseModel):
@@ -41,6 +60,7 @@ class ProductCreate(BaseModel):
url: str
title: Optional[str] = None
category: Optional[str] = None
type: Optional[str] = None
description: Optional[str] = None
currency: Optional[str] = None
msrp: Optional[float] = None
@@ -50,6 +70,7 @@ class ProductUpdate(BaseModel):
url: Optional[str] = None
title: Optional[str] = None
category: Optional[str] = None
type: Optional[str] = None
description: Optional[str] = None
currency: Optional[str] = None
msrp: Optional[float] = None
@@ -202,6 +223,36 @@ class VersionResponse(BaseModel):
api_version: str
class ClassificationRuleOut(BaseModel):
id: int
category: Optional[str] = None
type: Optional[str] = None
keywords: list[str] = Field(default_factory=list)
sort_order: int = 0
is_active: bool = True
class ClassificationRuleCreate(BaseModel):
category: Optional[str] = None
type: Optional[str] = None
keywords: list[str] = Field(default_factory=list)
sort_order: Optional[int] = 0
is_active: Optional[bool] = True
class ClassificationRuleUpdate(BaseModel):
category: Optional[str] = None
type: Optional[str] = None
keywords: Optional[list[str]] = None
sort_order: Optional[int] = None
is_active: Optional[bool] = None
class ClassificationOptionsOut(BaseModel):
categories: list[str] = Field(default_factory=list)
types: list[str] = Field(default_factory=list)
class BackendLogEntry(BaseModel):
time: datetime
level: str

BIN
pricewatch/app/core/__pycache__/io.cpython-313.pyc Executable file → Normal file

Binary file not shown.

View File

@@ -93,13 +93,52 @@ class ProductSnapshot(BaseModel):
reference: Optional[str] = Field(
default=None, description="Référence produit (ASIN, SKU, etc.)"
)
asin: Optional[str] = Field(
default=None, description="ASIN Amazon si disponible"
)
category: Optional[str] = Field(default=None, description="Catégorie du produit")
type: Optional[str] = Field(default=None, description="Type du produit")
description: Optional[str] = Field(default=None, description="Description produit")
# Données Amazon explicites (si disponibles)
rating_value: Optional[float] = Field(
default=None, description="Note moyenne affichée"
)
rating_count: Optional[int] = Field(
default=None, description="Nombre d'évaluations"
)
amazon_choice: Optional[bool] = Field(
default=None, description="Badge Choix d'Amazon présent"
)
amazon_choice_label: Optional[str] = Field(
default=None, description="Libellé du badge Choix d'Amazon"
)
discount_text: Optional[str] = Field(
default=None, description="Texte de réduction affiché"
)
stock_text: Optional[str] = Field(
default=None, description="Texte brut de stock"
)
in_stock: Optional[bool] = Field(
default=None, description="Disponibilité dérivée"
)
model_number: Optional[str] = Field(
default=None, description="Numéro du modèle de l'article"
)
model_name: Optional[str] = Field(
default=None, description="Nom du modèle explicite"
)
# Médias
images: list[str] = Field(
default_factory=list, description="Liste des URLs d'images du produit"
)
main_image: Optional[str] = Field(
default=None, description="Image principale du produit"
)
gallery_images: list[str] = Field(
default_factory=list, description="Images de galerie dédoublonnées"
)
# Caractéristiques techniques
specs: dict[str, str] = Field(
@@ -134,6 +173,12 @@ class ProductSnapshot(BaseModel):
"""Filtre les URLs d'images vides."""
return [url.strip() for url in v if url and url.strip()]
@field_validator("gallery_images")
@classmethod
def validate_gallery_images(cls, v: list[str]) -> list[str]:
"""Filtre les URLs de galerie vides."""
return [url.strip() for url in v if url and url.strip()]
model_config = ConfigDict(
use_enum_values=True,
json_schema_extra={

View File

@@ -0,0 +1,350 @@
"""Ajout champs Amazon produit
Revision ID: 0014e51c4927
Revises: 20260115_02_product_details
Create Date: 2026-01-17 19:23:01.866891
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# Revision identifiers, used by Alembic.
revision = '0014e51c4927'
down_revision = '20260115_02_product_details'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('price_history', 'price',
existing_type=sa.NUMERIC(precision=10, scale=2),
comment='Product price',
existing_nullable=True)
op.alter_column('price_history', 'shipping_cost',
existing_type=sa.NUMERIC(precision=10, scale=2),
comment='Shipping cost',
existing_nullable=True)
op.alter_column('price_history', 'stock_status',
existing_type=sa.VARCHAR(length=20),
comment='Stock status (in_stock, out_of_stock, unknown)',
existing_nullable=True)
op.alter_column('price_history', 'fetch_method',
existing_type=sa.VARCHAR(length=20),
comment='Fetch method (http, playwright)',
existing_nullable=False)
op.alter_column('price_history', 'fetch_status',
existing_type=sa.VARCHAR(length=20),
comment='Fetch status (success, partial, failed)',
existing_nullable=False)
op.alter_column('price_history', 'fetched_at',
existing_type=postgresql.TIMESTAMP(),
comment='Scraping timestamp',
existing_nullable=False)
op.alter_column('product_images', 'image_url',
existing_type=sa.TEXT(),
comment='Image URL',
existing_nullable=False)
op.alter_column('product_images', 'position',
existing_type=sa.INTEGER(),
comment='Image position (0=main)',
existing_nullable=False)
op.alter_column('product_specs', 'spec_key',
existing_type=sa.VARCHAR(length=200),
comment="Specification key (e.g., 'Brand', 'Color')",
existing_nullable=False)
op.alter_column('product_specs', 'spec_value',
existing_type=sa.TEXT(),
comment='Specification value',
existing_nullable=False)
op.add_column('products', sa.Column('rating_value', sa.Numeric(precision=3, scale=2), nullable=True, comment='Note moyenne'))
op.add_column('products', sa.Column('rating_count', sa.Integer(), nullable=True, comment="Nombre d'evaluations"))
op.add_column('products', sa.Column('amazon_choice', sa.Boolean(), nullable=True, comment="Badge Choix d'Amazon"))
op.add_column('products', sa.Column('amazon_choice_label', sa.Text(), nullable=True, comment="Libelle Choix d'Amazon"))
op.add_column('products', sa.Column('discount_text', sa.Text(), nullable=True, comment='Texte de reduction affiche'))
op.add_column('products', sa.Column('stock_text', sa.Text(), nullable=True, comment='Texte brut du stock'))
op.add_column('products', sa.Column('in_stock', sa.Boolean(), nullable=True, comment='Disponibilite derivee'))
op.add_column('products', sa.Column('model_number', sa.Text(), nullable=True, comment='Numero du modele'))
op.add_column('products', sa.Column('model_name', sa.Text(), nullable=True, comment='Nom du modele'))
op.alter_column('products', 'source',
existing_type=sa.VARCHAR(length=50),
comment='Store ID (amazon, cdiscount, etc.)',
existing_nullable=False)
op.alter_column('products', 'reference',
existing_type=sa.VARCHAR(length=100),
comment='Product reference (ASIN, SKU, etc.)',
existing_nullable=False)
op.alter_column('products', 'url',
existing_type=sa.TEXT(),
comment='Canonical product URL',
existing_nullable=False)
op.alter_column('products', 'title',
existing_type=sa.TEXT(),
comment='Product title',
existing_nullable=True)
op.alter_column('products', 'category',
existing_type=sa.TEXT(),
comment='Product category (breadcrumb)',
existing_nullable=True)
op.alter_column('products', 'description',
existing_type=sa.TEXT(),
comment='Product description',
existing_nullable=True)
op.alter_column('products', 'currency',
existing_type=sa.VARCHAR(length=3),
comment='Currency code (EUR, USD, GBP)',
existing_nullable=True)
op.alter_column('products', 'msrp',
existing_type=sa.NUMERIC(precision=10, scale=2),
comment='Recommended price',
existing_nullable=True)
op.alter_column('products', 'first_seen_at',
existing_type=postgresql.TIMESTAMP(),
comment='First scraping timestamp',
existing_nullable=False)
op.alter_column('products', 'last_updated_at',
existing_type=postgresql.TIMESTAMP(),
comment='Last metadata update',
existing_nullable=False)
op.alter_column('scraping_logs', 'url',
existing_type=sa.TEXT(),
comment='Scraped URL',
existing_nullable=False)
op.alter_column('scraping_logs', 'source',
existing_type=sa.VARCHAR(length=50),
comment='Store ID (amazon, cdiscount, etc.)',
existing_nullable=False)
op.alter_column('scraping_logs', 'reference',
existing_type=sa.VARCHAR(length=100),
comment='Product reference (if extracted)',
existing_nullable=True)
op.alter_column('scraping_logs', 'fetch_method',
existing_type=sa.VARCHAR(length=20),
comment='Fetch method (http, playwright)',
existing_nullable=False)
op.alter_column('scraping_logs', 'fetch_status',
existing_type=sa.VARCHAR(length=20),
comment='Fetch status (success, partial, failed)',
existing_nullable=False)
op.alter_column('scraping_logs', 'fetched_at',
existing_type=postgresql.TIMESTAMP(),
comment='Scraping timestamp',
existing_nullable=False)
op.alter_column('scraping_logs', 'duration_ms',
existing_type=sa.INTEGER(),
comment='Fetch duration in milliseconds',
existing_nullable=True)
op.alter_column('scraping_logs', 'html_size_bytes',
existing_type=sa.INTEGER(),
comment='HTML response size in bytes',
existing_nullable=True)
op.alter_column('scraping_logs', 'errors',
existing_type=postgresql.JSONB(astext_type=sa.Text()),
comment='Error messages (list of strings)',
existing_nullable=True)
op.alter_column('scraping_logs', 'notes',
existing_type=postgresql.JSONB(astext_type=sa.Text()),
comment='Debug notes (list of strings)',
existing_nullable=True)
op.alter_column('webhooks', 'event',
existing_type=sa.VARCHAR(length=50),
comment='Event name',
existing_nullable=False)
op.alter_column('webhooks', 'url',
existing_type=sa.TEXT(),
comment='Webhook URL',
existing_nullable=False)
op.alter_column('webhooks', 'secret',
existing_type=sa.VARCHAR(length=200),
comment='Secret optionnel',
existing_nullable=True)
op.alter_column('webhooks', 'created_at',
existing_type=postgresql.TIMESTAMP(),
comment='Creation timestamp',
existing_nullable=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('webhooks', 'created_at',
existing_type=postgresql.TIMESTAMP(),
comment=None,
existing_comment='Creation timestamp',
existing_nullable=False)
op.alter_column('webhooks', 'secret',
existing_type=sa.VARCHAR(length=200),
comment=None,
existing_comment='Secret optionnel',
existing_nullable=True)
op.alter_column('webhooks', 'url',
existing_type=sa.TEXT(),
comment=None,
existing_comment='Webhook URL',
existing_nullable=False)
op.alter_column('webhooks', 'event',
existing_type=sa.VARCHAR(length=50),
comment=None,
existing_comment='Event name',
existing_nullable=False)
op.alter_column('scraping_logs', 'notes',
existing_type=postgresql.JSONB(astext_type=sa.Text()),
comment=None,
existing_comment='Debug notes (list of strings)',
existing_nullable=True)
op.alter_column('scraping_logs', 'errors',
existing_type=postgresql.JSONB(astext_type=sa.Text()),
comment=None,
existing_comment='Error messages (list of strings)',
existing_nullable=True)
op.alter_column('scraping_logs', 'html_size_bytes',
existing_type=sa.INTEGER(),
comment=None,
existing_comment='HTML response size in bytes',
existing_nullable=True)
op.alter_column('scraping_logs', 'duration_ms',
existing_type=sa.INTEGER(),
comment=None,
existing_comment='Fetch duration in milliseconds',
existing_nullable=True)
op.alter_column('scraping_logs', 'fetched_at',
existing_type=postgresql.TIMESTAMP(),
comment=None,
existing_comment='Scraping timestamp',
existing_nullable=False)
op.alter_column('scraping_logs', 'fetch_status',
existing_type=sa.VARCHAR(length=20),
comment=None,
existing_comment='Fetch status (success, partial, failed)',
existing_nullable=False)
op.alter_column('scraping_logs', 'fetch_method',
existing_type=sa.VARCHAR(length=20),
comment=None,
existing_comment='Fetch method (http, playwright)',
existing_nullable=False)
op.alter_column('scraping_logs', 'reference',
existing_type=sa.VARCHAR(length=100),
comment=None,
existing_comment='Product reference (if extracted)',
existing_nullable=True)
op.alter_column('scraping_logs', 'source',
existing_type=sa.VARCHAR(length=50),
comment=None,
existing_comment='Store ID (amazon, cdiscount, etc.)',
existing_nullable=False)
op.alter_column('scraping_logs', 'url',
existing_type=sa.TEXT(),
comment=None,
existing_comment='Scraped URL',
existing_nullable=False)
op.alter_column('products', 'last_updated_at',
existing_type=postgresql.TIMESTAMP(),
comment=None,
existing_comment='Last metadata update',
existing_nullable=False)
op.alter_column('products', 'first_seen_at',
existing_type=postgresql.TIMESTAMP(),
comment=None,
existing_comment='First scraping timestamp',
existing_nullable=False)
op.alter_column('products', 'msrp',
existing_type=sa.NUMERIC(precision=10, scale=2),
comment=None,
existing_comment='Recommended price',
existing_nullable=True)
op.alter_column('products', 'currency',
existing_type=sa.VARCHAR(length=3),
comment=None,
existing_comment='Currency code (EUR, USD, GBP)',
existing_nullable=True)
op.alter_column('products', 'description',
existing_type=sa.TEXT(),
comment=None,
existing_comment='Product description',
existing_nullable=True)
op.alter_column('products', 'category',
existing_type=sa.TEXT(),
comment=None,
existing_comment='Product category (breadcrumb)',
existing_nullable=True)
op.alter_column('products', 'title',
existing_type=sa.TEXT(),
comment=None,
existing_comment='Product title',
existing_nullable=True)
op.alter_column('products', 'url',
existing_type=sa.TEXT(),
comment=None,
existing_comment='Canonical product URL',
existing_nullable=False)
op.alter_column('products', 'reference',
existing_type=sa.VARCHAR(length=100),
comment=None,
existing_comment='Product reference (ASIN, SKU, etc.)',
existing_nullable=False)
op.alter_column('products', 'source',
existing_type=sa.VARCHAR(length=50),
comment=None,
existing_comment='Store ID (amazon, cdiscount, etc.)',
existing_nullable=False)
op.drop_column('products', 'model_name')
op.drop_column('products', 'model_number')
op.drop_column('products', 'in_stock')
op.drop_column('products', 'stock_text')
op.drop_column('products', 'discount_text')
op.drop_column('products', 'amazon_choice_label')
op.drop_column('products', 'amazon_choice')
op.drop_column('products', 'rating_count')
op.drop_column('products', 'rating_value')
op.alter_column('product_specs', 'spec_value',
existing_type=sa.TEXT(),
comment=None,
existing_comment='Specification value',
existing_nullable=False)
op.alter_column('product_specs', 'spec_key',
existing_type=sa.VARCHAR(length=200),
comment=None,
existing_comment="Specification key (e.g., 'Brand', 'Color')",
existing_nullable=False)
op.alter_column('product_images', 'position',
existing_type=sa.INTEGER(),
comment=None,
existing_comment='Image position (0=main)',
existing_nullable=False)
op.alter_column('product_images', 'image_url',
existing_type=sa.TEXT(),
comment=None,
existing_comment='Image URL',
existing_nullable=False)
op.alter_column('price_history', 'fetched_at',
existing_type=postgresql.TIMESTAMP(),
comment=None,
existing_comment='Scraping timestamp',
existing_nullable=False)
op.alter_column('price_history', 'fetch_status',
existing_type=sa.VARCHAR(length=20),
comment=None,
existing_comment='Fetch status (success, partial, failed)',
existing_nullable=False)
op.alter_column('price_history', 'fetch_method',
existing_type=sa.VARCHAR(length=20),
comment=None,
existing_comment='Fetch method (http, playwright)',
existing_nullable=False)
op.alter_column('price_history', 'stock_status',
existing_type=sa.VARCHAR(length=20),
comment=None,
existing_comment='Stock status (in_stock, out_of_stock, unknown)',
existing_nullable=True)
op.alter_column('price_history', 'shipping_cost',
existing_type=sa.NUMERIC(precision=10, scale=2),
comment=None,
existing_comment='Shipping cost',
existing_nullable=True)
op.alter_column('price_history', 'price',
existing_type=sa.NUMERIC(precision=10, scale=2),
comment=None,
existing_comment='Product price',
existing_nullable=True)
# ### end Alembic commands ###

View File

@@ -0,0 +1,28 @@
"""Ajout champs Amazon produit
Revision ID: 1467e98fcbea
Revises: 3e68b0f0c9e4
Create Date: 2026-01-17 20:08:32.991650
"""
from alembic import op
import sqlalchemy as sa
# Revision identifiers, used by Alembic.
revision = '1467e98fcbea'
down_revision = '3e68b0f0c9e4'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

View File

@@ -0,0 +1,114 @@
"""Ajout classification rules et type produit
Revision ID: 20260117_03_classification_rules
Revises: 3e68b0f0c9e4
Create Date: 2026-01-17 20:05:00.000000
"""
from datetime import datetime, timezone
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# Revision identifiers, used by Alembic.
revision = "20260117_03_classification_rules"
down_revision = "3e68b0f0c9e4"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"products",
sa.Column("type", sa.Text(), nullable=True, comment="Product type"),
)
op.create_table(
"classification_rules",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
sa.Column("category", sa.String(length=80), nullable=True, comment="Categorie cible"),
sa.Column("type", sa.String(length=80), nullable=True, comment="Type cible"),
sa.Column(
"keywords",
postgresql.JSONB(astext_type=sa.Text()),
nullable=False,
comment="Mots-cles de matching",
),
sa.Column("sort_order", sa.Integer(), nullable=False, server_default="0"),
sa.Column("is_active", sa.Boolean(), nullable=False, server_default=sa.text("true")),
sa.Column(
"created_at",
sa.TIMESTAMP(),
nullable=False,
server_default=sa.text("CURRENT_TIMESTAMP"),
comment="Creation timestamp",
),
)
op.create_index("ix_classification_rule_order", "classification_rules", ["sort_order"])
op.create_index("ix_classification_rule_active", "classification_rules", ["is_active"])
rules_table = sa.table(
"classification_rules",
sa.column("category", sa.String),
sa.column("type", sa.String),
sa.column("keywords", postgresql.JSONB),
sa.column("sort_order", sa.Integer),
sa.column("is_active", sa.Boolean),
sa.column("created_at", sa.TIMESTAMP),
)
now = datetime.now(timezone.utc)
op.bulk_insert(
rules_table,
[
{
"category": "Informatique",
"type": "Ecran",
"keywords": ["ecran", "moniteur", "display"],
"sort_order": 0,
"is_active": True,
"created_at": now,
},
{
"category": "Informatique",
"type": "PC portable",
"keywords": ["pc portable", "ordinateur portable", "laptop", "notebook"],
"sort_order": 1,
"is_active": True,
"created_at": now,
},
{
"category": "Informatique",
"type": "Unite centrale",
"keywords": ["unite centrale", "tour", "desktop", "pc fixe"],
"sort_order": 2,
"is_active": True,
"created_at": now,
},
{
"category": "Informatique",
"type": "Clavier",
"keywords": ["clavier", "keyboard"],
"sort_order": 3,
"is_active": True,
"created_at": now,
},
{
"category": "Informatique",
"type": "Souris",
"keywords": ["souris", "mouse"],
"sort_order": 4,
"is_active": True,
"created_at": now,
},
],
)
def downgrade() -> None:
op.drop_index("ix_classification_rule_active", table_name="classification_rules")
op.drop_index("ix_classification_rule_order", table_name="classification_rules")
op.drop_table("classification_rules")
op.drop_column("products", "type")

View File

@@ -0,0 +1,28 @@
"""Ajout champs Amazon produit
Revision ID: 3e68b0f0c9e4
Revises: 0014e51c4927
Create Date: 2026-01-17 19:45:03.730218
"""
from alembic import op
import sqlalchemy as sa
# Revision identifiers, used by Alembic.
revision = '3e68b0f0c9e4'
down_revision = '0014e51c4927'
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

View File

@@ -84,6 +84,36 @@ class Product(Base):
msrp: Mapped[Optional[Decimal]] = mapped_column(
Numeric(10, 2), nullable=True, comment="Recommended price"
)
type: Mapped[Optional[str]] = mapped_column(
Text, nullable=True, comment="Product type"
)
rating_value: Mapped[Optional[Decimal]] = mapped_column(
Numeric(3, 2), nullable=True, comment="Note moyenne"
)
rating_count: Mapped[Optional[int]] = mapped_column(
Integer, nullable=True, comment="Nombre d'evaluations"
)
amazon_choice: Mapped[Optional[bool]] = mapped_column(
Boolean, nullable=True, comment="Badge Choix d'Amazon"
)
amazon_choice_label: Mapped[Optional[str]] = mapped_column(
Text, nullable=True, comment="Libelle Choix d'Amazon"
)
discount_text: Mapped[Optional[str]] = mapped_column(
Text, nullable=True, comment="Texte de reduction affiche"
)
stock_text: Mapped[Optional[str]] = mapped_column(
Text, nullable=True, comment="Texte brut du stock"
)
in_stock: Mapped[Optional[bool]] = mapped_column(
Boolean, nullable=True, comment="Disponibilite derivee"
)
model_number: Mapped[Optional[str]] = mapped_column(
Text, nullable=True, comment="Numero du modele"
)
model_name: Mapped[Optional[str]] = mapped_column(
Text, nullable=True, comment="Nom du modele"
)
# Timestamps
first_seen_at: Mapped[datetime] = mapped_column(
@@ -331,6 +361,45 @@ class ScrapingLog(Base):
return f"<ScrapingLog(id={self.id}, url={self.url}, status={self.fetch_status}, fetched_at={self.fetched_at})>"
class ClassificationRule(Base):
"""
Regles de classification categorie/type basees sur des mots-cles.
"""
__tablename__ = "classification_rules"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
category: Mapped[Optional[str]] = mapped_column(
String(80), nullable=True, comment="Categorie cible"
)
type: Mapped[Optional[str]] = mapped_column(
String(80), nullable=True, comment="Type cible"
)
keywords: Mapped[list[str]] = mapped_column(
JSON().with_variant(JSONB, "postgresql"),
nullable=False,
default=list,
comment="Mots-cles de matching",
)
sort_order: Mapped[int] = mapped_column(
Integer, nullable=False, default=0, comment="Ordre de priorite (0=haut)"
)
is_active: Mapped[bool] = mapped_column(
Boolean, nullable=False, default=True, comment="Regle active"
)
created_at: Mapped[datetime] = mapped_column(
TIMESTAMP, nullable=False, default=utcnow, comment="Creation timestamp"
)
__table_args__ = (
Index("ix_classification_rule_order", "sort_order"),
Index("ix_classification_rule_active", "is_active"),
)
def __repr__(self) -> str:
return f"<ClassificationRule(id={self.id}, category={self.category}, type={self.type})>"
class Webhook(Base):
"""
Webhooks pour notifications externes.

View File

@@ -13,7 +13,14 @@ from sqlalchemy.orm import Session
from pricewatch.app.core.logging import get_logger
from pricewatch.app.core.schema import ProductSnapshot
from pricewatch.app.db.models import PriceHistory, Product, ProductImage, ProductSpec, ScrapingLog
from pricewatch.app.db.models import (
ClassificationRule,
PriceHistory,
Product,
ProductImage,
ProductSpec,
ScrapingLog,
)
logger = get_logger("db.repository")
@@ -49,12 +56,58 @@ class ProductRepository:
product.title = snapshot.title
if snapshot.category:
product.category = snapshot.category
if snapshot.type:
product.type = snapshot.type
if snapshot.description:
product.description = snapshot.description
if snapshot.currency:
product.currency = snapshot.currency
if snapshot.msrp is not None:
product.msrp = snapshot.msrp
if snapshot.rating_value is not None:
product.rating_value = snapshot.rating_value
if snapshot.rating_count is not None:
product.rating_count = snapshot.rating_count
if snapshot.amazon_choice is not None:
product.amazon_choice = snapshot.amazon_choice
if snapshot.amazon_choice_label:
product.amazon_choice_label = snapshot.amazon_choice_label
if snapshot.discount_text:
product.discount_text = snapshot.discount_text
if snapshot.stock_text:
product.stock_text = snapshot.stock_text
if snapshot.in_stock is not None:
product.in_stock = snapshot.in_stock
if snapshot.model_number:
product.model_number = snapshot.model_number
if snapshot.model_name:
product.model_name = snapshot.model_name
def apply_classification(self, snapshot: ProductSnapshot) -> None:
"""Applique les regles de classification au snapshot."""
if not snapshot.title:
return
rules = (
self.session.query(ClassificationRule)
.filter(ClassificationRule.is_active == True)
.order_by(ClassificationRule.sort_order, ClassificationRule.id)
.all()
)
if not rules:
return
title = snapshot.title.lower()
for rule in rules:
keywords = rule.keywords or []
if isinstance(keywords, str):
keywords = [keywords]
if any(keyword and keyword.lower() in title for keyword in keywords):
if rule.category:
snapshot.category = rule.category
if rule.type:
snapshot.type = rule.type
return
def add_price_history(self, product: Product, snapshot: ProductSnapshot) -> Optional[PriceHistory]:
"""Ajoute une entree d'historique de prix si inexistante."""

Binary file not shown.

View File

@@ -25,7 +25,12 @@ class ScrapingPipeline:
def __init__(self, config: Optional[AppConfig] = None) -> None:
self.config = config
def process_snapshot(self, snapshot: ProductSnapshot, save_to_db: bool = True) -> Optional[int]:
def process_snapshot(
self,
snapshot: ProductSnapshot,
save_to_db: bool = True,
apply_classification: bool = True,
) -> Optional[int]:
"""
Persiste un snapshot en base si active.
@@ -39,6 +44,8 @@ class ScrapingPipeline:
try:
with get_session(app_config) as session:
repo = ProductRepository(session)
if apply_classification:
repo.apply_classification(snapshot)
product_id = repo.safe_save_snapshot(snapshot)
session.commit()
return product_id

View File

@@ -45,6 +45,8 @@ def fetch_playwright(
timeout_ms: int = 60000,
save_screenshot: bool = False,
wait_for_selector: Optional[str] = None,
wait_for_network_idle: bool = False,
extra_wait_ms: int = 0,
) -> PlaywrightFetchResult:
"""
Récupère une page avec Playwright.
@@ -55,6 +57,8 @@ def fetch_playwright(
timeout_ms: Timeout en millisecondes
save_screenshot: Prendre un screenshot
wait_for_selector: Attendre un sélecteur CSS avant de récupérer
wait_for_network_idle: Attendre que le réseau soit inactif (pour SPA)
extra_wait_ms: Délai supplémentaire après chargement (pour JS lent)
Returns:
PlaywrightFetchResult avec HTML, screenshot (optionnel), ou erreur
@@ -65,6 +69,8 @@ def fetch_playwright(
- Headful disponible pour debug visuel
- Screenshot optionnel pour diagnostiquer les échecs
- wait_for_selector permet d'attendre le chargement dynamique
- wait_for_network_idle utile pour les SPA qui chargent via AJAX
- extra_wait_ms pour les sites avec JS lent après DOM ready
"""
if not url or not url.strip():
logger.error("URL vide fournie")
@@ -101,7 +107,8 @@ def fetch_playwright(
# Naviguer vers la page
logger.debug(f"[Playwright] Navigation vers {url}")
response = page.goto(url, wait_until="domcontentloaded")
wait_until = "networkidle" if wait_for_network_idle else "domcontentloaded"
response = page.goto(url, wait_until=wait_until)
if not response:
raise Exception("Pas de réponse du serveur")
@@ -116,6 +123,11 @@ def fetch_playwright(
f"[Playwright] Timeout en attendant le sélecteur: {wait_for_selector}"
)
# Délai supplémentaire pour JS lent (SPA)
if extra_wait_ms > 0:
logger.debug(f"[Playwright] Attente supplémentaire: {extra_wait_ms}ms")
page.wait_for_timeout(extra_wait_ms)
# Récupérer le HTML
html = page.content()

BIN
pricewatch/app/stores/__pycache__/base.cpython-313.pyc Executable file → Normal file

Binary file not shown.

Binary file not shown.

View File

@@ -29,13 +29,39 @@ logger = get_logger("stores.aliexpress")
class AliexpressStore(BaseStore):
"""Store pour AliExpress.com (marketplace chinois)."""
"""Store pour AliExpress.com (marketplace chinois).
AliExpress est une SPA (Single Page Application) qui charge
le contenu via JavaScript/AJAX. Nécessite Playwright avec
attente du chargement dynamique.
"""
def __init__(self):
"""Initialise le store AliExpress avec ses sélecteurs."""
selectors_path = Path(__file__).parent / "selectors.yml"
super().__init__(store_id="aliexpress", selectors_path=selectors_path)
def get_spa_config(self) -> dict:
"""
Configuration SPA pour AliExpress.
AliExpress charge les données produit (prix, titre) via AJAX.
Il faut attendre que le réseau soit inactif ET ajouter un délai
pour laisser le JS terminer le rendu.
Returns:
Configuration Playwright pour SPA
"""
return {
"wait_for_network_idle": True,
"wait_for_selector": "h1", # Titre du produit
"extra_wait_ms": 2000, # 2s pour le rendu JS
}
def requires_playwright(self) -> bool:
"""AliExpress nécessite Playwright pour le rendu SPA."""
return True
def match(self, url: str) -> float:
"""
Détecte si l'URL est AliExpress.
@@ -206,28 +232,71 @@ class AliexpressStore(BaseStore):
Extrait le prix.
AliExpress n'a PAS de sélecteur CSS stable pour le prix.
On utilise regex sur le HTML brut.
Stratégie multi-niveaux:
1. Chercher dans les données JSON embarquées
2. Chercher dans les spans avec classes contenant "price"
3. Regex sur le HTML brut
4. Meta tags og:price
"""
# Pattern 1: Prix avant € (ex: "136,69 €")
match = re.search(r"([0-9][0-9\\s.,\\u00a0\\u202f\\u2009]*)\\s*€", html)
# Priorité 1: Extraire depuis JSON embarqué (skuActivityAmount, formattedActivityPrice)
json_patterns = [
r'"skuActivityAmount"\s*:\s*\{\s*"value"\s*:\s*(\d+(?:\.\d+)?)', # {"value": 123.45}
r'"formattedActivityPrice"\s*:\s*"([0-9,.\s]+)\s*€"', # "123,45 €"
r'"formattedActivityPrice"\s*:\s*"\s*([0-9,.\s]+)"', # "€ 123.45"
r'"minPrice"\s*:\s*"([0-9,.\s]+)"', # "minPrice": "123.45"
r'"price"\s*:\s*"([0-9,.\s]+)"', # "price": "123.45"
r'"activityAmount"\s*:\s*\{\s*"value"\s*:\s*(\d+(?:\.\d+)?)', # activityAmount.value
]
for pattern in json_patterns:
match = re.search(pattern, html)
if match:
price = parse_price_text(match.group(1))
if price is not None and price > 0:
debug.notes.append(f"Prix extrait depuis JSON: {price}")
return price
# Priorité 2: Chercher dans les spans/divs avec classes contenant "price"
price_selectors = [
'span[class*="price--current"]',
'span[class*="price--sale"]',
'div[class*="price--current"]',
'span[class*="product-price"]',
'span[class*="Price_Price"]',
'div[class*="es--wrap"]', # Structure AliExpress spécifique
]
for selector in price_selectors:
elements = soup.select(selector)
for elem in elements:
text = elem.get_text(strip=True)
# Chercher un prix dans le texte
price_match = re.search(r'(\d+[,.\s]*\d*)\s*€|€\s*(\d+[,.\s]*\d*)', text)
if price_match:
price_str = price_match.group(1) or price_match.group(2)
price = parse_price_text(price_str)
if price is not None and price > 0:
debug.notes.append(f"Prix extrait depuis sélecteur {selector}")
return price
# Priorité 3: Prix avant € (ex: "136,69€" ou "136,69 €")
match = re.search(r'(\d+[,.\s\u00a0\u202f\u2009]*\d*)\s*€', html)
if match:
price = parse_price_text(match.group(1))
if price is not None:
if price is not None and price > 0:
return price
# Pattern 2: € avant prix (ex: "€ 136.69")
match = re.search(r"\\s*([0-9][0-9\\s.,\\u00a0\\u202f\\u2009]*)", html)
# Priorité 4: € avant prix (ex: "€136.69" ou "€ 136.69")
match = re.search(r'\s*(\d+[,.\s\u00a0\u202f\u2009]*\d*)', html)
if match:
price = parse_price_text(match.group(1))
if price is not None:
if price is not None and price > 0:
return price
# Pattern 3: Chercher dans meta tags (moins fiable)
# Priorité 5: Chercher dans meta tags (moins fiable)
og_price = soup.find("meta", property="og:price:amount")
if og_price:
price_str = og_price.get("content", "")
price = parse_price_text(price_str)
if price is not None:
if price is not None and price > 0:
return price
debug.errors.append("Prix non trouvé")
@@ -235,7 +304,7 @@ class AliexpressStore(BaseStore):
def _extract_msrp(self, html: str, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix conseille si present."""
match = re.search(r"originalPrice\"\\s*:\\s*\"([0-9\\s.,]+)\"", html)
match = re.search(r'originalPrice"\s*:\s*"([0-9\s.,]+)"', html)
if match:
price = parse_price_text(match.group(1))
if price is not None:

View File

@@ -15,6 +15,13 @@ price:
- "#priceblock_dealprice"
- ".a-price-range .a-price .a-offscreen"
# Texte de réduction explicite
discount_text:
- "#regularprice_savings"
- "#dealprice_savings"
- "#savingsPercentage"
- "span.savingsPercentage"
# Devise (généralement dans le symbole)
currency:
- "span.a-price-symbol"
@@ -32,6 +39,24 @@ stock_status:
- "#availability"
- ".a-declarative .a-size-medium"
# Note moyenne
rating_value:
- "#acrPopover"
- "#averageCustomerReviews .a-icon-alt"
- "#averageCustomerReviews span.a-icon-alt"
# Nombre d'évaluations
rating_count:
- "#acrCustomerReviewText"
- "#acrCustomerReviewLink"
# Badge Choix d'Amazon
amazon_choice:
- "#acBadge_feature_div"
- "#acBadge_feature_div .ac-badge"
- "#acBadge_feature_div .ac-badge-rectangle"
- "#acBadge_feature_div .ac-badge-rectangle-icon"
# Images produit
images:
- "#landingImage"
@@ -44,6 +69,13 @@ category:
- "#wayfinding-breadcrumbs_feature_div"
- ".a-breadcrumb"
# Description (détails de l'article)
description:
- "#detailBullets_feature_div"
- "#detailBulletsWrapper_feature_div"
- "#productDetails_detailBullets_sections1"
- "#feature-bullets"
# Caractéristiques techniques (table specs)
specs_table:
- "#productDetails_techSpec_section_1"

View File

@@ -130,13 +130,19 @@ class AmazonStore(BaseStore):
title = self._extract_title(soup, debug_info)
price = self._extract_price(soup, debug_info)
currency = self._extract_currency(soup, debug_info)
stock_status = self._extract_stock(soup, debug_info)
images = self._extract_images(soup, debug_info)
stock_status, stock_text, in_stock = self._extract_stock_details(soup, debug_info)
main_image, gallery_images, images = self._extract_images(soup, debug_info)
category = self._extract_category(soup, debug_info)
specs = self._extract_specs(soup, debug_info)
description = self._extract_description(soup, debug_info)
msrp = self._extract_msrp(soup, debug_info)
reference = self.extract_reference(url) or self._extract_asin_from_html(soup)
rating_value = self._extract_rating_value(soup, debug_info)
rating_count = self._extract_rating_count(soup, debug_info)
amazon_choice, amazon_choice_label = self._extract_amazon_choice(soup, debug_info)
discount_text = self._extract_discount_text(soup, debug_info)
model_number, model_name = self._extract_model_details(specs)
asin = reference
# Déterminer le statut final (ne pas écraser FAILED)
if debug_info.status != DebugStatus.FAILED:
@@ -153,12 +159,24 @@ class AmazonStore(BaseStore):
currency=currency or "EUR",
shipping_cost=None, # Difficile à extraire
stock_status=stock_status,
stock_text=stock_text,
in_stock=in_stock,
reference=reference,
asin=asin,
category=category,
description=description,
images=images,
main_image=main_image,
gallery_images=gallery_images,
specs=specs,
msrp=msrp,
rating_value=rating_value,
rating_count=rating_count,
amazon_choice=amazon_choice,
amazon_choice_label=amazon_choice_label,
discount_text=discount_text,
model_number=model_number,
model_name=model_name,
debug=debug_info,
)
@@ -203,18 +221,43 @@ class AmazonStore(BaseStore):
return None
def _extract_description(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait la description (meta tags)."""
meta = soup.find("meta", property="og:description") or soup.find(
"meta", attrs={"name": "description"}
)
if meta:
description = meta.get("content", "").strip()
if description:
return description
"""Extrait la description depuis les détails de l'article."""
selectors = self.get_selector("description", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if not element:
continue
items = [
item.get_text(" ", strip=True)
for item in element.select("li")
if item.get_text(strip=True)
]
if items:
return "\n".join(items)
text = " ".join(element.stripped_strings)
if text:
return text
return None
def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix."""
# Priorité 1: combiner les spans séparés a-price-whole et a-price-fraction
# C'est le format le plus courant sur Amazon pour les prix avec centimes séparés
whole = soup.select_one("span.a-price-whole")
fraction = soup.select_one("span.a-price-fraction")
if whole and fraction:
whole_text = whole.get_text(strip=True).rstrip(",.")
fraction_text = fraction.get_text(strip=True)
if whole_text and fraction_text:
price = parse_price_text(f"{whole_text}.{fraction_text}")
if price is not None:
return price
# Priorité 2: essayer les sélecteurs (incluant a-price-whole seul avec prix complet)
selectors = self.get_selector("price", [])
if isinstance(selectors, str):
selectors = [selectors]
@@ -227,16 +270,6 @@ class AmazonStore(BaseStore):
if price is not None:
return price
# Fallback: chercher les spans séparés a-price-whole et a-price-fraction
whole = soup.select_one("span.a-price-whole")
fraction = soup.select_one("span.a-price-fraction")
if whole and fraction:
whole_text = whole.get_text(strip=True)
fraction_text = fraction.get_text(strip=True)
price = parse_price_text(f"{whole_text}.{fraction_text}")
if price is not None:
return price
debug.errors.append("Prix non trouvé")
return None
@@ -268,8 +301,10 @@ class AmazonStore(BaseStore):
# Défaut basé sur le domaine
return "EUR"
def _extract_stock(self, soup: BeautifulSoup, debug: DebugInfo) -> StockStatus:
"""Extrait le statut de stock."""
def _extract_stock_details(
self, soup: BeautifulSoup, debug: DebugInfo
) -> tuple[StockStatus, Optional[str], Optional[bool]]:
"""Extrait le statut de stock avec texte brut."""
selectors = self.get_selector("stock_status", [])
if isinstance(selectors, str):
selectors = [selectors]
@@ -277,22 +312,27 @@ class AmazonStore(BaseStore):
for selector in selectors:
element = soup.select_one(selector)
if element:
text = element.get_text(strip=True).lower()
if "en stock" in text or "available" in text or "in stock" in text:
return StockStatus.IN_STOCK
text = element.get_text(strip=True)
normalized = text.lower()
if "en stock" in normalized or "available" in normalized or "in stock" in normalized:
return StockStatus.IN_STOCK, text, True
elif (
"rupture" in text
or "indisponible" in text
or "out of stock" in text
"rupture" in normalized
or "indisponible" in normalized
or "out of stock" in normalized
):
return StockStatus.OUT_OF_STOCK
return StockStatus.OUT_OF_STOCK, text, False
return StockStatus.UNKNOWN
return StockStatus.UNKNOWN, None, None
def _extract_images(self, soup: BeautifulSoup, debug: DebugInfo) -> list[str]:
"""Extrait les URLs d'images."""
images = []
seen = set()
def _extract_images(
self, soup: BeautifulSoup, debug: DebugInfo
) -> tuple[Optional[str], list[str], list[str]]:
"""Extrait l'image principale et la galerie."""
images: list[str] = []
seen: set[str] = set()
main_image: Optional[str] = None
max_gallery = 15
selectors = self.get_selector("images", [])
if isinstance(selectors, str):
selectors = [selectors]
@@ -306,6 +346,8 @@ class AmazonStore(BaseStore):
if self._is_product_image(url) and url not in seen:
images.append(url)
seen.add(url)
if main_image is None:
main_image = url
dynamic = element.get("data-a-dynamic-image")
if dynamic:
urls = self._extract_dynamic_images(dynamic)
@@ -313,6 +355,8 @@ class AmazonStore(BaseStore):
if self._is_product_image(dyn_url) and dyn_url not in seen:
images.append(dyn_url)
seen.add(dyn_url)
if main_image is None:
main_image = dyn_url
# Fallback: chercher tous les img tags si aucune image trouvée
if not images:
@@ -323,8 +367,15 @@ class AmazonStore(BaseStore):
if url not in seen:
images.append(url)
seen.add(url)
if main_image is None:
main_image = url
return images
if main_image is None and images:
main_image = images[0]
gallery_images = [url for url in images if url != main_image]
gallery_images = gallery_images[:max_gallery]
final_images = [main_image] + gallery_images if main_image else gallery_images
return main_image, gallery_images, final_images
def _extract_dynamic_images(self, raw: str) -> list[str]:
"""Extrait les URLs du JSON data-a-dynamic-image."""
@@ -390,8 +441,111 @@ class AmazonStore(BaseStore):
if key and value:
specs[key] = value
# Détails de l'article sous forme de liste
detail_list = soup.select("#detailBullets_feature_div li")
for item in detail_list:
text = item.get_text(" ", strip=True)
if ":" not in text:
continue
key, value = text.split(":", 1)
key = key.strip()
value = value.strip()
if key and value and key not in specs:
specs[key] = value
return specs
def _extract_rating_value(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait la note moyenne."""
selectors = self.get_selector("rating_value", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if not element:
continue
text = element.get_text(" ", strip=True) or element.get("title", "").strip()
match = re.search(r"([\d.,]+)", text)
if match:
value = match.group(1).replace(",", ".")
try:
return float(value)
except ValueError:
continue
return None
def _extract_rating_count(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[int]:
"""Extrait le nombre d'évaluations."""
selectors = self.get_selector("rating_count", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if not element:
continue
text = element.get_text(" ", strip=True)
match = re.search(r"([\d\s\u202f\u00a0]+)", text)
if match:
numeric = re.sub(r"[^\d]", "", match.group(1))
if numeric:
return int(numeric)
return None
def _extract_amazon_choice(
self, soup: BeautifulSoup, debug: DebugInfo
) -> tuple[Optional[bool], Optional[str]]:
"""Extrait le badge Choix d'Amazon."""
selectors = self.get_selector("amazon_choice", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if element:
label_candidates = [
element.get_text(" ", strip=True),
element.get("aria-label", "").strip(),
element.get("title", "").strip(),
element.get("data-a-badge-label", "").strip(),
]
label = next((item for item in label_candidates if item), "")
normalized = label.lower()
if "choix d'amazon" in normalized or "amazon's choice" in normalized:
return True, label
if label:
return True, label
return True, None
return None, None
def _extract_discount_text(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
"""Extrait le texte de réduction explicite."""
selectors = self.get_selector("discount_text", [])
if isinstance(selectors, str):
selectors = [selectors]
for selector in selectors:
element = soup.select_one(selector)
if not element:
continue
text = element.get_text(" ", strip=True)
if text:
return text
return None
def _extract_model_details(self, specs: dict[str, str]) -> tuple[Optional[str], Optional[str]]:
"""Extrait le numero et le nom du modele depuis les specs."""
model_number = None
model_name = None
for key, value in specs.items():
normalized = key.lower()
if "numéro du modèle de l'article" in normalized or "numero du modele de l'article" in normalized:
model_number = value
if "nom du modèle" in normalized or "nom du modele" in normalized:
model_name = value
return model_number, model_name
def _extract_asin_from_html(self, soup: BeautifulSoup) -> Optional[str]:
"""Extrait l'ASIN depuis le HTML (fallback)."""
selectors = self.get_selector("asin", [])

Binary file not shown.

View File

@@ -152,5 +152,32 @@ class BaseStore(ABC):
"""
return self.selectors.get(key, default)
def get_spa_config(self) -> Optional[dict]:
"""
Retourne la configuration SPA pour Playwright si ce store est une SPA.
Returns:
dict avec les options Playwright ou None si pas une SPA:
- wait_for_selector: Sélecteur CSS à attendre avant scraping
- wait_for_network_idle: Attendre que le réseau soit inactif
- extra_wait_ms: Délai supplémentaire après chargement
Par défaut retourne None (pas de config SPA spécifique).
Les stores SPA doivent surcharger cette méthode.
"""
return None
def requires_playwright(self) -> bool:
"""
Indique si ce store nécessite obligatoirement Playwright.
Returns:
True si Playwright est requis, False sinon
Par défaut False. Les stores avec anti-bot agressif ou
rendu SPA obligatoire doivent surcharger cette méthode.
"""
return False
def __repr__(self) -> str:
return f"<{self.__class__.__name__} id={self.store_id}>"

Binary file not shown.

View File

@@ -112,7 +112,7 @@ class CdiscountStore(BaseStore):
currency = self._extract_currency(soup, debug_info)
stock_status = self._extract_stock(soup, debug_info)
images = self._extract_images(soup, debug_info)
category = self._extract_category(soup, debug_info)
category = self._extract_category(soup, debug_info, url)
specs = self._extract_specs(soup, debug_info)
description = self._extract_description(soup, debug_info)
msrp = self._extract_msrp(soup, debug_info)
@@ -180,7 +180,7 @@ class CdiscountStore(BaseStore):
return None
def _extract_price(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix."""
"""Extrait le prix (DOM puis JSON-LD)."""
selectors = self.get_selector("price", [])
if isinstance(selectors, str):
selectors = [selectors]
@@ -188,16 +188,33 @@ class CdiscountStore(BaseStore):
for selector in selectors:
elements = soup.select(selector)
for element in elements:
# Attribut content (schema.org) ou texte
price_text = element.get("content") or element.get_text(strip=True)
price = parse_price_text(price_text)
if price is not None:
return price
price = self._extract_price_from_json_ld(soup)
if price is not None:
return price
debug.errors.append("Prix non trouvé")
return None
def _extract_price_from_json_ld(self, soup: BeautifulSoup) -> Optional[float]:
"""Extrait le prix depuis les scripts JSON-LD."""
product_ld = self._find_product_ld(soup)
offers = product_ld.get("offers")
if isinstance(offers, list):
offers = offers[0] if offers else None
if isinstance(offers, dict):
price = offers.get("price")
if isinstance(price, str):
return parse_price_text(price)
if isinstance(price, (int, float)):
# convert to float but maintain decimals
return float(price)
return None
def _extract_msrp(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[float]:
"""Extrait le prix conseille."""
selectors = [
@@ -205,6 +222,8 @@ class CdiscountStore(BaseStore):
".price__old",
".c-price__strike",
".price-strike",
"div[data-e2e='strikedPrice']",
"div.SecondaryPrice-price",
]
for selector in selectors:
element = soup.select_one(selector)
@@ -212,6 +231,19 @@ class CdiscountStore(BaseStore):
price = parse_price_text(element.get_text(strip=True))
if price is not None:
return price
# Fallback: JSON-LD (offers price + promotions)
product_ld = self._find_product_ld(soup)
offer = product_ld.get("offers")
if isinstance(offer, dict):
price = offer.get("price")
if isinstance(price, str):
candidate = parse_price_text(price)
elif isinstance(price, (int, float)):
candidate = float(price)
else:
candidate = None
if candidate is not None:
return candidate
return None
def _extract_currency(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
@@ -288,7 +320,7 @@ class CdiscountStore(BaseStore):
return list(dict.fromkeys(images)) # Préserver lordre
def _extract_category(self, soup: BeautifulSoup, debug: DebugInfo) -> Optional[str]:
def _extract_category(self, soup: BeautifulSoup, debug: DebugInfo, url: str) -> Optional[str]:
"""Extrait la catégorie depuis les breadcrumbs."""
selectors = self.get_selector("category", [])
if isinstance(selectors, str):
@@ -310,6 +342,54 @@ class CdiscountStore(BaseStore):
if parts:
return parts[-1]
if title := self._extract_category_from_breadcrumbs(soup):
return title
return self._extract_category_from_url(url)
def _extract_category_from_breadcrumbs(self, soup: BeautifulSoup) -> Optional[str]:
"""Cherche un breadcrumb via JSON-LD (BreadcrumbList) et retourne l'avant-dernier item."""
entries = self._extract_json_ld_entries(soup)
for entry in entries:
if not isinstance(entry, dict):
continue
if entry.get("@type") != "BreadcrumbList":
continue
items = entry.get("itemListElement", [])
if not isinstance(items, list):
continue
positions = [
element.get("position")
for element in items
if isinstance(element, dict) and isinstance(element.get("position"), int)
]
max_pos = max(positions) if positions else None
for element in reversed(items):
if not isinstance(element, dict):
continue
position = element.get("position")
if max_pos is not None and position == max_pos:
continue
item = element.get("item", {})
name = item.get("name")
if name and isinstance(name, str):
title = name.strip()
if title:
return title
return None
def _extract_category_from_url(self, url: str) -> Optional[str]:
"""Déduit la catégorie via l'URL /informatique/.../f-..."""
if not url:
return None
parsed = urlparse(url)
segments = [seg for seg in parsed.path.split("/") if seg]
breadcrumb = []
for segment in segments:
if segment.startswith("f-") or segment.startswith("p-"):
break
breadcrumb.append(segment)
if breadcrumb:
return breadcrumb[-1].replace("-", " ").title()
return None
def _extract_json_ld_entries(self, soup: BeautifulSoup) -> list[dict]:

View File

@@ -17,6 +17,18 @@ def parse_price_text(text: str) -> Optional[float]:
if not text:
return None
euro_suffix = re.search(r"([0-9 .,]+)\s*€\s*(\d{2})\b", text)
if euro_suffix:
integer_part = euro_suffix.group(1)
decimal_part = euro_suffix.group(2)
integer_clean = re.sub(r"[^\d]", "", integer_part)
if integer_clean:
cleaned_decimal = f"{integer_clean}.{decimal_part}"
try:
return float(cleaned_decimal)
except ValueError:
pass
# Fallback to original replacement if suffix logic fails
text = re.sub(r"(\d)\s*€\s*(\d)", r"\1,\2", text)
cleaned = text.replace("\u00a0", " ").replace("\u202f", " ").replace("\u2009", " ")
cleaned = "".join(ch for ch in cleaned if ch.isdigit() or ch in ".,")

View File

@@ -6,6 +6,7 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
import hashlib
from typing import Optional
import redis
@@ -127,11 +128,13 @@ class ScrapingScheduler:
interval_hours: int = 24,
use_playwright: Optional[bool] = None,
save_db: bool = True,
job_id: Optional[str] = None,
) -> ScheduledJobInfo:
"""Planifie un scraping recurrent (intervalle en heures)."""
interval_seconds = int(timedelta(hours=interval_hours).total_seconds())
next_run = datetime.now(timezone.utc) + timedelta(seconds=interval_seconds)
resolved_job_id = job_id or self._job_id_for_url(url)
job = self.scheduler.schedule(
scheduled_time=next_run,
func=scrape_product,
@@ -139,6 +142,13 @@ class ScrapingScheduler:
kwargs={"use_playwright": use_playwright, "save_db": save_db},
interval=interval_seconds,
repeat=None,
id=resolved_job_id,
)
logger.info(f"Job planifie: {job.id}, prochaine execution: {next_run.isoformat()}")
return ScheduledJobInfo(job_id=job.id, next_run=next_run)
@staticmethod
def _job_id_for_url(url: str) -> str:
"""Genere un job_id stable pour eviter les doublons."""
fingerprint = hashlib.sha1(url.strip().lower().encode("utf-8")).hexdigest()
return f"scrape_{fingerprint}"

View File

@@ -157,6 +157,36 @@ def scrape_product(
)
success = False
fetch_error = str(exc)
# Si captcha detecte via HTTP, forcer une tentative Playwright.
if (
fetch_method == FetchMethod.HTTP
and use_playwright
and snapshot.debug.errors
and any("captcha" in error.lower() for error in snapshot.debug.errors)
):
logger.info("[FETCH] Captcha detecte, tentative Playwright")
pw_result = fetch_playwright(
canonical_url,
headless=not headful,
timeout_ms=timeout_ms,
save_screenshot=save_screenshot,
)
if pw_result.success and pw_result.html:
try:
snapshot = store.parse(pw_result.html, canonical_url)
snapshot.debug.method = FetchMethod.PLAYWRIGHT
snapshot.debug.duration_ms = pw_result.duration_ms
snapshot.debug.html_size_bytes = len(pw_result.html.encode("utf-8"))
snapshot.add_note("Captcha detecte via HTTP, fallback Playwright")
success = snapshot.debug.status != DebugStatus.FAILED
except Exception as exc:
snapshot.add_note(f"Fallback Playwright echoue: {exc}")
logger.error(f"[PARSE] Exception fallback Playwright: {exc}")
fetch_error = str(exc)
else:
error = pw_result.error or "Erreur Playwright"
snapshot.add_note(f"Fallback Playwright echoue: {error}")
fetch_error = error
else:
snapshot = ProductSnapshot(
source=store.store_id,

10304
scraped/amazon_B07RW6Z692.html Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1 @@
<html><body>content</body></html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

10584
scraped/amazon_B0DWFLPMM5.html Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,121 @@
import os
from typing import Dict, Optional
import psycopg2
from psycopg2.extras import RealDictCursor
def _env_str(name: str, default: str) -> str:
return os.environ.get(name, default)
def _env_int(name: str, default: int) -> int:
try:
return int(os.environ.get(name, default))
except ValueError:
return default
def get_connection():
return psycopg2.connect(
host=_env_str("PW_DB_HOST", "localhost"),
port=_env_int("PW_DB_PORT", 5432),
dbname=_env_str("PW_DB_NAME", "pricewatch"),
user=_env_str("PW_DB_USER", "pricewatch"),
password=_env_str("PW_DB_PASSWORD", "pricewatch"),
)
def gather(limit: Optional[int] = None):
query = """
SELECT
COALESCE(p.source, 'unknown') AS source,
p.id,
p.reference,
p.title,
p.description,
p.category,
p.msrp,
EXISTS (
SELECT 1 FROM product_images WHERE product_id = p.id LIMIT 1
) AS has_image,
EXISTS (
SELECT 1 FROM product_specs WHERE product_id = p.id LIMIT 1
) AS has_specs,
ph.price,
ph.stock_status
FROM products p
LEFT JOIN LATERAL (
SELECT price, stock_status
FROM price_history
WHERE product_id = p.id
ORDER BY fetched_at DESC
LIMIT 1
) ph ON TRUE
ORDER BY p.last_updated_at DESC
"""
if limit:
query += f" LIMIT {limit}"
with get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query)
return cur.fetchall()
def summarize(rows):
stores: Dict[str, Dict[str, object]] = {}
fields = [
("price", "Prix absent"),
("stock_status", "Statut stock manquant"),
("description", "Description manquante"),
("category", "Catégorie manquante"),
("msrp", "Prix conseillé absent"),
("has_image", "Images absentes"),
("has_specs", "Caractéristiques absentes"),
]
for row in rows:
store = row["source"] or "unknown"
entry = stores.setdefault(
store,
{
"total": 0,
"details": {field: [] for field, _ in fields},
},
)
entry["total"] += 1
for field, label in fields:
value = row.get(field)
if field in ("has_image", "has_specs"):
missing = not value
else:
missing = value in (None, "", [])
if missing:
entry["details"][field].append(
{
"id": row["id"],
"reference": row["reference"],
"title": row["title"] or "Sans titre",
}
)
return fields, stores
def pretty_print(fields, stores):
for store, data in stores.items():
print(f"\n=== Store: {store} ({data['total']} produits) ===")
for field, label in fields:
unit = len(data["details"][field])
print(f" {label}: {unit}")
for item in data["details"][field][:5]:
print(f" - [{item['id']}] {item['reference']} · {item['title']}")
def main():
rows = gather(limit=1000)
fields, stores = summarize(rows)
pretty_print(fields, stores)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,53 @@
"""Tests simples pour l'authentification API."""
import pytest
from fastapi import HTTPException
from pricewatch.app.api.main import require_token
class FakeConfig:
api_token = "valid-token"
class FakeConfigNoToken:
api_token = None
def test_require_token_valid(monkeypatch):
"""Token valide ne leve pas d'exception."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
# Ne doit pas lever d'exception
require_token("Bearer valid-token")
def test_require_token_missing(monkeypatch):
"""Token manquant leve 401."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
with pytest.raises(HTTPException) as exc_info:
require_token(None)
assert exc_info.value.status_code == 401
def test_require_token_invalid_format(monkeypatch):
"""Token sans Bearer leve 401."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
with pytest.raises(HTTPException) as exc_info:
require_token("invalid-format")
assert exc_info.value.status_code == 401
def test_require_token_wrong_value(monkeypatch):
"""Mauvais token leve 403."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfig())
with pytest.raises(HTTPException) as exc_info:
require_token("Bearer wrong-token")
assert exc_info.value.status_code == 403
def test_require_token_not_configured(monkeypatch):
"""Token non configure leve 500."""
monkeypatch.setattr("pricewatch.app.api.main.get_config", lambda: FakeConfigNoToken())
with pytest.raises(HTTPException) as exc_info:
require_token("Bearer any-token")
assert exc_info.value.status_code == 500

View File

@@ -0,0 +1,26 @@
"""Tests pour les endpoints de logs API."""
from pricewatch.app.api.main import list_backend_logs, BACKEND_LOGS
from pricewatch.app.api.schemas import BackendLogEntry
def test_list_backend_logs_empty():
"""Liste des logs backend vide."""
BACKEND_LOGS.clear()
result = list_backend_logs()
assert result == []
def test_list_backend_logs_with_entries():
"""Liste des logs backend avec entrees."""
from datetime import datetime
BACKEND_LOGS.clear()
entry = BackendLogEntry(level="INFO", message="Test log", time=datetime(2026, 1, 17, 12, 0, 0))
BACKEND_LOGS.append(entry)
result = list_backend_logs()
assert len(result) == 1
assert result[0].message == "Test log"
assert result[0].level == "INFO"
BACKEND_LOGS.clear()

View File

@@ -0,0 +1,267 @@
"""Tests fonctions API produits avec mocks."""
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
from fastapi import HTTPException
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from pricewatch.app.api.main import (
create_product,
get_product,
update_product,
delete_product,
list_prices,
create_price,
update_price,
delete_price,
)
from pricewatch.app.api.schemas import ProductCreate, ProductUpdate, PriceHistoryCreate, PriceHistoryUpdate
class MockProduct:
"""Mock Product model."""
def __init__(self, **kwargs):
self.id = kwargs.get("id", 1)
self.source = kwargs.get("source", "amazon")
self.reference = kwargs.get("reference", "REF123")
self.url = kwargs.get("url", "https://example.com")
self.title = kwargs.get("title", "Test Product")
self.category = kwargs.get("category")
self.description = kwargs.get("description")
self.currency = kwargs.get("currency", "EUR")
self.msrp = kwargs.get("msrp")
self.first_seen_at = kwargs.get("first_seen_at", datetime.now())
self.last_updated_at = kwargs.get("last_updated_at", datetime.now())
class MockPrice:
"""Mock PriceHistory model."""
def __init__(self, **kwargs):
self.id = kwargs.get("id", 1)
self.product_id = kwargs.get("product_id", 1)
self.price = kwargs.get("price", 99.99)
self.shipping_cost = kwargs.get("shipping_cost")
self.stock_status = kwargs.get("stock_status", "in_stock")
self.fetch_method = kwargs.get("fetch_method", "http")
self.fetch_status = kwargs.get("fetch_status", "success")
self.fetched_at = kwargs.get("fetched_at", datetime.now())
class TestCreateProduct:
"""Tests create_product."""
def test_create_success(self):
"""Cree un produit avec succes."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock()
session.refresh = MagicMock()
payload = ProductCreate(
source="amazon",
reference="NEW123",
url="https://amazon.fr/dp/NEW123",
title="New Product",
currency="EUR",
)
with patch("pricewatch.app.api.main.Product") as MockProductClass:
mock_product = MockProduct(reference="NEW123")
MockProductClass.return_value = mock_product
with patch("pricewatch.app.api.main._product_to_out") as mock_to_out:
mock_to_out.return_value = MagicMock()
result = create_product(payload, session)
session.add.assert_called_once()
session.commit.assert_called_once()
def test_create_duplicate(self):
"""Cree un produit duplique leve 409."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=IntegrityError("duplicate", {}, None))
session.rollback = MagicMock()
payload = ProductCreate(
source="amazon",
reference="DUPE",
url="https://amazon.fr/dp/DUPE",
title="Duplicate",
currency="EUR",
)
with patch("pricewatch.app.api.main.Product"):
with pytest.raises(HTTPException) as exc_info:
create_product(payload, session)
assert exc_info.value.status_code == 409
def test_create_db_error(self):
"""Erreur DB leve 500."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("db error"))
session.rollback = MagicMock()
payload = ProductCreate(
source="amazon",
reference="ERR",
url="https://amazon.fr/dp/ERR",
title="Error",
currency="EUR",
)
with patch("pricewatch.app.api.main.Product"):
with pytest.raises(HTTPException) as exc_info:
create_product(payload, session)
assert exc_info.value.status_code == 500
class TestGetProduct:
"""Tests get_product."""
def test_get_not_found(self):
"""Produit non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
get_product(99999, session)
assert exc_info.value.status_code == 404
class TestUpdateProduct:
"""Tests update_product."""
def test_update_not_found(self):
"""Update produit non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
payload = ProductUpdate(title="Updated")
with pytest.raises(HTTPException) as exc_info:
update_product(99999, payload, session)
assert exc_info.value.status_code == 404
def test_update_db_error(self):
"""Erreur DB lors d'update leve 500."""
session = MagicMock()
mock_product = MockProduct()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_product
session.query.return_value = mock_query
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = ProductUpdate(title="Updated")
with pytest.raises(HTTPException) as exc_info:
update_product(1, payload, session)
assert exc_info.value.status_code == 500
class TestDeleteProduct:
"""Tests delete_product."""
def test_delete_not_found(self):
"""Delete produit non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
delete_product(99999, session)
assert exc_info.value.status_code == 404
def test_delete_success(self):
"""Delete produit avec succes."""
session = MagicMock()
mock_product = MockProduct()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_product
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock()
result = delete_product(1, session)
assert result == {"status": "deleted"}
session.delete.assert_called_once()
def test_delete_db_error(self):
"""Erreur DB lors de delete leve 500."""
session = MagicMock()
mock_product = MockProduct()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_product
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
with pytest.raises(HTTPException) as exc_info:
delete_product(1, session)
assert exc_info.value.status_code == 500
class TestCreatePrice:
"""Tests create_price."""
def test_create_price_db_error(self):
"""Erreur DB lors de creation prix."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = PriceHistoryCreate(
product_id=1,
price=99.99,
fetch_method="http",
fetch_status="success",
fetched_at=datetime.now(),
)
with patch("pricewatch.app.api.main.PriceHistory"):
with pytest.raises(HTTPException) as exc_info:
create_price(payload, session)
assert exc_info.value.status_code == 500
class TestUpdatePrice:
"""Tests update_price."""
def test_update_price_not_found(self):
"""Update prix non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
payload = PriceHistoryUpdate(price=149.99)
with pytest.raises(HTTPException) as exc_info:
update_price(99999, payload, session)
assert exc_info.value.status_code == 404
class TestDeletePrice:
"""Tests delete_price."""
def test_delete_price_not_found(self):
"""Delete prix non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
delete_price(99999, session)
assert exc_info.value.status_code == 404

View File

@@ -0,0 +1,135 @@
"""Tests API endpoints scraping logs."""
from datetime import datetime
from unittest.mock import MagicMock
import pytest
from fastapi import HTTPException
from sqlalchemy.exc import SQLAlchemyError
from pricewatch.app.api.main import create_log, update_log, delete_log
from pricewatch.app.api.schemas import ScrapingLogCreate, ScrapingLogUpdate
class MockScrapingLog:
"""Mock ScrapingLog model."""
def __init__(self, **kwargs):
self.id = kwargs.get("id", 1)
self.product_id = kwargs.get("product_id")
self.url = kwargs.get("url", "https://example.com")
self.source = kwargs.get("source", "amazon")
self.reference = kwargs.get("reference", "REF123")
self.fetch_method = kwargs.get("fetch_method", "http")
self.fetch_status = kwargs.get("fetch_status", "success")
self.fetched_at = kwargs.get("fetched_at", datetime.now())
self.duration_ms = kwargs.get("duration_ms", 1500)
self.html_size_bytes = kwargs.get("html_size_bytes", 50000)
self.errors = kwargs.get("errors", [])
self.notes = kwargs.get("notes", [])
class TestCreateLog:
"""Tests create_log endpoint."""
def test_create_log_db_error(self):
"""Erreur DB lors de creation log leve 500."""
from unittest.mock import patch
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = ScrapingLogCreate(
url="https://amazon.fr/dp/TEST",
source="amazon",
reference="TEST123",
fetch_method="http",
fetch_status="success",
fetched_at=datetime.now(),
)
with patch("pricewatch.app.api.main.ScrapingLog"):
with pytest.raises(HTTPException) as exc_info:
create_log(payload, session)
assert exc_info.value.status_code == 500
class TestUpdateLog:
"""Tests update_log endpoint."""
def test_update_log_not_found(self):
"""Update log non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
payload = ScrapingLogUpdate(fetch_status="failed")
with pytest.raises(HTTPException) as exc_info:
update_log(99999, payload, session)
assert exc_info.value.status_code == 404
def test_update_log_db_error(self):
"""Erreur DB lors d'update log leve 500."""
from unittest.mock import patch
session = MagicMock()
mock_log = MockScrapingLog()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_log
session.query.return_value = mock_query
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = ScrapingLogUpdate(fetch_status="failed")
with patch("pricewatch.app.api.main._log_to_out"):
with pytest.raises(HTTPException) as exc_info:
update_log(1, payload, session)
assert exc_info.value.status_code == 500
class TestDeleteLog:
"""Tests delete_log endpoint."""
def test_delete_log_not_found(self):
"""Delete log non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
delete_log(99999, session)
assert exc_info.value.status_code == 404
def test_delete_log_success(self):
"""Delete log avec succes."""
session = MagicMock()
mock_log = MockScrapingLog()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_log
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock()
result = delete_log(1, session)
assert result == {"status": "deleted"}
session.delete.assert_called_once()
def test_delete_log_db_error(self):
"""Erreur DB lors de delete log leve 500."""
session = MagicMock()
mock_log = MockScrapingLog()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_log
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
with pytest.raises(HTTPException) as exc_info:
delete_log(1, session)
assert exc_info.value.status_code == 500

View File

@@ -0,0 +1,159 @@
"""Tests API endpoints webhooks."""
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
from fastapi import HTTPException
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from pricewatch.app.api.main import (
list_webhooks,
create_webhook,
update_webhook,
delete_webhook,
)
from pricewatch.app.api.schemas import WebhookCreate, WebhookUpdate
class MockWebhook:
"""Mock Webhook model."""
def __init__(self, **kwargs):
self.id = kwargs.get("id", 1)
self.url = kwargs.get("url", "https://example.com/webhook")
self.events = kwargs.get("events", ["price_change", "stock_change"])
self.active = kwargs.get("active", True)
self.created_at = kwargs.get("created_at", datetime.now())
self.last_triggered_at = kwargs.get("last_triggered_at")
class TestListWebhooks:
"""Tests list_webhooks endpoint."""
def test_list_webhooks_empty(self):
"""Liste vide de webhooks."""
session = MagicMock()
mock_query = MagicMock()
mock_query.all.return_value = []
session.query.return_value = mock_query
with patch("pricewatch.app.api.main._webhook_to_out") as mock_to_out:
result = list_webhooks(session=session)
assert result == []
class TestCreateWebhook:
"""Tests create_webhook endpoint."""
def test_create_webhook_integrity_error(self):
"""Erreur d'integrite lors de creation webhook leve 500."""
# Note: le code actuel ne distingue pas IntegrityError de SQLAlchemyError
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=IntegrityError("duplicate", {}, None))
session.rollback = MagicMock()
payload = WebhookCreate(
event="price_change",
url="https://example.com/webhook",
)
with patch("pricewatch.app.api.main.Webhook"):
with pytest.raises(HTTPException) as exc_info:
create_webhook(payload, session)
assert exc_info.value.status_code == 500
def test_create_webhook_db_error(self):
"""Erreur DB lors de creation webhook leve 500."""
session = MagicMock()
session.add = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = WebhookCreate(
event="price_change",
url="https://example.com/webhook",
)
with patch("pricewatch.app.api.main.Webhook"):
with pytest.raises(HTTPException) as exc_info:
create_webhook(payload, session)
assert exc_info.value.status_code == 500
class TestUpdateWebhook:
"""Tests update_webhook endpoint."""
def test_update_webhook_not_found(self):
"""Update webhook non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
payload = WebhookUpdate(active=False)
with pytest.raises(HTTPException) as exc_info:
update_webhook(99999, payload, session)
assert exc_info.value.status_code == 404
def test_update_webhook_db_error(self):
"""Erreur DB lors d'update webhook leve 500."""
session = MagicMock()
mock_webhook = MockWebhook()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_webhook
session.query.return_value = mock_query
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
payload = WebhookUpdate(active=False)
with patch("pricewatch.app.api.main._webhook_to_out"):
with pytest.raises(HTTPException) as exc_info:
update_webhook(1, payload, session)
assert exc_info.value.status_code == 500
class TestDeleteWebhook:
"""Tests delete_webhook endpoint."""
def test_delete_webhook_not_found(self):
"""Delete webhook non trouve leve 404."""
session = MagicMock()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = None
session.query.return_value = mock_query
with pytest.raises(HTTPException) as exc_info:
delete_webhook(99999, session)
assert exc_info.value.status_code == 404
def test_delete_webhook_success(self):
"""Delete webhook avec succes."""
session = MagicMock()
mock_webhook = MockWebhook()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_webhook
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock()
result = delete_webhook(1, session)
assert result == {"status": "deleted"}
session.delete.assert_called_once()
def test_delete_webhook_db_error(self):
"""Erreur DB lors de delete webhook leve 500."""
session = MagicMock()
mock_webhook = MockWebhook()
mock_query = MagicMock()
mock_query.filter.return_value.one_or_none.return_value = mock_webhook
session.query.return_value = mock_query
session.delete = MagicMock()
session.commit = MagicMock(side_effect=SQLAlchemyError("error"))
session.rollback = MagicMock()
with pytest.raises(HTTPException) as exc_info:
delete_webhook(1, session)
assert exc_info.value.status_code == 500

42
tests/cli/test_detect.py Normal file
View File

@@ -0,0 +1,42 @@
"""Tests pour la commande CLI detect."""
import pytest
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
runner = CliRunner()
class TestDetectCommand:
"""Tests pour la commande detect."""
def test_detect_amazon_url(self):
"""Detect doit identifier une URL Amazon."""
result = runner.invoke(app, ["detect", "https://www.amazon.fr/dp/B08N5WRWNW"])
assert result.exit_code == 0
assert "amazon" in result.stdout.lower()
assert "B08N5WRWNW" in result.stdout
def test_detect_cdiscount_url(self):
"""Detect doit identifier une URL Cdiscount."""
result = runner.invoke(
app,
[
"detect",
"https://www.cdiscount.com/informatique/f-10709-tuf608umrv004.html",
],
)
assert result.exit_code == 0
assert "cdiscount" in result.stdout.lower()
def test_detect_unknown_url(self):
"""Detect doit echouer pour une URL inconnue."""
result = runner.invoke(app, ["detect", "https://www.unknown-store.com/product"])
assert result.exit_code == 1
assert "aucun store" in result.stdout.lower()
def test_detect_invalid_url(self):
"""Detect doit echouer pour une URL invalide."""
result = runner.invoke(app, ["detect", "not-a-valid-url"])
assert result.exit_code == 1

36
tests/cli/test_doctor.py Normal file
View File

@@ -0,0 +1,36 @@
"""Tests pour la commande CLI doctor."""
import pytest
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
runner = CliRunner()
class TestDoctorCommand:
"""Tests pour la commande doctor."""
def test_doctor_success(self):
"""Doctor doit afficher le statut de l'installation."""
result = runner.invoke(app, ["doctor"])
assert result.exit_code == 0
assert "PriceWatch Doctor" in result.stdout
assert "Python" in result.stdout
# "prêt" avec accent
assert "prêt" in result.stdout.lower() or "ready" in result.stdout.lower()
def test_doctor_shows_dependencies(self):
"""Doctor doit lister les dependances."""
result = runner.invoke(app, ["doctor"])
assert result.exit_code == 0
assert "typer" in result.stdout.lower()
assert "pydantic" in result.stdout.lower()
assert "playwright" in result.stdout.lower()
def test_doctor_shows_stores(self):
"""Doctor doit lister les stores disponibles."""
result = runner.invoke(app, ["doctor"])
assert result.exit_code == 0
assert "amazon" in result.stdout.lower()
assert "cdiscount" in result.stdout.lower()

99
tests/cli/test_fetch.py Normal file
View File

@@ -0,0 +1,99 @@
"""Tests pour la commande CLI fetch."""
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
runner = CliRunner()
class TestFetchCommand:
"""Tests pour la commande fetch."""
def test_fetch_conflicting_options(self):
"""Fetch doit echouer si --http et --playwright sont specifies."""
result = runner.invoke(
app, ["fetch", "https://example.com", "--http", "--playwright"]
)
assert result.exit_code == 1
assert "impossible" in result.stdout.lower()
@patch("pricewatch.app.cli.main.fetch_http")
def test_fetch_http_success(self, mock_fetch: MagicMock):
"""Fetch HTTP doit afficher le resultat."""
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html>test</html>"
mock_result.status_code = 200
mock_result.duration_ms = 150
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--http"])
assert result.exit_code == 0
assert "Succes" in result.stdout or "" in result.stdout
assert "150" in result.stdout
@patch("pricewatch.app.cli.main.fetch_http")
def test_fetch_http_failure(self, mock_fetch: MagicMock):
"""Fetch HTTP doit signaler l'echec."""
mock_result = MagicMock()
mock_result.success = False
mock_result.error = "Connection refused"
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--http"])
assert result.exit_code == 1
assert "Connection refused" in result.stdout
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_fetch_playwright_success(self, mock_fetch: MagicMock):
"""Fetch Playwright doit afficher le resultat."""
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html>test playwright</html>"
mock_result.duration_ms = 2500
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--playwright"])
assert result.exit_code == 0
assert "Succes" in result.stdout or "" in result.stdout
assert "2500" in result.stdout
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_fetch_playwright_failure(self, mock_fetch: MagicMock):
"""Fetch Playwright doit signaler l'echec."""
mock_result = MagicMock()
mock_result.success = False
mock_result.error = "Timeout waiting for page"
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--playwright"])
assert result.exit_code == 1
assert "Timeout" in result.stdout
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_fetch_default_is_playwright(self, mock_fetch: MagicMock):
"""Fetch sans option utilise Playwright par defaut."""
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html>test</html>"
mock_result.duration_ms = 1000
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com"])
assert result.exit_code == 0
mock_fetch.assert_called_once()
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_fetch_with_debug(self, mock_fetch: MagicMock):
"""Fetch doit fonctionner avec --debug."""
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html>test</html>"
mock_result.duration_ms = 1000
mock_fetch.return_value = mock_result
result = runner.invoke(app, ["fetch", "https://example.com", "--debug"])
assert result.exit_code == 0

99
tests/cli/test_parse.py Normal file
View File

@@ -0,0 +1,99 @@
"""Tests pour la commande CLI parse."""
import tempfile
from pathlib import Path
import pytest
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
runner = CliRunner()
class TestParseCommand:
"""Tests pour la commande parse."""
@pytest.fixture
def amazon_html_file(self, tmp_path: Path) -> Path:
"""Cree un fichier HTML Amazon temporaire."""
html = """
<html>
<body>
<span id="productTitle">Test Product</span>
<span class="a-price-whole">299,99 €</span>
<div id="availability">
<span>En stock</span>
</div>
</body>
</html>
"""
file_path = tmp_path / "amazon_test.html"
file_path.write_text(html, encoding="utf-8")
return file_path
@pytest.fixture
def cdiscount_html_file(self, tmp_path: Path) -> Path:
"""Cree un fichier HTML Cdiscount temporaire."""
html = """
<html>
<head>
<script type="application/ld+json">
{
"@type": "Product",
"name": "Produit Cdiscount",
"offers": {"price": "199.99", "priceCurrency": "EUR"}
}
</script>
</head>
<body>
<h1 data-e2e="title">Produit Cdiscount</h1>
</body>
</html>
"""
file_path = tmp_path / "cdiscount_test.html"
file_path.write_text(html, encoding="utf-8")
return file_path
def test_parse_amazon_success(self, amazon_html_file: Path):
"""Parse doit extraire les donnees d'un HTML Amazon."""
result = runner.invoke(
app, ["parse", "amazon", "--in", str(amazon_html_file)]
)
assert result.exit_code == 0
assert "Test Product" in result.stdout
assert "299" in result.stdout
def test_parse_cdiscount_success(self, cdiscount_html_file: Path):
"""Parse doit extraire les donnees d'un HTML Cdiscount."""
result = runner.invoke(
app, ["parse", "cdiscount", "--in", str(cdiscount_html_file)]
)
assert result.exit_code == 0
assert "Produit Cdiscount" in result.stdout
assert "199" in result.stdout
def test_parse_unknown_store(self, amazon_html_file: Path):
"""Parse doit echouer pour un store inconnu."""
result = runner.invoke(
app, ["parse", "unknown_store", "--in", str(amazon_html_file)]
)
assert result.exit_code == 1
assert "inconnu" in result.stdout.lower()
def test_parse_with_debug(self, amazon_html_file: Path):
"""Parse doit fonctionner avec --debug."""
result = runner.invoke(
app, ["parse", "amazon", "--in", str(amazon_html_file), "--debug"]
)
assert result.exit_code == 0
def test_parse_shows_fields(self, amazon_html_file: Path):
"""Parse doit afficher les champs extraits."""
result = runner.invoke(
app, ["parse", "amazon", "--in", str(amazon_html_file)]
)
assert result.exit_code == 0
assert "Titre" in result.stdout
assert "Prix" in result.stdout
assert "Stock" in result.stdout

View File

@@ -0,0 +1,258 @@
"""Tests pour la commande CLI run."""
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from typer.testing import CliRunner
from pricewatch.app.cli.main import app
from pricewatch.app.core.schema import ProductSnapshot, DebugInfo, DebugStatus, FetchMethod
runner = CliRunner()
@pytest.fixture
def yaml_config(tmp_path: Path) -> Path:
"""Cree un fichier YAML de config temporaire."""
yaml_content = """
urls:
- "https://www.amazon.fr/dp/B08N5WRWNW"
options:
use_playwright: false
force_playwright: false
headful: false
save_html: false
save_screenshot: false
timeout_ms: 30000
"""
file_path = tmp_path / "test_config.yaml"
file_path.write_text(yaml_content, encoding="utf-8")
return file_path
@pytest.fixture
def output_json(tmp_path: Path) -> Path:
"""Chemin pour le fichier JSON de sortie."""
return tmp_path / "output.json"
class TestRunCommand:
"""Tests pour la commande run."""
@patch("pricewatch.app.cli.main.fetch_http")
def test_run_http_success(self, mock_fetch, yaml_config, output_json):
"""Run avec HTTP reussi."""
# Mock HTTP fetch
mock_result = MagicMock()
mock_result.success = True
mock_result.html = """
<html><body>
<span id="productTitle">Test Product</span>
<span class="a-price-whole">299,99 €</span>
</body></html>
"""
mock_result.error = None
mock_fetch.return_value = mock_result
result = runner.invoke(
app,
["run", "--yaml", str(yaml_config), "--out", str(output_json), "--no-db"],
)
assert result.exit_code == 0
assert output_json.exists()
@patch("pricewatch.app.cli.main.fetch_http")
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_run_http_fail_playwright_fallback(
self, mock_pw, mock_http, yaml_config, output_json
):
"""Run avec fallback Playwright quand HTTP echoue."""
# Mock HTTP fail
mock_http_result = MagicMock()
mock_http_result.success = False
mock_http_result.error = "403 Forbidden"
mock_http.return_value = mock_http_result
# Mock Playwright success
mock_pw_result = MagicMock()
mock_pw_result.success = True
mock_pw_result.html = """
<html><body>
<span id="productTitle">Playwright Product</span>
<span class="a-price-whole">199,99 €</span>
</body></html>
"""
mock_pw_result.screenshot = None
mock_pw.return_value = mock_pw_result
# Modifier config pour activer playwright
yaml_content = """
urls:
- "https://www.amazon.fr/dp/B08N5WRWNW"
options:
use_playwright: true
force_playwright: false
headful: false
save_html: false
save_screenshot: false
timeout_ms: 30000
"""
yaml_config.write_text(yaml_content, encoding="utf-8")
result = runner.invoke(
app,
["run", "--yaml", str(yaml_config), "--out", str(output_json), "--no-db"],
)
assert result.exit_code == 0
mock_pw.assert_called()
@patch("pricewatch.app.cli.main.fetch_http")
def test_run_http_fail_no_playwright(self, mock_http, yaml_config, output_json):
"""Run avec HTTP echoue sans Playwright."""
mock_result = MagicMock()
mock_result.success = False
mock_result.error = "Connection refused"
mock_http.return_value = mock_result
result = runner.invoke(
app,
["run", "--yaml", str(yaml_config), "--out", str(output_json), "--no-db"],
)
# Doit quand meme creer le fichier JSON (avec snapshot failed)
assert result.exit_code == 0
assert output_json.exists()
def test_run_invalid_yaml(self, tmp_path, output_json):
"""Run avec YAML invalide echoue."""
yaml_file = tmp_path / "invalid.yaml"
yaml_file.write_text("invalid: [yaml: content", encoding="utf-8")
result = runner.invoke(
app,
["run", "--yaml", str(yaml_file), "--out", str(output_json)],
)
assert result.exit_code == 1
def test_run_with_debug(self, yaml_config, output_json):
"""Run avec --debug active les logs."""
with patch("pricewatch.app.cli.main.fetch_http") as mock_fetch:
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html><body>Test</body></html>"
mock_fetch.return_value = mock_result
result = runner.invoke(
app,
[
"run",
"--yaml",
str(yaml_config),
"--out",
str(output_json),
"--debug",
"--no-db",
],
)
assert result.exit_code == 0
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_run_force_playwright(self, mock_pw, tmp_path, output_json):
"""Run avec force_playwright skip HTTP."""
yaml_content = """
urls:
- "https://www.amazon.fr/dp/B08N5WRWNW"
options:
use_playwright: true
force_playwright: true
headful: false
save_html: false
save_screenshot: false
timeout_ms: 30000
"""
yaml_file = tmp_path / "force_pw.yaml"
yaml_file.write_text(yaml_content, encoding="utf-8")
mock_result = MagicMock()
mock_result.success = True
mock_result.html = "<html><body>PW content</body></html>"
mock_result.screenshot = None
mock_pw.return_value = mock_result
with patch("pricewatch.app.cli.main.fetch_http") as mock_http:
result = runner.invoke(
app,
["run", "--yaml", str(yaml_file), "--out", str(output_json), "--no-db"],
)
# HTTP ne doit pas etre appele
mock_http.assert_not_called()
mock_pw.assert_called()
assert result.exit_code == 0
@patch("pricewatch.app.cli.main.fetch_http")
def test_run_unknown_store(self, mock_fetch, tmp_path, output_json):
"""Run avec URL de store inconnu."""
yaml_content = """
urls:
- "https://www.unknown-store.com/product/123"
options:
use_playwright: false
"""
yaml_file = tmp_path / "unknown.yaml"
yaml_file.write_text(yaml_content, encoding="utf-8")
result = runner.invoke(
app,
["run", "--yaml", str(yaml_file), "--out", str(output_json), "--no-db"],
)
# Doit continuer sans crash
assert result.exit_code == 0
# HTTP ne doit pas etre appele (store non trouve)
mock_fetch.assert_not_called()
@patch("pricewatch.app.cli.main.fetch_http")
@patch("pricewatch.app.cli.main.fetch_playwright")
def test_run_with_save_screenshot(self, mock_pw, mock_http, tmp_path, output_json):
"""Run avec save_screenshot."""
yaml_content = """
urls:
- "https://www.amazon.fr/dp/B08N5WRWNW"
options:
use_playwright: true
force_playwright: false
save_screenshot: true
timeout_ms: 30000
"""
yaml_file = tmp_path / "screenshot.yaml"
yaml_file.write_text(yaml_content, encoding="utf-8")
# HTTP fail
mock_http_result = MagicMock()
mock_http_result.success = False
mock_http_result.error = "blocked"
mock_http.return_value = mock_http_result
# PW success avec screenshot
mock_pw_result = MagicMock()
mock_pw_result.success = True
mock_pw_result.html = "<html><body>content</body></html>"
mock_pw_result.screenshot = b"fake_png_data"
mock_pw.return_value = mock_pw_result
with patch("pricewatch.app.core.io.save_debug_screenshot") as mock_save:
result = runner.invoke(
app,
["run", "--yaml", str(yaml_file), "--out", str(output_json), "--no-db"],
)
assert result.exit_code == 0
# Le screenshot doit etre sauvegarde si present
mock_save.assert_called()

Binary file not shown.

View File

@@ -171,7 +171,25 @@ class TestCdiscountRealFixtures:
assert isinstance(snapshot.price, float)
assert snapshot.price > 0
# Le prix doit avoir maximum 2 décimales
assert snapshot.price == round(snapshot.price, 2)
assert snapshot.price == round(snapshot.price, 2)
def test_parse_tuf608umrv004_price_value(self, store, fixture_tuf608umrv004):
"""Le prix doit correspondre à 1199,99 €."""
url = "https://www.cdiscount.com/informatique/.../f-10709-tuf608umrv004.html"
snapshot = store.parse(fixture_tuf608umrv004, url)
assert snapshot.price == 1199.99
def test_parse_tuf608umrv004_category_and_msrp(
self, store, fixture_tuf608umrv004
):
"""La fixture ASUS doit fournir une catégorie et un prix conseillé."""
url = "https://www.cdiscount.com/informatique/.../f-10709-tuf608umrv004.html"
snapshot = store.parse(fixture_tuf608umrv004, url)
assert snapshot.category
assert "Ordinateur" in snapshot.category or "Portable" in snapshot.category
assert snapshot.msrp is not None
if snapshot.price:
assert snapshot.msrp >= snapshot.price
def test_parse_a128902_price_format(self, store, fixture_a128902):
"""Parse fixture a128902 - le prix doit être un float valide."""

View File

@@ -27,3 +27,7 @@ def test_parse_price_without_decimal():
def test_parse_price_with_currency():
assert parse_price_text("EUR 1 259,00") == 1259.00
def test_parse_price_with_cents_after_currency_symbol():
assert parse_price_text("1199 €99") == 1199.99

1341
verif_amazon.md Normal file

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More