Files
serv_benchmark/backend/app/api/endpoints/peripherals.py
Gilles Soulier c67befc549 addon
2026-01-05 16:08:01 +01:00

1337 lines
48 KiB
Python
Executable File

"""
Linux BenchTools - Peripherals API Endpoints
"""
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File, Form
from sqlalchemy.orm import Session
from typing import List, Optional
from datetime import date
from app.db.session import get_peripherals_db, get_db
from app.services.peripheral_service import PeripheralService, LocationService
from app.models.device import Device
from app.schemas.peripheral import (
PeripheralCreate, PeripheralUpdate, PeripheralDetail,
PeripheralListResponse, PeripheralSummary,
PeripheralPhotoSchema, PeripheralDocumentSchema, PeripheralLinkSchema,
LoanCreate, LoanReturn, LoanSchema,
LocationSchema, LocationCreate, LocationUpdate, LocationTreeNode,
PeripheralHistorySchema,
PeripheralPhotoCreate, PeripheralDocumentCreate, PeripheralLinkCreate
)
from app.models.peripheral import PeripheralPhoto, PeripheralDocument, PeripheralLink, PeripheralLoan
from app.models.peripheral_history import PeripheralLocationHistory
from app.utils.image_processor import ImageProcessor
from app.utils.qr_generator import QRCodeGenerator
from app.utils.usb_parser import parse_lsusb_verbose, create_device_name
from app.utils.md_parser import parse_md_specification, extract_usb_ids_from_filename
from app.utils.lsusb_parser import detect_usb_devices, extract_device_section, parse_device_info
from app.utils.device_classifier import DeviceClassifier
from app.utils.usb_info_parser import parse_structured_usb_info, create_full_cli_section
from app.utils.yaml_loader import yaml_loader
from app.core.config import settings
import os
import shutil
from pathlib import Path
router = APIRouter()
def _build_usb_device_id(vendor_id: Optional[str], product_id: Optional[str]) -> Optional[str]:
if not vendor_id or not product_id:
return None
v = vendor_id.lower().replace("0x", "")
p = product_id.lower().replace("0x", "")
return f"{v}:{p}"
# ========================================
# CONFIGURATION
# ========================================
@router.get("/config/types", response_model=dict)
def get_peripheral_types():
"""
Get all peripheral types from YAML configuration.
Returns types organized by type_principal with their subtypes.
"""
try:
peripheral_types = yaml_loader.get_peripheral_types()
# Organize by type_principal for easier frontend usage
types_by_category = {}
for ptype in peripheral_types:
type_principal = ptype.get("type_principal")
sous_type = ptype.get("sous_type")
if type_principal:
if type_principal not in types_by_category:
types_by_category[type_principal] = []
if sous_type and sous_type not in types_by_category[type_principal]:
types_by_category[type_principal].append(sous_type)
return {
"success": True,
"types": types_by_category,
"full_types": peripheral_types # Complete data if needed
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to load peripheral types: {str(e)}")
@router.get("/config/location-types", response_model=dict)
def get_location_types():
"""
Get all location types from YAML configuration.
Returns location types with their hierarchy rules and icons.
"""
try:
location_types = yaml_loader.get_location_types()
return {
"success": True,
"location_types": location_types
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to load location types: {str(e)}")
@router.get("/config/stockage-locations", response_model=dict)
def get_stockage_locations():
"""
Get storage locations list from YAML configuration.
"""
try:
locations = yaml_loader.get_stockage_locations()
return {
"success": True,
"locations": locations
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to load storage locations: {str(e)}")
@router.get("/config/boutiques", response_model=dict)
def get_boutiques():
"""
Get boutiques list from YAML configuration.
"""
try:
boutiques = yaml_loader.get_boutiques()
return {
"success": True,
"boutiques": boutiques
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to load boutiques: {str(e)}")
@router.get("/config/hosts", response_model=dict)
def get_hosts():
"""
Get hosts list from YAML configuration.
"""
try:
hosts = yaml_loader.get_hosts()
return {
"success": True,
"hosts": hosts
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to load hosts: {str(e)}")
@router.get("/config/devices", response_model=dict)
def get_devices_for_dropdown(db: Session = Depends(get_db)):
"""
Get all devices (hosts) for dropdown selection in peripherals form.
Returns a simple list of devices with id, hostname, and location.
"""
try:
devices = db.query(Device).order_by(Device.hostname).all()
devices_list = []
for device in devices:
devices_list.append({
"id": device.id,
"hostname": device.hostname,
"location": device.location or "",
"description": device.description or ""
})
return {
"success": True,
"devices": devices_list
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to load devices: {str(e)}")
# ========================================
# PERIPHERAL CRUD
# ========================================
@router.post("/", response_model=PeripheralDetail, status_code=201)
def create_peripheral(
peripheral: PeripheralCreate,
db: Session = Depends(get_peripherals_db)
):
"""Create a new peripheral"""
return PeripheralService.create_peripheral(db, peripheral)
@router.get("/", response_model=PeripheralListResponse)
def list_peripherals(
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
type_filter: Optional[str] = None,
search: Optional[str] = None,
location_id: Optional[int] = None,
device_id: Optional[int] = None,
en_pret: Optional[bool] = None,
is_complete_device: Optional[bool] = None,
sort_by: str = "date_creation",
sort_order: str = "desc",
db: Session = Depends(get_peripherals_db)
):
"""List peripherals with pagination and filters"""
return PeripheralService.list_peripherals(
db=db,
page=page,
page_size=page_size,
type_filter=type_filter,
search=search,
location_id=location_id,
device_id=device_id,
en_pret=en_pret,
is_complete_device=is_complete_device,
sort_by=sort_by,
sort_order=sort_order
)
@router.get("/{peripheral_id}", response_model=PeripheralDetail)
def get_peripheral(
peripheral_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get a peripheral by ID"""
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
raise HTTPException(status_code=404, detail="Peripheral not found")
return peripheral
@router.put("/{peripheral_id}", response_model=PeripheralDetail)
def update_peripheral(
peripheral_id: int,
peripheral_data: PeripheralUpdate,
db: Session = Depends(get_peripherals_db)
):
"""Update a peripheral"""
peripheral = PeripheralService.update_peripheral(db, peripheral_id, peripheral_data)
if not peripheral:
raise HTTPException(status_code=404, detail="Peripheral not found")
return peripheral
@router.delete("/{peripheral_id}", status_code=204)
def delete_peripheral(
peripheral_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete a peripheral"""
if not PeripheralService.delete_peripheral(db, peripheral_id):
raise HTTPException(status_code=404, detail="Peripheral not found")
@router.get("/statistics/summary")
def get_statistics(db: Session = Depends(get_peripherals_db)):
"""Get peripheral statistics"""
return PeripheralService.get_statistics(db)
# ========================================
# DEVICE ASSIGNMENT
# ========================================
@router.post("/{peripheral_id}/assign/{device_id}", response_model=PeripheralDetail)
def assign_to_device(
peripheral_id: int,
device_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Assign peripheral to a device"""
peripheral = PeripheralService.assign_to_device(db, peripheral_id, device_id)
if not peripheral:
raise HTTPException(status_code=404, detail="Peripheral not found")
return peripheral
@router.post("/{peripheral_id}/unassign", response_model=PeripheralDetail)
def unassign_from_device(
peripheral_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Unassign peripheral from device"""
peripheral = PeripheralService.unassign_from_device(db, peripheral_id)
if not peripheral:
raise HTTPException(status_code=404, detail="Peripheral not found")
return peripheral
@router.get("/by-device/{device_id}", response_model=List[PeripheralSummary])
def get_peripherals_by_device(
device_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get all peripherals assigned to a device"""
peripherals = PeripheralService.get_peripherals_by_device(db, device_id)
return [
PeripheralSummary(
id=p.id,
nom=p.nom,
type_principal=p.type_principal,
sous_type=p.sous_type,
marque=p.marque,
modele=p.modele,
etat=p.etat or "Inconnu",
rating=p.rating or 0.0,
prix=p.prix,
en_pret=p.en_pret or False,
is_complete_device=p.is_complete_device or False,
quantite_disponible=p.quantite_disponible or 0
)
for p in peripherals
]
# ========================================
# PHOTOS
# ========================================
@router.post("/{peripheral_id}/photos", response_model=PeripheralPhotoSchema)
async def upload_photo(
peripheral_id: int,
file: UploadFile = File(...),
description: Optional[str] = Form(None),
is_primary: bool = Form(False),
db: Session = Depends(get_peripherals_db)
):
"""Upload a photo for a peripheral"""
# Check peripheral exists
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
raise HTTPException(status_code=404, detail="Peripheral not found")
# Validate image
temp_path = f"/tmp/{file.filename}"
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
if not ImageProcessor.is_valid_image(temp_path):
os.remove(temp_path)
raise HTTPException(status_code=400, detail="Invalid image file")
# Create upload directory
upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "photos", str(peripheral_id))
os.makedirs(upload_dir, exist_ok=True)
# Process image (main + thumbnail)
try:
# Process main image with level configuration
processed_path, file_size, original_path = ImageProcessor.process_image_with_level(
image_path=temp_path,
output_dir=upload_dir,
compression_level="medium", # Use medium by default
save_original=True
)
mime_type = ImageProcessor.get_mime_type(processed_path)
# Generate thumbnail
thumbnail_path, thumbnail_size = ImageProcessor.create_thumbnail_with_level(
image_path=temp_path,
output_dir=upload_dir,
compression_level="medium"
)
# Create database entry
photo = PeripheralPhoto(
peripheral_id=peripheral_id,
filename=os.path.basename(processed_path),
stored_path=processed_path,
thumbnail_path=thumbnail_path,
mime_type=mime_type,
size_bytes=file_size,
description=description,
is_primary=is_primary
)
db.add(photo)
# If primary, unset other primary photos
if is_primary:
db.query(PeripheralPhoto).filter(
PeripheralPhoto.peripheral_id == peripheral_id,
PeripheralPhoto.id != photo.id
).update({"is_primary": False})
db.commit()
db.refresh(photo)
return photo
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
@router.post("/{peripheral_id}/photos/from-url", response_model=PeripheralPhotoSchema)
async def upload_photo_from_url(
peripheral_id: int,
image_url: str = Form(...),
description: Optional[str] = Form(None),
is_primary: bool = Form(False),
db: Session = Depends(get_peripherals_db)
):
"""Download and upload a photo from URL for a peripheral"""
# Check peripheral exists
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
raise HTTPException(status_code=404, detail="Peripheral not found")
if not image_url.lower().startswith(("http://", "https://")):
raise HTTPException(status_code=400, detail="URL must start with http:// or https://")
# Download to temp file
from urllib.request import Request, urlopen
from tempfile import NamedTemporaryFile
max_bytes = 10 * 1024 * 1024 # 10 MB
temp_file = None
try:
req = Request(image_url, headers={"User-Agent": "LinuxBenchTools/1.0"})
with urlopen(req, timeout=10) as response:
content_type = response.headers.get("Content-Type", "")
if content_type and not content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="URL does not point to an image")
temp_file = NamedTemporaryFile(delete=False, dir="/tmp", suffix=".img")
total = 0
while True:
chunk = response.read(1024 * 1024)
if not chunk:
break
total += len(chunk)
if total > max_bytes:
raise HTTPException(status_code=400, detail="Image is too large (max 10 MB)")
temp_file.write(chunk)
temp_path = temp_file.name
temp_file.close()
if not ImageProcessor.is_valid_image(temp_path):
os.remove(temp_path)
raise HTTPException(status_code=400, detail="Invalid image file")
# Create upload directory
upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "photos", str(peripheral_id))
os.makedirs(upload_dir, exist_ok=True)
# Process image (main + thumbnail)
processed_path, file_size, original_path = ImageProcessor.process_image_with_level(
image_path=temp_path,
output_dir=upload_dir,
compression_level="medium",
save_original=True
)
mime_type = ImageProcessor.get_mime_type(processed_path)
# Generate thumbnail
thumbnail_path, thumbnail_size = ImageProcessor.create_thumbnail_with_level(
image_path=temp_path,
output_dir=upload_dir,
compression_level="medium"
)
# Create database entry
photo = PeripheralPhoto(
peripheral_id=peripheral_id,
filename=os.path.basename(processed_path),
stored_path=processed_path,
thumbnail_path=thumbnail_path,
mime_type=mime_type,
size_bytes=file_size,
description=description,
is_primary=is_primary
)
db.add(photo)
# If primary, unset other primary photos
if is_primary:
db.query(PeripheralPhoto).filter(
PeripheralPhoto.peripheral_id == peripheral_id,
PeripheralPhoto.id != photo.id
).update({"is_primary": False})
db.commit()
db.refresh(photo)
return photo
finally:
if temp_file and os.path.exists(temp_file.name):
os.remove(temp_file.name)
@router.get("/{peripheral_id}/photos")
def get_photos(
peripheral_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get all photos for a peripheral"""
photos = db.query(PeripheralPhoto).filter(
PeripheralPhoto.peripheral_id == peripheral_id
).all()
# Convert stored paths to web-accessible URLs
result = []
for photo in photos:
photo_dict = {
"id": photo.id,
"peripheral_id": photo.peripheral_id,
"filename": photo.filename,
"stored_path": photo.stored_path.replace('/app/uploads/', '/uploads/') if photo.stored_path.startswith('/app/uploads/') else photo.stored_path,
"thumbnail_path": photo.thumbnail_path.replace('/app/uploads/', '/uploads/') if photo.thumbnail_path and photo.thumbnail_path.startswith('/app/uploads/') else photo.thumbnail_path,
"mime_type": photo.mime_type,
"size_bytes": photo.size_bytes,
"description": photo.description,
"is_primary": photo.is_primary,
"uploaded_at": photo.uploaded_at
}
result.append(photo_dict)
return result
@router.post("/{peripheral_id}/photos/{photo_id}/set-primary", status_code=200)
def set_primary_photo(
peripheral_id: int,
photo_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Set a photo as primary (thumbnail)"""
# Get the photo
photo = db.query(PeripheralPhoto).filter(
PeripheralPhoto.id == photo_id,
PeripheralPhoto.peripheral_id == peripheral_id
).first()
if not photo:
raise HTTPException(status_code=404, detail="Photo not found")
# Unset all other primary photos for this peripheral
db.query(PeripheralPhoto).filter(
PeripheralPhoto.peripheral_id == peripheral_id,
PeripheralPhoto.id != photo_id
).update({"is_primary": False})
# Set this photo as primary
photo.is_primary = True
db.commit()
return {"message": "Photo set as primary", "photo_id": photo_id}
@router.delete("/photos/{photo_id}", status_code=204)
def delete_photo(
photo_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete a photo"""
photo = db.query(PeripheralPhoto).filter(PeripheralPhoto.id == photo_id).first()
if not photo:
raise HTTPException(status_code=404, detail="Photo not found")
# Delete file
if os.path.exists(photo.stored_path):
os.remove(photo.stored_path)
db.delete(photo)
db.commit()
# ========================================
# DOCUMENTS
# ========================================
@router.post("/{peripheral_id}/documents", response_model=PeripheralDocumentSchema)
async def upload_document(
peripheral_id: int,
file: UploadFile = File(...),
doc_type: str = Form(...),
description: Optional[str] = Form(None),
db: Session = Depends(get_peripherals_db)
):
"""Upload a document for a peripheral"""
# Check peripheral exists
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
raise HTTPException(status_code=404, detail="Peripheral not found")
# Create upload directory
upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "documents", str(peripheral_id))
os.makedirs(upload_dir, exist_ok=True)
# Save file
timestamp = Path(file.filename).stem
safe_filename = f"{doc_type}_{timestamp}{Path(file.filename).suffix}"
file_path = os.path.join(upload_dir, safe_filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
file_size = os.path.getsize(file_path)
# Create database entry
document = PeripheralDocument(
peripheral_id=peripheral_id,
doc_type=doc_type,
filename=safe_filename,
stored_path=file_path,
mime_type=file.content_type,
size_bytes=file_size,
description=description
)
db.add(document)
db.commit()
db.refresh(document)
return document
@router.get("/{peripheral_id}/documents")
def get_documents(
peripheral_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get all documents for a peripheral"""
documents = db.query(PeripheralDocument).filter(
PeripheralDocument.peripheral_id == peripheral_id
).all()
# Convert stored paths to web-accessible URLs
result = []
for doc in documents:
doc_dict = {
"id": doc.id,
"peripheral_id": doc.peripheral_id,
"filename": doc.filename,
"stored_path": doc.stored_path.replace('/app/uploads/', '/uploads/') if doc.stored_path.startswith('/app/uploads/') else doc.stored_path,
"doc_type": doc.doc_type,
"size_bytes": doc.size_bytes,
"description": doc.description,
"uploaded_at": doc.uploaded_at
}
result.append(doc_dict)
return result
@router.delete("/documents/{document_id}", status_code=204)
def delete_document(
document_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete a document"""
document = db.query(PeripheralDocument).filter(PeripheralDocument.id == document_id).first()
if not document:
raise HTTPException(status_code=404, detail="Document not found")
# Delete file
if os.path.exists(document.stored_path):
os.remove(document.stored_path)
db.delete(document)
db.commit()
# ========================================
# LINKS
# ========================================
@router.post("/{peripheral_id}/links", response_model=PeripheralLinkSchema)
def create_link(
peripheral_id: int,
link: PeripheralLinkCreate,
db: Session = Depends(get_peripherals_db)
):
"""Create a link for a peripheral"""
# Check peripheral exists
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
if not peripheral:
raise HTTPException(status_code=404, detail="Peripheral not found")
db_link = PeripheralLink(**link.model_dump(), peripheral_id=peripheral_id)
db.add(db_link)
db.commit()
db.refresh(db_link)
return db_link
@router.get("/{peripheral_id}/links", response_model=List[PeripheralLinkSchema])
def get_links(
peripheral_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get all links for a peripheral"""
return db.query(PeripheralLink).filter(
PeripheralLink.peripheral_id == peripheral_id
).all()
@router.delete("/links/{link_id}", status_code=204)
def delete_link(
link_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Delete a link"""
link = db.query(PeripheralLink).filter(PeripheralLink.id == link_id).first()
if not link:
raise HTTPException(status_code=404, detail="Link not found")
db.delete(link)
db.commit()
# ========================================
# LOANS
# ========================================
@router.post("/loans", response_model=LoanSchema)
def create_loan(
loan: LoanCreate,
db: Session = Depends(get_peripherals_db)
):
"""Create a loan"""
db_loan = PeripheralService.create_loan(db, loan)
if not db_loan:
raise HTTPException(status_code=400, detail="Cannot create loan (peripheral not found or already on loan)")
return db_loan
@router.post("/loans/{loan_id}/return", response_model=LoanSchema)
def return_loan(
loan_id: int,
return_data: LoanReturn,
db: Session = Depends(get_peripherals_db)
):
"""Return a loan"""
loan = PeripheralService.return_loan(db, loan_id, return_data)
if not loan:
raise HTTPException(status_code=404, detail="Loan not found or already returned")
return loan
@router.get("/loans/overdue", response_model=List[LoanSchema])
def get_overdue_loans(db: Session = Depends(get_peripherals_db)):
"""Get all overdue loans"""
return PeripheralService.get_overdue_loans(db)
@router.get("/loans/upcoming", response_model=List[LoanSchema])
def get_upcoming_returns(
days: int = Query(7, ge=1, le=30),
db: Session = Depends(get_peripherals_db)
):
"""Get loans due within specified days"""
return PeripheralService.get_upcoming_returns(db, days)
@router.get("/{peripheral_id}/loans", response_model=List[LoanSchema])
def get_peripheral_loans(
peripheral_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get all loans for a peripheral"""
return db.query(PeripheralLoan).filter(
PeripheralLoan.peripheral_id == peripheral_id
).all()
# ========================================
# HISTORY
# ========================================
@router.get("/{peripheral_id}/history", response_model=List[PeripheralHistorySchema])
def get_peripheral_history(
peripheral_id: int,
db: Session = Depends(get_peripherals_db)
):
"""Get history for a peripheral"""
return db.query(PeripheralLocationHistory).filter(
PeripheralLocationHistory.peripheral_id == peripheral_id
).order_by(PeripheralLocationHistory.timestamp.desc()).all()
# ========================================
# USB IMPORT
# ========================================
@router.post("/import/usb", response_model=dict)
def import_usb_info(
lsusb_output: str = Form(...),
db: Session = Depends(get_peripherals_db)
):
"""Import USB device information from lsusb -v output"""
try:
usb_info = parse_lsusb_verbose(lsusb_output)
# Create suggested peripheral data
suggested = {
"nom": create_device_name(usb_info),
"type_principal": usb_info.get("type_principal", "USB"),
"sous_type": usb_info.get("sous_type"),
"marque": usb_info.get("marque"),
"modele": usb_info.get("modele"),
"fabricant": usb_info.get("fabricant") or usb_info.get("manufacturer"),
"produit": usb_info.get("produit") or usb_info.get("product"),
"numero_serie": usb_info.get("numero_serie"),
"iManufacturer": usb_info.get("fabricant") or usb_info.get("manufacturer"),
"iProduct": usb_info.get("produit") or usb_info.get("product"),
"vendor_id": usb_info.get("vendor_id"),
"product_id": usb_info.get("product_id"),
"usb_device_id": usb_info.get("usb_device_id") or _build_usb_device_id(usb_info.get("vendor_id"), usb_info.get("product_id")),
"class_id": usb_info.get("raw_info", {}).get("device_class_code"),
"caracteristiques_specifiques": {
"usb_version": usb_info.get("usb_version"),
"device_class": usb_info.get("device_class"),
"max_power_ma": usb_info.get("max_power_ma"),
"speed": usb_info.get("speed"),
"interfaces": usb_info.get("interfaces", [])
}
}
return {
"success": True,
"parsed_data": usb_info,
"suggested_peripheral": suggested
}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to parse USB info: {str(e)}")
@router.post("/import/markdown", response_model=dict)
async def import_markdown_specification(
file: UploadFile = File(...),
db: Session = Depends(get_peripherals_db)
):
"""
Import peripheral specification from a markdown (.md) file.
Supports two formats:
- Simple: Title + Description
- Detailed: Full USB specification with vendor/product IDs, characteristics
Checks if peripheral already exists (by vendor_id + product_id).
Returns suggested peripheral data for the frontend to pre-fill the form.
"""
try:
# Validate file type
if not file.filename.endswith('.md'):
raise HTTPException(
status_code=400,
detail="Only markdown (.md) files are supported"
)
# Read file content
content = await file.read()
md_content = content.decode('utf-8')
# Parse markdown
parsed_data = parse_md_specification(md_content)
# Try to extract IDs from filename as fallback
filename_ids = extract_usb_ids_from_filename(file.filename)
if filename_ids and "caracteristiques_specifiques" in parsed_data:
# Add IDs from filename if not already present
if "vendor_id" not in parsed_data["caracteristiques_specifiques"]:
parsed_data["caracteristiques_specifiques"].update(filename_ids)
elif filename_ids:
parsed_data["caracteristiques_specifiques"] = filename_ids
# Intelligent classification of device type from markdown content
# Use classifier if type not already detected by parser
type_principal = parsed_data.get("type_principal")
sous_type = parsed_data.get("sous_type")
if not type_principal or not sous_type:
# Build device_info from parsed data
device_info = {
"vendor_id": parsed_data.get("caracteristiques_specifiques", {}).get("vendor_id"),
"product_id": parsed_data.get("caracteristiques_specifiques", {}).get("product_id"),
"manufacturer": parsed_data.get("marque"),
"product": parsed_data.get("modele"),
"device_class": parsed_data.get("caracteristiques_specifiques", {}).get("device_class"),
}
detected_type_principal, detected_sous_type = DeviceClassifier.classify_device(
cli_content=None,
synthese_content=md_content,
device_info=device_info
)
# Use detected values if not already present
if not type_principal:
type_principal = detected_type_principal
if not sous_type:
sous_type = detected_sous_type
# Refine subtypes if needed
if type_principal == "Stockage" and md_content:
sous_type = DeviceClassifier.refine_storage_subtype(md_content)
elif type_principal == "Bluetooth" and sous_type == "Autre" and md_content:
sous_type = DeviceClassifier.refine_bluetooth_subtype(md_content)
# Build suggested peripheral data
suggested = {
"nom": parsed_data.get("nom", "Périphérique importé"),
"type_principal": type_principal, # Intelligently detected or from parser
"sous_type": sous_type, # Intelligently detected or from parser
"marque": parsed_data.get("marque"),
"modele": parsed_data.get("modele"),
"numero_serie": parsed_data.get("numero_serie"),
"description": parsed_data.get("description"),
"synthese": md_content, # Store the full markdown content in synthese field
"notes": parsed_data.get("notes"),
"caracteristiques_specifiques": parsed_data.get("caracteristiques_specifiques", {}),
"etat": "Neuf", # Default state
"quantite_totale": 1,
"quantite_disponible": 1
}
# Clean up None values
suggested = {k: v for k, v in suggested.items() if v is not None}
# Check for existing peripheral with same vendor_id and product_id
existing_peripheral = None
vendor_id = suggested.get("caracteristiques_specifiques", {}).get("vendor_id")
product_id = suggested.get("caracteristiques_specifiques", {}).get("product_id")
if vendor_id and product_id:
# Search for peripheral with matching vendor_id and product_id in JSON field
from app.models.peripheral import Peripheral
all_peripherals = db.query(Peripheral).all()
for periph in all_peripherals:
if periph.caracteristiques_specifiques:
p_vendor = periph.caracteristiques_specifiques.get("vendor_id")
p_product = periph.caracteristiques_specifiques.get("product_id")
if p_vendor == vendor_id and p_product == product_id:
existing_peripheral = periph
break
if existing_peripheral:
# Peripheral already exists
return {
"success": True,
"already_exists": True,
"existing_peripheral_id": existing_peripheral.id,
"existing_peripheral": {
"id": existing_peripheral.id,
"nom": existing_peripheral.nom,
"type_principal": existing_peripheral.type_principal,
"marque": existing_peripheral.marque,
"modele": existing_peripheral.modele,
"quantite_totale": existing_peripheral.quantite_totale,
"quantite_disponible": existing_peripheral.quantite_disponible
},
"filename": file.filename,
"message": f"Un périphérique avec vendor_id={vendor_id} et product_id={product_id} existe déjà"
}
else:
# New peripheral - return suggested data for form pre-fill
return {
"success": True,
"already_exists": False,
"filename": file.filename,
"parsed_data": parsed_data,
"suggested_peripheral": suggested
}
except UnicodeDecodeError:
raise HTTPException(
status_code=400,
detail="File encoding error. Please ensure the file is UTF-8 encoded."
)
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to parse markdown file: {str(e)}"
)
@router.post("/import/usb-cli/detect", response_model=dict)
def detect_usb_devices_from_cli(
lsusb_output: str = Form(...),
db: Session = Depends(get_peripherals_db)
):
"""
Detect all USB devices from 'lsusb -v' output.
Returns a list of devices for the user to select from.
"""
try:
devices = detect_usb_devices(lsusb_output)
if not devices:
raise HTTPException(
status_code=400,
detail="No USB devices found in the provided output. Please ensure you're pasting the output from 'lsusb -v' command."
)
return {
"success": True,
"devices": devices,
"total_devices": len(devices)
}
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to detect USB devices: {str(e)}"
)
@router.post("/import/usb-cli/extract", response_model=dict)
def extract_usb_device_from_cli(
lsusb_output: str = Form(...),
bus: str = Form(...),
device: str = Form(...),
db: Session = Depends(get_peripherals_db)
):
"""
Extract a specific USB device from 'lsusb -v' output.
Filters the CLI output to only the selected device and returns parsed data.
"""
try:
# Extract the device section
device_section = extract_device_section(lsusb_output, bus, device)
if not device_section:
raise HTTPException(
status_code=404,
detail=f"Device Bus {bus} Device {device} not found in the provided output"
)
# Parse device info
device_info = parse_device_info(device_section)
# Also use existing parser for more complete data
try:
detailed_info = parse_lsusb_verbose(device_section)
except:
detailed_info = {}
# Format CLI output as markdown (raw)
cli_raw = f"""# Sortie sudo lsusb -v
Bus {bus} Device {device}
```
{device_section}
```
"""
# Generate YAML structured data
cli_yaml_data = {
"identification": {
"bus": bus,
"device": device,
"vendor_id": device_info.get("vendor_id"),
"product_id": device_info.get("product_id"),
"manufacturer": device_info.get("manufacturer"),
"product": device_info.get("product"),
"serial": device_info.get("serial"),
},
"usb": {
"version_declared": device_info.get("usb_version"),
"type": device_info.get("usb_type"),
"negotiated_speed": device_info.get("speed"),
},
"classes": {
"device_class": device_info.get("device_class"),
"interface_classes": device_info.get("interface_classes"),
},
"power": {
"max_power_ma": device_info.get("max_power"),
"is_bus_powered": device_info.get("is_bus_powered"),
"is_self_powered": device_info.get("is_self_powered"),
"power_sufficient": device_info.get("power_sufficient"),
},
"firmware": {
"requires_firmware": device_info.get("requires_firmware"),
}
}
# Remove None values from YAML data
def clean_dict(d):
if isinstance(d, dict):
return {k: clean_dict(v) for k, v in d.items() if v is not None}
return d
cli_yaml_data = clean_dict(cli_yaml_data)
# Convert to YAML string
import yaml
cli_yaml = yaml.dump(cli_yaml_data, default_flow_style=False, allow_unicode=True, sort_keys=False)
# Intelligent classification of device type
type_principal, sous_type = DeviceClassifier.classify_device(
cli_content=device_section,
synthese_content=None,
device_info=device_info
)
# Refine subtypes if needed
if type_principal == "Bluetooth" and sous_type == "Autre":
sous_type = DeviceClassifier.refine_bluetooth_subtype(device_section)
elif type_principal == "Stockage":
# Always refine storage subtype to distinguish between flash/HDD/card reader
sous_type = DeviceClassifier.refine_storage_subtype(device_section)
# Build suggested peripheral data
# Field mappings per technical specs:
# - marque = Vendor string (idVendor line 3rd column)
# - modele = Product string (idProduct line 3rd column)
# - fabricant = iManufacturer (manufacturer string)
# - produit = iProduct (product string)
suggested = {
"nom": device_info.get("product") or detailed_info.get("product_string") or f"USB Device {device_info['vendor_id']}:{device_info['product_id']}",
"type_principal": type_principal, # Intelligently detected
"sous_type": sous_type, # Intelligently detected
"marque": detailed_info.get("marque") or device_info.get("manufacturer"),
"modele": detailed_info.get("modele") or device_info.get("product"),
"fabricant": device_info.get("manufacturer"), # iManufacturer
"produit": device_info.get("product"), # iProduct
"numero_serie": device_info.get("serial"),
"cli_yaml": cli_yaml, # Structured data in YAML format
"cli_raw": cli_raw, # Raw CLI output formatted as markdown
"iManufacturer": device_info.get("manufacturer"),
"iProduct": device_info.get("product"),
"usb_device_id": _build_usb_device_id(device_info.get("vendor_id"), device_info.get("product_id")),
"caracteristiques_specifiques": {
"vendor_id": device_info.get("vendor_id"), # idVendor (hex ID)
"product_id": device_info.get("product_id"), # idProduct (hex ID)
"fabricant": device_info.get("manufacturer"), # iManufacturer / Vendor name
"produit": device_info.get("product"), # iProduct string
"usb_version_declared": device_info.get("usb_version"), # bcdUSB (declared, not definitive)
"usb_type": device_info.get("usb_type"), # Actual USB type from negotiated speed
"negotiated_speed": device_info.get("speed"), # e.g., "High Speed", "SuperSpeed"
"device_class": device_info.get("device_class"), # bDeviceClass
"interface_classes": device_info.get("interface_classes"), # CRITICAL: bInterfaceClass (normative)
"requires_firmware": device_info.get("requires_firmware"), # True if class 255 (Vendor Specific)
"max_power_ma": device_info.get("max_power"), # MaxPower in mA
"is_bus_powered": device_info.get("is_bus_powered"),
"is_self_powered": device_info.get("is_self_powered"),
"power_sufficient": device_info.get("power_sufficient"), # Based on port capacity
}
}
# Clean up None values
suggested = {k: v for k, v in suggested.items() if v is not None}
if "caracteristiques_specifiques" in suggested:
suggested["caracteristiques_specifiques"] = {
k: v for k, v in suggested["caracteristiques_specifiques"].items() if v is not None
}
# Check for existing peripheral with same vendor_id and product_id
existing_peripheral = None
vendor_id = suggested.get("caracteristiques_specifiques", {}).get("vendor_id")
product_id = suggested.get("caracteristiques_specifiques", {}).get("product_id")
if vendor_id and product_id:
from app.models.peripheral import Peripheral
all_peripherals = db.query(Peripheral).all()
for periph in all_peripherals:
if periph.caracteristiques_specifiques:
p_vendor = periph.caracteristiques_specifiques.get("vendor_id")
p_product = periph.caracteristiques_specifiques.get("product_id")
if p_vendor == vendor_id and p_product == product_id:
existing_peripheral = periph
break
if existing_peripheral:
# Peripheral already exists
return {
"success": True,
"already_exists": True,
"existing_peripheral_id": existing_peripheral.id,
"existing_peripheral": {
"id": existing_peripheral.id,
"nom": existing_peripheral.nom,
"type_principal": existing_peripheral.type_principal,
"marque": existing_peripheral.marque,
"modele": existing_peripheral.modele,
"quantite_totale": existing_peripheral.quantite_totale,
"quantite_disponible": existing_peripheral.quantite_disponible
},
"message": f"Un périphérique avec vendor_id={vendor_id} et product_id={product_id} existe déjà"
}
else:
# New peripheral - return suggested data for form pre-fill
return {
"success": True,
"already_exists": False,
"device_section": device_section,
"parsed_data": device_info,
"detailed_data": detailed_info,
"suggested_peripheral": suggested
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to extract device information: {str(e)}"
)
@router.post("/import/usb-structured", response_model=dict)
def import_structured_usb_info(
usb_info: str = Form(...),
db: Session = Depends(get_peripherals_db)
):
"""
Import structured USB information (from GUI tools or formatted text).
Parses the information and returns suggested peripheral data with YAML-formatted CLI.
Args:
usb_info: Structured USB information text (French format supported)
Returns:
Suggested peripheral data with general fields and YAML CLI section
"""
try:
# Parse structured USB information
parsed = parse_structured_usb_info(usb_info)
# Generate YAML structured data
import yaml
cli_yaml = yaml.dump(parsed["cli_yaml"], default_flow_style=False, allow_unicode=True, sort_keys=False)
# Format raw output as markdown
cli_raw = f"""# Sortie USB structurée
```
{usb_info.strip()}
```
"""
# Build device_info for classification (include interface_classes!)
device_info = {
"vendor_id": parsed["caracteristiques_specifiques"].get("vendor_id"),
"product_id": parsed["caracteristiques_specifiques"].get("product_id"),
"manufacturer": parsed["caracteristiques_specifiques"].get("fabricant"),
"product": parsed["caracteristiques_specifiques"].get("produit"),
"device_class": parsed["caracteristiques_specifiques"].get("device_class"),
"interface_classes": parsed["caracteristiques_specifiques"].get("interface_classes"), # CRITICAL
}
# Intelligent classification
type_principal, sous_type = DeviceClassifier.classify_device(
cli_content=usb_info,
synthese_content=None,
device_info=device_info
)
# Refine subtypes if needed
if type_principal == "Bluetooth" and sous_type == "Autre":
sous_type = DeviceClassifier.refine_bluetooth_subtype(usb_info)
elif type_principal == "Stockage":
sous_type = DeviceClassifier.refine_storage_subtype(usb_info)
# Build suggested peripheral data
# Field mappings per technical specs:
# - marque = Vendor string (idVendor line 3rd column)
# - modele = Product string (idProduct line 3rd column)
# - fabricant = iManufacturer (manufacturer string)
# - produit = iProduct (product string)
suggested = {
"nom": parsed["general"].get("nom", "Périphérique USB"),
"type_principal": type_principal,
"sous_type": sous_type,
"marque": parsed["general"].get("marque"),
"modele": parsed["general"].get("modele"),
"fabricant": parsed["general"].get("fabricant"), # iManufacturer
"produit": parsed["general"].get("produit"), # iProduct
"numero_serie": parsed["general"].get("numero_serie"),
"cli_yaml": cli_yaml, # Structured data in YAML format
"cli_raw": cli_raw, # Raw output formatted as markdown
"iManufacturer": parsed["general"].get("fabricant"),
"iProduct": parsed["general"].get("produit"),
"usb_device_id": _build_usb_device_id(
parsed["caracteristiques_specifiques"].get("vendor_id"),
parsed["caracteristiques_specifiques"].get("product_id")
),
"caracteristiques_specifiques": parsed["caracteristiques_specifiques"]
}
# Clean up None values
suggested = {k: v for k, v in suggested.items() if v is not None}
if "caracteristiques_specifiques" in suggested:
suggested["caracteristiques_specifiques"] = {
k: v for k, v in suggested["caracteristiques_specifiques"].items() if v is not None
}
# Check for existing peripheral
existing_peripheral = None
vendor_id = suggested.get("caracteristiques_specifiques", {}).get("vendor_id")
product_id = suggested.get("caracteristiques_specifiques", {}).get("product_id")
if vendor_id and product_id:
from app.models.peripheral import Peripheral
all_peripherals = db.query(Peripheral).all()
for periph in all_peripherals:
if periph.caracteristiques_specifiques:
p_vendor = periph.caracteristiques_specifiques.get("vendor_id")
p_product = periph.caracteristiques_specifiques.get("product_id")
if p_vendor == vendor_id and p_product == product_id:
existing_peripheral = periph
break
if existing_peripheral:
return {
"success": True,
"already_exists": True,
"existing_peripheral_id": existing_peripheral.id,
"existing_peripheral": {
"id": existing_peripheral.id,
"nom": existing_peripheral.nom,
"type_principal": existing_peripheral.type_principal,
"marque": existing_peripheral.marque,
"modele": existing_peripheral.modele,
"quantite_disponible": existing_peripheral.quantite_disponible
}
}
# New peripheral
return {
"success": True,
"already_exists": False,
"suggested_peripheral": suggested,
"parsed_data": parsed["cli_yaml"]
}
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to parse structured USB information: {str(e)}"
)