""" Test end-to-end: enqueue -> worker -> DB via Redis. """ from dataclasses import dataclass from datetime import datetime import pytest import redis from rq import Queue from rq.worker import SimpleWorker from pricewatch.app.core.registry import get_registry from pricewatch.app.core.schema import DebugInfo, DebugStatus, FetchMethod, ProductSnapshot from pricewatch.app.db.connection import get_session, init_db, reset_engine from pricewatch.app.db.models import Product, ScrapingLog from pricewatch.app.stores.base import BaseStore from pricewatch.app.tasks import scrape as scrape_task @dataclass class FakeDbConfig: url: str @dataclass class FakeAppConfig: db: FakeDbConfig debug: bool = False enable_db: bool = True default_use_playwright: bool = False default_playwright_timeout: int = 1000 class DummyStore(BaseStore): def __init__(self) -> None: super().__init__(store_id="dummy") def match(self, url: str) -> float: return 1.0 if "example.com" in url else 0.0 def canonicalize(self, url: str) -> str: return url def extract_reference(self, url: str) -> str | None: return "REF-WORKER" def parse(self, html: str, url: str) -> ProductSnapshot: return ProductSnapshot( source=self.store_id, url=url, fetched_at=datetime(2026, 1, 14, 11, 0, 0), title="Produit worker", price=29.99, currency="EUR", reference="REF-WORKER", debug=DebugInfo(method=FetchMethod.HTTP, status=DebugStatus.SUCCESS), ) class DummyFetchResult: def __init__(self, html: str) -> None: self.success = True self.html = html self.error = None self.duration_ms = 50 def _redis_available(redis_url: str) -> bool: try: conn = redis.from_url(redis_url) conn.ping() return True except Exception: return False @pytest.mark.skipif(not _redis_available("redis://localhost:6379/0"), reason="Redis indisponible") def test_enqueue_worker_persists_db(tmp_path, monkeypatch): """Le job enqueued est traite par le worker et persiste en DB.""" reset_engine() db_path = tmp_path / "worker.db" config = FakeAppConfig(db=FakeDbConfig(url=f"sqlite:///{db_path}")) init_db(config) registry = get_registry() previous_stores = list(registry._stores) registry._stores = [] registry.register(DummyStore()) monkeypatch.setattr(scrape_task, "get_config", lambda: config) monkeypatch.setattr(scrape_task, "setup_stores", lambda: None) monkeypatch.setattr(scrape_task, "fetch_http", lambda url: DummyFetchResult("")) redis_conn = redis.from_url("redis://localhost:6379/0") queue = Queue("default", connection=redis_conn) try: job = queue.enqueue(scrape_task.scrape_product, "https://example.com/product", save_db=True) worker = SimpleWorker([queue], connection=redis_conn) worker.work(burst=True) finally: registry._stores = previous_stores reset_engine() assert job.is_finished with get_session(config) as session: assert session.query(Product).count() == 1 assert session.query(ScrapingLog).count() == 1