This commit is contained in:
Gilles Soulier
2026-01-05 16:08:01 +01:00
parent dcba044cd6
commit c67befc549
2215 changed files with 26743 additions and 329 deletions

0
backend/app/__init__.py Normal file → Executable file
View File

0
backend/app/api/__init__.py Normal file → Executable file
View File

0
backend/app/api/benchmark.py Normal file → Executable file
View File

0
backend/app/api/devices.py Normal file → Executable file
View File

0
backend/app/api/docs.py Normal file → Executable file
View File

View File

@@ -0,0 +1,7 @@
"""
Linux BenchTools - API Endpoints
"""
from . import peripherals, locations
__all__ = ["peripherals", "locations"]

View File

@@ -0,0 +1,303 @@
"""
Linux BenchTools - Locations API Endpoints
"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from sqlalchemy.orm import Session
from typing import List, Optional
import os
import shutil
from app.db.session import get_peripherals_db
from app.services.peripheral_service import LocationService
from app.schemas.peripheral import (
LocationCreate, LocationUpdate, LocationSchema, LocationTreeNode
)
from app.models.location import Location
from app.utils.image_processor import ImageProcessor
from app.utils.qr_generator import QRCodeGenerator
from app.core.config import settings
router = APIRouter()
# ========================================
# LOCATION CRUD
# ========================================
@router.post("/", response_model=LocationSchema, status_code=201)
def create_location(
location: LocationCreate,
db: Session = Depends(get_peripherals_db)
):
"""Create a new location"""
# Check parent exists if specified
if location.parent_id:
parent = db.query(Location).filter(Location.id == location.parent_id).first()
if not parent:
raise HTTPException(status_code=404, detail="Parent location not found")
# Check for duplicate name
existing = db.query(Location).filter(Location.nom == location.nom).first()
if existing:
raise HTTPException(status_code=400, detail="Location with this name already exists")
db_location = Location(**location.model_dump())
db.add(db_location)
db.commit()
db.refresh(db_location)
return db_location
@router.get("/", response_model=List[LocationSchema])
def list_locations(
parent_id: Optional[int] = None,
db: Session = Depends(get_peripherals_db)
):
"""List all locations (optionally filtered by parent)"""
query = db.query(Location)
if parent_id is not None:
query = query.filter(Location.parent_id == parent_id)
return query.order_by(Location.ordre_affichage, Location.nom).all()
@router.get("/tree", response_model=List[dict])
def get_location_tree(db: Session = Depends(get_peripherals_db)):
"""Get hierarchical location tree"""
return LocationService.get_location_tree(db)
@router.get("/{location_id}", response_model=LocationSchema)
def get_location(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get a location by ID"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
return location
@router.get("/{location_id}/path", response_model=List[LocationSchema])
def get_location_path(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get full path from root to location"""
path = LocationService.get_location_path(db, location_id)
if not path:
raise HTTPException(status_code=404, detail="Location not found")
return path
@router.put("/{location_id}", response_model=LocationSchema)
def update_location(
location_id: int,
location_data: LocationUpdate,
db: Session = Depends(get_peripherals_db)
):
"""Update a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Check parent exists if being changed
update_dict = location_data.model_dump(exclude_unset=True)
if "parent_id" in update_dict and update_dict["parent_id"]:
parent = db.query(Location).filter(Location.id == update_dict["parent_id"]).first()
if not parent:
raise HTTPException(status_code=404, detail="Parent location not found")
# Prevent circular reference
if update_dict["parent_id"] == location_id:
raise HTTPException(status_code=400, detail="Location cannot be its own parent")
# Check for duplicate name if name is being changed
if "nom" in update_dict and update_dict["nom"] != location.nom:
existing = db.query(Location).filter(Location.nom == update_dict["nom"]).first()
if existing:
raise HTTPException(status_code=400, detail="Location with this name already exists")
# Update fields
for key, value in update_dict.items():
setattr(location, key, value)
db.commit()
db.refresh(location)
return location
@router.delete("/{location_id}", status_code=204)
def delete_location(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Check if location has children
children = db.query(Location).filter(Location.parent_id == location_id).count()
if children > 0:
raise HTTPException(status_code=400, detail="Cannot delete location with children")
# Check if location has peripherals
count = LocationService.count_peripherals_in_location(db, location_id)
if count > 0:
raise HTTPException(status_code=400, detail="Cannot delete location with peripherals")
# Delete image and QR code files if they exist
if location.image_path and os.path.exists(location.image_path):
os.remove(location.image_path)
if location.qr_code_path and os.path.exists(location.qr_code_path):
os.remove(location.qr_code_path)
db.delete(location)
db.commit()
@router.get("/{location_id}/count")
def count_peripherals(
location_id: int,
recursive: bool = False,
db: Session = Depends(get_peripherals_db)
):
"""Count peripherals in a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
count = LocationService.count_peripherals_in_location(db, location_id, recursive)
return {"location_id": location_id, "count": count, "recursive": recursive}
# ========================================
# LOCATION IMAGES
# ========================================
@router.post("/{location_id}/image", response_model=LocationSchema)
async def upload_location_image(
location_id: int,
file: UploadFile = File(...),
db: Session = Depends(get_peripherals_db)
):
"""Upload an image for a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Validate image
temp_path = f"/tmp/{file.filename}"
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
if not ImageProcessor.is_valid_image(temp_path):
os.remove(temp_path)
raise HTTPException(status_code=400, detail="Invalid image file")
# Create upload directory
upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "locations", "images")
os.makedirs(upload_dir, exist_ok=True)
try:
# Process image
processed_path, _ = ImageProcessor.process_image(
temp_path,
upload_dir,
max_width=800,
max_height=600
)
# Delete old image if exists
if location.image_path and os.path.exists(location.image_path):
os.remove(location.image_path)
# Update location
location.image_path = processed_path
db.commit()
db.refresh(location)
return location
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
@router.delete("/{location_id}/image", status_code=204)
def delete_location_image(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete location image"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
if location.image_path and os.path.exists(location.image_path):
os.remove(location.image_path)
location.image_path = None
db.commit()
# ========================================
# LOCATION QR CODES
# ========================================
@router.post("/{location_id}/qr-code", response_model=LocationSchema)
def generate_qr_code(
location_id: int,
base_url: str,
db: Session = Depends(get_peripherals_db)
):
"""Generate QR code for a location"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
# Create QR code directory
qr_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "locations", "qrcodes")
os.makedirs(qr_dir, exist_ok=True)
# Generate QR code
qr_path = QRCodeGenerator.generate_location_qr(
location_id=location.id,
location_name=location.nom,
base_url=base_url,
output_dir=qr_dir
)
# Delete old QR code if exists
if location.qr_code_path and os.path.exists(location.qr_code_path):
os.remove(location.qr_code_path)
# Update location
location.qr_code_path = qr_path
db.commit()
db.refresh(location)
return location
@router.delete("/{location_id}/qr-code", status_code=204)
def delete_qr_code(
location_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete location QR code"""
location = db.query(Location).filter(Location.id == location_id).first()
if not location:
raise HTTPException(status_code=404, detail="Location not found")
if location.qr_code_path and os.path.exists(location.qr_code_path):
os.remove(location.qr_code_path)
location.qr_code_path = None
db.commit()

File diff suppressed because it is too large Load Diff

0
backend/app/api/links.py Normal file → Executable file
View File

0
backend/app/core/__init__.py Normal file → Executable file
View File

25
backend/app/core/config.py Normal file → Executable file
View File

@@ -13,13 +13,29 @@ class Settings(BaseSettings):
API_TOKEN: str = os.getenv("API_TOKEN", "CHANGE_ME_INSECURE_DEFAULT")
API_PREFIX: str = "/api"
# Database
# Database - Main (Benchmarks)
DATABASE_URL: str = os.getenv("DATABASE_URL", "sqlite:///./backend/data/data.db")
# Database - Peripherals (Separate DB)
PERIPHERALS_DB_URL: str = os.getenv("PERIPHERALS_DB_URL", "sqlite:///./backend/data/peripherals.db")
# Module Peripherals
PERIPHERALS_MODULE_ENABLED: bool = os.getenv("PERIPHERALS_MODULE_ENABLED", "true").lower() == "true"
# Upload configuration
UPLOAD_DIR: str = os.getenv("UPLOAD_DIR", "./uploads")
PERIPHERALS_UPLOAD_DIR: str = os.getenv("PERIPHERALS_UPLOAD_DIR", "./uploads/peripherals")
MAX_UPLOAD_SIZE: int = 50 * 1024 * 1024 # 50 MB
# Image compression
IMAGE_COMPRESSION_ENABLED: bool = True
IMAGE_COMPRESSION_QUALITY: int = 85
IMAGE_MAX_WIDTH: int = 1920
IMAGE_MAX_HEIGHT: int = 1080
THUMBNAIL_SIZE: int = 48
THUMBNAIL_QUALITY: int = 75
THUMBNAIL_FORMAT: str = "webp"
# CORS
CORS_ORIGINS: list = ["*"] # For local network access
@@ -29,10 +45,11 @@ class Settings(BaseSettings):
APP_DESCRIPTION: str = "Self-hosted benchmarking and hardware inventory for Linux machines"
# Score weights for global score calculation
SCORE_WEIGHT_CPU: float = 0.30
# CPU weight is double the base weight (0.40 vs 0.20)
SCORE_WEIGHT_CPU: float = 0.40
SCORE_WEIGHT_MEMORY: float = 0.20
SCORE_WEIGHT_DISK: float = 0.25
SCORE_WEIGHT_NETWORK: float = 0.15
SCORE_WEIGHT_DISK: float = 0.20
SCORE_WEIGHT_NETWORK: float = 0.10
SCORE_WEIGHT_GPU: float = 0.10
class Config:

0
backend/app/core/security.py Normal file → Executable file
View File

0
backend/app/db/__init__.py Normal file → Executable file
View File

8
backend/app/db/base.py Normal file → Executable file
View File

@@ -4,12 +4,20 @@ Linux BenchTools - Database Base
from sqlalchemy.ext.declarative import declarative_base
# Base for main database (benchmarks, devices)
Base = declarative_base()
# Base for peripherals database (separate)
BasePeripherals = declarative_base()
# Import all models here for Alembic/migrations
# Main DB models
from app.models.device import Device # noqa
from app.models.hardware_snapshot import HardwareSnapshot # noqa
from app.models.benchmark import Benchmark # noqa
from app.models.disk_smart import DiskSMART # noqa
from app.models.manufacturer_link import ManufacturerLink # noqa
from app.models.document import Document # noqa
# Peripherals DB models (imported when module enabled)
# Will be imported in init_db.py

48
backend/app/db/init_db.py Normal file → Executable file
View File

@@ -3,8 +3,8 @@ Linux BenchTools - Database Initialization
"""
import os
from app.db.base import Base
from app.db.session import engine
from app.db.base import Base, BasePeripherals
from app.db.session import engine, engine_peripherals
from app.core.config import settings
@@ -24,8 +24,48 @@ def init_db():
if db_dir:
os.makedirs(db_dir, exist_ok=True)
# Create all tables
# Create all tables for main database
Base.metadata.create_all(bind=engine)
print(f"Database initialized: {settings.DATABASE_URL}")
print(f"Main database initialized: {settings.DATABASE_URL}")
print(f"✅ Upload directory created: {settings.UPLOAD_DIR}")
# Initialize peripherals database if module is enabled
if settings.PERIPHERALS_MODULE_ENABLED:
init_peripherals_db()
def init_peripherals_db():
"""
Initialize peripherals database:
- Create all tables
- Create upload directories
- Import peripheral models
"""
# Import models to register them
from app.models.peripheral import (
Peripheral, PeripheralPhoto, PeripheralDocument,
PeripheralLink, PeripheralLoan
)
from app.models.location import Location
from app.models.peripheral_history import PeripheralLocationHistory
# Create peripherals upload directories
os.makedirs(settings.PERIPHERALS_UPLOAD_DIR, exist_ok=True)
os.makedirs(os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "photos"), exist_ok=True)
os.makedirs(os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "documents"), exist_ok=True)
os.makedirs(os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "locations", "images"), exist_ok=True)
os.makedirs(os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "locations", "qrcodes"), exist_ok=True)
# Create database directory if using SQLite
if "sqlite" in settings.PERIPHERALS_DB_URL:
db_path = settings.PERIPHERALS_DB_URL.replace("sqlite:///", "")
db_dir = os.path.dirname(db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
# Create all tables for peripherals database
BasePeripherals.metadata.create_all(bind=engine_peripherals)
print(f"✅ Peripherals database initialized: {settings.PERIPHERALS_DB_URL}")
print(f"✅ Peripherals upload directories created: {settings.PERIPHERALS_UPLOAD_DIR}")

62
backend/app/db/session.py Normal file → Executable file
View File

@@ -1,28 +1,70 @@
"""
Linux BenchTools - Database Session
Linux BenchTools - Database Sessions
"""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, Session
from app.core.config import settings
# Create engine
engine = create_engine(
# ========================================
# DATABASE PRINCIPALE (Benchmarks)
# ========================================
# Create main engine
engine_main = create_engine(
settings.DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {},
echo=False, # Set to True for SQL query logging during development
)
# Create SessionLocal class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create SessionLocal class for main DB
SessionLocalMain = sessionmaker(autocommit=False, autoflush=False, bind=engine_main)
# Backward compatibility
engine = engine_main
SessionLocal = SessionLocalMain
# Dependency to get DB session
def get_db():
# ========================================
# DATABASE PÉRIPHÉRIQUES
# ========================================
# Create peripherals engine
engine_peripherals = create_engine(
settings.PERIPHERALS_DB_URL,
connect_args={"check_same_thread": False} if "sqlite" in settings.PERIPHERALS_DB_URL else {},
echo=False,
)
# Create SessionLocal class for peripherals DB
SessionLocalPeripherals = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine_peripherals
)
# ========================================
# DEPENDENCY INJECTION
# ========================================
def get_db() -> Session:
"""
Database session dependency for FastAPI
Main database session dependency for FastAPI (benchmarks, devices)
"""
db = SessionLocal()
db = SessionLocalMain()
try:
yield db
finally:
db.close()
def get_peripherals_db() -> Session:
"""
Peripherals database session dependency for FastAPI
"""
db = SessionLocalPeripherals()
try:
yield db
finally:

56
backend/app/main.py Normal file → Executable file
View File

@@ -6,11 +6,15 @@ from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from sqlalchemy.orm import Session
from datetime import datetime
import os
import shutil
from app.core.config import settings
from app.db.init_db import init_db
from app.db.session import get_db
from app.api import benchmark, devices, links, docs
from app.api.endpoints import peripherals, locations
@asynccontextmanager
@@ -48,6 +52,11 @@ app.include_router(devices.router, prefix=settings.API_PREFIX, tags=["Devices"])
app.include_router(links.router, prefix=settings.API_PREFIX, tags=["Links"])
app.include_router(docs.router, prefix=settings.API_PREFIX, tags=["Documents"])
# Peripherals module (if enabled)
if settings.PERIPHERALS_MODULE_ENABLED:
app.include_router(peripherals.router, prefix=f"{settings.API_PREFIX}/peripherals", tags=["Peripherals"])
app.include_router(locations.router, prefix=f"{settings.API_PREFIX}/locations", tags=["Locations"])
# Root endpoint
@app.get("/")
@@ -100,7 +109,52 @@ async def get_config():
"""Get frontend configuration (API token, server URLs, etc.)"""
return {
"api_token": settings.API_TOKEN,
"iperf_server": "10.0.1.97"
"iperf_server": "10.0.0.50"
}
def _sqlite_path(url: str) -> str:
if url.startswith("sqlite:////"):
return url.replace("sqlite:////", "/")
if url.startswith("sqlite:///"):
return url.replace("sqlite:///", "")
return ""
@app.post(f"{settings.API_PREFIX}/backup")
async def backup_databases():
"""Create timestamped backups of the main and peripherals databases."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backups = []
main_db = _sqlite_path(settings.DATABASE_URL)
peripherals_db = _sqlite_path(settings.PERIPHERALS_DB_URL)
db_paths = {
"main": main_db,
"peripherals": peripherals_db
}
# Use main DB directory for backups
base_dir = os.path.dirname(main_db) if main_db else "/app/data"
backup_dir = os.path.join(base_dir, "backups")
os.makedirs(backup_dir, exist_ok=True)
for key, path in db_paths.items():
if not path or not os.path.exists(path):
continue
filename = f"{key}_backup_{timestamp}.db"
dest = os.path.join(backup_dir, filename)
shutil.copy2(path, dest)
backups.append({
"name": key,
"source": path,
"destination": dest,
"filename": filename
})
return {
"success": True,
"timestamp": timestamp,
"backup_dir": backup_dir,
"backups": backups
}

0
backend/app/models/__init__.py Normal file → Executable file
View File

0
backend/app/models/benchmark.py Normal file → Executable file
View File

0
backend/app/models/device.py Normal file → Executable file
View File

0
backend/app/models/disk_smart.py Normal file → Executable file
View File

0
backend/app/models/document.py Normal file → Executable file
View File

0
backend/app/models/hardware_snapshot.py Normal file → Executable file
View File

26
backend/app/models/location.py Executable file
View File

@@ -0,0 +1,26 @@
"""
Linux BenchTools - Location Models
"""
from sqlalchemy import Column, Integer, String, Text
from app.db.base import BasePeripherals
class Location(BasePeripherals):
"""
Physical locations (rooms, closets, drawers, shelves)
Hierarchical structure for organizing peripherals
"""
__tablename__ = "locations"
id = Column(Integer, primary_key=True, index=True)
nom = Column(String(255), nullable=False, unique=True)
type = Column(String(50), nullable=False, index=True) # root, piece, placard, tiroir, etagere, meuble, boite
parent_id = Column(Integer, index=True) # Hierarchical relationship
description = Column(Text)
image_path = Column(String(500)) # Photo of the location
qr_code_path = Column(String(500)) # QR code for quick access
ordre_affichage = Column(Integer, default=0)
def __repr__(self):
return f"<Location(id={self.id}, nom='{self.nom}', type='{self.type}')>"

0
backend/app/models/manufacturer_link.py Normal file → Executable file
View File

234
backend/app/models/peripheral.py Executable file
View File

@@ -0,0 +1,234 @@
"""
Linux BenchTools - Peripheral Models
"""
from sqlalchemy import Column, Integer, String, Float, Boolean, Date, DateTime, Text, JSON
from sqlalchemy.sql import func
from app.db.base import BasePeripherals
class Peripheral(BasePeripherals):
"""
Peripheral model - Main table for all peripherals
"""
__tablename__ = "peripherals"
# ========================================
# IDENTIFICATION
# ========================================
id = Column(Integer, primary_key=True, index=True)
nom = Column(String(255), nullable=False, index=True)
type_principal = Column(String(100), nullable=False, index=True)
sous_type = Column(String(100), index=True)
marque = Column(String(100), index=True)
modele = Column(String(255))
fabricant = Column(String(255)) # iManufacturer (USB manufacturer string)
produit = Column(String(255)) # iProduct (USB product string)
numero_serie = Column(String(255))
ean_upc = Column(String(50))
# ========================================
# ACHAT
# ========================================
boutique = Column(String(255))
date_achat = Column(Date)
prix = Column(Float)
devise = Column(String(10), default="EUR")
garantie_duree_mois = Column(Integer)
garantie_expiration = Column(Date)
# ========================================
# ÉVALUATION
# ========================================
rating = Column(Float, default=0.0) # 0-5 étoiles
# ========================================
# STOCK
# ========================================
quantite_totale = Column(Integer, default=1)
quantite_disponible = Column(Integer, default=1)
seuil_alerte = Column(Integer, default=0)
# ========================================
# MÉTADONNÉES
# ========================================
date_creation = Column(DateTime, server_default=func.now())
date_modification = Column(DateTime, onupdate=func.now())
etat = Column(String(50), default="Neuf", index=True) # Neuf, Bon, Usagé, Défectueux, Retiré
localisation = Column(String(255))
proprietaire = Column(String(100))
tags = Column(Text) # JSON array
notes = Column(Text)
# ========================================
# LINUX IDENTIFICATION
# ========================================
device_path = Column(String(255))
sysfs_path = Column(String(500))
vendor_id = Column(String(20))
product_id = Column(String(20))
usb_device_id = Column(String(20)) # idVendor:idProduct (e.g. 1d6b:0003)
iManufacturer = Column(Text) # USB manufacturer string from lsusb
iProduct = Column(Text) # USB product string from lsusb
class_id = Column(String(20))
driver_utilise = Column(String(100))
modules_kernel = Column(Text) # JSON
udev_rules = Column(Text)
identifiant_systeme = Column(Text)
# ========================================
# INSTALLATION
# ========================================
installation_auto = Column(Boolean, default=False)
driver_requis = Column(Text)
firmware_requis = Column(Text)
paquets_necessaires = Column(Text) # JSON
commandes_installation = Column(Text)
problemes_connus = Column(Text)
solutions = Column(Text)
compatibilite_noyau = Column(String(100))
# ========================================
# CONNECTIVITÉ
# ========================================
interface_connexion = Column(String(100))
connecte_a = Column(String(255))
consommation_electrique_w = Column(Float)
# ========================================
# LOCALISATION PHYSIQUE
# ========================================
location_id = Column(Integer) # FK vers locations
location_details = Column(String(500))
location_auto = Column(Boolean, default=True)
# ========================================
# PRÊT
# ========================================
en_pret = Column(Boolean, default=False, index=True)
pret_actuel_id = Column(Integer) # FK vers peripheral_loans
prete_a = Column(String(255))
# ========================================
# APPAREIL COMPLET
# ========================================
is_complete_device = Column(Boolean, default=False, index=True)
device_type = Column(String(50)) # desktop, laptop, tablet, smartphone, server, console
# ========================================
# LIEN VERS DB PRINCIPALE (logique, pas FK SQL)
# ========================================
linked_device_id = Column(Integer, index=True) # → devices.id dans data.db (benchmarks)
device_id = Column(Integer, index=True) # → devices.id dans data.db (assignation actuelle)
# ========================================
# DOCUMENTATION
# ========================================
description = Column(Text) # Description courte du périphérique
synthese = Column(Text) # Synthèse complète du fichier markdown importé
cli = Column(Text) # DEPRECATED: Sortie CLI (lsusb -v) - use cli_yaml + cli_raw instead
cli_yaml = Column(Text) # Données structurées CLI au format YAML
cli_raw = Column(Text) # Sortie CLI brute (lsusb -v, lshw, etc.) au format Markdown
specifications = Column(Text) # Spécifications techniques (format Markdown) - contenu brut importé depuis .md
notes = Column(Text) # Notes libres (format Markdown)
# ========================================
# DONNÉES SPÉCIFIQUES
# ========================================
caracteristiques_specifiques = Column(JSON) # Flexible JSON par type
def __repr__(self):
return f"<Peripheral(id={self.id}, nom='{self.nom}', type='{self.type_principal}')>"
class PeripheralPhoto(BasePeripherals):
"""Photos of peripherals"""
__tablename__ = "peripheral_photos"
id = Column(Integer, primary_key=True)
peripheral_id = Column(Integer, nullable=False, index=True)
filename = Column(String(255), nullable=False)
stored_path = Column(String(500), nullable=False)
thumbnail_path = Column(String(500)) # Path to thumbnail image
mime_type = Column(String(100))
size_bytes = Column(Integer)
uploaded_at = Column(DateTime, server_default=func.now())
description = Column(Text)
is_primary = Column(Boolean, default=False)
def __repr__(self):
return f"<PeripheralPhoto(id={self.id}, peripheral_id={self.peripheral_id})>"
class PeripheralDocument(BasePeripherals):
"""Documents attached to peripherals (manuals, warranties, invoices, etc.)"""
__tablename__ = "peripheral_documents"
id = Column(Integer, primary_key=True)
peripheral_id = Column(Integer, nullable=False, index=True)
doc_type = Column(String(50), nullable=False, index=True) # manual, warranty, invoice, datasheet, other
filename = Column(String(255), nullable=False)
stored_path = Column(String(500), nullable=False)
mime_type = Column(String(100))
size_bytes = Column(Integer)
uploaded_at = Column(DateTime, server_default=func.now())
description = Column(Text)
def __repr__(self):
return f"<PeripheralDocument(id={self.id}, type='{self.doc_type}')>"
class PeripheralLink(BasePeripherals):
"""Links related to peripherals (manufacturer, support, drivers, etc.)"""
__tablename__ = "peripheral_links"
id = Column(Integer, primary_key=True)
peripheral_id = Column(Integer, nullable=False, index=True)
link_type = Column(String(50), nullable=False) # manufacturer, support, drivers, documentation, custom
label = Column(String(255), nullable=False)
url = Column(Text, nullable=False)
def __repr__(self):
return f"<PeripheralLink(id={self.id}, label='{self.label}')>"
class PeripheralLoan(BasePeripherals):
"""Loan/borrow tracking for peripherals"""
__tablename__ = "peripheral_loans"
id = Column(Integer, primary_key=True)
peripheral_id = Column(Integer, nullable=False, index=True)
# Emprunteur
emprunte_par = Column(String(255), nullable=False, index=True)
email_emprunteur = Column(String(255))
telephone = Column(String(50))
# Dates
date_pret = Column(Date, nullable=False)
date_retour_prevue = Column(Date, nullable=False, index=True)
date_retour_effectif = Column(Date)
# Statut
statut = Column(String(50), nullable=False, default="en_cours", index=True) # en_cours, retourne, en_retard
# Caution
caution_montant = Column(Float)
caution_rendue = Column(Boolean, default=False)
# État
etat_depart = Column(String(50))
etat_retour = Column(String(50))
problemes_retour = Column(Text)
# Informations
raison_pret = Column(Text)
notes = Column(Text)
created_by = Column(String(100))
# Rappels
rappel_envoye = Column(Boolean, default=False)
date_rappel = Column(DateTime)
def __repr__(self):
return f"<PeripheralLoan(id={self.id}, emprunte_par='{self.emprunte_par}', statut='{self.statut}')>"

View File

@@ -0,0 +1,34 @@
"""
Linux BenchTools - Peripheral History Models
"""
from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.sql import func
from app.db.base import BasePeripherals
class PeripheralLocationHistory(BasePeripherals):
"""
History of peripheral movements (location changes, assignments)
"""
__tablename__ = "peripheral_location_history"
id = Column(Integer, primary_key=True, index=True)
peripheral_id = Column(Integer, nullable=False, index=True)
# Location changes
from_location_id = Column(Integer)
to_location_id = Column(Integer)
# Device assignments
from_device_id = Column(Integer)
to_device_id = Column(Integer)
# Action details
action = Column(String(50), nullable=False) # moved, assigned, unassigned, stored
timestamp = Column(DateTime, server_default=func.now())
notes = Column(Text)
user = Column(String(100))
def __repr__(self):
return f"<PeripheralLocationHistory(id={self.id}, action='{self.action}')>"

0
backend/app/schemas/__init__.py Normal file → Executable file
View File

16
backend/app/schemas/benchmark.py Normal file → Executable file
View File

@@ -13,15 +13,15 @@ class CPUResults(BaseModel):
events_per_sec_single: Optional[float] = Field(None, ge=0) # Monocore
events_per_sec_multi: Optional[float] = Field(None, ge=0) # Multicore
duration_s: Optional[float] = Field(None, ge=0)
score: Optional[float] = Field(None, ge=0, le=10000)
score_single: Optional[float] = Field(None, ge=0, le=10000) # Monocore score
score_multi: Optional[float] = Field(None, ge=0, le=10000) # Multicore score
score: Optional[float] = Field(None, ge=0, le=100000)
score_single: Optional[float] = Field(None, ge=0, le=50000) # Monocore score
score_multi: Optional[float] = Field(None, ge=0, le=100000) # Multicore score
class MemoryResults(BaseModel):
"""Memory benchmark results"""
throughput_mib_s: Optional[float] = Field(None, ge=0)
score: Optional[float] = Field(None, ge=0, le=10000)
score: Optional[float] = Field(None, ge=0, le=100000)
class DiskResults(BaseModel):
@@ -31,7 +31,7 @@ class DiskResults(BaseModel):
iops_read: Optional[int] = Field(None, ge=0)
iops_write: Optional[int] = Field(None, ge=0)
latency_ms: Optional[float] = Field(None, ge=0)
score: Optional[float] = Field(None, ge=0, le=10000)
score: Optional[float] = Field(None, ge=0, le=50000)
class NetworkResults(BaseModel):
@@ -41,13 +41,13 @@ class NetworkResults(BaseModel):
ping_ms: Optional[float] = Field(None, ge=0)
jitter_ms: Optional[float] = Field(None, ge=0)
packet_loss_percent: Optional[float] = Field(None, ge=0, le=100)
score: Optional[float] = Field(None, ge=0, le=10000)
score: Optional[float] = Field(None, ge=0, le=100000)
class GPUResults(BaseModel):
"""GPU benchmark results"""
glmark2_score: Optional[int] = Field(None, ge=0)
score: Optional[float] = Field(None, ge=0, le=10000)
score: Optional[float] = Field(None, ge=0, le=50000)
class BenchmarkResults(BaseModel):
@@ -57,7 +57,7 @@ class BenchmarkResults(BaseModel):
disk: Optional[DiskResults] = None
network: Optional[NetworkResults] = None
gpu: Optional[GPUResults] = None
global_score: float = Field(..., ge=0, le=10000, description="Global score (0-10000)")
global_score: float = Field(..., ge=0, le=100000, description="Global score (weighted average of component scores)")
class BenchmarkPayload(BaseModel):

0
backend/app/schemas/device.py Normal file → Executable file
View File

0
backend/app/schemas/document.py Normal file → Executable file
View File

0
backend/app/schemas/hardware.py Normal file → Executable file
View File

0
backend/app/schemas/link.py Normal file → Executable file
View File

392
backend/app/schemas/peripheral.py Executable file
View File

@@ -0,0 +1,392 @@
"""
Linux BenchTools - Peripheral Schemas
"""
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import date, datetime
# ========================================
# BASE SCHEMAS
# ========================================
class PeripheralBase(BaseModel):
"""Base schema for peripherals"""
nom: str = Field(..., min_length=1, max_length=255)
type_principal: str = Field(..., min_length=1, max_length=100)
sous_type: Optional[str] = Field(None, max_length=100)
marque: Optional[str] = Field(None, max_length=100)
modele: Optional[str] = Field(None, max_length=255)
fabricant: Optional[str] = Field(None, max_length=255)
produit: Optional[str] = Field(None, max_length=255)
numero_serie: Optional[str] = Field(None, max_length=255)
ean_upc: Optional[str] = Field(None, max_length=50)
# Achat
boutique: Optional[str] = Field(None, max_length=255)
date_achat: Optional[date] = None
prix: Optional[float] = Field(None, ge=0)
devise: Optional[str] = Field("EUR", max_length=10)
garantie_duree_mois: Optional[int] = Field(None, ge=0)
garantie_expiration: Optional[date] = None
# Évaluation
rating: Optional[float] = Field(0.0, ge=0, le=5)
# Stock
quantite_totale: Optional[int] = Field(1, ge=0)
quantite_disponible: Optional[int] = Field(1, ge=0)
seuil_alerte: Optional[int] = Field(0, ge=0)
# Métadonnées
etat: Optional[str] = Field("Neuf", max_length=50)
localisation: Optional[str] = Field(None, max_length=255)
proprietaire: Optional[str] = Field(None, max_length=100)
tags: Optional[str] = None # JSON string
# Documentation
description: Optional[str] = None # Description courte
synthese: Optional[str] = None # Synthèse complète markdown
cli: Optional[str] = None # DEPRECATED: Sortie CLI (lsusb -v) filtrée
cli_yaml: Optional[str] = None # Données structurées CLI au format YAML
cli_raw: Optional[str] = None # Sortie CLI brute (Markdown)
specifications: Optional[str] = None # Spécifications techniques (Markdown)
notes: Optional[str] = None # Notes libres (Markdown)
# Linux
device_path: Optional[str] = Field(None, max_length=255)
sysfs_path: Optional[str] = Field(None, max_length=500)
vendor_id: Optional[str] = Field(None, max_length=20)
product_id: Optional[str] = Field(None, max_length=20)
usb_device_id: Optional[str] = Field(None, max_length=20)
iManufacturer: Optional[str] = None # USB manufacturer string
iProduct: Optional[str] = None # USB product string
class_id: Optional[str] = Field(None, max_length=20)
driver_utilise: Optional[str] = Field(None, max_length=100)
modules_kernel: Optional[str] = None # JSON string
udev_rules: Optional[str] = None
identifiant_systeme: Optional[str] = None
# Installation
installation_auto: Optional[bool] = False
driver_requis: Optional[str] = None
firmware_requis: Optional[str] = None
paquets_necessaires: Optional[str] = None # JSON string
commandes_installation: Optional[str] = None
problemes_connus: Optional[str] = None
solutions: Optional[str] = None
compatibilite_noyau: Optional[str] = Field(None, max_length=100)
# Connectivité
interface_connexion: Optional[str] = Field(None, max_length=100)
connecte_a: Optional[str] = Field(None, max_length=255)
consommation_electrique_w: Optional[float] = Field(None, ge=0)
# Localisation physique
location_id: Optional[int] = None
location_details: Optional[str] = Field(None, max_length=500)
location_auto: Optional[bool] = True
# Appareil complet
is_complete_device: Optional[bool] = False
device_type: Optional[str] = Field(None, max_length=50)
linked_device_id: Optional[int] = None
device_id: Optional[int] = None
# Données spécifiques
caracteristiques_specifiques: Optional[Dict[str, Any]] = None
class PeripheralCreate(PeripheralBase):
"""Schema for creating a peripheral"""
pass
class PeripheralUpdate(BaseModel):
"""Schema for updating a peripheral (all fields optional)"""
nom: Optional[str] = Field(None, min_length=1, max_length=255)
type_principal: Optional[str] = Field(None, min_length=1, max_length=100)
sous_type: Optional[str] = Field(None, max_length=100)
marque: Optional[str] = Field(None, max_length=100)
modele: Optional[str] = Field(None, max_length=255)
fabricant: Optional[str] = Field(None, max_length=255)
produit: Optional[str] = Field(None, max_length=255)
numero_serie: Optional[str] = Field(None, max_length=255)
ean_upc: Optional[str] = Field(None, max_length=50)
boutique: Optional[str] = Field(None, max_length=255)
date_achat: Optional[date] = None
prix: Optional[float] = Field(None, ge=0)
devise: Optional[str] = Field(None, max_length=10)
garantie_duree_mois: Optional[int] = Field(None, ge=0)
garantie_expiration: Optional[date] = None
rating: Optional[float] = Field(None, ge=0, le=5)
quantite_totale: Optional[int] = Field(None, ge=0)
quantite_disponible: Optional[int] = Field(None, ge=0)
seuil_alerte: Optional[int] = Field(None, ge=0)
etat: Optional[str] = Field(None, max_length=50)
localisation: Optional[str] = Field(None, max_length=255)
proprietaire: Optional[str] = Field(None, max_length=100)
tags: Optional[str] = None
notes: Optional[str] = None
device_path: Optional[str] = Field(None, max_length=255)
vendor_id: Optional[str] = Field(None, max_length=20)
product_id: Optional[str] = Field(None, max_length=20)
usb_device_id: Optional[str] = Field(None, max_length=20)
iManufacturer: Optional[str] = None
iProduct: Optional[str] = None
connecte_a: Optional[str] = Field(None, max_length=255)
location_id: Optional[int] = None
location_details: Optional[str] = Field(None, max_length=500)
is_complete_device: Optional[bool] = None
device_type: Optional[str] = Field(None, max_length=50)
linked_device_id: Optional[int] = None
device_id: Optional[int] = None
caracteristiques_specifiques: Optional[Dict[str, Any]] = None
class PeripheralSummary(BaseModel):
"""Summary schema for peripheral lists"""
id: int
nom: str
type_principal: str
sous_type: Optional[str]
marque: Optional[str]
modele: Optional[str]
etat: str
rating: float
prix: Optional[float]
en_pret: bool
is_complete_device: bool
quantite_disponible: int
thumbnail_url: Optional[str] = None
class Config:
from_attributes = True
class PeripheralDetail(PeripheralBase):
"""Detailed schema with all information"""
id: int
date_creation: datetime
date_modification: Optional[datetime]
en_pret: bool
pret_actuel_id: Optional[int]
prete_a: Optional[str]
class Config:
from_attributes = True
class PeripheralListResponse(BaseModel):
"""Paginated list response"""
items: List[PeripheralSummary]
total: int
page: int
page_size: int
total_pages: int
# ========================================
# PHOTO SCHEMAS
# ========================================
class PeripheralPhotoBase(BaseModel):
"""Base schema for peripheral photos"""
description: Optional[str] = None
is_primary: Optional[bool] = False
class PeripheralPhotoCreate(PeripheralPhotoBase):
"""Schema for creating a photo"""
peripheral_id: int
filename: str
stored_path: str
mime_type: Optional[str]
size_bytes: Optional[int]
class PeripheralPhotoSchema(PeripheralPhotoBase):
"""Full photo schema"""
id: int
peripheral_id: int
filename: str
stored_path: str
thumbnail_path: Optional[str]
mime_type: Optional[str]
size_bytes: Optional[int]
uploaded_at: datetime
class Config:
from_attributes = True
# ========================================
# DOCUMENT SCHEMAS
# ========================================
class PeripheralDocumentBase(BaseModel):
"""Base schema for peripheral documents"""
doc_type: str = Field(..., max_length=50) # manual, warranty, invoice, datasheet, other
description: Optional[str] = None
class PeripheralDocumentCreate(PeripheralDocumentBase):
"""Schema for creating a document"""
peripheral_id: int
filename: str
stored_path: str
mime_type: Optional[str]
size_bytes: Optional[int]
class PeripheralDocumentSchema(PeripheralDocumentBase):
"""Full document schema"""
id: int
peripheral_id: int
filename: str
stored_path: str
mime_type: Optional[str]
size_bytes: Optional[int]
uploaded_at: datetime
class Config:
from_attributes = True
# ========================================
# LINK SCHEMAS
# ========================================
class PeripheralLinkBase(BaseModel):
"""Base schema for peripheral links"""
link_type: str = Field(..., max_length=50) # manufacturer, support, drivers, documentation, custom
label: str = Field(..., min_length=1, max_length=255)
url: str
class PeripheralLinkCreate(PeripheralLinkBase):
"""Schema for creating a link"""
peripheral_id: int
class PeripheralLinkSchema(PeripheralLinkBase):
"""Full link schema"""
id: int
peripheral_id: int
class Config:
from_attributes = True
# ========================================
# LOAN SCHEMAS
# ========================================
class LoanBase(BaseModel):
"""Base schema for loans"""
emprunte_par: str = Field(..., min_length=1, max_length=255)
email_emprunteur: Optional[str] = Field(None, max_length=255)
telephone: Optional[str] = Field(None, max_length=50)
date_pret: date
date_retour_prevue: date
caution_montant: Optional[float] = Field(None, ge=0)
etat_depart: Optional[str] = Field(None, max_length=50)
raison_pret: Optional[str] = None
notes: Optional[str] = None
class LoanCreate(LoanBase):
"""Schema for creating a loan"""
peripheral_id: int
class LoanReturn(BaseModel):
"""Schema for returning a loan"""
date_retour_effectif: date
etat_retour: Optional[str] = Field(None, max_length=50)
problemes_retour: Optional[str] = None
caution_rendue: bool = True
notes: Optional[str] = None
class LoanSchema(LoanBase):
"""Full loan schema"""
id: int
peripheral_id: int
date_retour_effectif: Optional[date]
statut: str
caution_rendue: bool
etat_retour: Optional[str]
problemes_retour: Optional[str]
created_by: Optional[str]
rappel_envoye: bool
date_rappel: Optional[datetime]
class Config:
from_attributes = True
# ========================================
# LOCATION SCHEMAS
# ========================================
class LocationBase(BaseModel):
"""Base schema for locations"""
nom: str = Field(..., min_length=1, max_length=255)
type: str = Field(..., max_length=50) # root, piece, placard, tiroir, etagere, meuble, boite
parent_id: Optional[int] = None
description: Optional[str] = None
ordre_affichage: Optional[int] = 0
class LocationCreate(LocationBase):
"""Schema for creating a location"""
pass
class LocationUpdate(BaseModel):
"""Schema for updating a location"""
nom: Optional[str] = Field(None, min_length=1, max_length=255)
type: Optional[str] = Field(None, max_length=50)
parent_id: Optional[int] = None
description: Optional[str] = None
ordre_affichage: Optional[int] = None
class LocationSchema(LocationBase):
"""Full location schema"""
id: int
image_path: Optional[str]
qr_code_path: Optional[str]
class Config:
from_attributes = True
class LocationTreeNode(LocationSchema):
"""Location with children for tree view"""
children: List['LocationTreeNode'] = []
class Config:
from_attributes = True
# ========================================
# HISTORY SCHEMAS
# ========================================
class PeripheralHistorySchema(BaseModel):
"""Peripheral location history schema"""
id: int
peripheral_id: int
from_location_id: Optional[int]
to_location_id: Optional[int]
from_device_id: Optional[int]
to_device_id: Optional[int]
action: str
timestamp: datetime
notes: Optional[str]
user: Optional[str]
class Config:
from_attributes = True

View File

@@ -0,0 +1,510 @@
"""
Linux BenchTools - Peripheral Service
Handles business logic and cross-database operations
"""
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func, desc
from datetime import date, datetime, timedelta
from app.models.peripheral import (
Peripheral, PeripheralPhoto, PeripheralDocument,
PeripheralLink, PeripheralLoan
)
from app.models.location import Location
from app.models.peripheral_history import PeripheralLocationHistory
from app.schemas.peripheral import (
PeripheralCreate, PeripheralUpdate, PeripheralSummary,
PeripheralDetail, PeripheralListResponse,
LoanCreate, LoanReturn
)
class PeripheralService:
"""Service for peripheral operations"""
@staticmethod
def create_peripheral(
db: Session,
peripheral_data: PeripheralCreate,
user: Optional[str] = None
) -> Peripheral:
"""Create a new peripheral"""
peripheral = Peripheral(**peripheral_data.model_dump())
db.add(peripheral)
db.commit()
db.refresh(peripheral)
# Create history entry
if peripheral.location_id or peripheral.device_id:
PeripheralService._create_history(
db=db,
peripheral_id=peripheral.id,
action="created",
to_location_id=peripheral.location_id,
to_device_id=peripheral.device_id,
user=user
)
return peripheral
@staticmethod
def get_peripheral(db: Session, peripheral_id: int) -> Optional[Peripheral]:
"""Get a peripheral by ID"""
return db.query(Peripheral).filter(Peripheral.id == peripheral_id).first()
@staticmethod
def update_peripheral(
db: Session,
peripheral_id: int,
peripheral_data: PeripheralUpdate,
user: Optional[str] = None
) -> Optional[Peripheral]:
"""Update a peripheral"""
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
return None
# Track location/device changes for history
old_location_id = peripheral.location_id
old_device_id = peripheral.device_id
# Update fields
update_data = peripheral_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(peripheral, key, value)
db.commit()
db.refresh(peripheral)
# Create history if location or device changed
new_location_id = peripheral.location_id
new_device_id = peripheral.device_id
if old_location_id != new_location_id or old_device_id != new_device_id:
action = "moved" if old_location_id != new_location_id else "assigned"
PeripheralService._create_history(
db=db,
peripheral_id=peripheral.id,
action=action,
from_location_id=old_location_id,
to_location_id=new_location_id,
from_device_id=old_device_id,
to_device_id=new_device_id,
user=user
)
return peripheral
@staticmethod
def delete_peripheral(db: Session, peripheral_id: int) -> bool:
"""Delete a peripheral and all related data"""
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
return False
# Delete related records
db.query(PeripheralPhoto).filter(PeripheralPhoto.peripheral_id == peripheral_id).delete()
db.query(PeripheralDocument).filter(PeripheralDocument.peripheral_id == peripheral_id).delete()
db.query(PeripheralLink).filter(PeripheralLink.peripheral_id == peripheral_id).delete()
db.query(PeripheralLoan).filter(PeripheralLoan.peripheral_id == peripheral_id).delete()
db.query(PeripheralLocationHistory).filter(PeripheralLocationHistory.peripheral_id == peripheral_id).delete()
# Delete peripheral
db.delete(peripheral)
db.commit()
return True
@staticmethod
def list_peripherals(
db: Session,
page: int = 1,
page_size: int = 50,
type_filter: Optional[str] = None,
search: Optional[str] = None,
location_id: Optional[int] = None,
device_id: Optional[int] = None,
en_pret: Optional[bool] = None,
is_complete_device: Optional[bool] = None,
sort_by: str = "date_creation",
sort_order: str = "desc"
) -> PeripheralListResponse:
"""List peripherals with pagination and filters"""
# Base query
query = db.query(Peripheral)
# Apply filters
if type_filter:
query = query.filter(Peripheral.type_principal == type_filter)
if search:
search_pattern = f"%{search}%"
query = query.filter(
or_(
Peripheral.nom.ilike(search_pattern),
Peripheral.marque.ilike(search_pattern),
Peripheral.modele.ilike(search_pattern),
Peripheral.numero_serie.ilike(search_pattern)
)
)
if location_id is not None:
query = query.filter(Peripheral.location_id == location_id)
if device_id is not None:
query = query.filter(Peripheral.device_id == device_id)
if en_pret is not None:
query = query.filter(Peripheral.en_pret == en_pret)
if is_complete_device is not None:
query = query.filter(Peripheral.is_complete_device == is_complete_device)
# Count total
total = query.count()
# Apply sorting
sort_column = getattr(Peripheral, sort_by, Peripheral.date_creation)
if sort_order == "desc":
query = query.order_by(desc(sort_column))
else:
query = query.order_by(sort_column)
# Apply pagination
offset = (page - 1) * page_size
peripherals = query.offset(offset).limit(page_size).all()
# Import PeripheralPhoto here to avoid circular import
from app.models.peripheral import PeripheralPhoto
# Convert to summary
items = []
for p in peripherals:
# Get primary photo thumbnail
thumbnail_url = None
primary_photo = db.query(PeripheralPhoto).filter(
PeripheralPhoto.peripheral_id == p.id,
PeripheralPhoto.is_primary == True
).first()
if primary_photo and primary_photo.thumbnail_path:
# Convert file path to URL
thumbnail_url = primary_photo.thumbnail_path.replace('/app/uploads/', '/uploads/')
items.append(PeripheralSummary(
id=p.id,
nom=p.nom,
type_principal=p.type_principal,
sous_type=p.sous_type,
marque=p.marque,
modele=p.modele,
etat=p.etat or "Inconnu",
rating=p.rating or 0.0,
prix=p.prix,
en_pret=p.en_pret or False,
is_complete_device=p.is_complete_device or False,
quantite_disponible=p.quantite_disponible or 0,
thumbnail_url=thumbnail_url
))
total_pages = (total + page_size - 1) // page_size
return PeripheralListResponse(
items=items,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages
)
@staticmethod
def get_peripherals_by_device(
db: Session,
device_id: int
) -> List[Peripheral]:
"""Get all peripherals assigned to a device (cross-database logical FK)"""
return db.query(Peripheral).filter(Peripheral.device_id == device_id).all()
@staticmethod
def get_peripherals_by_linked_device(
db: Session,
linked_device_id: int
) -> List[Peripheral]:
"""Get all peripherals that are part of a complete device"""
return db.query(Peripheral).filter(Peripheral.linked_device_id == linked_device_id).all()
@staticmethod
def assign_to_device(
db: Session,
peripheral_id: int,
device_id: int,
user: Optional[str] = None
) -> Optional[Peripheral]:
"""Assign a peripheral to a device"""
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
return None
old_device_id = peripheral.device_id
peripheral.device_id = device_id
db.commit()
db.refresh(peripheral)
# Create history
PeripheralService._create_history(
db=db,
peripheral_id=peripheral.id,
action="assigned",
from_device_id=old_device_id,
to_device_id=device_id,
user=user
)
return peripheral
@staticmethod
def unassign_from_device(
db: Session,
peripheral_id: int,
user: Optional[str] = None
) -> Optional[Peripheral]:
"""Unassign a peripheral from a device"""
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
return None
old_device_id = peripheral.device_id
peripheral.device_id = None
db.commit()
db.refresh(peripheral)
# Create history
PeripheralService._create_history(
db=db,
peripheral_id=peripheral.id,
action="unassigned",
from_device_id=old_device_id,
to_device_id=None,
user=user
)
return peripheral
@staticmethod
def create_loan(
db: Session,
loan_data: LoanCreate,
user: Optional[str] = None
) -> Optional[PeripheralLoan]:
"""Create a loan for a peripheral"""
peripheral = PeripheralService.get_peripheral(db, loan_data.peripheral_id)
if not peripheral or peripheral.en_pret:
return None
# Create loan
loan = PeripheralLoan(
**loan_data.model_dump(),
statut="en_cours",
created_by=user
)
db.add(loan)
# Update peripheral
peripheral.en_pret = True
peripheral.pret_actuel_id = None # Will be set after commit
peripheral.prete_a = loan_data.emprunte_par
db.commit()
db.refresh(loan)
# Update peripheral with loan ID
peripheral.pret_actuel_id = loan.id
db.commit()
db.refresh(peripheral)
return loan
@staticmethod
def return_loan(
db: Session,
loan_id: int,
return_data: LoanReturn
) -> Optional[PeripheralLoan]:
"""Return a loan"""
loan = db.query(PeripheralLoan).filter(PeripheralLoan.id == loan_id).first()
if not loan or loan.statut != "en_cours":
return None
# Update loan
loan.date_retour_effectif = return_data.date_retour_effectif
loan.etat_retour = return_data.etat_retour
loan.problemes_retour = return_data.problemes_retour
loan.caution_rendue = return_data.caution_rendue
loan.statut = "retourne"
if return_data.notes:
loan.notes = (loan.notes or "") + "\n" + return_data.notes
# Update peripheral
peripheral = PeripheralService.get_peripheral(db, loan.peripheral_id)
if peripheral:
peripheral.en_pret = False
peripheral.pret_actuel_id = None
peripheral.prete_a = None
db.commit()
db.refresh(loan)
return loan
@staticmethod
def get_overdue_loans(db: Session) -> List[PeripheralLoan]:
"""Get all overdue loans"""
today = date.today()
return db.query(PeripheralLoan).filter(
and_(
PeripheralLoan.statut == "en_cours",
PeripheralLoan.date_retour_prevue < today
)
).all()
@staticmethod
def get_upcoming_returns(db: Session, days: int = 7) -> List[PeripheralLoan]:
"""Get loans due within specified days"""
today = date.today()
future = today + timedelta(days=days)
return db.query(PeripheralLoan).filter(
and_(
PeripheralLoan.statut == "en_cours",
PeripheralLoan.date_retour_prevue.between(today, future)
)
).all()
@staticmethod
def get_statistics(db: Session) -> Dict[str, Any]:
"""Get peripheral statistics"""
total = db.query(Peripheral).count()
en_pret = db.query(Peripheral).filter(Peripheral.en_pret == True).count()
complete_devices = db.query(Peripheral).filter(Peripheral.is_complete_device == True).count()
# By type
by_type = db.query(
Peripheral.type_principal,
func.count(Peripheral.id).label('count')
).group_by(Peripheral.type_principal).all()
# By state
by_etat = db.query(
Peripheral.etat,
func.count(Peripheral.id).label('count')
).group_by(Peripheral.etat).all()
# Low stock
low_stock = db.query(Peripheral).filter(
Peripheral.quantite_disponible <= Peripheral.seuil_alerte
).count()
return {
"total_peripherals": total,
"en_pret": en_pret,
"disponible": total - en_pret,
"complete_devices": complete_devices,
"low_stock_count": low_stock,
"by_type": [{"type": t, "count": c} for t, c in by_type],
"by_etat": [{"etat": e or "Inconnu", "count": c} for e, c in by_etat]
}
@staticmethod
def _create_history(
db: Session,
peripheral_id: int,
action: str,
from_location_id: Optional[int] = None,
to_location_id: Optional[int] = None,
from_device_id: Optional[int] = None,
to_device_id: Optional[int] = None,
user: Optional[str] = None,
notes: Optional[str] = None
) -> PeripheralLocationHistory:
"""Create a history entry"""
history = PeripheralLocationHistory(
peripheral_id=peripheral_id,
action=action,
from_location_id=from_location_id,
to_location_id=to_location_id,
from_device_id=from_device_id,
to_device_id=to_device_id,
user=user,
notes=notes
)
db.add(history)
db.commit()
return history
class LocationService:
"""Service for location operations"""
@staticmethod
def get_location_tree(db: Session) -> List[Dict[str, Any]]:
"""Get hierarchical location tree"""
def build_tree(parent_id: Optional[int] = None) -> List[Dict[str, Any]]:
locations = db.query(Location).filter(
Location.parent_id == parent_id
).order_by(Location.ordre_affichage, Location.nom).all()
return [
{
"id": loc.id,
"nom": loc.nom,
"type": loc.type,
"description": loc.description,
"image_path": loc.image_path,
"qr_code_path": loc.qr_code_path,
"children": build_tree(loc.id)
}
for loc in locations
]
return build_tree(None)
@staticmethod
def get_location_path(db: Session, location_id: int) -> List[Location]:
"""Get full path from root to location"""
path = []
current_id = location_id
while current_id:
location = db.query(Location).filter(Location.id == current_id).first()
if not location:
break
path.insert(0, location)
current_id = location.parent_id
return path
@staticmethod
def count_peripherals_in_location(
db: Session,
location_id: int,
recursive: bool = False
) -> int:
"""Count peripherals in a location (optionally recursive)"""
if not recursive:
return db.query(Peripheral).filter(Peripheral.location_id == location_id).count()
# Get all child locations
def get_children(parent_id: int) -> List[int]:
children = db.query(Location.id).filter(Location.parent_id == parent_id).all()
child_ids = [c[0] for c in children]
for child_id in child_ids[:]:
child_ids.extend(get_children(child_id))
return child_ids
location_ids = [location_id] + get_children(location_id)
return db.query(Peripheral).filter(Peripheral.location_id.in_(location_ids)).count()

0
backend/app/utils/__init__.py Normal file → Executable file
View File

View File

@@ -0,0 +1,395 @@
"""
Device classifier - Intelligent detection of peripheral type and subtype
Analyzes CLI output and markdown content to automatically determine device category
"""
import re
from typing import Dict, Optional, Tuple
class DeviceClassifier:
"""
Intelligent classifier for USB/Bluetooth/Network devices
Analyzes content to determine type_principal and sous_type
"""
# Keywords mapping for type detection
TYPE_KEYWORDS = {
# WiFi adapters
("USB", "Adaptateur WiFi"): [
r"wi[-]?fi",
r"wireless",
r"802\.11[a-z]",
r"rtl81\d+", # Realtek WiFi chips
r"mt76\d+", # MediaTek WiFi chips
r"atheros",
r"qualcomm.*wireless",
r"broadcom.*wireless",
r"wlan",
r"wireless\s+adapter",
],
# Bluetooth
("Bluetooth", "Autre"): [
r"bluetooth",
r"bcm20702", # Broadcom BT chips
r"bt\s+adapter",
],
# USB Flash Drive / Clé USB
("Stockage", "Clé USB"): [
r"flash\s+drive",
r"usb\s+stick",
r"cruzer", # SanDisk Cruzer series
r"datatraveler", # Kingston DataTraveler
r"usb.*flash",
r"clé\s+usb",
r"pendrive",
],
# External HDD/SSD
("Stockage", "Disque dur externe"): [
r"external\s+hdd",
r"external\s+ssd",
r"portable\s+ssd",
r"portable\s+drive",
r"disk\s+drive",
r"disque\s+dur\s+externe",
r"my\s+passport", # WD My Passport
r"expansion", # Seagate Expansion
r"backup\s+plus", # Seagate Backup Plus
r"elements", # WD Elements
r"touro", # Hitachi Touro
r"adata.*hd\d+", # ADATA external drives
],
# Card Reader
("Stockage", "Lecteur de carte"): [
r"card\s+reader",
r"lecteur.*carte",
r"sd.*reader",
r"microsd.*reader",
r"multi.*card",
r"cf.*reader",
],
# USB Hub
("USB", "Hub"): [
r"usb\s+hub",
r"hub\s+controller",
r"multi[-]?port",
],
# USB Keyboard
("USB", "Clavier"): [
r"keyboard",
r"clavier",
r"hid.*keyboard",
],
# USB Mouse
("USB", "Souris"): [
r"mouse",
r"souris",
r"hid.*mouse",
r"optical\s+mouse",
],
# Logitech Unifying (can be keyboard or mouse)
("USB", "Autre"): [
r"unifying\s+receiver",
r"logitech.*receiver",
],
# ZigBee dongle
("USB", "ZigBee"): [
r"zigbee",
r"conbee",
r"cc2531", # Texas Instruments ZigBee chip
r"cc2652", # TI newer ZigBee chip
r"dresden\s+elektronik",
r"zigbee.*gateway",
r"zigbee.*coordinator",
r"thread.*border",
],
# Fingerprint reader
("USB", "Lecteur biométrique"): [
r"fingerprint",
r"fingprint", # Common typo (CS9711Fingprint)
r"empreinte",
r"biometric",
r"biométrique",
r"validity.*sensor",
r"synaptics.*fingerprint",
r"goodix.*fingerprint",
r"elan.*fingerprint",
],
# USB Webcam
("Video", "Webcam"): [
r"webcam",
r"camera",
r"video\s+capture",
r"uvc", # USB Video Class
],
# Ethernet
("Réseau", "Ethernet"): [
r"ethernet",
r"gigabit",
r"network\s+adapter",
r"lan\s+adapter",
r"rtl81\d+.*ethernet",
],
# Network WiFi (non-USB)
("Réseau", "Wi-Fi"): [
r"wireless.*network",
r"wi[-]?fi.*card",
r"wlan.*card",
],
}
# INTERFACE class codes (from USB spec)
# CRITICAL: Mass Storage is determined by bInterfaceClass, not bDeviceClass
USB_INTERFACE_CLASS_MAPPING = {
8: ("Stockage", "Clé USB"), # Mass Storage (refined by keywords to distinguish flash/HDD/card reader)
3: ("USB", "Clavier"), # HID (could be keyboard/mouse, refined by keywords)
14: ("Video", "Webcam"), # Video (0x0e)
9: ("USB", "Hub"), # Hub
224: ("Bluetooth", "Autre"), # Wireless Controller (0xe0)
255: ("USB", "Autre"), # Vendor Specific - requires firmware
}
# Device class codes (less reliable than interface class for Mass Storage)
USB_DEVICE_CLASS_MAPPING = {
"08": ("Stockage", "Clé USB"), # Mass Storage (fallback only)
"03": ("USB", "Clavier"), # HID (could be keyboard/mouse, refined by keywords)
"0e": ("Video", "Webcam"), # Video
"09": ("USB", "Hub"), # Hub
"e0": ("Bluetooth", "Autre"), # Wireless Controller
}
@staticmethod
def normalize_text(text: str) -> str:
"""Normalize text for matching (lowercase, remove accents)"""
if not text:
return ""
return text.lower().strip()
@staticmethod
def detect_from_keywords(content: str) -> Optional[Tuple[str, str]]:
"""
Detect device type from keywords in content
Args:
content: Text content to analyze (CLI output or markdown)
Returns:
Tuple of (type_principal, sous_type) or None
"""
normalized = DeviceClassifier.normalize_text(content)
# Score each type based on keyword matches
scores = {}
for (type_principal, sous_type), patterns in DeviceClassifier.TYPE_KEYWORDS.items():
score = 0
for pattern in patterns:
matches = re.findall(pattern, normalized, re.IGNORECASE)
score += len(matches)
if score > 0:
scores[(type_principal, sous_type)] = score
if not scores:
return None
# Return the type with highest score
best_match = max(scores.items(), key=lambda x: x[1])
return best_match[0]
@staticmethod
def detect_from_usb_interface_class(interface_classes: Optional[list]) -> Optional[Tuple[str, str]]:
"""
Detect device type from USB interface class codes
CRITICAL: This is the normative way to detect Mass Storage (class 08)
Args:
interface_classes: List of interface class info dicts with 'code' and 'name'
e.g., [{"code": 8, "name": "Mass Storage"}]
Returns:
Tuple of (type_principal, sous_type) or None
"""
if not interface_classes:
return None
# Check all interfaces for known types
# Priority: Mass Storage (8) > others
for interface in interface_classes:
class_code = interface.get("code")
if class_code in DeviceClassifier.USB_INTERFACE_CLASS_MAPPING:
return DeviceClassifier.USB_INTERFACE_CLASS_MAPPING[class_code]
return None
@staticmethod
def detect_from_usb_device_class(device_class: Optional[str]) -> Optional[Tuple[str, str]]:
"""
Detect device type from USB device class code (FALLBACK ONLY)
NOTE: For Mass Storage, bInterfaceClass is normative, not bDeviceClass
Args:
device_class: USB bDeviceClass (e.g., "08", "03")
Returns:
Tuple of (type_principal, sous_type) or None
"""
if not device_class:
return None
# Normalize class code
device_class = device_class.strip().lower().lstrip("0x")
return DeviceClassifier.USB_DEVICE_CLASS_MAPPING.get(device_class)
@staticmethod
def detect_from_vendor_product(vendor_id: Optional[str], product_id: Optional[str],
manufacturer: Optional[str], product: Optional[str]) -> Optional[Tuple[str, str]]:
"""
Detect device type from vendor/product IDs and strings
Args:
vendor_id: USB vendor ID (e.g., "0x0781")
product_id: USB product ID
manufacturer: Manufacturer string
product: Product string
Returns:
Tuple of (type_principal, sous_type) or None
"""
# Build a searchable string from all identifiers
search_text = " ".join(filter(None, [
manufacturer or "",
product or "",
vendor_id or "",
product_id or "",
]))
return DeviceClassifier.detect_from_keywords(search_text)
@staticmethod
def classify_device(cli_content: Optional[str] = None,
synthese_content: Optional[str] = None,
device_info: Optional[Dict] = None) -> Tuple[str, str]:
"""
Classify a device using all available information
Args:
cli_content: Raw CLI output (lsusb -v, lshw, etc.)
synthese_content: Markdown synthesis content
device_info: Parsed device info dict (vendor_id, product_id, interface_classes, etc.)
Returns:
Tuple of (type_principal, sous_type) - defaults to ("USB", "Autre") if unknown
"""
device_info = device_info or {}
# Strategy 1: CRITICAL - Check USB INTERFACE class (normative for Mass Storage)
if device_info.get("interface_classes"):
result = DeviceClassifier.detect_from_usb_interface_class(device_info["interface_classes"])
if result:
# Refine HID devices (class 03) using keywords
if result == ("USB", "Clavier"):
content = " ".join(filter(None, [cli_content, synthese_content]))
if re.search(r"mouse|souris", content, re.IGNORECASE):
return ("USB", "Souris")
return result
# Strategy 2: Fallback to device class (less reliable)
if device_info.get("device_class"):
result = DeviceClassifier.detect_from_usb_device_class(device_info["device_class"])
if result:
# Refine HID devices (class 03) using keywords
if result == ("USB", "Clavier"):
content = " ".join(filter(None, [cli_content, synthese_content]))
if re.search(r"mouse|souris", content, re.IGNORECASE):
return ("USB", "Souris")
return result
# Strategy 3: Analyze vendor/product info
result = DeviceClassifier.detect_from_vendor_product(
device_info.get("vendor_id"),
device_info.get("product_id"),
device_info.get("manufacturer"),
device_info.get("product"),
)
if result:
return result
# Strategy 4: Analyze full CLI content
if cli_content:
result = DeviceClassifier.detect_from_keywords(cli_content)
if result:
return result
# Strategy 5: Analyze markdown synthesis
if synthese_content:
result = DeviceClassifier.detect_from_keywords(synthese_content)
if result:
return result
# Default fallback
return ("USB", "Autre")
@staticmethod
def refine_bluetooth_subtype(content: str) -> str:
"""
Refine Bluetooth subtype based on content
Args:
content: Combined content to analyze
Returns:
Refined sous_type (Clavier, Souris, Audio, or Autre)
"""
normalized = DeviceClassifier.normalize_text(content)
if re.search(r"keyboard|clavier", normalized):
return "Clavier"
if re.search(r"mouse|souris", normalized):
return "Souris"
if re.search(r"headset|audio|speaker|écouteur|casque", normalized):
return "Audio"
return "Autre"
@staticmethod
def refine_storage_subtype(content: str) -> str:
"""
Refine Storage subtype based on content
Distinguishes between USB flash drives, external HDD/SSD, and card readers
Args:
content: Combined content to analyze
Returns:
Refined sous_type (Clé USB, Disque dur externe, Lecteur de carte)
"""
normalized = DeviceClassifier.normalize_text(content)
# Check for card reader first (most specific)
if re.search(r"card\s+reader|lecteur.*carte|sd.*reader|multi.*card", normalized):
return "Lecteur de carte"
# Check for external HDD/SSD
if re.search(r"external\s+(hdd|ssd|disk)|portable\s+(ssd|drive)|disque\s+dur|"
r"my\s+passport|expansion|backup\s+plus|elements|touro", normalized):
return "Disque dur externe"
# Check for USB flash drive indicators
if re.search(r"flash\s+drive|usb\s+stick|cruzer|datatraveler|pendrive|clé\s+usb", normalized):
return "Clé USB"
# Default to USB flash drive for mass storage devices
return "Clé USB"

View File

@@ -0,0 +1,131 @@
"""
Image compression configuration loader
Loads compression levels from YAML configuration file
"""
import yaml
from pathlib import Path
from typing import Dict, Any, Optional
class ImageCompressionConfig:
"""Manages image compression configuration from YAML file"""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize configuration loader
Args:
config_path: Path to YAML config file (optional)
"""
if config_path is None:
# Default path: config/image_compression.yaml (from project root)
# Path from backend/app/utils/ -> up 3 levels to project root
config_path = Path(__file__).parent.parent.parent.parent / "config" / "image_compression.yaml"
self.config_path = Path(config_path)
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from YAML file"""
if not self.config_path.exists():
print(f"Warning: Image compression config not found at {self.config_path}")
print("Using default configuration")
return self._get_default_config()
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config
except Exception as e:
print(f"Error loading image compression config: {e}")
print("Using default configuration")
return self._get_default_config()
def _get_default_config(self) -> Dict[str, Any]:
"""Get default configuration if YAML file not found"""
return {
"default_level": "medium",
"levels": {
"medium": {
"enabled": True,
"quality": 85,
"max_width": 1920,
"max_height": 1080,
"thumbnail_size": 48,
"thumbnail_quality": 75,
"thumbnail_format": "webp",
"description": "Qualité moyenne - Usage général"
}
},
"supported_formats": ["jpg", "jpeg", "png", "webp", "gif", "bmp"],
"max_upload_size": 52428800,
"auto_convert_to_webp": True,
"keep_original": False,
"compressed_prefix": "compressed_",
"thumbnail_prefix": "thumb_"
}
def get_level(self, level_name: Optional[str] = None) -> Dict[str, Any]:
"""
Get compression settings for a specific level
Args:
level_name: Name of compression level (high, medium, low, minimal)
If None, uses default level
Returns:
Dictionary with compression settings
"""
if level_name is None:
level_name = self.config.get("default_level", "medium")
levels = self.config.get("levels", {})
if level_name not in levels:
print(f"Warning: Level '{level_name}' not found, using default")
level_name = self.config.get("default_level", "medium")
return levels.get(level_name, levels.get("medium", {}))
def get_all_levels(self) -> Dict[str, Dict[str, Any]]:
"""Get all available compression levels"""
return self.config.get("levels", {})
def get_default_level_name(self) -> str:
"""Get name of default compression level"""
return self.config.get("default_level", "medium")
def is_format_supported(self, format: str) -> bool:
"""Check if image format is supported for input"""
supported = self.config.get("supported_input_formats", ["jpg", "jpeg", "png", "webp"])
return format.lower() in supported
def get_output_format(self) -> str:
"""Get output format for resized images"""
return self.config.get("output_format", "png")
def get_folders(self) -> Dict[str, str]:
"""Get folder structure configuration"""
return self.config.get("folders", {
"original": "original",
"thumbnail": "thumbnail"
})
def get_max_upload_size(self) -> int:
"""Get maximum upload size in bytes"""
return self.config.get("max_upload_size", 52428800)
def should_keep_original(self) -> bool:
"""Check if original file should be kept"""
return self.config.get("keep_original", True)
def get_compressed_prefix(self) -> str:
"""Get prefix for compressed files"""
return self.config.get("compressed_prefix", "")
def get_thumbnail_prefix(self) -> str:
"""Get prefix for thumbnail files"""
return self.config.get("thumbnail_prefix", "thumb_")
# Global instance
image_compression_config = ImageCompressionConfig()

View File

@@ -0,0 +1,339 @@
"""
Linux BenchTools - Image Processor
Handles image compression, resizing and thumbnail generation
"""
import os
from pathlib import Path
from typing import Tuple, Optional
from PIL import Image
import hashlib
from datetime import datetime
from app.core.config import settings
from app.utils.image_config_loader import image_compression_config
class ImageProcessor:
"""Image processing utilities"""
@staticmethod
def process_image_with_level(
image_path: str,
output_dir: str,
compression_level: Optional[str] = None,
output_format: Optional[str] = None,
save_original: bool = True
) -> Tuple[str, int, Optional[str]]:
"""
Process an image using configured compression level
Saves original in original/ subdirectory and resized in main directory
Args:
image_path: Path to source image
output_dir: Directory for output
compression_level: Compression level (high, medium, low, minimal)
If None, uses default from config
output_format: Output format (None = PNG from config)
save_original: Save original file in original/ subdirectory
Returns:
Tuple of (output_path, file_size_bytes, original_path)
"""
# Get compression settings and folders config
level_config = image_compression_config.get_level(compression_level)
folders = image_compression_config.get_folders()
if output_format is None:
output_format = image_compression_config.get_output_format()
# Create subdirectories
original_dir = os.path.join(output_dir, folders.get("original", "original"))
os.makedirs(original_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
# Save original if requested
original_path = None
if save_original and image_compression_config.should_keep_original():
import shutil
original_filename = os.path.basename(image_path)
original_path = os.path.join(original_dir, original_filename)
shutil.copy2(image_path, original_path)
# Process and resize image
resized_path, file_size = ImageProcessor.process_image(
image_path=image_path,
output_dir=output_dir,
max_width=level_config.get("max_width"),
max_height=level_config.get("max_height"),
quality=level_config.get("quality"),
output_format=output_format
)
return resized_path, file_size, original_path
@staticmethod
def create_thumbnail_with_level(
image_path: str,
output_dir: str,
compression_level: Optional[str] = None,
output_format: Optional[str] = None
) -> Tuple[str, int]:
"""
Create thumbnail using configured compression level
Saves in thumbnail/ subdirectory
Args:
image_path: Path to source image
output_dir: Directory for output
compression_level: Compression level (high, medium, low, minimal)
output_format: Output format (None = PNG from config)
Returns:
Tuple of (output_path, file_size_bytes)
"""
# Get compression settings and folders config
level_config = image_compression_config.get_level(compression_level)
folders = image_compression_config.get_folders()
if output_format is None:
output_format = image_compression_config.get_output_format()
# Create thumbnail subdirectory
thumbnail_dir = os.path.join(output_dir, folders.get("thumbnail", "thumbnail"))
os.makedirs(thumbnail_dir, exist_ok=True)
return ImageProcessor.create_thumbnail(
image_path=image_path,
output_dir=thumbnail_dir,
size=level_config.get("thumbnail_size"),
quality=level_config.get("thumbnail_quality"),
output_format=output_format
)
@staticmethod
def process_image(
image_path: str,
output_dir: str,
max_width: Optional[int] = None,
max_height: Optional[int] = None,
quality: Optional[int] = None,
output_format: str = "webp"
) -> Tuple[str, int]:
"""
Process an image: resize and compress
Args:
image_path: Path to source image
output_dir: Directory for output
max_width: Maximum width (None = no limit)
max_height: Maximum height (None = no limit)
quality: Compression quality 1-100 (None = use settings)
output_format: Output format (webp, jpeg, png)
Returns:
Tuple of (output_path, file_size_bytes)
"""
# Use settings if not provided
if max_width is None:
max_width = settings.IMAGE_MAX_WIDTH
if max_height is None:
max_height = settings.IMAGE_MAX_HEIGHT
if quality is None:
quality = settings.IMAGE_COMPRESSION_QUALITY
# Open image
img = Image.open(image_path)
# Convert RGBA to RGB for JPEG/WebP
if img.mode == 'RGBA' and output_format.lower() in ['jpeg', 'jpg', 'webp']:
# Create white background
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[3]) # Use alpha channel as mask
img = background
# Resize if needed
original_width, original_height = img.size
if max_width and original_width > max_width or max_height and original_height > max_height:
img.thumbnail((max_width or original_width, max_height or original_height), Image.Resampling.LANCZOS)
# Generate unique filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
original_name = Path(image_path).stem
output_filename = f"{original_name}_{timestamp}.{output_format}"
output_path = os.path.join(output_dir, output_filename)
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Save with compression
save_kwargs = {'quality': quality, 'optimize': True}
if output_format.lower() == 'webp':
save_kwargs['method'] = 6 # Better compression
elif output_format.lower() in ['jpeg', 'jpg']:
save_kwargs['progressive'] = True
img.save(output_path, format=output_format.upper(), **save_kwargs)
# Get file size
file_size = os.path.getsize(output_path)
return output_path, file_size
@staticmethod
def create_thumbnail(
image_path: str,
output_dir: str,
size: Optional[int] = None,
quality: Optional[int] = None,
output_format: Optional[str] = None
) -> Tuple[str, int]:
"""
Create a thumbnail
Args:
image_path: Path to source image
output_dir: Directory for output
size: Thumbnail size (square, None = use settings)
quality: Compression quality (None = use settings)
output_format: Output format (None = use settings)
Returns:
Tuple of (output_path, file_size_bytes)
"""
# Use settings if not provided
if size is None:
size = settings.THUMBNAIL_SIZE
if quality is None:
quality = settings.THUMBNAIL_QUALITY
if output_format is None:
output_format = settings.THUMBNAIL_FORMAT
# Open image
img = Image.open(image_path)
# Convert RGBA to RGB for JPEG/WebP
if img.mode == 'RGBA' and output_format.lower() in ['jpeg', 'jpg', 'webp']:
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[3])
img = background
# Resize keeping aspect ratio (width-based)
# size parameter represents the target width
width, height = img.size
aspect_ratio = height / width
new_width = size
new_height = int(size * aspect_ratio)
# Use thumbnail method to preserve aspect ratio
img.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
# Generate filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
original_name = Path(image_path).stem
output_filename = f"{original_name}_thumb_{timestamp}.{output_format}"
output_path = os.path.join(output_dir, output_filename)
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Save
save_kwargs = {'quality': quality, 'optimize': True}
if output_format.lower() == 'webp':
save_kwargs['method'] = 6
elif output_format.lower() in ['jpeg', 'jpg']:
save_kwargs['progressive'] = True
img.save(output_path, format=output_format.upper(), **save_kwargs)
# Get file size
file_size = os.path.getsize(output_path)
return output_path, file_size
@staticmethod
def get_image_hash(image_path: str) -> str:
"""
Calculate SHA256 hash of image file
Args:
image_path: Path to image
Returns:
SHA256 hash as hex string
"""
sha256_hash = hashlib.sha256()
with open(image_path, "rb") as f:
# Read in chunks for large files
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
@staticmethod
def get_image_info(image_path: str) -> dict:
"""
Get image information
Args:
image_path: Path to image
Returns:
Dictionary with image info
"""
img = Image.open(image_path)
return {
"width": img.width,
"height": img.height,
"format": img.format,
"mode": img.mode,
"size_bytes": os.path.getsize(image_path),
"hash": ImageProcessor.get_image_hash(image_path)
}
@staticmethod
def is_valid_image(file_path: str) -> bool:
"""
Check if file is a valid image
Args:
file_path: Path to file
Returns:
True if valid image, False otherwise
"""
try:
img = Image.open(file_path)
img.verify()
return True
except Exception:
return False
@staticmethod
def get_mime_type(file_path: str) -> Optional[str]:
"""
Get MIME type from image file
Args:
file_path: Path to image
Returns:
MIME type string or None
"""
try:
img = Image.open(file_path)
format_to_mime = {
'JPEG': 'image/jpeg',
'PNG': 'image/png',
'GIF': 'image/gif',
'BMP': 'image/bmp',
'WEBP': 'image/webp',
'TIFF': 'image/tiff'
}
return format_to_mime.get(img.format, f'image/{img.format.lower()}')
except Exception:
return None

246
backend/app/utils/lsusb_parser.py Executable file
View File

@@ -0,0 +1,246 @@
"""
lsusb output parser for USB device detection and extraction.
Parses output from 'lsusb -v' and extracts individual device information.
"""
import re
from typing import List, Dict, Any, Optional
def detect_usb_devices(lsusb_output: str) -> List[Dict[str, str]]:
"""
Detect all USB devices from lsusb -v output.
Returns a list of devices with their Bus line and basic info.
Args:
lsusb_output: Raw output from 'lsusb -v' command
Returns:
List of dicts with keys: bus_line, bus, device, id, vendor_id, product_id, description
Example:
[
{
"bus_line": "Bus 002 Device 003: ID 0781:55ab SanDisk Corp. ...",
"bus": "002",
"device": "003",
"id": "0781:55ab",
"vendor_id": "0x0781",
"product_id": "0x55ab",
"description": "SanDisk Corp. ..."
},
...
]
"""
devices = []
lines = lsusb_output.strip().split('\n')
for line in lines:
line_stripped = line.strip()
# Match lines starting with "Bus"
# Format: "Bus 002 Device 003: ID 0781:55ab SanDisk Corp. ..."
match = re.match(r'^Bus\s+(\d+)\s+Device\s+(\d+):\s+ID\s+([0-9a-fA-F]{4}):([0-9a-fA-F]{4})\s*(.*)$', line_stripped)
if match:
bus = match.group(1)
device_num = match.group(2)
vendor_id = match.group(3).lower()
product_id = match.group(4).lower()
description = match.group(5).strip()
devices.append({
"bus_line": line_stripped,
"bus": bus,
"device": device_num,
"id": f"{vendor_id}:{product_id}",
"vendor_id": f"0x{vendor_id}",
"product_id": f"0x{product_id}",
"description": description
})
return devices
def extract_device_section(lsusb_output: str, bus: str, device: str) -> Optional[str]:
"""
Extract the complete section for a specific device from lsusb -v output.
Args:
lsusb_output: Raw output from 'lsusb -v' command
bus: Bus number (e.g., "002")
device: Device number (e.g., "003")
Returns:
Complete section for the device, from its Bus line to the next Bus line (or end)
"""
lines = lsusb_output.strip().split('\n')
# Build the pattern to match the target device's Bus line
target_pattern = re.compile(rf'^Bus\s+{bus}\s+Device\s+{device}:')
section_lines = []
in_section = False
for line in lines:
# Check if this is the start of our target device
if target_pattern.match(line):
in_section = True
section_lines.append(line)
continue
# If we're in the section
if in_section:
# Check if we've hit the next device (new Bus line)
if line.startswith('Bus '):
# End of our section
break
# Add the line to our section
section_lines.append(line)
if section_lines:
return '\n'.join(section_lines)
return None
def parse_device_info(device_section: str) -> Dict[str, Any]:
"""
Parse detailed information from a device section.
Args:
device_section: The complete lsusb output for a single device
Returns:
Dictionary with parsed device information including interface classes
"""
result = {
"vendor_id": None, # idVendor
"product_id": None, # idProduct
"manufacturer": None, # iManufacturer (fabricant)
"product": None, # iProduct (modele)
"serial": None,
"usb_version": None, # bcdUSB (declared version)
"device_class": None, # bDeviceClass
"device_subclass": None,
"device_protocol": None,
"interface_classes": [], # CRITICAL: bInterfaceClass from all interfaces
"max_power": None, # MaxPower (in mA)
"speed": None, # Negotiated speed (determines actual USB type)
"usb_type": None, # Determined from negotiated speed
"requires_firmware": False, # True if any interface is Vendor Specific (255)
"is_bus_powered": None,
"is_self_powered": None,
"power_sufficient": None # Based on MaxPower vs port capacity
}
lines = device_section.split('\n')
# Parse the first line (Bus line) - contains idVendor:idProduct and vendor name
# Format: "Bus 002 Device 005: ID 0bda:8176 Realtek Semiconductor Corp."
first_line = lines[0] if lines else ""
bus_match = re.match(r'^Bus\s+\d+\s+Device\s+\d+:\s+ID\s+([0-9a-fA-F]{4}):([0-9a-fA-F]{4})\s*(.*)$', first_line)
if bus_match:
result["vendor_id"] = f"0x{bus_match.group(1).lower()}"
result["product_id"] = f"0x{bus_match.group(2).lower()}"
# Extract vendor name from first line (marque = text after IDs)
vendor_name = bus_match.group(3).strip()
if vendor_name:
result["manufacturer"] = vendor_name
# Parse detailed fields
current_interface = False
for line in lines[1:]:
line_stripped = line.strip()
# iManufacturer (fabricant)
mfg_match = re.search(r'iManufacturer\s+\d+\s+(.+?)$', line_stripped)
if mfg_match:
result["manufacturer"] = mfg_match.group(1).strip()
# iProduct (modele)
prod_match = re.search(r'iProduct\s+\d+\s+(.+?)$', line_stripped)
if prod_match:
result["product"] = prod_match.group(1).strip()
# iSerial
serial_match = re.search(r'iSerial\s+\d+\s+(.+?)$', line_stripped)
if serial_match:
result["serial"] = serial_match.group(1).strip()
# bcdUSB (declared version, not definitive)
usb_ver_match = re.search(r'bcdUSB\s+([\d.]+)', line_stripped)
if usb_ver_match:
result["usb_version"] = usb_ver_match.group(1).strip()
# bDeviceClass
class_match = re.search(r'bDeviceClass\s+(\d+)\s+(.+?)$', line_stripped)
if class_match:
result["device_class"] = class_match.group(1).strip()
# bDeviceSubClass
subclass_match = re.search(r'bDeviceSubClass\s+(\d+)', line_stripped)
if subclass_match:
result["device_subclass"] = subclass_match.group(1).strip()
# bDeviceProtocol
protocol_match = re.search(r'bDeviceProtocol\s+(\d+)', line_stripped)
if protocol_match:
result["device_protocol"] = protocol_match.group(1).strip()
# MaxPower (extract numeric value in mA)
power_match = re.search(r'MaxPower\s+(\d+)\s*mA', line_stripped)
if power_match:
result["max_power"] = power_match.group(1).strip()
# bmAttributes (to determine Bus/Self powered)
attr_match = re.search(r'bmAttributes\s+0x([0-9a-fA-F]+)', line_stripped)
if attr_match:
attrs = int(attr_match.group(1), 16)
# Bit 6: Self Powered, Bit 5: Remote Wakeup
result["is_self_powered"] = bool(attrs & 0x40)
result["is_bus_powered"] = not result["is_self_powered"]
# CRITICAL: bInterfaceClass (this determines Mass Storage, not bDeviceClass)
interface_class_match = re.search(r'bInterfaceClass\s+(\d+)\s+(.+?)$', line_stripped)
if interface_class_match:
class_code = int(interface_class_match.group(1))
class_name = interface_class_match.group(2).strip()
result["interface_classes"].append({
"code": class_code,
"name": class_name
})
# Check for Vendor Specific (255) - requires firmware
if class_code == 255:
result["requires_firmware"] = True
# Detect negotiated speed (determines actual USB type)
# Format can be: "Device Qualifier (for other device speed):" or speed mentioned
speed_patterns = [
(r'1\.5\s*Mb(?:it)?/s|Low\s+Speed', 'Low Speed', 'USB 1.1'),
(r'12\s*Mb(?:it)?/s|Full\s+Speed', 'Full Speed', 'USB 1.1'),
(r'480\s*Mb(?:it)?/s|High\s+Speed', 'High Speed', 'USB 2.0'),
(r'5000\s*Mb(?:it)?/s|5\s*Gb(?:it)?/s|SuperSpeed(?:\s+USB)?(?:\s+Gen\s*1)?', 'SuperSpeed', 'USB 3.0'),
(r'10\s*Gb(?:it)?/s|SuperSpeed\s+USB\s+Gen\s*2|SuperSpeed\+', 'SuperSpeed+', 'USB 3.1'),
(r'20\s*Gb(?:it)?/s|SuperSpeed\s+USB\s+Gen\s*2x2', 'SuperSpeed Gen 2x2', 'USB 3.2'),
]
for pattern, speed_name, usb_type in speed_patterns:
if re.search(pattern, line_stripped, re.IGNORECASE):
result["speed"] = speed_name
result["usb_type"] = usb_type
break
# Determine power sufficiency based on USB type and MaxPower
if result["max_power"]:
max_power_ma = int(result["max_power"])
usb_type = result.get("usb_type", "USB 2.0") # Default to USB 2.0
# Normative port capacities
if "USB 3" in usb_type:
port_capacity = 900 # USB 3.x: 900 mA @ 5V = 4.5W
else:
port_capacity = 500 # USB 2.0: 500 mA @ 5V = 2.5W
result["power_sufficient"] = max_power_ma <= port_capacity
return result

322
backend/app/utils/md_parser.py Executable file
View File

@@ -0,0 +1,322 @@
"""
Markdown specification file parser for peripherals.
Parses .md files containing USB device specifications.
"""
import re
from typing import Dict, Any, Optional
def parse_md_specification(md_content: str) -> Dict[str, Any]:
"""
Parse a markdown specification file and extract peripheral information.
Supports two formats:
1. Simple format: Title + Description
2. Detailed format: Full USB specification with vendor/product IDs, characteristics, etc.
Args:
md_content: Raw markdown content
Returns:
Dictionary with peripheral data ready for database insertion
"""
result = {
"nom": None,
"type_principal": "USB",
"sous_type": None,
"marque": None,
"modele": None,
"numero_serie": None,
"description": None,
"synthese": md_content, # Store complete markdown content
"caracteristiques_specifiques": {},
"notes": None
}
lines = md_content.strip().split('\n')
# Extract title (first H1)
title_match = re.search(r'^#\s+(.+?)$', md_content, re.MULTILINE)
if title_match:
title = title_match.group(1).strip()
# Extract USB IDs from title if present
id_match = re.search(r'(?:ID\s+)?([0-9a-fA-F]{4})[_:]([0-9a-fA-F]{4})', title)
if id_match:
vendor_id = id_match.group(1).lower()
product_id = id_match.group(2).lower()
result["caracteristiques_specifiques"]["vendor_id"] = f"0x{vendor_id}"
result["caracteristiques_specifiques"]["product_id"] = f"0x{product_id}"
# Parse content
current_section = None
description_lines = []
notes_lines = []
for line in lines:
line = line.strip()
# Section headers (H2)
if line.startswith('## '):
section_raw = line[3:].strip()
# Remove numbering (e.g., "1. ", "2. ", "10. ")
current_section = re.sub(r'^\d+\.\s*', '', section_raw)
continue
# Description section
if current_section == "Description":
if line and not line.startswith('#'):
description_lines.append(line)
# Try to extract device type from description
if not result["sous_type"]:
# Common patterns
if re.search(r'souris|mouse', line, re.IGNORECASE):
result["sous_type"] = "Souris"
elif re.search(r'clavier|keyboard', line, re.IGNORECASE):
result["sous_type"] = "Clavier"
elif re.search(r'wi-?fi|wireless', line, re.IGNORECASE):
result["type_principal"] = "WiFi"
result["sous_type"] = "Adaptateur WiFi"
elif re.search(r'bluetooth', line, re.IGNORECASE):
result["type_principal"] = "Bluetooth"
result["sous_type"] = "Adaptateur Bluetooth"
elif re.search(r'usb\s+flash|clé\s+usb|flash\s+drive', line, re.IGNORECASE):
result["sous_type"] = "Clé USB"
elif re.search(r'dongle', line, re.IGNORECASE):
result["sous_type"] = "Dongle"
# Identification section (support both "Identification" and "Identification USB")
elif current_section in ["Identification", "Identification USB", "Identification générale"]:
# Vendor ID (support multiple formats)
vendor_match = re.search(r'\*\*Vendor\s+ID\*\*\s*:\s*0x([0-9a-fA-F]{4})\s*(?:\((.+?)\))?', line)
if vendor_match:
result["caracteristiques_specifiques"]["vendor_id"] = f"0x{vendor_match.group(1)}"
if vendor_match.group(2):
result["marque"] = vendor_match.group(2).strip()
# Product ID (support multiple formats)
product_match = re.search(r'\*\*Product\s+ID\*\*\s*:\s*0x([0-9a-fA-F]{4})', line)
if product_match:
result["caracteristiques_specifiques"]["product_id"] = f"0x{product_match.group(1)}"
# Commercial name or Désignation USB
name_match = re.search(r'\*\*(?:Commercial\s+name|Désignation\s+USB)\*\*\s*:\s*(.+?)$', line, re.IGNORECASE)
if name_match:
result["nom"] = name_match.group(1).strip()
# Manufacturer
mfg_match = re.search(r'\*\*Manufacturer\s+string\*\*:\s*(.+?)$', line)
if mfg_match and not result["marque"]:
result["marque"] = mfg_match.group(1).strip()
# Product string
prod_match = re.search(r'\*\*Product\s+string\*\*:\s*(.+?)$', line)
if prod_match and not result["nom"]:
result["nom"] = prod_match.group(1).strip()
# Serial number
serial_match = re.search(r'\*\*Serial\s+number\*\*:\s*(.+?)$', line)
if serial_match:
result["numero_serie"] = serial_match.group(1).strip()
# Catégorie (format FR)
cat_match = re.search(r'\*\*Catégorie\*\*:\s*(.+?)$', line)
if cat_match:
cat_value = cat_match.group(1).strip()
if 'réseau' in cat_value.lower():
result["type_principal"] = "Réseau"
# Sous-catégorie (format FR)
subcat_match = re.search(r'\*\*Sous-catégorie\*\*:\s*(.+?)$', line)
if subcat_match:
result["sous_type"] = subcat_match.group(1).strip()
# Nom courant (format FR)
common_match = re.search(r'\*\*Nom\s+courant\*\*\s*:\s*(.+?)$', line)
if common_match and not result.get("modele"):
result["modele"] = common_match.group(1).strip()
# Version USB (from Identification USB section)
version_match = re.search(r'\*\*Version\s+USB\*\*\s*:\s*(.+?)$', line)
if version_match:
result["caracteristiques_specifiques"]["usb_version"] = version_match.group(1).strip()
# Vitesse négociée (from Identification USB section)
speed_match2 = re.search(r'\*\*Vitesse\s+négociée\*\*\s*:\s*(.+?)$', line)
if speed_match2:
result["caracteristiques_specifiques"]["usb_speed"] = speed_match2.group(1).strip()
# Consommation maximale (from Identification USB section)
power_match2 = re.search(r'\*\*Consommation\s+maximale\*\*\s*:\s*(.+?)$', line)
if power_match2:
result["caracteristiques_specifiques"]["max_power"] = power_match2.group(1).strip()
# USB Characteristics
elif current_section == "USB Characteristics":
# USB version (support both formats)
usb_ver_match = re.search(r'\*\*(?:USB\s+version|Version\s+USB)\*\*:\s*(.+?)$', line, re.IGNORECASE)
if usb_ver_match:
result["caracteristiques_specifiques"]["usb_version"] = usb_ver_match.group(1).strip()
# Speed (support both formats)
speed_match = re.search(r'\*\*(?:Negotiated\s+speed|Vitesse\s+négociée)\*\*:\s*(.+?)$', line, re.IGNORECASE)
if speed_match:
result["caracteristiques_specifiques"]["usb_speed"] = speed_match.group(1).strip()
# bcdUSB
bcd_match = re.search(r'\*\*bcdUSB\*\*:\s*(.+?)$', line)
if bcd_match:
result["caracteristiques_specifiques"]["bcdUSB"] = bcd_match.group(1).strip()
# Power (support both formats)
power_match = re.search(r'\*\*(?:Max\s+power\s+draw|Consommation\s+maximale)\*\*:\s*(.+?)$', line, re.IGNORECASE)
if power_match:
result["caracteristiques_specifiques"]["max_power"] = power_match.group(1).strip()
# Device Class (support both formats)
elif current_section in ["Device Class", "Classe et interface USB"]:
# Interface class (EN format)
class_match = re.search(r'\*\*Interface\s+class\*\*:\s*(\d+)\s*—\s*(.+?)$', line)
if class_match:
result["caracteristiques_specifiques"]["interface_class"] = class_match.group(1)
result["caracteristiques_specifiques"]["interface_class_name"] = class_match.group(2).strip()
# Classe USB (FR format)
class_fr_match = re.search(r'\*\*Classe\s+USB\*\*\s*:\s*(.+?)\s*\((\d+)\)', line)
if class_fr_match:
result["caracteristiques_specifiques"]["interface_class"] = class_fr_match.group(2)
result["caracteristiques_specifiques"]["interface_class_name"] = class_fr_match.group(1).strip()
# Subclass (EN format)
subclass_match = re.search(r'\*\*Subclass\*\*\s*:\s*(\d+)\s*—\s*(.+?)$', line)
if subclass_match:
result["caracteristiques_specifiques"]["interface_subclass"] = subclass_match.group(1)
result["caracteristiques_specifiques"]["interface_subclass_name"] = subclass_match.group(2).strip()
# Sous-classe (FR format)
subclass_fr_match = re.search(r'\*\*Sous-classe\*\*\s*:\s*(.+?)\s*\((\d+)\)', line)
if subclass_fr_match:
result["caracteristiques_specifiques"]["interface_subclass"] = subclass_fr_match.group(2)
result["caracteristiques_specifiques"]["interface_subclass_name"] = subclass_fr_match.group(1).strip()
# Protocol (EN format)
protocol_match = re.search(r'\*\*Protocol\*\*\s*:\s*(\d+|[0-9a-fA-F]{2})\s*—\s*(.+?)$', line)
if protocol_match:
result["caracteristiques_specifiques"]["interface_protocol"] = protocol_match.group(1)
result["caracteristiques_specifiques"]["interface_protocol_name"] = protocol_match.group(2).strip()
# Protocole (FR format)
protocol_fr_match = re.search(r'\*\*Protocole\*\*\s*:\s*(.+?)\s*\((\d+)\)', line)
if protocol_fr_match:
result["caracteristiques_specifiques"]["interface_protocol"] = protocol_fr_match.group(2)
result["caracteristiques_specifiques"]["interface_protocol_name"] = protocol_fr_match.group(1).strip()
# Functional Role
elif current_section == "Functional Role":
if line.startswith('- '):
notes_lines.append(line[2:])
# Classification Summary
elif current_section == "Classification Summary":
# Category
category_match = re.search(r'\*\*Category\*\*:\s*(.+?)$', line)
if category_match:
result["caracteristiques_specifiques"]["category"] = category_match.group(1).strip()
# Subcategory
subcategory_match = re.search(r'\*\*Subcategory\*\*:\s*(.+?)$', line)
if subcategory_match:
result["caracteristiques_specifiques"]["subcategory"] = subcategory_match.group(1).strip()
# Wi-Fi characteristics (new section for wireless adapters)
elif current_section == "Caractéristiques WiFi":
# Norme Wi-Fi
wifi_std_match = re.search(r'\*\*Norme\s+WiFi\*\*:\s*(.+?)$', line)
if wifi_std_match:
result["caracteristiques_specifiques"]["wifi_standard"] = wifi_std_match.group(1).strip()
# Bande de fréquence
freq_match = re.search(r'\*\*Bande\s+de\s+fréquence\*\*:\s*(.+?)$', line)
if freq_match:
result["caracteristiques_specifiques"]["wifi_frequency"] = freq_match.group(1).strip()
# Débit théorique maximal
speed_match = re.search(r'\*\*Débit\s+théorique\s+maximal\*\*:\s*(.+?)$', line)
if speed_match:
result["caracteristiques_specifiques"]["wifi_max_speed"] = speed_match.group(1).strip()
# Collect other sections for notes
elif current_section in ["Performance Notes", "Power & Stability Considerations",
"Recommended USB Port Placement", "Typical Use Cases",
"Operating System Support", "Pilotes et compatibilité système",
"Contraintes et limitations", "Placement USB recommandé",
"Cas d'usage typiques", "Fonction réseau", "Résumé synthétique"]:
if line and not line.startswith('#'):
if line.startswith('- '):
notes_lines.append(f"{current_section}: {line[2:]}")
elif line.startswith('**'):
notes_lines.append(f"{current_section}: {line}")
elif line.startswith('>'):
notes_lines.append(f"{current_section}: {line[1:].strip()}")
elif current_section == "Résumé synthétique":
notes_lines.append(line)
# Build description
if description_lines:
result["description"] = " ".join(description_lines)
# Build notes
if notes_lines:
result["notes"] = "\n".join(notes_lines)
# Fallback for nom if not found
if not result["nom"]:
if result["description"]:
# Use first line/sentence of description as name
first_line = result["description"].split('\n')[0]
result["nom"] = first_line[:100] if len(first_line) > 100 else first_line
elif title_match:
result["nom"] = title
else:
result["nom"] = "Périphérique importé"
# Extract brand from description if not found
if not result["marque"] and result["description"]:
# Common brand patterns
brands = ["Logitech", "SanDisk", "Ralink", "Broadcom", "ASUS", "Realtek",
"TP-Link", "Intel", "Samsung", "Kingston", "Corsair"]
for brand in brands:
if re.search(rf'\b{brand}\b', result["description"], re.IGNORECASE):
result["marque"] = brand
break
# Clean up None values and empty dicts
result = {k: v for k, v in result.items() if v is not None}
if not result.get("caracteristiques_specifiques"):
result.pop("caracteristiques_specifiques", None)
return result
def extract_usb_ids_from_filename(filename: str) -> Optional[Dict[str, str]]:
"""
Extract vendor_id and product_id from filename.
Examples:
ID_0781_55ab.md -> {"vendor_id": "0x0781", "product_id": "0x55ab"}
id_0b05_17cb.md -> {"vendor_id": "0x0b05", "product_id": "0x17cb"}
Args:
filename: Name of the file
Returns:
Dict with vendor_id and product_id, or None if not found
"""
match = re.search(r'(?:ID|id)[_\s]+([0-9a-fA-F]{4})[_:]([0-9a-fA-F]{4})', filename)
if match:
return {
"vendor_id": f"0x{match.group(1).lower()}",
"product_id": f"0x{match.group(2).lower()}"
}
return None

187
backend/app/utils/qr_generator.py Executable file
View File

@@ -0,0 +1,187 @@
"""
Linux BenchTools - QR Code Generator
Generate QR codes for locations
"""
import os
from pathlib import Path
from typing import Optional
import qrcode
from qrcode.image.styledpil import StyledPilImage
from qrcode.image.styles.moduledrawers import RoundedModuleDrawer
class QRCodeGenerator:
"""QR Code generation utilities"""
@staticmethod
def generate_location_qr(
location_id: int,
location_name: str,
base_url: str,
output_dir: str,
size: int = 300
) -> str:
"""
Generate QR code for a location
Args:
location_id: Location ID
location_name: Location name (for filename)
base_url: Base URL of the application
output_dir: Directory for output
size: QR code size in pixels
Returns:
Path to generated QR code image
"""
# Create URL pointing to location page
url = f"{base_url}/peripherals?location={location_id}"
# Create QR code
qr = qrcode.QRCode(
version=1, # Auto-adjust
error_correction=qrcode.constants.ERROR_CORRECT_H, # High error correction
box_size=10,
border=4,
)
qr.add_data(url)
qr.make(fit=True)
# Generate image with rounded style
img = qr.make_image(
image_factory=StyledPilImage,
module_drawer=RoundedModuleDrawer()
)
# Resize to specified size
img = img.resize((size, size))
# Generate filename
safe_name = "".join(c for c in location_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_name = safe_name.replace(' ', '_')
output_filename = f"qr_location_{location_id}_{safe_name}.png"
output_path = os.path.join(output_dir, output_filename)
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Save
img.save(output_path)
return output_path
@staticmethod
def generate_peripheral_qr(
peripheral_id: int,
peripheral_name: str,
base_url: str,
output_dir: str,
size: int = 200
) -> str:
"""
Generate QR code for a peripheral
Args:
peripheral_id: Peripheral ID
peripheral_name: Peripheral name (for filename)
base_url: Base URL of the application
output_dir: Directory for output
size: QR code size in pixels
Returns:
Path to generated QR code image
"""
# Create URL pointing to peripheral detail page
url = f"{base_url}/peripheral/{peripheral_id}"
# Create QR code
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_H,
box_size=10,
border=4,
)
qr.add_data(url)
qr.make(fit=True)
# Generate image
img = qr.make_image(
image_factory=StyledPilImage,
module_drawer=RoundedModuleDrawer()
)
# Resize
img = img.resize((size, size))
# Generate filename
safe_name = "".join(c for c in peripheral_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_name = safe_name.replace(' ', '_')
output_filename = f"qr_peripheral_{peripheral_id}_{safe_name}.png"
output_path = os.path.join(output_dir, output_filename)
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Save
img.save(output_path)
return output_path
@staticmethod
def generate_custom_qr(
data: str,
output_path: str,
size: int = 300,
error_correction: str = "H"
) -> str:
"""
Generate a custom QR code
Args:
data: Data to encode
output_path: Full output path
size: QR code size in pixels
error_correction: Error correction level (L, M, Q, H)
Returns:
Path to generated QR code image
"""
# Map error correction
ec_map = {
"L": qrcode.constants.ERROR_CORRECT_L,
"M": qrcode.constants.ERROR_CORRECT_M,
"Q": qrcode.constants.ERROR_CORRECT_Q,
"H": qrcode.constants.ERROR_CORRECT_H
}
ec = ec_map.get(error_correction.upper(), qrcode.constants.ERROR_CORRECT_H)
# Create QR code
qr = qrcode.QRCode(
version=1,
error_correction=ec,
box_size=10,
border=4,
)
qr.add_data(data)
qr.make(fit=True)
# Generate image
img = qr.make_image(
image_factory=StyledPilImage,
module_drawer=RoundedModuleDrawer()
)
# Resize
img = img.resize((size, size))
# Ensure output directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Save
img.save(output_path)
return output_path

65
backend/app/utils/scoring.py Normal file → Executable file
View File

@@ -1,12 +1,12 @@
"""
Linux BenchTools - Scoring Utilities
New normalized scoring formulas (0-100 scale):
- CPU: events_per_second / 100
- Memory: throughput_mib_s / 1000
- Disk: (read_mb_s + write_mb_s) / 20
- Network: (upload_mbps + download_mbps) / 20
- GPU: glmark2_score / 50
Raw benchmark scoring (no normalization):
- CPU: events_per_second (raw)
- Memory: throughput_mib_s (raw)
- Disk: read_mb_s + write_mb_s (raw)
- Network: upload_mbps + download_mbps (raw)
- GPU: glmark2_score (raw)
"""
from app.core.config import settings
@@ -16,42 +16,40 @@ def calculate_cpu_score(events_per_second: float = None) -> float:
"""
Calculate CPU score from sysbench events per second.
Formula: events_per_second / 100
Range: 0-100 (capped)
Formula: events_per_second (raw value)
No normalization applied.
Example: 3409.87 events/s → 34.1 score
Example: 3409.87 events/s → 3409.87 score
"""
if events_per_second is None or events_per_second <= 0:
return 0.0
score = events_per_second / 100.0
return min(100.0, max(0.0, score))
return max(0.0, events_per_second)
def calculate_memory_score(throughput_mib_s: float = None) -> float:
"""
Calculate Memory score from sysbench throughput.
Formula: throughput_mib_s / 1000
Range: 0-100 (capped)
Formula: throughput_mib_s (raw value)
No normalization applied.
Example: 13806.03 MiB/s → 13.8 score
Example: 13806.03 MiB/s → 13806.03 score
"""
if throughput_mib_s is None or throughput_mib_s <= 0:
return 0.0
score = throughput_mib_s / 1000.0
return min(100.0, max(0.0, score))
return max(0.0, throughput_mib_s)
def calculate_disk_score(read_mb_s: float = None, write_mb_s: float = None) -> float:
"""
Calculate Disk score from fio read/write bandwidth.
Formula: (read_mb_s + write_mb_s) / 20
Range: 0-100 (capped)
Formula: read_mb_s + write_mb_s (raw value)
No normalization applied.
Example: (695 + 695) MB/s → 69.5 score
Example: (695 + 695) MB/s → 1390 score
"""
if read_mb_s is None and write_mb_s is None:
return 0.0
@@ -59,18 +57,17 @@ def calculate_disk_score(read_mb_s: float = None, write_mb_s: float = None) -> f
read = read_mb_s if read_mb_s is not None and read_mb_s > 0 else 0.0
write = write_mb_s if write_mb_s is not None and write_mb_s > 0 else 0.0
score = (read + write) / 20.0
return min(100.0, max(0.0, score))
return max(0.0, read + write)
def calculate_network_score(upload_mbps: float = None, download_mbps: float = None) -> float:
"""
Calculate Network score from iperf3 upload/download speeds.
Formula: (upload_mbps + download_mbps) / 20
Range: 0-100 (capped)
Formula: upload_mbps + download_mbps (raw value)
No normalization applied.
Example: (484.67 + 390.13) Mbps → 43.7 score
Example: (484.67 + 390.13) Mbps → 874.8 score
"""
if upload_mbps is None and download_mbps is None:
return 0.0
@@ -78,24 +75,22 @@ def calculate_network_score(upload_mbps: float = None, download_mbps: float = No
upload = upload_mbps if upload_mbps is not None and upload_mbps > 0 else 0.0
download = download_mbps if download_mbps is not None and download_mbps > 0 else 0.0
score = (upload + download) / 20.0
return min(100.0, max(0.0, score))
return max(0.0, upload + download)
def calculate_gpu_score(glmark2_score: int = None) -> float:
"""
Calculate GPU score from glmark2 benchmark.
Formula: glmark2_score / 50
Range: 0-100 (capped)
Formula: glmark2_score (raw value)
No normalization applied.
Example: 2500 glmark2 → 50.0 score
Example: 2500 glmark2 → 2500 score
"""
if glmark2_score is None or glmark2_score <= 0:
return 0.0
score = glmark2_score / 50.0
return min(100.0, max(0.0, score))
return max(0.0, float(glmark2_score))
def calculate_global_score(
@@ -146,8 +141,8 @@ def calculate_global_score(
weighted_sum = sum(score * weight for score, weight in zip(scores, weights))
global_score = weighted_sum / total_weight
# Clamp to 0-100 range
return max(0.0, min(100.0, global_score))
# Ensure non-negative
return max(0.0, global_score)
def validate_score(score: float) -> bool:
@@ -158,9 +153,9 @@ def validate_score(score: float) -> bool:
score: Score value to validate
Returns:
bool: True if score is valid (0-100 or None)
bool: True if score is valid (>= 0 or None)
"""
if score is None:
return True
return 0.0 <= score <= 100.0
return score >= 0.0

View File

@@ -0,0 +1,372 @@
"""
Enhanced USB information parser
Parses structured USB device information (from lsusb -v or GUI tools)
Outputs YAML-formatted CLI section
"""
import re
import yaml
from typing import Dict, Any, Optional, List
def parse_structured_usb_info(text: str) -> Dict[str, Any]:
"""
Parse structured USB information text
Args:
text: Raw USB information (French or English)
Returns:
Dict with general fields and structured CLI data
"""
result = {
"general": {},
"cli_yaml": {},
"caracteristiques_specifiques": {}
}
# Normalize text
lines = text.strip().split('\n')
# ===========================================
# CHAMPS COMMUNS À TOUS (→ caracteristiques_specifiques)
# Per technical specs:
# - marque = Vendor string (3rd column of idVendor)
# - modele = Product string (3rd column of idProduct)
# - fabricant = iManufacturer (manufacturer string)
# - produit = iProduct (product string)
# ===========================================
for line in lines:
line = line.strip()
# Vendor ID - COMMUN
if match := re.search(r'Vendor\s+ID\s*:\s*(0x[0-9a-fA-F]+)\s+(.+)', line):
vid = match.group(1).lower()
result["caracteristiques_specifiques"]["vendor_id"] = vid
vendor_str = match.group(2).strip()
if vendor_str and vendor_str != "0":
result["general"]["marque"] = vendor_str
# Product ID - COMMUN
if match := re.search(r'Product\s+ID\s*:\s*(0x[0-9a-fA-F]+)\s+(.+)', line):
pid = match.group(1).lower()
result["caracteristiques_specifiques"]["product_id"] = pid
product_str = match.group(2).strip()
if product_str and product_str != "0":
result["general"]["modele"] = product_str
# Vendor string - marque
if match := re.search(r'Vendor\s+string\s*:\s*(.+)', line):
vendor = match.group(1).strip()
if vendor and vendor != "0":
result["general"]["marque"] = vendor
# iManufacturer - fabricant
if match := re.search(r'iManufacturer\s*:\s*(.+)', line):
manufacturer = match.group(1).strip()
if manufacturer and manufacturer != "0":
result["caracteristiques_specifiques"]["fabricant"] = manufacturer
result["general"]["fabricant"] = manufacturer
# Product string - modele
if match := re.search(r'Product\s+string\s*:\s*(.+)', line):
product = match.group(1).strip()
if product and product != "0":
result["general"]["modele"] = product
# Also use as nom if not already set
if "nom" not in result["general"]:
result["general"]["nom"] = product
# iProduct - produit
if match := re.search(r'iProduct\s*:\s*(.+)', line):
product = match.group(1).strip()
if product and product != "0":
result["caracteristiques_specifiques"]["produit"] = product
result["general"]["produit"] = product
# Serial number - PARFOIS ABSENT → general seulement si présent
if match := re.search(r'Numéro\s+de\s+série\s*:\s*(.+)', line):
serial = match.group(1).strip()
if serial and "non présent" not in serial.lower() and serial != "0":
result["general"]["numero_serie"] = serial
# USB version (bcdUSB) - DECLARED, not definitive
if match := re.search(r'USB\s+([\d.]+).*bcdUSB\s+([\d.]+)', line):
result["caracteristiques_specifiques"]["usb_version_declared"] = f"USB {match.group(2)}"
# Vitesse négociée - CRITICAL: determines actual USB type
if match := re.search(r'Vitesse\s+négociée\s*:\s*(.+)', line):
speed = match.group(1).strip()
result["caracteristiques_specifiques"]["negotiated_speed"] = speed
# Determine USB type from negotiated speed
speed_lower = speed.lower()
if 'low speed' in speed_lower or '1.5' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 1.1"
elif 'full speed' in speed_lower or '12 mb' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 1.1"
elif 'high speed' in speed_lower or '480 mb' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 2.0"
elif 'superspeed+' in speed_lower or '10 gb' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 3.1"
elif 'superspeed' in speed_lower or '5 gb' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 3.0"
# Classe périphérique (bDeviceClass) - LESS RELIABLE than bInterfaceClass
if match := re.search(r'Classe\s+périphérique\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
class_code = match.group(1)
class_name = match.group(2) if match.group(2) else ""
result["caracteristiques_specifiques"]["device_class"] = class_code
result["caracteristiques_specifiques"]["device_class_nom"] = class_name.strip()
# Sous-classe périphérique
if match := re.search(r'Sous-classe\s+périphérique\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
subclass_code = match.group(1)
subclass_name = match.group(2) if match.group(2) else ""
result["caracteristiques_specifiques"]["device_subclass"] = subclass_code
result["caracteristiques_specifiques"]["device_subclass_nom"] = subclass_name.strip()
# Protocole périphérique
if match := re.search(r'Protocole\s+périphérique\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
protocol_code = match.group(1)
protocol_name = match.group(2) if match.group(2) else ""
result["caracteristiques_specifiques"]["device_protocol"] = protocol_code
result["caracteristiques_specifiques"]["device_protocol_nom"] = protocol_name.strip()
# Puissance maximale (MaxPower)
if match := re.search(r'Puissance\s+maximale.*:\s*(\d+)\s*mA', line):
power_ma = int(match.group(1))
result["caracteristiques_specifiques"]["max_power_ma"] = power_ma
# Determine power sufficiency based on USB type
usb_type = result["caracteristiques_specifiques"].get("usb_type", "USB 2.0")
if "USB 3" in usb_type:
port_capacity = 900 # USB 3.x: 900 mA @ 5V = 4.5W
else:
port_capacity = 500 # USB 2.0: 500 mA @ 5V = 2.5W
result["caracteristiques_specifiques"]["power_sufficient"] = power_ma <= port_capacity
# Mode alimentation (Bus Powered vs Self Powered)
if match := re.search(r'Mode\s+d.alimentation\s*:\s*(.+)', line):
power_mode = match.group(1).strip()
result["caracteristiques_specifiques"]["power_mode"] = power_mode
result["caracteristiques_specifiques"]["is_bus_powered"] = "bus" in power_mode.lower()
result["caracteristiques_specifiques"]["is_self_powered"] = "self" in power_mode.lower()
# ===========================================
# DÉTAILS SPÉCIFIQUES (→ cli_yaml)
# Tous les champs vont aussi dans cli_yaml pour avoir une vue complète
# ===========================================
# Bus & Device
for line in lines:
line = line.strip()
if match := re.search(r'Bus\s*:\s*(\d+)', line):
result["cli_yaml"]["bus"] = match.group(1)
if match := re.search(r'Device\s*:\s*(\d+)', line):
result["cli_yaml"]["device"] = match.group(1)
# Copy all caracteristiques_specifiques to cli_yaml
result["cli_yaml"]["identification"] = {
"vendor_id": result["caracteristiques_specifiques"].get("vendor_id"),
"product_id": result["caracteristiques_specifiques"].get("product_id"),
"vendor_string": result["general"].get("marque"),
"product_string": result["general"].get("modele") or result["general"].get("nom"),
"numero_serie": result["general"].get("numero_serie"),
}
result["cli_yaml"]["usb"] = {
"version": result["caracteristiques_specifiques"].get("usb_version"),
"vitesse_negociee": result["caracteristiques_specifiques"].get("vitesse_negociee"),
}
result["cli_yaml"]["classe"] = {
"device_class": result["caracteristiques_specifiques"].get("device_class"),
"device_class_nom": result["caracteristiques_specifiques"].get("device_class_nom"),
"device_subclass": result["caracteristiques_specifiques"].get("device_subclass"),
"device_subclass_nom": result["caracteristiques_specifiques"].get("device_subclass_nom"),
"device_protocol": result["caracteristiques_specifiques"].get("device_protocol"),
"device_protocol_nom": result["caracteristiques_specifiques"].get("device_protocol_nom"),
}
result["cli_yaml"]["alimentation"] = {
"max_power": result["caracteristiques_specifiques"].get("max_power"),
"power_mode": result["caracteristiques_specifiques"].get("power_mode"),
}
# Extract interface information (CRITICAL for Mass Storage detection)
interfaces = extract_interfaces(text)
if interfaces:
result["cli_yaml"]["interfaces"] = interfaces
# Extract interface classes for classification
interface_classes = []
requires_firmware = False
for iface in interfaces:
if "classe" in iface:
class_code = iface["classe"].get("code")
class_name = iface["classe"].get("nom", "")
interface_classes.append({
"code": class_code,
"name": class_name
})
# Check for Vendor Specific (255) - requires firmware
if class_code == 255:
requires_firmware = True
result["caracteristiques_specifiques"]["interface_classes"] = interface_classes
result["caracteristiques_specifiques"]["requires_firmware"] = requires_firmware
# Extract endpoints
endpoints = extract_endpoints(text)
if endpoints:
result["cli_yaml"]["endpoints"] = endpoints
return result
def extract_interfaces(text: str) -> List[Dict[str, Any]]:
"""
Extract interface information
CRITICAL: bInterfaceClass is normative for Mass Storage detection (class 08)
"""
interfaces = []
lines = text.split('\n')
current_interface = None
for line in lines:
line = line.strip()
# New interface
if match := re.search(r'Interface\s+(\d+)', line):
if current_interface:
interfaces.append(current_interface)
current_interface = {
"numero": int(match.group(1)),
}
if not current_interface:
continue
# Alternate setting
if match := re.search(r'Alternate\s+setting\s*:\s*(\d+)', line):
current_interface["alternate_setting"] = int(match.group(1))
# Number of endpoints
if match := re.search(r'Nombre\s+d.endpoints\s*:\s*(\d+)', line):
current_interface["nombre_endpoints"] = int(match.group(1))
# Interface class (CRITICAL for Mass Storage)
if match := re.search(r'Classe\s+interface\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
class_code = int(match.group(1))
class_name = match.group(2).strip() if match.group(2) else ""
current_interface["classe"] = {
"code": class_code, # Store as int for classifier
"nom": class_name
}
# Interface subclass
if match := re.search(r'Sous-classe\s+interface\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
current_interface["sous_classe"] = {
"code": int(match.group(1)),
"nom": match.group(2).strip() if match.group(2) else ""
}
# Interface protocol
if match := re.search(r'Protocole\s+interface\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
current_interface["protocole"] = {
"code": int(match.group(1)),
"nom": match.group(2).strip() if match.group(2) else ""
}
if current_interface:
interfaces.append(current_interface)
return interfaces
def extract_endpoints(text: str) -> List[Dict[str, Any]]:
"""Extract endpoint information"""
endpoints = []
lines = text.split('\n')
for line in lines:
line = line.strip()
# Endpoint line: EP 0x81 (IN)
if match := re.search(r'EP\s+(0x[0-9a-fA-F]+)\s*\(([IN|OUT]+)\)', line):
endpoint = {
"adresse": match.group(1).lower(),
"direction": match.group(2)
}
endpoints.append(endpoint)
continue
# Type de transfert
if endpoints and (match := re.search(r'Type(?:\s+de\s+transfert)?\s*:\s*(\w+)', line)):
endpoints[-1]["type_transfert"] = match.group(1)
# Taille max paquet
if endpoints and (match := re.search(r'Taille\s+max\s+paquet\s*:\s*(\d+)\s*octets?', line)):
endpoints[-1]["taille_max_paquet"] = int(match.group(1))
# Interval
if endpoints and (match := re.search(r'Intervalle\s*:\s*(\d+)', line)):
endpoints[-1]["intervalle"] = int(match.group(1))
# bMaxBurst
if endpoints and (match := re.search(r'bMaxBurst\s*:\s*(\d+)', line)):
endpoints[-1]["max_burst"] = int(match.group(1))
return endpoints
def format_cli_as_yaml(cli_data: Dict[str, Any]) -> str:
"""
Format CLI data as YAML string
Args:
cli_data: Parsed CLI data
Returns:
YAML formatted string
"""
if not cli_data:
return ""
# Custom YAML formatting with comments
yaml_str = "# Informations USB extraites\n\n"
yaml_str += yaml.dump(cli_data, allow_unicode=True, sort_keys=False, indent=2, default_flow_style=False)
return yaml_str
def create_full_cli_section(text: str) -> str:
"""
Create a complete CLI section with both YAML and raw output
Args:
text: Raw USB information text
Returns:
Markdown-formatted CLI section with YAML + raw output
"""
parsed = parse_structured_usb_info(text)
cli_section = "# Informations USB\n\n"
# Add YAML section
cli_section += "## Données structurées (YAML)\n\n"
cli_section += "```yaml\n"
cli_section += format_cli_as_yaml(parsed["cli_yaml"])
cli_section += "```\n\n"
# Add raw output section
cli_section += "## Sortie brute\n\n"
cli_section += "```\n"
cli_section += text.strip()
cli_section += "\n```\n"
return cli_section

348
backend/app/utils/usb_parser.py Executable file
View File

@@ -0,0 +1,348 @@
"""
Linux BenchTools - USB Device Parser
Parses output from 'lsusb -v' command
"""
import re
from typing import Dict, Any, Optional, List
def parse_lsusb_verbose(lsusb_output: str) -> Dict[str, Any]:
"""
Parse the output of 'lsusb -v' command
Args:
lsusb_output: Raw text output from 'lsusb -v' command
Returns:
Dictionary with parsed USB device information
"""
result = {
"vendor_id": None,
"product_id": None,
"usb_device_id": None,
"marque": None,
"modele": None,
"fabricant": None,
"produit": None,
"numero_serie": None,
"usb_version": None,
"device_class": None,
"device_subclass": None,
"device_protocol": None,
"max_power_ma": None,
"speed": None,
"manufacturer": None,
"product": None,
"interfaces": [],
"raw_info": {}
}
lines = lsusb_output.strip().split('\n')
current_interface = None
for line in lines:
# Bus and Device info
# Example: Bus 002 Device 003: ID 0781:5567 SanDisk Corp. Cruzer Blade
match = re.match(r'Bus\s+(\d+)\s+Device\s+(\d+):\s+ID\s+([0-9a-f]{4}):([0-9a-f]{4})\s+(.*)', line)
if match:
result["raw_info"]["bus"] = match.group(1)
result["raw_info"]["device"] = match.group(2)
result["vendor_id"] = match.group(3)
result["product_id"] = match.group(4)
result["usb_device_id"] = f"{match.group(3)}:{match.group(4)}"
# Parse manufacturer and product from the description
desc = match.group(5)
parts = desc.split(' ', 1)
if len(parts) == 2:
result["marque"] = parts[0]
result["modele"] = parts[1]
else:
result["modele"] = desc
continue
# idVendor
match = re.search(r'idVendor\s+0x([0-9a-f]{4})\s+(.*)', line)
if match:
if not result["vendor_id"]:
result["vendor_id"] = match.group(1)
result["manufacturer"] = match.group(2).strip()
if not result["marque"]:
result["marque"] = result["manufacturer"]
if result.get("vendor_id") and result.get("product_id") and not result.get("usb_device_id"):
result["usb_device_id"] = f"{result['vendor_id']}:{result['product_id']}"
continue
# idProduct
match = re.search(r'idProduct\s+0x([0-9a-f]{4})\s+(.*)', line)
if match:
if not result["product_id"]:
result["product_id"] = match.group(1)
result["product"] = match.group(2).strip()
if not result["modele"]:
result["modele"] = result["product"]
if result.get("vendor_id") and result.get("product_id") and not result.get("usb_device_id"):
result["usb_device_id"] = f"{result['vendor_id']}:{result['product_id']}"
continue
# bcdUSB (USB version)
match = re.search(r'bcdUSB\s+([\d.]+)', line)
if match:
result["usb_version"] = match.group(1)
continue
# bDeviceClass
match = re.search(r'bDeviceClass\s+(\d+)\s+(.*)', line)
if match:
result["device_class"] = match.group(2).strip()
result["raw_info"]["device_class_code"] = match.group(1)
continue
# bDeviceSubClass
match = re.search(r'bDeviceSubClass\s+(\d+)\s*(.*)', line)
if match:
result["device_subclass"] = match.group(2).strip() if match.group(2) else match.group(1)
continue
# bDeviceProtocol
match = re.search(r'bDeviceProtocol\s+(\d+)\s*(.*)', line)
if match:
result["device_protocol"] = match.group(2).strip() if match.group(2) else match.group(1)
continue
# MaxPower
match = re.search(r'MaxPower\s+(\d+)mA', line)
if match:
result["max_power_ma"] = int(match.group(1))
continue
# iManufacturer
match = re.search(r'iManufacturer\s+\d+\s+(.*)', line)
if match and not result["manufacturer"]:
result["manufacturer"] = match.group(1).strip()
if not result["fabricant"]:
result["fabricant"] = result["manufacturer"]
continue
# iProduct
match = re.search(r'iProduct\s+\d+\s+(.*)', line)
if match and not result["product"]:
result["product"] = match.group(1).strip()
if not result["produit"]:
result["produit"] = result["product"]
continue
# iSerial
match = re.search(r'iSerial\s+\d+\s+(.*)', line)
if match:
serial = match.group(1).strip()
if serial and serial != "0":
result["numero_serie"] = serial
continue
# Speed (from Device Descriptor or Status)
match = re.search(r'Device Status:.*?Speed:\s*(\w+)', line)
if match:
result["speed"] = match.group(1)
continue
# Alternative speed detection
if "480M" in line or "high-speed" in line.lower() or "high speed" in line.lower():
result["speed"] = "High Speed (480 Mbps)"
elif "5000M" in line or "super-speed" in line.lower() or "super speed" in line.lower():
result["speed"] = "Super Speed (5 Gbps)"
elif "10000M" in line or "superspeed+" in line.lower():
result["speed"] = "SuperSpeed+ (10 Gbps)"
elif "12M" in line or "full-speed" in line.lower() or "full speed" in line.lower():
result["speed"] = "Full Speed (12 Mbps)"
elif "1.5M" in line or "low-speed" in line.lower() or "low speed" in line.lower():
result["speed"] = "Low Speed (1.5 Mbps)"
# Interface information
match = re.search(r'Interface Descriptor:', line)
if match:
current_interface = {}
result["interfaces"].append(current_interface)
continue
if current_interface is not None:
# bInterfaceClass
match = re.search(r'bInterfaceClass\s+(\d+)\s+(.*)', line)
if match:
current_interface["class"] = match.group(2).strip()
current_interface["class_code"] = match.group(1)
continue
# bInterfaceSubClass
match = re.search(r'bInterfaceSubClass\s+(\d+)\s*(.*)', line)
if match:
current_interface["subclass"] = match.group(2).strip() if match.group(2) else match.group(1)
continue
# bInterfaceProtocol
match = re.search(r'bInterfaceProtocol\s+(\d+)\s*(.*)', line)
if match:
current_interface["protocol"] = match.group(2).strip() if match.group(2) else match.group(1)
continue
# Clean up empty values
for key in list(result.keys()):
if result[key] == "" or result[key] == "0":
result[key] = None
# Determine peripheral type from class
result["type_principal"] = _determine_peripheral_type(result)
result["sous_type"] = _determine_peripheral_subtype(result)
return result
def _determine_peripheral_type(usb_info: Dict[str, Any]) -> str:
"""Determine peripheral type from USB class information"""
device_class = (usb_info.get("device_class") or "").lower()
# Check interfaces if device class is not specific
if not device_class or "vendor specific" in device_class or device_class == "0":
interfaces = usb_info.get("interfaces", [])
if interfaces:
interface_class = (interfaces[0].get("class") or "").lower()
else:
interface_class = ""
else:
interface_class = device_class
# Map USB classes to peripheral types
class_map = {
"hub": "USB",
"audio": "Audio",
"hid": "USB",
"human interface device": "USB",
"printer": "Imprimante",
"mass storage": "Stockage",
"video": "Video",
"wireless": "Sans-fil",
"bluetooth": "Bluetooth",
"smart card": "Securite",
"application specific": "USB",
"vendor specific": "USB"
}
for key, ptype in class_map.items():
if key in interface_class:
return ptype
# Default
return "USB"
def _determine_peripheral_subtype(usb_info: Dict[str, Any]) -> Optional[str]:
"""Determine peripheral subtype from USB class information"""
device_class = (usb_info.get("device_class") or "").lower()
interfaces = usb_info.get("interfaces", [])
if interfaces:
interface_class = (interfaces[0].get("class") or "").lower()
interface_subclass = (interfaces[0].get("subclass") or "").lower()
else:
interface_class = ""
interface_subclass = ""
# HID devices
if "hid" in device_class or "hid" in interface_class or "human interface" in interface_class:
if "mouse" in interface_subclass or "mouse" in str(usb_info.get("modele", "")).lower():
return "Souris"
elif "keyboard" in interface_subclass or "keyboard" in str(usb_info.get("modele", "")).lower():
return "Clavier"
elif "gamepad" in interface_subclass or "joystick" in interface_subclass:
return "Manette"
else:
return "Peripherique HID"
# Mass storage
if "mass storage" in interface_class:
model = str(usb_info.get("modele", "")).lower()
if "card reader" in model or "reader" in model:
return "Lecteur de cartes"
else:
return "Cle USB"
# Audio
if "audio" in interface_class:
if "microphone" in interface_subclass:
return "Microphone"
elif "speaker" in interface_subclass:
return "Haut-parleur"
else:
return "Audio"
# Video
if "video" in interface_class:
return "Webcam"
# Wireless
if "wireless" in interface_class or "bluetooth" in interface_class:
if "bluetooth" in interface_class:
return "Bluetooth"
else:
return "Adaptateur sans-fil"
# Printer
if "printer" in interface_class:
return "Imprimante"
return None
def parse_lsusb_simple(lsusb_output: str) -> List[Dict[str, Any]]:
"""
Parse the output of simple 'lsusb' command (without -v)
Args:
lsusb_output: Raw text output from 'lsusb' command
Returns:
List of dictionaries with basic USB device information
"""
devices = []
for line in lsusb_output.strip().split('\n'):
# Example: Bus 002 Device 003: ID 0781:5567 SanDisk Corp. Cruzer Blade
match = re.match(r'Bus\s+(\d+)\s+Device\s+(\d+):\s+ID\s+([0-9a-f]{4}):([0-9a-f]{4})\s+(.*)', line)
if match:
desc = match.group(5)
parts = desc.split(' ', 1)
device = {
"bus": match.group(1),
"device": match.group(2),
"vendor_id": match.group(3),
"product_id": match.group(4),
"marque": parts[0] if len(parts) >= 1 else None,
"modele": parts[1] if len(parts) == 2 else desc,
"type_principal": "USB",
"sous_type": None
}
devices.append(device)
return devices
def create_device_name(usb_info: Dict[str, Any]) -> str:
"""Generate a readable device name from USB info"""
parts = []
if usb_info.get("marque"):
parts.append(usb_info["marque"])
if usb_info.get("modele"):
parts.append(usb_info["modele"])
if not parts:
parts.append("Peripherique USB")
if usb_info.get("vendor_id") and usb_info.get("product_id"):
parts.append(f"({usb_info['vendor_id']}:{usb_info['product_id']})")
return " ".join(parts)

263
backend/app/utils/yaml_loader.py Executable file
View File

@@ -0,0 +1,263 @@
"""
Linux BenchTools - YAML Configuration Loader
Load and manage YAML configuration files
"""
import os
import yaml
from typing import Dict, Any, List, Optional
from pathlib import Path
class YAMLConfigLoader:
"""YAML configuration file loader"""
def __init__(self, config_dir: str = "./config"):
"""
Initialize YAML loader
Args:
config_dir: Directory containing YAML config files
"""
self.config_dir = config_dir
self._cache = {}
def load_config(self, filename: str, force_reload: bool = False) -> Dict[str, Any]:
"""
Load a YAML configuration file
Args:
filename: YAML filename (without path)
force_reload: Force reload even if cached
Returns:
Parsed YAML data as dictionary
"""
if not force_reload and filename in self._cache:
return self._cache[filename]
filepath = os.path.join(self.config_dir, filename)
if not os.path.exists(filepath):
return {}
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f) or {}
self._cache[filename] = data
return data
def save_config(self, filename: str, data: Dict[str, Any]) -> bool:
"""
Save a YAML configuration file
Args:
filename: YAML filename (without path)
data: Dictionary to save
Returns:
True if successful
"""
filepath = os.path.join(self.config_dir, filename)
# Ensure directory exists
os.makedirs(self.config_dir, exist_ok=True)
try:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.safe_dump(data, f, allow_unicode=True, sort_keys=False, indent=2)
# Update cache
self._cache[filename] = data
return True
except Exception as e:
print(f"Error saving YAML config: {e}")
return False
def get_peripheral_types(self) -> List[Dict[str, Any]]:
"""
Get peripheral types configuration
Returns:
List of peripheral type definitions
"""
config = self.load_config("peripheral_types.yaml")
return config.get("peripheral_types", [])
def get_peripheral_type(self, type_id: str) -> Optional[Dict[str, Any]]:
"""
Get specific peripheral type configuration
Args:
type_id: Peripheral type ID
Returns:
Peripheral type definition or None
"""
types = self.get_peripheral_types()
for ptype in types:
if ptype.get("id") == type_id:
return ptype
return None
def add_peripheral_type(self, type_data: Dict[str, Any]) -> bool:
"""
Add a new peripheral type
Args:
type_data: Peripheral type definition
Returns:
True if successful
"""
config = self.load_config("peripheral_types.yaml", force_reload=True)
if "peripheral_types" not in config:
config["peripheral_types"] = []
# Check if type already exists
existing_ids = [t.get("id") for t in config["peripheral_types"]]
if type_data.get("id") in existing_ids:
return False
config["peripheral_types"].append(type_data)
return self.save_config("peripheral_types.yaml", config)
def update_peripheral_type(self, type_id: str, type_data: Dict[str, Any]) -> bool:
"""
Update an existing peripheral type
Args:
type_id: Peripheral type ID to update
type_data: New peripheral type definition
Returns:
True if successful
"""
config = self.load_config("peripheral_types.yaml", force_reload=True)
if "peripheral_types" not in config:
return False
# Find and update
for i, ptype in enumerate(config["peripheral_types"]):
if ptype.get("id") == type_id:
config["peripheral_types"][i] = type_data
return self.save_config("peripheral_types.yaml", config)
return False
def delete_peripheral_type(self, type_id: str) -> bool:
"""
Delete a peripheral type
Args:
type_id: Peripheral type ID to delete
Returns:
True if successful
"""
config = self.load_config("peripheral_types.yaml", force_reload=True)
if "peripheral_types" not in config:
return False
# Filter out the type
original_count = len(config["peripheral_types"])
config["peripheral_types"] = [
t for t in config["peripheral_types"] if t.get("id") != type_id
]
if len(config["peripheral_types"]) < original_count:
return self.save_config("peripheral_types.yaml", config)
return False
def get_location_types(self) -> List[Dict[str, Any]]:
"""
Get location types configuration
Returns:
List of location type definitions
"""
config = self.load_config("locations.yaml")
return config.get("location_types", [])
def get_stockage_locations(self) -> List[str]:
"""
Get storage locations list (for non-used peripherals)
Returns:
List of storage location names
"""
config = self.load_config("locations.yaml")
locations = config.get("stockage_locations", [])
return [l for l in locations if isinstance(l, str)]
def get_image_processing_config(self) -> Dict[str, Any]:
"""
Get image processing configuration
Returns:
Image processing settings
"""
config = self.load_config("image_processing.yaml")
return config.get("image_processing", {})
def get_notification_config(self) -> Dict[str, Any]:
"""
Get notification configuration
Returns:
Notification settings
"""
config = self.load_config("notifications.yaml")
return config.get("notifications", {})
def get_boutiques(self) -> List[str]:
"""
Get boutique list configuration
Returns:
List of boutique names
"""
config = self.load_config("boutique.yaml")
boutiques = config.get("boutiques", [])
return [b for b in boutiques if isinstance(b, str)]
def get_hosts(self) -> List[Dict[str, str]]:
"""
Get hosts list configuration
Returns:
List of hosts with name and location
"""
config = self.load_config("host.yaml")
hosts = config.get("hosts", [])
result = []
for host in hosts:
if not isinstance(host, dict):
continue
name = host.get("nom")
location = host.get("localisation", "")
if isinstance(name, str) and name:
result.append({"nom": name, "localisation": location})
return result
def get_loan_reminder_days(self) -> int:
"""
Get number of days before loan return to send reminder
Returns:
Number of days
"""
config = self.get_notification_config()
return config.get("loan_reminder_days", 7)
def clear_cache(self):
"""Clear the configuration cache"""
self._cache = {}
# Global instance
yaml_loader = YAMLConfigLoader()