""" Test end-to-end: CLI enqueue -> worker -> DB via Redis. """ from dataclasses import dataclass from datetime import datetime import pytest import redis from rq import Queue from rq.worker import SimpleWorker from typer.testing import CliRunner from pricewatch.app.cli import main as cli_main from pricewatch.app.core.registry import get_registry from pricewatch.app.core.schema import DebugInfo, DebugStatus, FetchMethod, ProductSnapshot from pricewatch.app.db.connection import get_session, init_db, reset_engine from pricewatch.app.db.models import Product, ScrapingLog from pricewatch.app.stores.base import BaseStore from pricewatch.app.tasks import scrape as scrape_task @dataclass class FakeDbConfig: url: str @dataclass class FakeRedisConfig: url: str @dataclass class FakeAppConfig: db: FakeDbConfig redis: FakeRedisConfig debug: bool = False enable_db: bool = True default_use_playwright: bool = False default_playwright_timeout: int = 1000 class DummyStore(BaseStore): def __init__(self) -> None: super().__init__(store_id="dummy") def match(self, url: str) -> float: return 1.0 if "example.com" in url else 0.0 def canonicalize(self, url: str) -> str: return url def extract_reference(self, url: str) -> str | None: return "REF-CLI" def parse(self, html: str, url: str) -> ProductSnapshot: return ProductSnapshot( source=self.store_id, url=url, fetched_at=datetime(2026, 1, 14, 15, 0, 0), title="Produit cli", price=49.99, currency="EUR", reference="REF-CLI", debug=DebugInfo(method=FetchMethod.HTTP, status=DebugStatus.SUCCESS), ) class DummyFetchResult: def __init__(self, html: str) -> None: self.success = True self.html = html self.error = None self.duration_ms = 20 def _redis_available(redis_url: str) -> bool: try: conn = redis.from_url(redis_url) conn.ping() return True except Exception: return False @pytest.mark.skipif(not _redis_available("redis://localhost:6379/0"), reason="Redis indisponible") def test_cli_enqueue_worker_persists_db(tmp_path, monkeypatch): """Enqueue via CLI, execution worker, persistence DB.""" reset_engine() db_path = tmp_path / "cli-worker.db" redis_url = "redis://localhost:6379/0" config = FakeAppConfig( db=FakeDbConfig(url=f"sqlite:///{db_path}"), redis=FakeRedisConfig(url=redis_url), ) init_db(config) registry = get_registry() previous_stores = list(registry._stores) registry._stores = [] registry.register(DummyStore()) monkeypatch.setattr(cli_main, "get_config", lambda: config) monkeypatch.setattr(scrape_task, "get_config", lambda: config) monkeypatch.setattr(scrape_task, "setup_stores", lambda: None) monkeypatch.setattr(scrape_task, "fetch_http", lambda url: DummyFetchResult("")) queue_name = "test-cli" redis_conn = redis.from_url(redis_url) queue = Queue(queue_name, connection=redis_conn) queue.empty() runner = CliRunner() try: result = runner.invoke( cli_main.app, ["enqueue", "https://example.com/product", "--queue", queue_name, "--save-db"], ) assert result.exit_code == 0 worker = SimpleWorker([queue], connection=redis_conn) worker.work(burst=True) finally: queue.empty() registry._stores = previous_stores reset_engine() with get_session(config) as session: assert session.query(Product).count() == 1 assert session.query(ScrapingLog).count() == 1