128 lines
4.1 KiB
Python
128 lines
4.1 KiB
Python
"""
|
|
Tests pour la gestion des erreurs Redis dans le scheduler.
|
|
"""
|
|
|
|
import pytest
|
|
from redis.exceptions import ConnectionError as RedisConnectionError
|
|
from redis.exceptions import RedisError, TimeoutError as RedisTimeoutError
|
|
|
|
from pricewatch.app.tasks.scheduler import RedisUnavailableError, ScrapingScheduler, check_redis_connection
|
|
|
|
|
|
class DummyRedisOk:
|
|
def ping(self) -> bool:
|
|
return True
|
|
|
|
|
|
class DummyRedisError:
|
|
def __init__(self, exc: Exception) -> None:
|
|
self._exc = exc
|
|
|
|
def ping(self) -> None:
|
|
raise self._exc
|
|
|
|
|
|
class DummyQueue:
|
|
def __init__(self, name: str, connection=None) -> None:
|
|
self.name = name
|
|
self.connection = connection
|
|
|
|
|
|
class DummyScheduler:
|
|
def __init__(self, queue=None, connection=None) -> None:
|
|
self.queue = queue
|
|
self.connection = connection
|
|
|
|
def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None):
|
|
return type("Job", (), {"id": "job-redis"})()
|
|
|
|
|
|
class FakeRedisConfig:
|
|
def __init__(self, url: str) -> None:
|
|
self.url = url
|
|
|
|
|
|
class FakeAppConfig:
|
|
def __init__(self, redis_url: str) -> None:
|
|
self.redis = FakeRedisConfig(redis_url)
|
|
|
|
|
|
def test_check_redis_connection_success(monkeypatch):
|
|
"""Ping OK retourne True."""
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", lambda url: DummyRedisOk())
|
|
assert check_redis_connection("redis://localhost:6379/0") is True
|
|
|
|
|
|
def test_check_redis_connection_failure_connection(monkeypatch):
|
|
"""Ping en echec retourne False."""
|
|
monkeypatch.setattr(
|
|
"pricewatch.app.tasks.scheduler.redis.from_url",
|
|
lambda url: DummyRedisError(RedisConnectionError("no")),
|
|
)
|
|
assert check_redis_connection("redis://localhost:6379/0") is False
|
|
|
|
|
|
def test_check_redis_connection_failure_timeout(monkeypatch):
|
|
"""Timeout Redis retourne False."""
|
|
monkeypatch.setattr(
|
|
"pricewatch.app.tasks.scheduler.redis.from_url",
|
|
lambda url: DummyRedisError(RedisTimeoutError("timeout")),
|
|
)
|
|
assert check_redis_connection("redis://localhost:6379/0") is False
|
|
|
|
|
|
def test_scheduler_lazy_connection(monkeypatch):
|
|
"""La connexion Redis est lazy."""
|
|
config = FakeAppConfig("redis://localhost:6379/0")
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", lambda url: DummyRedisOk())
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue)
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler)
|
|
|
|
scheduler = ScrapingScheduler(config=config)
|
|
assert scheduler._redis is None
|
|
|
|
_ = scheduler.queue
|
|
assert scheduler._redis is not None
|
|
|
|
|
|
def test_scheduler_redis_connection_error(monkeypatch):
|
|
"""Une erreur de connexion leve RedisUnavailableError."""
|
|
config = FakeAppConfig("redis://localhost:6379/0")
|
|
|
|
def raise_connection(url):
|
|
raise RedisConnectionError("no")
|
|
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", raise_connection)
|
|
|
|
scheduler = ScrapingScheduler(config=config)
|
|
with pytest.raises(RedisUnavailableError):
|
|
_ = scheduler.queue
|
|
|
|
|
|
def test_scheduler_schedule_redis_error(monkeypatch):
|
|
"""Une erreur Redis leve RedisUnavailableError lors du schedule."""
|
|
config = FakeAppConfig("redis://localhost:6379/0")
|
|
|
|
monkeypatch.setattr(
|
|
"pricewatch.app.tasks.scheduler.redis.from_url",
|
|
lambda url: DummyRedisError(RedisError("boom")),
|
|
)
|
|
|
|
scheduler = ScrapingScheduler(config=config)
|
|
with pytest.raises(RedisUnavailableError):
|
|
scheduler.schedule_product("https://example.com/product", interval_hours=1)
|
|
|
|
|
|
def test_scheduler_enqueue_redis_error(monkeypatch):
|
|
"""Une erreur Redis leve RedisUnavailableError lors de l'enqueue."""
|
|
config = FakeAppConfig("redis://localhost:6379/0")
|
|
|
|
monkeypatch.setattr(
|
|
"pricewatch.app.tasks.scheduler.redis.from_url",
|
|
lambda url: DummyRedisError(RedisError("boom")),
|
|
)
|
|
|
|
scheduler = ScrapingScheduler(config=config)
|
|
with pytest.raises(RedisUnavailableError):
|
|
scheduler.enqueue_immediate("https://example.com/product")
|