1337 lines
48 KiB
Python
Executable File
1337 lines
48 KiB
Python
Executable File
"""
|
|
Linux BenchTools - Peripherals API Endpoints
|
|
"""
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File, Form
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
from datetime import date
|
|
|
|
from app.db.session import get_peripherals_db, get_db
|
|
from app.services.peripheral_service import PeripheralService, LocationService
|
|
from app.models.device import Device
|
|
from app.schemas.peripheral import (
|
|
PeripheralCreate, PeripheralUpdate, PeripheralDetail,
|
|
PeripheralListResponse, PeripheralSummary,
|
|
PeripheralPhotoSchema, PeripheralDocumentSchema, PeripheralLinkSchema,
|
|
LoanCreate, LoanReturn, LoanSchema,
|
|
LocationSchema, LocationCreate, LocationUpdate, LocationTreeNode,
|
|
PeripheralHistorySchema,
|
|
PeripheralPhotoCreate, PeripheralDocumentCreate, PeripheralLinkCreate
|
|
)
|
|
from app.models.peripheral import PeripheralPhoto, PeripheralDocument, PeripheralLink, PeripheralLoan
|
|
from app.models.peripheral_history import PeripheralLocationHistory
|
|
from app.utils.image_processor import ImageProcessor
|
|
from app.utils.qr_generator import QRCodeGenerator
|
|
from app.utils.usb_parser import parse_lsusb_verbose, create_device_name
|
|
from app.utils.md_parser import parse_md_specification, extract_usb_ids_from_filename
|
|
from app.utils.lsusb_parser import detect_usb_devices, extract_device_section, parse_device_info
|
|
from app.utils.device_classifier import DeviceClassifier
|
|
from app.utils.usb_info_parser import parse_structured_usb_info, create_full_cli_section
|
|
from app.utils.yaml_loader import yaml_loader
|
|
from app.core.config import settings
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
router = APIRouter()
|
|
|
|
def _build_usb_device_id(vendor_id: Optional[str], product_id: Optional[str]) -> Optional[str]:
|
|
if not vendor_id or not product_id:
|
|
return None
|
|
v = vendor_id.lower().replace("0x", "")
|
|
p = product_id.lower().replace("0x", "")
|
|
return f"{v}:{p}"
|
|
|
|
|
|
# ========================================
|
|
# CONFIGURATION
|
|
# ========================================
|
|
|
|
@router.get("/config/types", response_model=dict)
|
|
def get_peripheral_types():
|
|
"""
|
|
Get all peripheral types from YAML configuration.
|
|
Returns types organized by type_principal with their subtypes.
|
|
"""
|
|
try:
|
|
peripheral_types = yaml_loader.get_peripheral_types()
|
|
|
|
# Organize by type_principal for easier frontend usage
|
|
types_by_category = {}
|
|
|
|
for ptype in peripheral_types:
|
|
type_principal = ptype.get("type_principal")
|
|
sous_type = ptype.get("sous_type")
|
|
|
|
if type_principal:
|
|
if type_principal not in types_by_category:
|
|
types_by_category[type_principal] = []
|
|
|
|
if sous_type and sous_type not in types_by_category[type_principal]:
|
|
types_by_category[type_principal].append(sous_type)
|
|
|
|
return {
|
|
"success": True,
|
|
"types": types_by_category,
|
|
"full_types": peripheral_types # Complete data if needed
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to load peripheral types: {str(e)}")
|
|
|
|
|
|
@router.get("/config/location-types", response_model=dict)
|
|
def get_location_types():
|
|
"""
|
|
Get all location types from YAML configuration.
|
|
Returns location types with their hierarchy rules and icons.
|
|
"""
|
|
try:
|
|
location_types = yaml_loader.get_location_types()
|
|
|
|
return {
|
|
"success": True,
|
|
"location_types": location_types
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to load location types: {str(e)}")
|
|
|
|
|
|
@router.get("/config/stockage-locations", response_model=dict)
|
|
def get_stockage_locations():
|
|
"""
|
|
Get storage locations list from YAML configuration.
|
|
"""
|
|
try:
|
|
locations = yaml_loader.get_stockage_locations()
|
|
return {
|
|
"success": True,
|
|
"locations": locations
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to load storage locations: {str(e)}")
|
|
|
|
|
|
@router.get("/config/boutiques", response_model=dict)
|
|
def get_boutiques():
|
|
"""
|
|
Get boutiques list from YAML configuration.
|
|
"""
|
|
try:
|
|
boutiques = yaml_loader.get_boutiques()
|
|
return {
|
|
"success": True,
|
|
"boutiques": boutiques
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to load boutiques: {str(e)}")
|
|
|
|
|
|
@router.get("/config/hosts", response_model=dict)
|
|
def get_hosts():
|
|
"""
|
|
Get hosts list from YAML configuration.
|
|
"""
|
|
try:
|
|
hosts = yaml_loader.get_hosts()
|
|
return {
|
|
"success": True,
|
|
"hosts": hosts
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to load hosts: {str(e)}")
|
|
|
|
|
|
@router.get("/config/devices", response_model=dict)
|
|
def get_devices_for_dropdown(db: Session = Depends(get_db)):
|
|
"""
|
|
Get all devices (hosts) for dropdown selection in peripherals form.
|
|
Returns a simple list of devices with id, hostname, and location.
|
|
"""
|
|
try:
|
|
devices = db.query(Device).order_by(Device.hostname).all()
|
|
|
|
devices_list = []
|
|
for device in devices:
|
|
devices_list.append({
|
|
"id": device.id,
|
|
"hostname": device.hostname,
|
|
"location": device.location or "",
|
|
"description": device.description or ""
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"devices": devices_list
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to load devices: {str(e)}")
|
|
|
|
|
|
# ========================================
|
|
# PERIPHERAL CRUD
|
|
# ========================================
|
|
|
|
@router.post("/", response_model=PeripheralDetail, status_code=201)
|
|
def create_peripheral(
|
|
peripheral: PeripheralCreate,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Create a new peripheral"""
|
|
return PeripheralService.create_peripheral(db, peripheral)
|
|
|
|
|
|
@router.get("/", response_model=PeripheralListResponse)
|
|
def list_peripherals(
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(50, ge=1, le=100),
|
|
type_filter: Optional[str] = None,
|
|
search: Optional[str] = None,
|
|
location_id: Optional[int] = None,
|
|
device_id: Optional[int] = None,
|
|
en_pret: Optional[bool] = None,
|
|
is_complete_device: Optional[bool] = None,
|
|
sort_by: str = "date_creation",
|
|
sort_order: str = "desc",
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""List peripherals with pagination and filters"""
|
|
return PeripheralService.list_peripherals(
|
|
db=db,
|
|
page=page,
|
|
page_size=page_size,
|
|
type_filter=type_filter,
|
|
search=search,
|
|
location_id=location_id,
|
|
device_id=device_id,
|
|
en_pret=en_pret,
|
|
is_complete_device=is_complete_device,
|
|
sort_by=sort_by,
|
|
sort_order=sort_order
|
|
)
|
|
|
|
|
|
@router.get("/{peripheral_id}", response_model=PeripheralDetail)
|
|
def get_peripheral(
|
|
peripheral_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Get a peripheral by ID"""
|
|
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
|
|
if not peripheral:
|
|
raise HTTPException(status_code=404, detail="Peripheral not found")
|
|
return peripheral
|
|
|
|
|
|
@router.put("/{peripheral_id}", response_model=PeripheralDetail)
|
|
def update_peripheral(
|
|
peripheral_id: int,
|
|
peripheral_data: PeripheralUpdate,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Update a peripheral"""
|
|
peripheral = PeripheralService.update_peripheral(db, peripheral_id, peripheral_data)
|
|
if not peripheral:
|
|
raise HTTPException(status_code=404, detail="Peripheral not found")
|
|
return peripheral
|
|
|
|
|
|
@router.delete("/{peripheral_id}", status_code=204)
|
|
def delete_peripheral(
|
|
peripheral_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Delete a peripheral"""
|
|
if not PeripheralService.delete_peripheral(db, peripheral_id):
|
|
raise HTTPException(status_code=404, detail="Peripheral not found")
|
|
|
|
|
|
@router.get("/statistics/summary")
|
|
def get_statistics(db: Session = Depends(get_peripherals_db)):
|
|
"""Get peripheral statistics"""
|
|
return PeripheralService.get_statistics(db)
|
|
|
|
|
|
# ========================================
|
|
# DEVICE ASSIGNMENT
|
|
# ========================================
|
|
|
|
@router.post("/{peripheral_id}/assign/{device_id}", response_model=PeripheralDetail)
|
|
def assign_to_device(
|
|
peripheral_id: int,
|
|
device_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Assign peripheral to a device"""
|
|
peripheral = PeripheralService.assign_to_device(db, peripheral_id, device_id)
|
|
if not peripheral:
|
|
raise HTTPException(status_code=404, detail="Peripheral not found")
|
|
return peripheral
|
|
|
|
|
|
@router.post("/{peripheral_id}/unassign", response_model=PeripheralDetail)
|
|
def unassign_from_device(
|
|
peripheral_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Unassign peripheral from device"""
|
|
peripheral = PeripheralService.unassign_from_device(db, peripheral_id)
|
|
if not peripheral:
|
|
raise HTTPException(status_code=404, detail="Peripheral not found")
|
|
return peripheral
|
|
|
|
|
|
@router.get("/by-device/{device_id}", response_model=List[PeripheralSummary])
|
|
def get_peripherals_by_device(
|
|
device_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Get all peripherals assigned to a device"""
|
|
peripherals = PeripheralService.get_peripherals_by_device(db, device_id)
|
|
return [
|
|
PeripheralSummary(
|
|
id=p.id,
|
|
nom=p.nom,
|
|
type_principal=p.type_principal,
|
|
sous_type=p.sous_type,
|
|
marque=p.marque,
|
|
modele=p.modele,
|
|
etat=p.etat or "Inconnu",
|
|
rating=p.rating or 0.0,
|
|
prix=p.prix,
|
|
en_pret=p.en_pret or False,
|
|
is_complete_device=p.is_complete_device or False,
|
|
quantite_disponible=p.quantite_disponible or 0
|
|
)
|
|
for p in peripherals
|
|
]
|
|
|
|
|
|
# ========================================
|
|
# PHOTOS
|
|
# ========================================
|
|
|
|
@router.post("/{peripheral_id}/photos", response_model=PeripheralPhotoSchema)
|
|
async def upload_photo(
|
|
peripheral_id: int,
|
|
file: UploadFile = File(...),
|
|
description: Optional[str] = Form(None),
|
|
is_primary: bool = Form(False),
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Upload a photo for a peripheral"""
|
|
# Check peripheral exists
|
|
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
|
|
if not peripheral:
|
|
raise HTTPException(status_code=404, detail="Peripheral not found")
|
|
|
|
# Validate image
|
|
temp_path = f"/tmp/{file.filename}"
|
|
with open(temp_path, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
if not ImageProcessor.is_valid_image(temp_path):
|
|
os.remove(temp_path)
|
|
raise HTTPException(status_code=400, detail="Invalid image file")
|
|
|
|
# Create upload directory
|
|
upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "photos", str(peripheral_id))
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
# Process image (main + thumbnail)
|
|
try:
|
|
# Process main image with level configuration
|
|
processed_path, file_size, original_path = ImageProcessor.process_image_with_level(
|
|
image_path=temp_path,
|
|
output_dir=upload_dir,
|
|
compression_level="medium", # Use medium by default
|
|
save_original=True
|
|
)
|
|
mime_type = ImageProcessor.get_mime_type(processed_path)
|
|
|
|
# Generate thumbnail
|
|
thumbnail_path, thumbnail_size = ImageProcessor.create_thumbnail_with_level(
|
|
image_path=temp_path,
|
|
output_dir=upload_dir,
|
|
compression_level="medium"
|
|
)
|
|
|
|
# Create database entry
|
|
photo = PeripheralPhoto(
|
|
peripheral_id=peripheral_id,
|
|
filename=os.path.basename(processed_path),
|
|
stored_path=processed_path,
|
|
thumbnail_path=thumbnail_path,
|
|
mime_type=mime_type,
|
|
size_bytes=file_size,
|
|
description=description,
|
|
is_primary=is_primary
|
|
)
|
|
db.add(photo)
|
|
|
|
# If primary, unset other primary photos
|
|
if is_primary:
|
|
db.query(PeripheralPhoto).filter(
|
|
PeripheralPhoto.peripheral_id == peripheral_id,
|
|
PeripheralPhoto.id != photo.id
|
|
).update({"is_primary": False})
|
|
|
|
db.commit()
|
|
db.refresh(photo)
|
|
|
|
return photo
|
|
|
|
finally:
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
|
|
|
|
@router.post("/{peripheral_id}/photos/from-url", response_model=PeripheralPhotoSchema)
|
|
async def upload_photo_from_url(
|
|
peripheral_id: int,
|
|
image_url: str = Form(...),
|
|
description: Optional[str] = Form(None),
|
|
is_primary: bool = Form(False),
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Download and upload a photo from URL for a peripheral"""
|
|
# Check peripheral exists
|
|
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
|
|
if not peripheral:
|
|
raise HTTPException(status_code=404, detail="Peripheral not found")
|
|
|
|
if not image_url.lower().startswith(("http://", "https://")):
|
|
raise HTTPException(status_code=400, detail="URL must start with http:// or https://")
|
|
|
|
# Download to temp file
|
|
from urllib.request import Request, urlopen
|
|
from tempfile import NamedTemporaryFile
|
|
|
|
max_bytes = 10 * 1024 * 1024 # 10 MB
|
|
temp_file = None
|
|
|
|
try:
|
|
req = Request(image_url, headers={"User-Agent": "LinuxBenchTools/1.0"})
|
|
with urlopen(req, timeout=10) as response:
|
|
content_type = response.headers.get("Content-Type", "")
|
|
if content_type and not content_type.startswith("image/"):
|
|
raise HTTPException(status_code=400, detail="URL does not point to an image")
|
|
|
|
temp_file = NamedTemporaryFile(delete=False, dir="/tmp", suffix=".img")
|
|
total = 0
|
|
while True:
|
|
chunk = response.read(1024 * 1024)
|
|
if not chunk:
|
|
break
|
|
total += len(chunk)
|
|
if total > max_bytes:
|
|
raise HTTPException(status_code=400, detail="Image is too large (max 10 MB)")
|
|
temp_file.write(chunk)
|
|
|
|
temp_path = temp_file.name
|
|
temp_file.close()
|
|
|
|
if not ImageProcessor.is_valid_image(temp_path):
|
|
os.remove(temp_path)
|
|
raise HTTPException(status_code=400, detail="Invalid image file")
|
|
|
|
# Create upload directory
|
|
upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "photos", str(peripheral_id))
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
# Process image (main + thumbnail)
|
|
processed_path, file_size, original_path = ImageProcessor.process_image_with_level(
|
|
image_path=temp_path,
|
|
output_dir=upload_dir,
|
|
compression_level="medium",
|
|
save_original=True
|
|
)
|
|
mime_type = ImageProcessor.get_mime_type(processed_path)
|
|
|
|
# Generate thumbnail
|
|
thumbnail_path, thumbnail_size = ImageProcessor.create_thumbnail_with_level(
|
|
image_path=temp_path,
|
|
output_dir=upload_dir,
|
|
compression_level="medium"
|
|
)
|
|
|
|
# Create database entry
|
|
photo = PeripheralPhoto(
|
|
peripheral_id=peripheral_id,
|
|
filename=os.path.basename(processed_path),
|
|
stored_path=processed_path,
|
|
thumbnail_path=thumbnail_path,
|
|
mime_type=mime_type,
|
|
size_bytes=file_size,
|
|
description=description,
|
|
is_primary=is_primary
|
|
)
|
|
db.add(photo)
|
|
|
|
# If primary, unset other primary photos
|
|
if is_primary:
|
|
db.query(PeripheralPhoto).filter(
|
|
PeripheralPhoto.peripheral_id == peripheral_id,
|
|
PeripheralPhoto.id != photo.id
|
|
).update({"is_primary": False})
|
|
|
|
db.commit()
|
|
db.refresh(photo)
|
|
|
|
return photo
|
|
|
|
finally:
|
|
if temp_file and os.path.exists(temp_file.name):
|
|
os.remove(temp_file.name)
|
|
|
|
@router.get("/{peripheral_id}/photos")
|
|
def get_photos(
|
|
peripheral_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Get all photos for a peripheral"""
|
|
photos = db.query(PeripheralPhoto).filter(
|
|
PeripheralPhoto.peripheral_id == peripheral_id
|
|
).all()
|
|
|
|
# Convert stored paths to web-accessible URLs
|
|
result = []
|
|
for photo in photos:
|
|
photo_dict = {
|
|
"id": photo.id,
|
|
"peripheral_id": photo.peripheral_id,
|
|
"filename": photo.filename,
|
|
"stored_path": photo.stored_path.replace('/app/uploads/', '/uploads/') if photo.stored_path.startswith('/app/uploads/') else photo.stored_path,
|
|
"thumbnail_path": photo.thumbnail_path.replace('/app/uploads/', '/uploads/') if photo.thumbnail_path and photo.thumbnail_path.startswith('/app/uploads/') else photo.thumbnail_path,
|
|
"mime_type": photo.mime_type,
|
|
"size_bytes": photo.size_bytes,
|
|
"description": photo.description,
|
|
"is_primary": photo.is_primary,
|
|
"uploaded_at": photo.uploaded_at
|
|
}
|
|
result.append(photo_dict)
|
|
|
|
return result
|
|
|
|
|
|
@router.post("/{peripheral_id}/photos/{photo_id}/set-primary", status_code=200)
|
|
def set_primary_photo(
|
|
peripheral_id: int,
|
|
photo_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Set a photo as primary (thumbnail)"""
|
|
# Get the photo
|
|
photo = db.query(PeripheralPhoto).filter(
|
|
PeripheralPhoto.id == photo_id,
|
|
PeripheralPhoto.peripheral_id == peripheral_id
|
|
).first()
|
|
|
|
if not photo:
|
|
raise HTTPException(status_code=404, detail="Photo not found")
|
|
|
|
# Unset all other primary photos for this peripheral
|
|
db.query(PeripheralPhoto).filter(
|
|
PeripheralPhoto.peripheral_id == peripheral_id,
|
|
PeripheralPhoto.id != photo_id
|
|
).update({"is_primary": False})
|
|
|
|
# Set this photo as primary
|
|
photo.is_primary = True
|
|
db.commit()
|
|
|
|
return {"message": "Photo set as primary", "photo_id": photo_id}
|
|
|
|
|
|
@router.delete("/photos/{photo_id}", status_code=204)
|
|
def delete_photo(
|
|
photo_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Delete a photo"""
|
|
photo = db.query(PeripheralPhoto).filter(PeripheralPhoto.id == photo_id).first()
|
|
if not photo:
|
|
raise HTTPException(status_code=404, detail="Photo not found")
|
|
|
|
# Delete file
|
|
if os.path.exists(photo.stored_path):
|
|
os.remove(photo.stored_path)
|
|
|
|
db.delete(photo)
|
|
db.commit()
|
|
|
|
|
|
# ========================================
|
|
# DOCUMENTS
|
|
# ========================================
|
|
|
|
@router.post("/{peripheral_id}/documents", response_model=PeripheralDocumentSchema)
|
|
async def upload_document(
|
|
peripheral_id: int,
|
|
file: UploadFile = File(...),
|
|
doc_type: str = Form(...),
|
|
description: Optional[str] = Form(None),
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Upload a document for a peripheral"""
|
|
# Check peripheral exists
|
|
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
|
|
if not peripheral:
|
|
raise HTTPException(status_code=404, detail="Peripheral not found")
|
|
|
|
# Create upload directory
|
|
upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "documents", str(peripheral_id))
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
# Save file
|
|
timestamp = Path(file.filename).stem
|
|
safe_filename = f"{doc_type}_{timestamp}{Path(file.filename).suffix}"
|
|
file_path = os.path.join(upload_dir, safe_filename)
|
|
|
|
with open(file_path, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
# Create database entry
|
|
document = PeripheralDocument(
|
|
peripheral_id=peripheral_id,
|
|
doc_type=doc_type,
|
|
filename=safe_filename,
|
|
stored_path=file_path,
|
|
mime_type=file.content_type,
|
|
size_bytes=file_size,
|
|
description=description
|
|
)
|
|
db.add(document)
|
|
db.commit()
|
|
db.refresh(document)
|
|
|
|
return document
|
|
|
|
|
|
@router.get("/{peripheral_id}/documents")
|
|
def get_documents(
|
|
peripheral_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Get all documents for a peripheral"""
|
|
documents = db.query(PeripheralDocument).filter(
|
|
PeripheralDocument.peripheral_id == peripheral_id
|
|
).all()
|
|
|
|
# Convert stored paths to web-accessible URLs
|
|
result = []
|
|
for doc in documents:
|
|
doc_dict = {
|
|
"id": doc.id,
|
|
"peripheral_id": doc.peripheral_id,
|
|
"filename": doc.filename,
|
|
"stored_path": doc.stored_path.replace('/app/uploads/', '/uploads/') if doc.stored_path.startswith('/app/uploads/') else doc.stored_path,
|
|
"doc_type": doc.doc_type,
|
|
"size_bytes": doc.size_bytes,
|
|
"description": doc.description,
|
|
"uploaded_at": doc.uploaded_at
|
|
}
|
|
result.append(doc_dict)
|
|
|
|
return result
|
|
|
|
|
|
@router.delete("/documents/{document_id}", status_code=204)
|
|
def delete_document(
|
|
document_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Delete a document"""
|
|
document = db.query(PeripheralDocument).filter(PeripheralDocument.id == document_id).first()
|
|
if not document:
|
|
raise HTTPException(status_code=404, detail="Document not found")
|
|
|
|
# Delete file
|
|
if os.path.exists(document.stored_path):
|
|
os.remove(document.stored_path)
|
|
|
|
db.delete(document)
|
|
db.commit()
|
|
|
|
|
|
# ========================================
|
|
# LINKS
|
|
# ========================================
|
|
|
|
@router.post("/{peripheral_id}/links", response_model=PeripheralLinkSchema)
|
|
def create_link(
|
|
peripheral_id: int,
|
|
link: PeripheralLinkCreate,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Create a link for a peripheral"""
|
|
# Check peripheral exists
|
|
peripheral = PeripheralService.get_peripheral(db, peripheral_id)
|
|
if not peripheral:
|
|
raise HTTPException(status_code=404, detail="Peripheral not found")
|
|
|
|
db_link = PeripheralLink(**link.model_dump(), peripheral_id=peripheral_id)
|
|
db.add(db_link)
|
|
db.commit()
|
|
db.refresh(db_link)
|
|
return db_link
|
|
|
|
|
|
@router.get("/{peripheral_id}/links", response_model=List[PeripheralLinkSchema])
|
|
def get_links(
|
|
peripheral_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Get all links for a peripheral"""
|
|
return db.query(PeripheralLink).filter(
|
|
PeripheralLink.peripheral_id == peripheral_id
|
|
).all()
|
|
|
|
|
|
@router.delete("/links/{link_id}", status_code=204)
|
|
def delete_link(
|
|
link_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Delete a link"""
|
|
link = db.query(PeripheralLink).filter(PeripheralLink.id == link_id).first()
|
|
if not link:
|
|
raise HTTPException(status_code=404, detail="Link not found")
|
|
|
|
db.delete(link)
|
|
db.commit()
|
|
|
|
|
|
# ========================================
|
|
# LOANS
|
|
# ========================================
|
|
|
|
@router.post("/loans", response_model=LoanSchema)
|
|
def create_loan(
|
|
loan: LoanCreate,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Create a loan"""
|
|
db_loan = PeripheralService.create_loan(db, loan)
|
|
if not db_loan:
|
|
raise HTTPException(status_code=400, detail="Cannot create loan (peripheral not found or already on loan)")
|
|
return db_loan
|
|
|
|
|
|
@router.post("/loans/{loan_id}/return", response_model=LoanSchema)
|
|
def return_loan(
|
|
loan_id: int,
|
|
return_data: LoanReturn,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Return a loan"""
|
|
loan = PeripheralService.return_loan(db, loan_id, return_data)
|
|
if not loan:
|
|
raise HTTPException(status_code=404, detail="Loan not found or already returned")
|
|
return loan
|
|
|
|
|
|
@router.get("/loans/overdue", response_model=List[LoanSchema])
|
|
def get_overdue_loans(db: Session = Depends(get_peripherals_db)):
|
|
"""Get all overdue loans"""
|
|
return PeripheralService.get_overdue_loans(db)
|
|
|
|
|
|
@router.get("/loans/upcoming", response_model=List[LoanSchema])
|
|
def get_upcoming_returns(
|
|
days: int = Query(7, ge=1, le=30),
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Get loans due within specified days"""
|
|
return PeripheralService.get_upcoming_returns(db, days)
|
|
|
|
|
|
@router.get("/{peripheral_id}/loans", response_model=List[LoanSchema])
|
|
def get_peripheral_loans(
|
|
peripheral_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Get all loans for a peripheral"""
|
|
return db.query(PeripheralLoan).filter(
|
|
PeripheralLoan.peripheral_id == peripheral_id
|
|
).all()
|
|
|
|
|
|
# ========================================
|
|
# HISTORY
|
|
# ========================================
|
|
|
|
@router.get("/{peripheral_id}/history", response_model=List[PeripheralHistorySchema])
|
|
def get_peripheral_history(
|
|
peripheral_id: int,
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Get history for a peripheral"""
|
|
return db.query(PeripheralLocationHistory).filter(
|
|
PeripheralLocationHistory.peripheral_id == peripheral_id
|
|
).order_by(PeripheralLocationHistory.timestamp.desc()).all()
|
|
|
|
|
|
# ========================================
|
|
# USB IMPORT
|
|
# ========================================
|
|
|
|
@router.post("/import/usb", response_model=dict)
|
|
def import_usb_info(
|
|
lsusb_output: str = Form(...),
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""Import USB device information from lsusb -v output"""
|
|
try:
|
|
usb_info = parse_lsusb_verbose(lsusb_output)
|
|
|
|
# Create suggested peripheral data
|
|
suggested = {
|
|
"nom": create_device_name(usb_info),
|
|
"type_principal": usb_info.get("type_principal", "USB"),
|
|
"sous_type": usb_info.get("sous_type"),
|
|
"marque": usb_info.get("marque"),
|
|
"modele": usb_info.get("modele"),
|
|
"fabricant": usb_info.get("fabricant") or usb_info.get("manufacturer"),
|
|
"produit": usb_info.get("produit") or usb_info.get("product"),
|
|
"numero_serie": usb_info.get("numero_serie"),
|
|
"iManufacturer": usb_info.get("fabricant") or usb_info.get("manufacturer"),
|
|
"iProduct": usb_info.get("produit") or usb_info.get("product"),
|
|
"vendor_id": usb_info.get("vendor_id"),
|
|
"product_id": usb_info.get("product_id"),
|
|
"usb_device_id": usb_info.get("usb_device_id") or _build_usb_device_id(usb_info.get("vendor_id"), usb_info.get("product_id")),
|
|
"class_id": usb_info.get("raw_info", {}).get("device_class_code"),
|
|
"caracteristiques_specifiques": {
|
|
"usb_version": usb_info.get("usb_version"),
|
|
"device_class": usb_info.get("device_class"),
|
|
"max_power_ma": usb_info.get("max_power_ma"),
|
|
"speed": usb_info.get("speed"),
|
|
"interfaces": usb_info.get("interfaces", [])
|
|
}
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"parsed_data": usb_info,
|
|
"suggested_peripheral": suggested
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Failed to parse USB info: {str(e)}")
|
|
|
|
|
|
@router.post("/import/markdown", response_model=dict)
|
|
async def import_markdown_specification(
|
|
file: UploadFile = File(...),
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""
|
|
Import peripheral specification from a markdown (.md) file.
|
|
|
|
Supports two formats:
|
|
- Simple: Title + Description
|
|
- Detailed: Full USB specification with vendor/product IDs, characteristics
|
|
|
|
Checks if peripheral already exists (by vendor_id + product_id).
|
|
Returns suggested peripheral data for the frontend to pre-fill the form.
|
|
"""
|
|
try:
|
|
# Validate file type
|
|
if not file.filename.endswith('.md'):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Only markdown (.md) files are supported"
|
|
)
|
|
|
|
# Read file content
|
|
content = await file.read()
|
|
md_content = content.decode('utf-8')
|
|
|
|
# Parse markdown
|
|
parsed_data = parse_md_specification(md_content)
|
|
|
|
# Try to extract IDs from filename as fallback
|
|
filename_ids = extract_usb_ids_from_filename(file.filename)
|
|
if filename_ids and "caracteristiques_specifiques" in parsed_data:
|
|
# Add IDs from filename if not already present
|
|
if "vendor_id" not in parsed_data["caracteristiques_specifiques"]:
|
|
parsed_data["caracteristiques_specifiques"].update(filename_ids)
|
|
elif filename_ids:
|
|
parsed_data["caracteristiques_specifiques"] = filename_ids
|
|
|
|
# Intelligent classification of device type from markdown content
|
|
# Use classifier if type not already detected by parser
|
|
type_principal = parsed_data.get("type_principal")
|
|
sous_type = parsed_data.get("sous_type")
|
|
|
|
if not type_principal or not sous_type:
|
|
# Build device_info from parsed data
|
|
device_info = {
|
|
"vendor_id": parsed_data.get("caracteristiques_specifiques", {}).get("vendor_id"),
|
|
"product_id": parsed_data.get("caracteristiques_specifiques", {}).get("product_id"),
|
|
"manufacturer": parsed_data.get("marque"),
|
|
"product": parsed_data.get("modele"),
|
|
"device_class": parsed_data.get("caracteristiques_specifiques", {}).get("device_class"),
|
|
}
|
|
|
|
detected_type_principal, detected_sous_type = DeviceClassifier.classify_device(
|
|
cli_content=None,
|
|
synthese_content=md_content,
|
|
device_info=device_info
|
|
)
|
|
|
|
# Use detected values if not already present
|
|
if not type_principal:
|
|
type_principal = detected_type_principal
|
|
if not sous_type:
|
|
sous_type = detected_sous_type
|
|
|
|
# Refine subtypes if needed
|
|
if type_principal == "Stockage" and md_content:
|
|
sous_type = DeviceClassifier.refine_storage_subtype(md_content)
|
|
elif type_principal == "Bluetooth" and sous_type == "Autre" and md_content:
|
|
sous_type = DeviceClassifier.refine_bluetooth_subtype(md_content)
|
|
|
|
# Build suggested peripheral data
|
|
suggested = {
|
|
"nom": parsed_data.get("nom", "Périphérique importé"),
|
|
"type_principal": type_principal, # Intelligently detected or from parser
|
|
"sous_type": sous_type, # Intelligently detected or from parser
|
|
"marque": parsed_data.get("marque"),
|
|
"modele": parsed_data.get("modele"),
|
|
"numero_serie": parsed_data.get("numero_serie"),
|
|
"description": parsed_data.get("description"),
|
|
"synthese": md_content, # Store the full markdown content in synthese field
|
|
"notes": parsed_data.get("notes"),
|
|
"caracteristiques_specifiques": parsed_data.get("caracteristiques_specifiques", {}),
|
|
"etat": "Neuf", # Default state
|
|
"quantite_totale": 1,
|
|
"quantite_disponible": 1
|
|
}
|
|
|
|
# Clean up None values
|
|
suggested = {k: v for k, v in suggested.items() if v is not None}
|
|
|
|
# Check for existing peripheral with same vendor_id and product_id
|
|
existing_peripheral = None
|
|
vendor_id = suggested.get("caracteristiques_specifiques", {}).get("vendor_id")
|
|
product_id = suggested.get("caracteristiques_specifiques", {}).get("product_id")
|
|
|
|
if vendor_id and product_id:
|
|
# Search for peripheral with matching vendor_id and product_id in JSON field
|
|
from app.models.peripheral import Peripheral
|
|
|
|
all_peripherals = db.query(Peripheral).all()
|
|
for periph in all_peripherals:
|
|
if periph.caracteristiques_specifiques:
|
|
p_vendor = periph.caracteristiques_specifiques.get("vendor_id")
|
|
p_product = periph.caracteristiques_specifiques.get("product_id")
|
|
if p_vendor == vendor_id and p_product == product_id:
|
|
existing_peripheral = periph
|
|
break
|
|
|
|
if existing_peripheral:
|
|
# Peripheral already exists
|
|
return {
|
|
"success": True,
|
|
"already_exists": True,
|
|
"existing_peripheral_id": existing_peripheral.id,
|
|
"existing_peripheral": {
|
|
"id": existing_peripheral.id,
|
|
"nom": existing_peripheral.nom,
|
|
"type_principal": existing_peripheral.type_principal,
|
|
"marque": existing_peripheral.marque,
|
|
"modele": existing_peripheral.modele,
|
|
"quantite_totale": existing_peripheral.quantite_totale,
|
|
"quantite_disponible": existing_peripheral.quantite_disponible
|
|
},
|
|
"filename": file.filename,
|
|
"message": f"Un périphérique avec vendor_id={vendor_id} et product_id={product_id} existe déjà"
|
|
}
|
|
else:
|
|
# New peripheral - return suggested data for form pre-fill
|
|
return {
|
|
"success": True,
|
|
"already_exists": False,
|
|
"filename": file.filename,
|
|
"parsed_data": parsed_data,
|
|
"suggested_peripheral": suggested
|
|
}
|
|
|
|
except UnicodeDecodeError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="File encoding error. Please ensure the file is UTF-8 encoded."
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Failed to parse markdown file: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/import/usb-cli/detect", response_model=dict)
|
|
def detect_usb_devices_from_cli(
|
|
lsusb_output: str = Form(...),
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""
|
|
Detect all USB devices from 'lsusb -v' output.
|
|
Returns a list of devices for the user to select from.
|
|
"""
|
|
try:
|
|
devices = detect_usb_devices(lsusb_output)
|
|
|
|
if not devices:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="No USB devices found in the provided output. Please ensure you're pasting the output from 'lsusb -v' command."
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"devices": devices,
|
|
"total_devices": len(devices)
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Failed to detect USB devices: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/import/usb-cli/extract", response_model=dict)
|
|
def extract_usb_device_from_cli(
|
|
lsusb_output: str = Form(...),
|
|
bus: str = Form(...),
|
|
device: str = Form(...),
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""
|
|
Extract a specific USB device from 'lsusb -v' output.
|
|
Filters the CLI output to only the selected device and returns parsed data.
|
|
"""
|
|
try:
|
|
# Extract the device section
|
|
device_section = extract_device_section(lsusb_output, bus, device)
|
|
|
|
if not device_section:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Device Bus {bus} Device {device} not found in the provided output"
|
|
)
|
|
|
|
# Parse device info
|
|
device_info = parse_device_info(device_section)
|
|
|
|
# Also use existing parser for more complete data
|
|
try:
|
|
detailed_info = parse_lsusb_verbose(device_section)
|
|
except:
|
|
detailed_info = {}
|
|
|
|
# Format CLI output as markdown (raw)
|
|
cli_raw = f"""# Sortie sudo lsusb -v
|
|
|
|
Bus {bus} Device {device}
|
|
|
|
```
|
|
{device_section}
|
|
```
|
|
"""
|
|
|
|
# Generate YAML structured data
|
|
cli_yaml_data = {
|
|
"identification": {
|
|
"bus": bus,
|
|
"device": device,
|
|
"vendor_id": device_info.get("vendor_id"),
|
|
"product_id": device_info.get("product_id"),
|
|
"manufacturer": device_info.get("manufacturer"),
|
|
"product": device_info.get("product"),
|
|
"serial": device_info.get("serial"),
|
|
},
|
|
"usb": {
|
|
"version_declared": device_info.get("usb_version"),
|
|
"type": device_info.get("usb_type"),
|
|
"negotiated_speed": device_info.get("speed"),
|
|
},
|
|
"classes": {
|
|
"device_class": device_info.get("device_class"),
|
|
"interface_classes": device_info.get("interface_classes"),
|
|
},
|
|
"power": {
|
|
"max_power_ma": device_info.get("max_power"),
|
|
"is_bus_powered": device_info.get("is_bus_powered"),
|
|
"is_self_powered": device_info.get("is_self_powered"),
|
|
"power_sufficient": device_info.get("power_sufficient"),
|
|
},
|
|
"firmware": {
|
|
"requires_firmware": device_info.get("requires_firmware"),
|
|
}
|
|
}
|
|
|
|
# Remove None values from YAML data
|
|
def clean_dict(d):
|
|
if isinstance(d, dict):
|
|
return {k: clean_dict(v) for k, v in d.items() if v is not None}
|
|
return d
|
|
|
|
cli_yaml_data = clean_dict(cli_yaml_data)
|
|
|
|
# Convert to YAML string
|
|
import yaml
|
|
cli_yaml = yaml.dump(cli_yaml_data, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
# Intelligent classification of device type
|
|
type_principal, sous_type = DeviceClassifier.classify_device(
|
|
cli_content=device_section,
|
|
synthese_content=None,
|
|
device_info=device_info
|
|
)
|
|
|
|
# Refine subtypes if needed
|
|
if type_principal == "Bluetooth" and sous_type == "Autre":
|
|
sous_type = DeviceClassifier.refine_bluetooth_subtype(device_section)
|
|
elif type_principal == "Stockage":
|
|
# Always refine storage subtype to distinguish between flash/HDD/card reader
|
|
sous_type = DeviceClassifier.refine_storage_subtype(device_section)
|
|
|
|
# Build suggested peripheral data
|
|
# Field mappings per technical specs:
|
|
# - marque = Vendor string (idVendor line 3rd column)
|
|
# - modele = Product string (idProduct line 3rd column)
|
|
# - fabricant = iManufacturer (manufacturer string)
|
|
# - produit = iProduct (product string)
|
|
suggested = {
|
|
"nom": device_info.get("product") or detailed_info.get("product_string") or f"USB Device {device_info['vendor_id']}:{device_info['product_id']}",
|
|
"type_principal": type_principal, # Intelligently detected
|
|
"sous_type": sous_type, # Intelligently detected
|
|
"marque": detailed_info.get("marque") or device_info.get("manufacturer"),
|
|
"modele": detailed_info.get("modele") or device_info.get("product"),
|
|
"fabricant": device_info.get("manufacturer"), # iManufacturer
|
|
"produit": device_info.get("product"), # iProduct
|
|
"numero_serie": device_info.get("serial"),
|
|
"cli_yaml": cli_yaml, # Structured data in YAML format
|
|
"cli_raw": cli_raw, # Raw CLI output formatted as markdown
|
|
"iManufacturer": device_info.get("manufacturer"),
|
|
"iProduct": device_info.get("product"),
|
|
"usb_device_id": _build_usb_device_id(device_info.get("vendor_id"), device_info.get("product_id")),
|
|
"caracteristiques_specifiques": {
|
|
"vendor_id": device_info.get("vendor_id"), # idVendor (hex ID)
|
|
"product_id": device_info.get("product_id"), # idProduct (hex ID)
|
|
"fabricant": device_info.get("manufacturer"), # iManufacturer / Vendor name
|
|
"produit": device_info.get("product"), # iProduct string
|
|
"usb_version_declared": device_info.get("usb_version"), # bcdUSB (declared, not definitive)
|
|
"usb_type": device_info.get("usb_type"), # Actual USB type from negotiated speed
|
|
"negotiated_speed": device_info.get("speed"), # e.g., "High Speed", "SuperSpeed"
|
|
"device_class": device_info.get("device_class"), # bDeviceClass
|
|
"interface_classes": device_info.get("interface_classes"), # CRITICAL: bInterfaceClass (normative)
|
|
"requires_firmware": device_info.get("requires_firmware"), # True if class 255 (Vendor Specific)
|
|
"max_power_ma": device_info.get("max_power"), # MaxPower in mA
|
|
"is_bus_powered": device_info.get("is_bus_powered"),
|
|
"is_self_powered": device_info.get("is_self_powered"),
|
|
"power_sufficient": device_info.get("power_sufficient"), # Based on port capacity
|
|
}
|
|
}
|
|
|
|
# Clean up None values
|
|
suggested = {k: v for k, v in suggested.items() if v is not None}
|
|
if "caracteristiques_specifiques" in suggested:
|
|
suggested["caracteristiques_specifiques"] = {
|
|
k: v for k, v in suggested["caracteristiques_specifiques"].items() if v is not None
|
|
}
|
|
|
|
# Check for existing peripheral with same vendor_id and product_id
|
|
existing_peripheral = None
|
|
vendor_id = suggested.get("caracteristiques_specifiques", {}).get("vendor_id")
|
|
product_id = suggested.get("caracteristiques_specifiques", {}).get("product_id")
|
|
|
|
if vendor_id and product_id:
|
|
from app.models.peripheral import Peripheral
|
|
|
|
all_peripherals = db.query(Peripheral).all()
|
|
for periph in all_peripherals:
|
|
if periph.caracteristiques_specifiques:
|
|
p_vendor = periph.caracteristiques_specifiques.get("vendor_id")
|
|
p_product = periph.caracteristiques_specifiques.get("product_id")
|
|
if p_vendor == vendor_id and p_product == product_id:
|
|
existing_peripheral = periph
|
|
break
|
|
|
|
if existing_peripheral:
|
|
# Peripheral already exists
|
|
return {
|
|
"success": True,
|
|
"already_exists": True,
|
|
"existing_peripheral_id": existing_peripheral.id,
|
|
"existing_peripheral": {
|
|
"id": existing_peripheral.id,
|
|
"nom": existing_peripheral.nom,
|
|
"type_principal": existing_peripheral.type_principal,
|
|
"marque": existing_peripheral.marque,
|
|
"modele": existing_peripheral.modele,
|
|
"quantite_totale": existing_peripheral.quantite_totale,
|
|
"quantite_disponible": existing_peripheral.quantite_disponible
|
|
},
|
|
"message": f"Un périphérique avec vendor_id={vendor_id} et product_id={product_id} existe déjà"
|
|
}
|
|
else:
|
|
# New peripheral - return suggested data for form pre-fill
|
|
return {
|
|
"success": True,
|
|
"already_exists": False,
|
|
"device_section": device_section,
|
|
"parsed_data": device_info,
|
|
"detailed_data": detailed_info,
|
|
"suggested_peripheral": suggested
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Failed to extract device information: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/import/usb-structured", response_model=dict)
|
|
def import_structured_usb_info(
|
|
usb_info: str = Form(...),
|
|
db: Session = Depends(get_peripherals_db)
|
|
):
|
|
"""
|
|
Import structured USB information (from GUI tools or formatted text).
|
|
Parses the information and returns suggested peripheral data with YAML-formatted CLI.
|
|
|
|
Args:
|
|
usb_info: Structured USB information text (French format supported)
|
|
|
|
Returns:
|
|
Suggested peripheral data with general fields and YAML CLI section
|
|
"""
|
|
try:
|
|
# Parse structured USB information
|
|
parsed = parse_structured_usb_info(usb_info)
|
|
|
|
# Generate YAML structured data
|
|
import yaml
|
|
cli_yaml = yaml.dump(parsed["cli_yaml"], default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
# Format raw output as markdown
|
|
cli_raw = f"""# Sortie USB structurée
|
|
|
|
```
|
|
{usb_info.strip()}
|
|
```
|
|
"""
|
|
|
|
# Build device_info for classification (include interface_classes!)
|
|
device_info = {
|
|
"vendor_id": parsed["caracteristiques_specifiques"].get("vendor_id"),
|
|
"product_id": parsed["caracteristiques_specifiques"].get("product_id"),
|
|
"manufacturer": parsed["caracteristiques_specifiques"].get("fabricant"),
|
|
"product": parsed["caracteristiques_specifiques"].get("produit"),
|
|
"device_class": parsed["caracteristiques_specifiques"].get("device_class"),
|
|
"interface_classes": parsed["caracteristiques_specifiques"].get("interface_classes"), # CRITICAL
|
|
}
|
|
|
|
# Intelligent classification
|
|
type_principal, sous_type = DeviceClassifier.classify_device(
|
|
cli_content=usb_info,
|
|
synthese_content=None,
|
|
device_info=device_info
|
|
)
|
|
|
|
# Refine subtypes if needed
|
|
if type_principal == "Bluetooth" and sous_type == "Autre":
|
|
sous_type = DeviceClassifier.refine_bluetooth_subtype(usb_info)
|
|
elif type_principal == "Stockage":
|
|
sous_type = DeviceClassifier.refine_storage_subtype(usb_info)
|
|
|
|
# Build suggested peripheral data
|
|
# Field mappings per technical specs:
|
|
# - marque = Vendor string (idVendor line 3rd column)
|
|
# - modele = Product string (idProduct line 3rd column)
|
|
# - fabricant = iManufacturer (manufacturer string)
|
|
# - produit = iProduct (product string)
|
|
suggested = {
|
|
"nom": parsed["general"].get("nom", "Périphérique USB"),
|
|
"type_principal": type_principal,
|
|
"sous_type": sous_type,
|
|
"marque": parsed["general"].get("marque"),
|
|
"modele": parsed["general"].get("modele"),
|
|
"fabricant": parsed["general"].get("fabricant"), # iManufacturer
|
|
"produit": parsed["general"].get("produit"), # iProduct
|
|
"numero_serie": parsed["general"].get("numero_serie"),
|
|
"cli_yaml": cli_yaml, # Structured data in YAML format
|
|
"cli_raw": cli_raw, # Raw output formatted as markdown
|
|
"iManufacturer": parsed["general"].get("fabricant"),
|
|
"iProduct": parsed["general"].get("produit"),
|
|
"usb_device_id": _build_usb_device_id(
|
|
parsed["caracteristiques_specifiques"].get("vendor_id"),
|
|
parsed["caracteristiques_specifiques"].get("product_id")
|
|
),
|
|
"caracteristiques_specifiques": parsed["caracteristiques_specifiques"]
|
|
}
|
|
|
|
# Clean up None values
|
|
suggested = {k: v for k, v in suggested.items() if v is not None}
|
|
if "caracteristiques_specifiques" in suggested:
|
|
suggested["caracteristiques_specifiques"] = {
|
|
k: v for k, v in suggested["caracteristiques_specifiques"].items() if v is not None
|
|
}
|
|
|
|
# Check for existing peripheral
|
|
existing_peripheral = None
|
|
vendor_id = suggested.get("caracteristiques_specifiques", {}).get("vendor_id")
|
|
product_id = suggested.get("caracteristiques_specifiques", {}).get("product_id")
|
|
|
|
if vendor_id and product_id:
|
|
from app.models.peripheral import Peripheral
|
|
|
|
all_peripherals = db.query(Peripheral).all()
|
|
for periph in all_peripherals:
|
|
if periph.caracteristiques_specifiques:
|
|
p_vendor = periph.caracteristiques_specifiques.get("vendor_id")
|
|
p_product = periph.caracteristiques_specifiques.get("product_id")
|
|
if p_vendor == vendor_id and p_product == product_id:
|
|
existing_peripheral = periph
|
|
break
|
|
|
|
if existing_peripheral:
|
|
return {
|
|
"success": True,
|
|
"already_exists": True,
|
|
"existing_peripheral_id": existing_peripheral.id,
|
|
"existing_peripheral": {
|
|
"id": existing_peripheral.id,
|
|
"nom": existing_peripheral.nom,
|
|
"type_principal": existing_peripheral.type_principal,
|
|
"marque": existing_peripheral.marque,
|
|
"modele": existing_peripheral.modele,
|
|
"quantite_disponible": existing_peripheral.quantite_disponible
|
|
}
|
|
}
|
|
|
|
# New peripheral
|
|
return {
|
|
"success": True,
|
|
"already_exists": False,
|
|
"suggested_peripheral": suggested,
|
|
"parsed_data": parsed["cli_yaml"]
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Failed to parse structured USB information: {str(e)}"
|
|
)
|