155 lines
5.0 KiB
Python
155 lines
5.0 KiB
Python
"""
|
|
Planification des jobs de scraping via RQ Scheduler.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
import hashlib
|
|
from typing import Optional
|
|
|
|
import redis
|
|
from redis.exceptions import ConnectionError as RedisConnectionError
|
|
from redis.exceptions import RedisError, TimeoutError as RedisTimeoutError
|
|
from rq import Queue
|
|
from rq_scheduler import Scheduler
|
|
|
|
from pricewatch.app.core.config import AppConfig, get_config
|
|
from pricewatch.app.core.logging import get_logger
|
|
from pricewatch.app.tasks.scrape import scrape_product
|
|
|
|
logger = get_logger("tasks.scheduler")
|
|
|
|
|
|
class RedisUnavailableError(Exception):
|
|
"""Exception levee quand Redis n'est pas disponible."""
|
|
|
|
def __init__(self, message: str = "Redis non disponible", cause: Optional[Exception] = None):
|
|
self.message = message
|
|
self.cause = cause
|
|
super().__init__(self.message)
|
|
|
|
|
|
@dataclass
|
|
class ScheduledJobInfo:
|
|
"""Infos de retour pour un job planifie."""
|
|
|
|
job_id: str
|
|
next_run: datetime
|
|
|
|
|
|
def check_redis_connection(redis_url: str) -> bool:
|
|
"""
|
|
Verifie si Redis est accessible.
|
|
|
|
Returns:
|
|
True si Redis repond, False sinon.
|
|
"""
|
|
try:
|
|
conn = redis.from_url(redis_url)
|
|
conn.ping()
|
|
return True
|
|
except (RedisConnectionError, RedisTimeoutError, RedisError) as e:
|
|
logger.debug(f"Redis ping echoue: {e}")
|
|
return False
|
|
|
|
|
|
class ScrapingScheduler:
|
|
"""Scheduler pour les jobs de scraping avec RQ."""
|
|
|
|
def __init__(self, config: Optional[AppConfig] = None, queue_name: str = "default") -> None:
|
|
self.config = config or get_config()
|
|
self._queue_name = queue_name
|
|
self._redis: Optional[redis.Redis] = None
|
|
self._queue: Optional[Queue] = None
|
|
self._scheduler: Optional[Scheduler] = None
|
|
|
|
def _ensure_connected(self) -> None:
|
|
"""Etablit la connexion Redis si necessaire, leve RedisUnavailableError si echec."""
|
|
if self._redis is not None:
|
|
return
|
|
|
|
try:
|
|
self._redis = redis.from_url(self.config.redis.url)
|
|
# Ping pour verifier la connexion
|
|
self._redis.ping()
|
|
self._queue = Queue(self._queue_name, connection=self._redis)
|
|
self._scheduler = Scheduler(queue=self._queue, connection=self._redis)
|
|
logger.debug(f"Connexion Redis etablie: {self.config.redis.url}")
|
|
except (RedisConnectionError, RedisTimeoutError) as e:
|
|
self._redis = None
|
|
msg = f"Impossible de se connecter a Redis ({self.config.redis.url}): {e}"
|
|
logger.error(msg)
|
|
raise RedisUnavailableError(msg, cause=e) from e
|
|
except RedisError as e:
|
|
self._redis = None
|
|
msg = f"Erreur Redis: {e}"
|
|
logger.error(msg)
|
|
raise RedisUnavailableError(msg, cause=e) from e
|
|
|
|
@property
|
|
def redis(self) -> redis.Redis:
|
|
"""Acces a la connexion Redis (lazy)."""
|
|
self._ensure_connected()
|
|
return self._redis # type: ignore
|
|
|
|
@property
|
|
def queue(self) -> Queue:
|
|
"""Acces a la queue RQ (lazy)."""
|
|
self._ensure_connected()
|
|
return self._queue # type: ignore
|
|
|
|
@property
|
|
def scheduler(self) -> Scheduler:
|
|
"""Acces au scheduler RQ (lazy)."""
|
|
self._ensure_connected()
|
|
return self._scheduler # type: ignore
|
|
|
|
def enqueue_immediate(
|
|
self,
|
|
url: str,
|
|
use_playwright: Optional[bool] = None,
|
|
save_db: bool = True,
|
|
):
|
|
"""Enqueue un job immediat."""
|
|
job = self.queue.enqueue(
|
|
scrape_product,
|
|
url,
|
|
use_playwright=use_playwright,
|
|
save_db=save_db,
|
|
)
|
|
logger.info(f"Job enqueued: {job.id}")
|
|
return job
|
|
|
|
def schedule_product(
|
|
self,
|
|
url: str,
|
|
interval_hours: int = 24,
|
|
use_playwright: Optional[bool] = None,
|
|
save_db: bool = True,
|
|
job_id: Optional[str] = None,
|
|
) -> ScheduledJobInfo:
|
|
"""Planifie un scraping recurrent (intervalle en heures)."""
|
|
interval_seconds = int(timedelta(hours=interval_hours).total_seconds())
|
|
next_run = datetime.now(timezone.utc) + timedelta(seconds=interval_seconds)
|
|
|
|
resolved_job_id = job_id or self._job_id_for_url(url)
|
|
job = self.scheduler.schedule(
|
|
scheduled_time=next_run,
|
|
func=scrape_product,
|
|
args=[url],
|
|
kwargs={"use_playwright": use_playwright, "save_db": save_db},
|
|
interval=interval_seconds,
|
|
repeat=None,
|
|
id=resolved_job_id,
|
|
)
|
|
logger.info(f"Job planifie: {job.id}, prochaine execution: {next_run.isoformat()}")
|
|
return ScheduledJobInfo(job_id=job.id, next_run=next_run)
|
|
|
|
@staticmethod
|
|
def _job_id_for_url(url: str) -> str:
|
|
"""Genere un job_id stable pour eviter les doublons."""
|
|
fingerprint = hashlib.sha1(url.strip().lower().encode("utf-8")).hexdigest()
|
|
return f"scrape_{fingerprint}"
|