185 lines
6.1 KiB
Python
185 lines
6.1 KiB
Python
"""
|
|
Tests pour ScrapingScheduler avec mocks Redis/RQ.
|
|
"""
|
|
|
|
from dataclasses import dataclass
|
|
|
|
import pytest
|
|
from redis.exceptions import ConnectionError as RedisConnectionError
|
|
|
|
from pricewatch.app.tasks.scheduler import (
|
|
RedisUnavailableError,
|
|
ScheduledJobInfo,
|
|
ScrapingScheduler,
|
|
check_redis_connection,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class FakeRedis:
|
|
url: str
|
|
|
|
def ping(self):
|
|
"""Simule un ping reussi."""
|
|
return True
|
|
|
|
|
|
class FakeRedisConnectionError:
|
|
"""FakeRedis qui leve une erreur a la connexion."""
|
|
|
|
def __init__(self, url: str):
|
|
self.url = url
|
|
|
|
def ping(self):
|
|
raise RedisConnectionError("Connection refused")
|
|
|
|
|
|
class DummyQueue:
|
|
def __init__(self, name: str, connection=None) -> None:
|
|
self.name = name
|
|
self.connection = connection
|
|
self.enqueued = []
|
|
|
|
def enqueue(self, func, *args, **kwargs):
|
|
job = type("Job", (), {"id": "job-123"})()
|
|
self.enqueued.append((func, args, kwargs))
|
|
return job
|
|
|
|
|
|
class DummyScheduler:
|
|
def __init__(self, queue=None, connection=None) -> None:
|
|
self.queue = queue
|
|
self.connection = connection
|
|
self.scheduled = []
|
|
|
|
def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None):
|
|
job = type("Job", (), {"id": "job-456"})()
|
|
self.scheduled.append((scheduled_time, func, args, kwargs, interval, repeat))
|
|
return job
|
|
|
|
|
|
@dataclass
|
|
class FakeRedisConfig:
|
|
url: str
|
|
|
|
|
|
@dataclass
|
|
class FakeAppConfig:
|
|
redis: FakeRedisConfig
|
|
|
|
|
|
def test_scheduler_enqueue_immediate(monkeypatch):
|
|
"""Enqueue immediate utilise la queue RQ."""
|
|
config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0"))
|
|
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", lambda url: FakeRedis(url))
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue)
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler)
|
|
|
|
scheduler = ScrapingScheduler(config=config, queue_name="default")
|
|
job = scheduler.enqueue_immediate("https://example.com/product")
|
|
|
|
assert job.id == "job-123"
|
|
assert len(scheduler.queue.enqueued) == 1
|
|
|
|
|
|
def test_scheduler_schedule_product(monkeypatch):
|
|
"""Schedule product cree un job recurrent."""
|
|
config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0"))
|
|
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", lambda url: FakeRedis(url))
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue)
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler)
|
|
|
|
scheduler = ScrapingScheduler(config=config, queue_name="default")
|
|
info = scheduler.schedule_product("https://example.com/product", interval_hours=1)
|
|
|
|
assert isinstance(info, ScheduledJobInfo)
|
|
assert info.job_id == "job-456"
|
|
assert len(scheduler.scheduler.scheduled) == 1
|
|
|
|
|
|
# ============================================================================
|
|
# Tests gestion erreurs Redis
|
|
# ============================================================================
|
|
|
|
|
|
def test_scheduler_redis_connection_error(monkeypatch):
|
|
"""Leve RedisUnavailableError quand Redis n'est pas accessible."""
|
|
config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0"))
|
|
|
|
monkeypatch.setattr(
|
|
"pricewatch.app.tasks.scheduler.redis.from_url",
|
|
lambda url: FakeRedisConnectionError(url),
|
|
)
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue)
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler)
|
|
|
|
scheduler = ScrapingScheduler(config=config, queue_name="default")
|
|
|
|
with pytest.raises(RedisUnavailableError) as exc_info:
|
|
scheduler.enqueue_immediate("https://example.com/product")
|
|
|
|
assert "Redis" in str(exc_info.value.message)
|
|
assert exc_info.value.cause is not None
|
|
|
|
|
|
def test_scheduler_lazy_connection(monkeypatch):
|
|
"""La connexion Redis n'est etablie qu'au premier appel."""
|
|
config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0"))
|
|
connection_calls = []
|
|
|
|
def track_from_url(url):
|
|
connection_calls.append(url)
|
|
return FakeRedis(url)
|
|
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", track_from_url)
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue)
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler)
|
|
|
|
scheduler = ScrapingScheduler(config=config, queue_name="default")
|
|
|
|
# Pas de connexion a la creation
|
|
assert len(connection_calls) == 0
|
|
|
|
# Connexion au premier appel
|
|
scheduler.enqueue_immediate("https://example.com/product")
|
|
assert len(connection_calls) == 1
|
|
|
|
# Pas de nouvelle connexion au deuxieme appel
|
|
scheduler.enqueue_immediate("https://example.com/product2")
|
|
assert len(connection_calls) == 1
|
|
|
|
|
|
def test_check_redis_connection_success(monkeypatch):
|
|
"""check_redis_connection retourne True si Redis repond."""
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.redis.from_url", FakeRedis)
|
|
|
|
assert check_redis_connection("redis://localhost:6379/0") is True
|
|
|
|
|
|
def test_check_redis_connection_failure(monkeypatch):
|
|
"""check_redis_connection retourne False si Redis ne repond pas."""
|
|
monkeypatch.setattr(
|
|
"pricewatch.app.tasks.scheduler.redis.from_url", FakeRedisConnectionError
|
|
)
|
|
|
|
assert check_redis_connection("redis://localhost:6379/0") is False
|
|
|
|
|
|
def test_scheduler_schedule_redis_error(monkeypatch):
|
|
"""schedule_product leve RedisUnavailableError si Redis down."""
|
|
config = FakeAppConfig(redis=FakeRedisConfig(url="redis://localhost:6379/0"))
|
|
|
|
monkeypatch.setattr(
|
|
"pricewatch.app.tasks.scheduler.redis.from_url",
|
|
lambda url: FakeRedisConnectionError(url),
|
|
)
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Queue", DummyQueue)
|
|
monkeypatch.setattr("pricewatch.app.tasks.scheduler.Scheduler", DummyScheduler)
|
|
|
|
scheduler = ScrapingScheduler(config=config, queue_name="default")
|
|
|
|
with pytest.raises(RedisUnavailableError):
|
|
scheduler.schedule_product("https://example.com/product", interval_hours=24)
|