198 lines
7.2 KiB
Python
198 lines
7.2 KiB
Python
"""
|
|
Repository pattern pour la persistence SQLAlchemy.
|
|
|
|
Centralise les operations CRUD sur les modeles DB a partir d'un ProductSnapshot.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import Session
|
|
|
|
from pricewatch.app.core.logging import get_logger
|
|
from pricewatch.app.core.schema import ProductSnapshot
|
|
from pricewatch.app.db.models import (
|
|
ClassificationRule,
|
|
PriceHistory,
|
|
Product,
|
|
ProductImage,
|
|
ProductSpec,
|
|
ScrapingLog,
|
|
)
|
|
|
|
logger = get_logger("db.repository")
|
|
|
|
|
|
class ProductRepository:
|
|
"""Repository de persistence pour ProductSnapshot."""
|
|
|
|
def __init__(self, session: Session) -> None:
|
|
self.session = session
|
|
|
|
def get_or_create(self, source: str, reference: str, url: str) -> Product:
|
|
"""
|
|
Recuperer ou creer un produit par cle naturelle (source, reference).
|
|
"""
|
|
product = (
|
|
self.session.query(Product)
|
|
.filter(Product.source == source, Product.reference == reference)
|
|
.one_or_none()
|
|
)
|
|
if product:
|
|
return product
|
|
|
|
product = Product(source=source, reference=reference, url=url)
|
|
self.session.add(product)
|
|
self.session.flush()
|
|
return product
|
|
|
|
def update_product_metadata(self, product: Product, snapshot: ProductSnapshot) -> None:
|
|
"""Met a jour les metadonnees produit si disponibles."""
|
|
if snapshot.url:
|
|
product.url = snapshot.url
|
|
if snapshot.title:
|
|
product.title = snapshot.title
|
|
if snapshot.category:
|
|
product.category = snapshot.category
|
|
if snapshot.type:
|
|
product.type = snapshot.type
|
|
if snapshot.description:
|
|
product.description = snapshot.description
|
|
if snapshot.currency:
|
|
product.currency = snapshot.currency
|
|
if snapshot.msrp is not None:
|
|
product.msrp = snapshot.msrp
|
|
if snapshot.rating_value is not None:
|
|
product.rating_value = snapshot.rating_value
|
|
if snapshot.rating_count is not None:
|
|
product.rating_count = snapshot.rating_count
|
|
if snapshot.amazon_choice is not None:
|
|
product.amazon_choice = snapshot.amazon_choice
|
|
if snapshot.amazon_choice_label:
|
|
product.amazon_choice_label = snapshot.amazon_choice_label
|
|
if snapshot.discount_text:
|
|
product.discount_text = snapshot.discount_text
|
|
if snapshot.stock_text:
|
|
product.stock_text = snapshot.stock_text
|
|
if snapshot.in_stock is not None:
|
|
product.in_stock = snapshot.in_stock
|
|
if snapshot.model_number:
|
|
product.model_number = snapshot.model_number
|
|
if snapshot.model_name:
|
|
product.model_name = snapshot.model_name
|
|
|
|
def apply_classification(self, snapshot: ProductSnapshot) -> None:
|
|
"""Applique les regles de classification au snapshot."""
|
|
if not snapshot.title:
|
|
return
|
|
|
|
rules = (
|
|
self.session.query(ClassificationRule)
|
|
.filter(ClassificationRule.is_active == True)
|
|
.order_by(ClassificationRule.sort_order, ClassificationRule.id)
|
|
.all()
|
|
)
|
|
if not rules:
|
|
return
|
|
|
|
title = snapshot.title.lower()
|
|
for rule in rules:
|
|
keywords = rule.keywords or []
|
|
if isinstance(keywords, str):
|
|
keywords = [keywords]
|
|
if any(keyword and keyword.lower() in title for keyword in keywords):
|
|
if rule.category:
|
|
snapshot.category = rule.category
|
|
if rule.type:
|
|
snapshot.type = rule.type
|
|
return
|
|
|
|
def add_price_history(self, product: Product, snapshot: ProductSnapshot) -> Optional[PriceHistory]:
|
|
"""Ajoute une entree d'historique de prix si inexistante."""
|
|
existing = (
|
|
self.session.query(PriceHistory)
|
|
.filter(
|
|
PriceHistory.product_id == product.id,
|
|
PriceHistory.fetched_at == snapshot.fetched_at,
|
|
)
|
|
.one_or_none()
|
|
)
|
|
if existing:
|
|
return existing
|
|
|
|
price_entry = PriceHistory(
|
|
product_id=product.id,
|
|
price=snapshot.price,
|
|
shipping_cost=snapshot.shipping_cost,
|
|
stock_status=snapshot.stock_status,
|
|
fetch_method=snapshot.debug.method,
|
|
fetch_status=snapshot.debug.status,
|
|
fetched_at=snapshot.fetched_at,
|
|
)
|
|
self.session.add(price_entry)
|
|
return price_entry
|
|
|
|
def sync_images(self, product: Product, images: list[str]) -> None:
|
|
"""Synchronise les images (ajout des nouvelles)."""
|
|
existing_urls = {image.image_url for image in product.images}
|
|
for position, url in enumerate(images):
|
|
if url in existing_urls:
|
|
continue
|
|
self.session.add(ProductImage(product_id=product.id, image_url=url, position=position))
|
|
|
|
def sync_specs(self, product: Product, specs: dict[str, str]) -> None:
|
|
"""Synchronise les specs (upsert par cle)."""
|
|
existing_specs = {spec.spec_key: spec for spec in product.specs}
|
|
for key, value in specs.items():
|
|
if key in existing_specs:
|
|
existing_specs[key].spec_value = value
|
|
else:
|
|
self.session.add(ProductSpec(product_id=product.id, spec_key=key, spec_value=value))
|
|
|
|
def add_scraping_log(self, snapshot: ProductSnapshot, product_id: Optional[int]) -> ScrapingLog:
|
|
"""Ajoute un log de scraping pour observabilite."""
|
|
log_entry = ScrapingLog(
|
|
product_id=product_id,
|
|
url=snapshot.url,
|
|
source=snapshot.source,
|
|
reference=snapshot.reference,
|
|
fetch_method=snapshot.debug.method,
|
|
fetch_status=snapshot.debug.status,
|
|
fetched_at=snapshot.fetched_at,
|
|
duration_ms=snapshot.debug.duration_ms,
|
|
html_size_bytes=snapshot.debug.html_size_bytes,
|
|
errors=snapshot.debug.errors or None,
|
|
notes=snapshot.debug.notes or None,
|
|
)
|
|
self.session.add(log_entry)
|
|
return log_entry
|
|
|
|
def save_snapshot(self, snapshot: ProductSnapshot) -> Optional[int]:
|
|
"""
|
|
Persiste un ProductSnapshot complet dans la base.
|
|
|
|
Retourne l'id produit ou None si reference absente.
|
|
"""
|
|
if not snapshot.reference:
|
|
logger.warning("Reference absente: persistence ignoree")
|
|
self.add_scraping_log(snapshot, product_id=None)
|
|
return None
|
|
|
|
product = self.get_or_create(snapshot.source, snapshot.reference, snapshot.url)
|
|
self.update_product_metadata(product, snapshot)
|
|
self.add_price_history(product, snapshot)
|
|
self.sync_images(product, snapshot.images)
|
|
self.sync_specs(product, snapshot.specs)
|
|
self.add_scraping_log(snapshot, product_id=product.id)
|
|
return product.id
|
|
|
|
def safe_save_snapshot(self, snapshot: ProductSnapshot) -> Optional[int]:
|
|
"""Sauvegarde avec gestion d'erreur SQLAlchemy."""
|
|
try:
|
|
return self.save_snapshot(snapshot)
|
|
except SQLAlchemyError as exc:
|
|
logger.error(f"Erreur SQLAlchemy: {exc}")
|
|
raise
|