Files
scrap/tests/tasks/test_worker_end_to_end.py
Gilles Soulier d0b73b9319 codex2
2026-01-14 21:54:55 +01:00

111 lines
3.2 KiB
Python

"""
Test end-to-end: enqueue -> worker -> DB via Redis.
"""
from dataclasses import dataclass
from datetime import datetime
import pytest
import redis
from rq import Queue
from rq.worker import SimpleWorker
from pricewatch.app.core.registry import get_registry
from pricewatch.app.core.schema import DebugInfo, DebugStatus, FetchMethod, ProductSnapshot
from pricewatch.app.db.connection import get_session, init_db, reset_engine
from pricewatch.app.db.models import Product, ScrapingLog
from pricewatch.app.stores.base import BaseStore
from pricewatch.app.tasks import scrape as scrape_task
@dataclass
class FakeDbConfig:
url: str
@dataclass
class FakeAppConfig:
db: FakeDbConfig
debug: bool = False
enable_db: bool = True
default_use_playwright: bool = False
default_playwright_timeout: int = 1000
class DummyStore(BaseStore):
def __init__(self) -> None:
super().__init__(store_id="dummy")
def match(self, url: str) -> float:
return 1.0 if "example.com" in url else 0.0
def canonicalize(self, url: str) -> str:
return url
def extract_reference(self, url: str) -> str | None:
return "REF-WORKER"
def parse(self, html: str, url: str) -> ProductSnapshot:
return ProductSnapshot(
source=self.store_id,
url=url,
fetched_at=datetime(2026, 1, 14, 11, 0, 0),
title="Produit worker",
price=29.99,
currency="EUR",
reference="REF-WORKER",
debug=DebugInfo(method=FetchMethod.HTTP, status=DebugStatus.SUCCESS),
)
class DummyFetchResult:
def __init__(self, html: str) -> None:
self.success = True
self.html = html
self.error = None
self.duration_ms = 50
def _redis_available(redis_url: str) -> bool:
try:
conn = redis.from_url(redis_url)
conn.ping()
return True
except Exception:
return False
@pytest.mark.skipif(not _redis_available("redis://localhost:6379/0"), reason="Redis indisponible")
def test_enqueue_worker_persists_db(tmp_path, monkeypatch):
"""Le job enqueued est traite par le worker et persiste en DB."""
reset_engine()
db_path = tmp_path / "worker.db"
config = FakeAppConfig(db=FakeDbConfig(url=f"sqlite:///{db_path}"))
init_db(config)
registry = get_registry()
previous_stores = list(registry._stores)
registry._stores = []
registry.register(DummyStore())
monkeypatch.setattr(scrape_task, "get_config", lambda: config)
monkeypatch.setattr(scrape_task, "setup_stores", lambda: None)
monkeypatch.setattr(scrape_task, "fetch_http", lambda url: DummyFetchResult("<html></html>"))
redis_conn = redis.from_url("redis://localhost:6379/0")
queue = Queue("default", connection=redis_conn)
try:
job = queue.enqueue(scrape_task.scrape_product, "https://example.com/product", save_db=True)
worker = SimpleWorker([queue], connection=redis_conn)
worker.work(burst=True)
finally:
registry._stores = previous_stores
reset_engine()
assert job.is_finished
with get_session(config) as session:
assert session.query(Product).count() == 1
assert session.query(ScrapingLog).count() == 1