""" Tests pour ScrapingScheduler avec mocks Redis/RQ. """ from dataclasses import dataclass import pytest from redis.exceptions import ConnectionError as RedisConnectionError from pricewatch.app.tasks.scheduler import ( RedisUnavailableError, ScheduledJobInfo, ScrapingScheduler, check_redis_connection, ) @dataclass class FakeRedis: url: str def ping(self): """Simule un ping reussi.""" return True class FakeRedisConnectionError: """FakeRedis qui leve une erreur a la connexion.""" def __init__(self, url: str): self.url = url def ping(self): raise RedisConnectionError("Connection refused") class DummyQueue: def __init__(self, name: str, connection=None) -> None: self.name = name self.connection = connection self.enqueued = [] def enqueue(self, func, *args, **kwargs): job = type("Job", (), {"id": "job-123"})() self.enqueued.append((func, args, kwargs)) return job class DummyScheduler: def __init__(self, queue=None, connection=None) -> None: self.queue = queue self.connection = connection self.scheduled = [] def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None): job = type("Job", (), {"id": "job-456"})() self.scheduled.append((scheduled_time, func, args, kwargs, interval, repeat)) return job @dataclass class FakeRedisConfig: url: str @dataclass class FakeAppConfig: redis: FakeRedisConfig def test_scheduler_enqueue_immediate(monkeypatch): """Enqueue immediate utilise la queue RQ.""" config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0")) monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", lambda url: FakeRedis(url)) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler) scheduler = ScrapingScheduler(config=config, queue_name="default") job = scheduler.enqueue_immediate("https://example.com/product") assert job.id == "job-123" assert len(scheduler.queue.enqueued) == 1 def test_scheduler_schedule_product(monkeypatch): """Schedule product cree un job recurrent.""" config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0")) monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", lambda url: FakeRedis(url)) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler) scheduler = ScrapingScheduler(config=config, queue_name="default") info = scheduler.schedule_product("https://example.com/product", interval_hours=1) assert isinstance(info, ScheduledJobInfo) assert info.job_id == "job-456" assert len(scheduler.scheduler.scheduled) == 1 # ============================================================================ # Tests gestion erreurs Redis # ============================================================================ def test_scheduler_redis_connection_error(monkeypatch): """Leve RedisUnavailableError quand Redis n'est pas accessible.""" config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0")) monkeypatch.setattr( "pricewatch.app.tasks.scheduler.redis.from_url", lambda url: FakeRedisConnectionError(url), ) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler) scheduler = ScrapingScheduler(config=config, queue_name="default") with pytest.raises(RedisUnavailableError) as exc_info: scheduler.enqueue_immediate("https://example.com/product") assert "Redis" in str(exc_info.value.message) assert exc_info.value.cause is not None def test_scheduler_lazy_connection(monkeypatch): """La connexion Redis n'est etablie qu'au premier appel.""" config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0")) connection_calls = [] def track_from_url(url): connection_calls.append(url) return FakeRedis(url) monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", track_from_url) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler) scheduler = ScrapingScheduler(config=config, queue_name="default") # Pas de connexion a la creation assert len(connection_calls) == 0 # Connexion au premier appel scheduler.enqueue_immediate("https://example.com/product") assert len(connection_calls) == 1 # Pas de nouvelle connexion au deuxieme appel scheduler.enqueue_immediate("https://example.com/product2") assert len(connection_calls) == 1 def test_check_redis_connection_success(monkeypatch): """check_redis_connection retourne True si Redis repond.""" monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", FakeRedis) assert check_redis_connection("redis://localhost:6379/0") is True def test_check_redis_connection_failure(monkeypatch): """check_redis_connection retourne False si Redis ne repond pas.""" monkeypatch.setattr( "pricewatch.app.tasks.scheduler.redis.from_url", FakeRedisConnectionError ) assert check_redis_connection("redis://localhost:6379/0") is False def test_scheduler_schedule_redis_error(monkeypatch): """schedule_product leve RedisUnavailableError si Redis down.""" config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0")) monkeypatch.setattr( "pricewatch.app.tasks.scheduler.redis.from_url", lambda url: FakeRedisConnectionError(url), ) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue) monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler) scheduler = ScrapingScheduler(config=config, queue_name="default") with pytest.raises(RedisUnavailableError): scheduler.schedule_product("https://example.com/product", interval_hours=24)