""" Linux BenchTools - Peripherals API Endpoints """ from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File, Form from sqlalchemy.orm import Session from typing import List, Optional from datetime import date from app.db.session import get_peripherals_db, get_db from app.services.peripheral_service import PeripheralService, LocationService from app.models.device import Device from app.schemas.peripheral import ( PeripheralCreate, PeripheralUpdate, PeripheralDetail, PeripheralListResponse, PeripheralSummary, PeripheralPhotoSchema, PeripheralDocumentSchema, PeripheralLinkSchema, LoanCreate, LoanReturn, LoanSchema, LocationSchema, LocationCreate, LocationUpdate, LocationTreeNode, PeripheralHistorySchema, PeripheralPhotoCreate, PeripheralDocumentCreate, PeripheralLinkCreate ) from app.models.peripheral import PeripheralPhoto, PeripheralDocument, PeripheralLink, PeripheralLoan from app.models.peripheral_history import PeripheralLocationHistory from app.utils.image_processor import ImageProcessor from app.utils.qr_generator import QRCodeGenerator from app.utils.usb_parser import parse_lsusb_verbose, create_device_name from app.utils.md_parser import parse_md_specification, extract_usb_ids_from_filename from app.utils.lsusb_parser import detect_usb_devices, extract_device_section, parse_device_info from app.utils.device_classifier import DeviceClassifier from app.utils.usb_info_parser import parse_structured_usb_info, create_full_cli_section from app.utils.yaml_loader import yaml_loader from app.core.config import settings import os import shutil from pathlib import Path router = APIRouter() def _build_usb_device_id(vendor_id: Optional[str], product_id: Optional[str]) -> Optional[str]: if not vendor_id or not product_id: return None v = vendor_id.lower().replace("0x", "") p = product_id.lower().replace("0x", "") return f"{v}:{p}" # ======================================== # CONFIGURATION # ======================================== @router.get("/config/types", response_model=dict) def get_peripheral_types(): """ Get all peripheral types from YAML configuration. Returns types organized by type_principal with their subtypes. """ try: peripheral_types = yaml_loader.get_peripheral_types() # Organize by type_principal for easier frontend usage types_by_category = {} for ptype in peripheral_types: type_principal = ptype.get("type_principal") sous_type = ptype.get("sous_type") if type_principal: if type_principal not in types_by_category: types_by_category[type_principal] = [] if sous_type and sous_type not in types_by_category[type_principal]: types_by_category[type_principal].append(sous_type) return { "success": True, "types": types_by_category, "full_types": peripheral_types # Complete data if needed } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to load peripheral types: {str(e)}") @router.get("/config/location-types", response_model=dict) def get_location_types(): """ Get all location types from YAML configuration. Returns location types with their hierarchy rules and icons. """ try: location_types = yaml_loader.get_location_types() return { "success": True, "location_types": location_types } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to load location types: {str(e)}") @router.get("/config/stockage-locations", response_model=dict) def get_stockage_locations(): """ Get storage locations list from YAML configuration. """ try: locations = yaml_loader.get_stockage_locations() return { "success": True, "locations": locations } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to load storage locations: {str(e)}") @router.get("/config/boutiques", response_model=dict) def get_boutiques(): """ Get boutiques list from YAML configuration. """ try: boutiques = yaml_loader.get_boutiques() return { "success": True, "boutiques": boutiques } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to load boutiques: {str(e)}") @router.get("/config/hosts", response_model=dict) def get_hosts(): """ Get hosts list from YAML configuration. """ try: hosts = yaml_loader.get_hosts() return { "success": True, "hosts": hosts } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to load hosts: {str(e)}") @router.get("/config/devices", response_model=dict) def get_devices_for_dropdown(db: Session = Depends(get_db)): """ Get all devices (hosts) for dropdown selection in peripherals form. Returns a simple list of devices with id, hostname, and location. """ try: devices = db.query(Device).order_by(Device.hostname).all() devices_list = [] for device in devices: devices_list.append({ "id": device.id, "hostname": device.hostname, "location": device.location or "", "description": device.description or "" }) return { "success": True, "devices": devices_list } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to load devices: {str(e)}") # ======================================== # PERIPHERAL CRUD # ======================================== @router.post("/", response_model=PeripheralDetail, status_code=201) def create_peripheral( peripheral: PeripheralCreate, db: Session = Depends(get_peripherals_db) ): """Create a new peripheral""" return PeripheralService.create_peripheral(db, peripheral) @router.get("/", response_model=PeripheralListResponse) def list_peripherals( page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=100), type_filter: Optional[str] = None, search: Optional[str] = None, location_id: Optional[int] = None, device_id: Optional[int] = None, en_pret: Optional[bool] = None, is_complete_device: Optional[bool] = None, sort_by: str = "date_creation", sort_order: str = "desc", db: Session = Depends(get_peripherals_db) ): """List peripherals with pagination and filters""" return PeripheralService.list_peripherals( db=db, page=page, page_size=page_size, type_filter=type_filter, search=search, location_id=location_id, device_id=device_id, en_pret=en_pret, is_complete_device=is_complete_device, sort_by=sort_by, sort_order=sort_order ) @router.get("/{peripheral_id}", response_model=PeripheralDetail) def get_peripheral( peripheral_id: int, db: Session = Depends(get_peripherals_db) ): """Get a peripheral by ID""" peripheral = PeripheralService.get_peripheral(db, peripheral_id) if not peripheral: raise HTTPException(status_code=404, detail="Peripheral not found") return peripheral @router.put("/{peripheral_id}", response_model=PeripheralDetail) def update_peripheral( peripheral_id: int, peripheral_data: PeripheralUpdate, db: Session = Depends(get_peripherals_db) ): """Update a peripheral""" peripheral = PeripheralService.update_peripheral(db, peripheral_id, peripheral_data) if not peripheral: raise HTTPException(status_code=404, detail="Peripheral not found") return peripheral @router.delete("/{peripheral_id}", status_code=204) def delete_peripheral( peripheral_id: int, db: Session = Depends(get_peripherals_db) ): """Delete a peripheral""" if not PeripheralService.delete_peripheral(db, peripheral_id): raise HTTPException(status_code=404, detail="Peripheral not found") @router.get("/statistics/summary") def get_statistics(db: Session = Depends(get_peripherals_db)): """Get peripheral statistics""" return PeripheralService.get_statistics(db) # ======================================== # DEVICE ASSIGNMENT # ======================================== @router.post("/{peripheral_id}/assign/{device_id}", response_model=PeripheralDetail) def assign_to_device( peripheral_id: int, device_id: int, db: Session = Depends(get_peripherals_db) ): """Assign peripheral to a device""" peripheral = PeripheralService.assign_to_device(db, peripheral_id, device_id) if not peripheral: raise HTTPException(status_code=404, detail="Peripheral not found") return peripheral @router.post("/{peripheral_id}/unassign", response_model=PeripheralDetail) def unassign_from_device( peripheral_id: int, db: Session = Depends(get_peripherals_db) ): """Unassign peripheral from device""" peripheral = PeripheralService.unassign_from_device(db, peripheral_id) if not peripheral: raise HTTPException(status_code=404, detail="Peripheral not found") return peripheral @router.get("/by-device/{device_id}", response_model=List[PeripheralSummary]) def get_peripherals_by_device( device_id: int, db: Session = Depends(get_peripherals_db) ): """Get all peripherals assigned to a device""" peripherals = PeripheralService.get_peripherals_by_device(db, device_id) return [ PeripheralSummary( id=p.id, nom=p.nom, type_principal=p.type_principal, sous_type=p.sous_type, marque=p.marque, modele=p.modele, etat=p.etat or "Inconnu", rating=p.rating or 0.0, prix=p.prix, en_pret=p.en_pret or False, is_complete_device=p.is_complete_device or False, quantite_disponible=p.quantite_disponible or 0 ) for p in peripherals ] # ======================================== # PHOTOS # ======================================== @router.post("/{peripheral_id}/photos", response_model=PeripheralPhotoSchema) async def upload_photo( peripheral_id: int, file: UploadFile = File(...), description: Optional[str] = Form(None), is_primary: bool = Form(False), db: Session = Depends(get_peripherals_db) ): """Upload a photo for a peripheral""" # Check peripheral exists peripheral = PeripheralService.get_peripheral(db, peripheral_id) if not peripheral: raise HTTPException(status_code=404, detail="Peripheral not found") # Validate image temp_path = f"/tmp/{file.filename}" with open(temp_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) if not ImageProcessor.is_valid_image(temp_path): os.remove(temp_path) raise HTTPException(status_code=400, detail="Invalid image file") # Create upload directory upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "photos", str(peripheral_id)) os.makedirs(upload_dir, exist_ok=True) # Process image (main + thumbnail) try: # Process main image with level configuration processed_path, file_size, original_path = ImageProcessor.process_image_with_level( image_path=temp_path, output_dir=upload_dir, compression_level="medium", # Use medium by default save_original=True ) mime_type = ImageProcessor.get_mime_type(processed_path) # Generate thumbnail thumbnail_path, thumbnail_size = ImageProcessor.create_thumbnail_with_level( image_path=temp_path, output_dir=upload_dir, compression_level="medium" ) # Create database entry photo = PeripheralPhoto( peripheral_id=peripheral_id, filename=os.path.basename(processed_path), stored_path=processed_path, thumbnail_path=thumbnail_path, mime_type=mime_type, size_bytes=file_size, description=description, is_primary=is_primary ) db.add(photo) # If primary, unset other primary photos if is_primary: db.query(PeripheralPhoto).filter( PeripheralPhoto.peripheral_id == peripheral_id, PeripheralPhoto.id != photo.id ).update({"is_primary": False}) db.commit() db.refresh(photo) return photo finally: if os.path.exists(temp_path): os.remove(temp_path) @router.post("/{peripheral_id}/photos/from-url", response_model=PeripheralPhotoSchema) async def upload_photo_from_url( peripheral_id: int, image_url: str = Form(...), description: Optional[str] = Form(None), is_primary: bool = Form(False), db: Session = Depends(get_peripherals_db) ): """Download and upload a photo from URL for a peripheral""" # Check peripheral exists peripheral = PeripheralService.get_peripheral(db, peripheral_id) if not peripheral: raise HTTPException(status_code=404, detail="Peripheral not found") if not image_url.lower().startswith(("http://", "https://")): raise HTTPException(status_code=400, detail="URL must start with http:// or https://") # Download to temp file from urllib.request import Request, urlopen from tempfile import NamedTemporaryFile max_bytes = 10 * 1024 * 1024 # 10 MB temp_file = None try: req = Request(image_url, headers={"User-Agent": "LinuxBenchTools/1.0"}) with urlopen(req, timeout=10) as response: content_type = response.headers.get("Content-Type", "") if content_type and not content_type.startswith("image/"): raise HTTPException(status_code=400, detail="URL does not point to an image") temp_file = NamedTemporaryFile(delete=False, dir="/tmp", suffix=".img") total = 0 while True: chunk = response.read(1024 * 1024) if not chunk: break total += len(chunk) if total > max_bytes: raise HTTPException(status_code=400, detail="Image is too large (max 10 MB)") temp_file.write(chunk) temp_path = temp_file.name temp_file.close() if not ImageProcessor.is_valid_image(temp_path): os.remove(temp_path) raise HTTPException(status_code=400, detail="Invalid image file") # Create upload directory upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "photos", str(peripheral_id)) os.makedirs(upload_dir, exist_ok=True) # Process image (main + thumbnail) processed_path, file_size, original_path = ImageProcessor.process_image_with_level( image_path=temp_path, output_dir=upload_dir, compression_level="medium", save_original=True ) mime_type = ImageProcessor.get_mime_type(processed_path) # Generate thumbnail thumbnail_path, thumbnail_size = ImageProcessor.create_thumbnail_with_level( image_path=temp_path, output_dir=upload_dir, compression_level="medium" ) # Create database entry photo = PeripheralPhoto( peripheral_id=peripheral_id, filename=os.path.basename(processed_path), stored_path=processed_path, thumbnail_path=thumbnail_path, mime_type=mime_type, size_bytes=file_size, description=description, is_primary=is_primary ) db.add(photo) # If primary, unset other primary photos if is_primary: db.query(PeripheralPhoto).filter( PeripheralPhoto.peripheral_id == peripheral_id, PeripheralPhoto.id != photo.id ).update({"is_primary": False}) db.commit() db.refresh(photo) return photo finally: if temp_file and os.path.exists(temp_file.name): os.remove(temp_file.name) @router.get("/{peripheral_id}/photos") def get_photos( peripheral_id: int, db: Session = Depends(get_peripherals_db) ): """Get all photos for a peripheral""" photos = db.query(PeripheralPhoto).filter( PeripheralPhoto.peripheral_id == peripheral_id ).all() # Convert stored paths to web-accessible URLs result = [] for photo in photos: photo_dict = { "id": photo.id, "peripheral_id": photo.peripheral_id, "filename": photo.filename, "stored_path": photo.stored_path.replace('/app/uploads/', '/uploads/') if photo.stored_path.startswith('/app/uploads/') else photo.stored_path, "thumbnail_path": photo.thumbnail_path.replace('/app/uploads/', '/uploads/') if photo.thumbnail_path and photo.thumbnail_path.startswith('/app/uploads/') else photo.thumbnail_path, "mime_type": photo.mime_type, "size_bytes": photo.size_bytes, "description": photo.description, "is_primary": photo.is_primary, "uploaded_at": photo.uploaded_at } result.append(photo_dict) return result @router.post("/{peripheral_id}/photos/{photo_id}/set-primary", status_code=200) def set_primary_photo( peripheral_id: int, photo_id: int, db: Session = Depends(get_peripherals_db) ): """Set a photo as primary (thumbnail)""" # Get the photo photo = db.query(PeripheralPhoto).filter( PeripheralPhoto.id == photo_id, PeripheralPhoto.peripheral_id == peripheral_id ).first() if not photo: raise HTTPException(status_code=404, detail="Photo not found") # Unset all other primary photos for this peripheral db.query(PeripheralPhoto).filter( PeripheralPhoto.peripheral_id == peripheral_id, PeripheralPhoto.id != photo_id ).update({"is_primary": False}) # Set this photo as primary photo.is_primary = True db.commit() return {"message": "Photo set as primary", "photo_id": photo_id} @router.delete("/photos/{photo_id}", status_code=204) def delete_photo( photo_id: int, db: Session = Depends(get_peripherals_db) ): """Delete a photo""" photo = db.query(PeripheralPhoto).filter(PeripheralPhoto.id == photo_id).first() if not photo: raise HTTPException(status_code=404, detail="Photo not found") # Delete file if os.path.exists(photo.stored_path): os.remove(photo.stored_path) db.delete(photo) db.commit() # ======================================== # DOCUMENTS # ======================================== @router.post("/{peripheral_id}/documents", response_model=PeripheralDocumentSchema) async def upload_document( peripheral_id: int, file: UploadFile = File(...), doc_type: str = Form(...), description: Optional[str] = Form(None), db: Session = Depends(get_peripherals_db) ): """Upload a document for a peripheral""" # Check peripheral exists peripheral = PeripheralService.get_peripheral(db, peripheral_id) if not peripheral: raise HTTPException(status_code=404, detail="Peripheral not found") # Create upload directory upload_dir = os.path.join(settings.PERIPHERALS_UPLOAD_DIR, "documents", str(peripheral_id)) os.makedirs(upload_dir, exist_ok=True) # Save file timestamp = Path(file.filename).stem safe_filename = f"{doc_type}_{timestamp}{Path(file.filename).suffix}" file_path = os.path.join(upload_dir, safe_filename) with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) file_size = os.path.getsize(file_path) # Create database entry document = PeripheralDocument( peripheral_id=peripheral_id, doc_type=doc_type, filename=safe_filename, stored_path=file_path, mime_type=file.content_type, size_bytes=file_size, description=description ) db.add(document) db.commit() db.refresh(document) return document @router.get("/{peripheral_id}/documents") def get_documents( peripheral_id: int, db: Session = Depends(get_peripherals_db) ): """Get all documents for a peripheral""" documents = db.query(PeripheralDocument).filter( PeripheralDocument.peripheral_id == peripheral_id ).all() # Convert stored paths to web-accessible URLs result = [] for doc in documents: doc_dict = { "id": doc.id, "peripheral_id": doc.peripheral_id, "filename": doc.filename, "stored_path": doc.stored_path.replace('/app/uploads/', '/uploads/') if doc.stored_path.startswith('/app/uploads/') else doc.stored_path, "doc_type": doc.doc_type, "size_bytes": doc.size_bytes, "description": doc.description, "uploaded_at": doc.uploaded_at } result.append(doc_dict) return result @router.delete("/documents/{document_id}", status_code=204) def delete_document( document_id: int, db: Session = Depends(get_peripherals_db) ): """Delete a document""" document = db.query(PeripheralDocument).filter(PeripheralDocument.id == document_id).first() if not document: raise HTTPException(status_code=404, detail="Document not found") # Delete file if os.path.exists(document.stored_path): os.remove(document.stored_path) db.delete(document) db.commit() # ======================================== # LINKS # ======================================== @router.post("/{peripheral_id}/links", response_model=PeripheralLinkSchema) def create_link( peripheral_id: int, link: PeripheralLinkCreate, db: Session = Depends(get_peripherals_db) ): """Create a link for a peripheral""" # Check peripheral exists peripheral = PeripheralService.get_peripheral(db, peripheral_id) if not peripheral: raise HTTPException(status_code=404, detail="Peripheral not found") db_link = PeripheralLink(**link.model_dump(), peripheral_id=peripheral_id) db.add(db_link) db.commit() db.refresh(db_link) return db_link @router.get("/{peripheral_id}/links", response_model=List[PeripheralLinkSchema]) def get_links( peripheral_id: int, db: Session = Depends(get_peripherals_db) ): """Get all links for a peripheral""" return db.query(PeripheralLink).filter( PeripheralLink.peripheral_id == peripheral_id ).all() @router.delete("/links/{link_id}", status_code=204) def delete_link( link_id: int, db: Session = Depends(get_peripherals_db) ): """Delete a link""" link = db.query(PeripheralLink).filter(PeripheralLink.id == link_id).first() if not link: raise HTTPException(status_code=404, detail="Link not found") db.delete(link) db.commit() # ======================================== # LOANS # ======================================== @router.post("/loans", response_model=LoanSchema) def create_loan( loan: LoanCreate, db: Session = Depends(get_peripherals_db) ): """Create a loan""" db_loan = PeripheralService.create_loan(db, loan) if not db_loan: raise HTTPException(status_code=400, detail="Cannot create loan (peripheral not found or already on loan)") return db_loan @router.post("/loans/{loan_id}/return", response_model=LoanSchema) def return_loan( loan_id: int, return_data: LoanReturn, db: Session = Depends(get_peripherals_db) ): """Return a loan""" loan = PeripheralService.return_loan(db, loan_id, return_data) if not loan: raise HTTPException(status_code=404, detail="Loan not found or already returned") return loan @router.get("/loans/overdue", response_model=List[LoanSchema]) def get_overdue_loans(db: Session = Depends(get_peripherals_db)): """Get all overdue loans""" return PeripheralService.get_overdue_loans(db) @router.get("/loans/upcoming", response_model=List[LoanSchema]) def get_upcoming_returns( days: int = Query(7, ge=1, le=30), db: Session = Depends(get_peripherals_db) ): """Get loans due within specified days""" return PeripheralService.get_upcoming_returns(db, days) @router.get("/{peripheral_id}/loans", response_model=List[LoanSchema]) def get_peripheral_loans( peripheral_id: int, db: Session = Depends(get_peripherals_db) ): """Get all loans for a peripheral""" return db.query(PeripheralLoan).filter( PeripheralLoan.peripheral_id == peripheral_id ).all() # ======================================== # HISTORY # ======================================== @router.get("/{peripheral_id}/history", response_model=List[PeripheralHistorySchema]) def get_peripheral_history( peripheral_id: int, db: Session = Depends(get_peripherals_db) ): """Get history for a peripheral""" return db.query(PeripheralLocationHistory).filter( PeripheralLocationHistory.peripheral_id == peripheral_id ).order_by(PeripheralLocationHistory.timestamp.desc()).all() # ======================================== # USB IMPORT # ======================================== @router.post("/import/usb", response_model=dict) def import_usb_info( lsusb_output: str = Form(...), db: Session = Depends(get_peripherals_db) ): """Import USB device information from lsusb -v output""" try: usb_info = parse_lsusb_verbose(lsusb_output) # Create suggested peripheral data suggested = { "nom": create_device_name(usb_info), "type_principal": usb_info.get("type_principal", "USB"), "sous_type": usb_info.get("sous_type"), "marque": usb_info.get("marque"), "modele": usb_info.get("modele"), "fabricant": usb_info.get("fabricant") or usb_info.get("manufacturer"), "produit": usb_info.get("produit") or usb_info.get("product"), "numero_serie": usb_info.get("numero_serie"), "iManufacturer": usb_info.get("fabricant") or usb_info.get("manufacturer"), "iProduct": usb_info.get("produit") or usb_info.get("product"), "vendor_id": usb_info.get("vendor_id"), "product_id": usb_info.get("product_id"), "usb_device_id": usb_info.get("usb_device_id") or _build_usb_device_id(usb_info.get("vendor_id"), usb_info.get("product_id")), "class_id": usb_info.get("raw_info", {}).get("device_class_code"), "caracteristiques_specifiques": { "usb_version": usb_info.get("usb_version"), "device_class": usb_info.get("device_class"), "max_power_ma": usb_info.get("max_power_ma"), "speed": usb_info.get("speed"), "interfaces": usb_info.get("interfaces", []) } } return { "success": True, "parsed_data": usb_info, "suggested_peripheral": suggested } except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to parse USB info: {str(e)}") @router.post("/import/markdown", response_model=dict) async def import_markdown_specification( file: UploadFile = File(...), db: Session = Depends(get_peripherals_db) ): """ Import peripheral specification from a markdown (.md) file. Supports two formats: - Simple: Title + Description - Detailed: Full USB specification with vendor/product IDs, characteristics Checks if peripheral already exists (by vendor_id + product_id). Returns suggested peripheral data for the frontend to pre-fill the form. """ try: # Validate file type if not file.filename.endswith('.md'): raise HTTPException( status_code=400, detail="Only markdown (.md) files are supported" ) # Read file content content = await file.read() md_content = content.decode('utf-8') # Parse markdown parsed_data = parse_md_specification(md_content) # Try to extract IDs from filename as fallback filename_ids = extract_usb_ids_from_filename(file.filename) if filename_ids and "caracteristiques_specifiques" in parsed_data: # Add IDs from filename if not already present if "vendor_id" not in parsed_data["caracteristiques_specifiques"]: parsed_data["caracteristiques_specifiques"].update(filename_ids) elif filename_ids: parsed_data["caracteristiques_specifiques"] = filename_ids # Intelligent classification of device type from markdown content # Use classifier if type not already detected by parser type_principal = parsed_data.get("type_principal") sous_type = parsed_data.get("sous_type") if not type_principal or not sous_type: # Build device_info from parsed data device_info = { "vendor_id": parsed_data.get("caracteristiques_specifiques", {}).get("vendor_id"), "product_id": parsed_data.get("caracteristiques_specifiques", {}).get("product_id"), "manufacturer": parsed_data.get("marque"), "product": parsed_data.get("modele"), "device_class": parsed_data.get("caracteristiques_specifiques", {}).get("device_class"), } detected_type_principal, detected_sous_type = DeviceClassifier.classify_device( cli_content=None, synthese_content=md_content, device_info=device_info ) # Use detected values if not already present if not type_principal: type_principal = detected_type_principal if not sous_type: sous_type = detected_sous_type # Refine subtypes if needed if type_principal == "Stockage" and md_content: sous_type = DeviceClassifier.refine_storage_subtype(md_content) elif type_principal == "Bluetooth" and sous_type == "Autre" and md_content: sous_type = DeviceClassifier.refine_bluetooth_subtype(md_content) # Build suggested peripheral data suggested = { "nom": parsed_data.get("nom", "Périphérique importé"), "type_principal": type_principal, # Intelligently detected or from parser "sous_type": sous_type, # Intelligently detected or from parser "marque": parsed_data.get("marque"), "modele": parsed_data.get("modele"), "numero_serie": parsed_data.get("numero_serie"), "description": parsed_data.get("description"), "synthese": md_content, # Store the full markdown content in synthese field "notes": parsed_data.get("notes"), "caracteristiques_specifiques": parsed_data.get("caracteristiques_specifiques", {}), "etat": "Neuf", # Default state "quantite_totale": 1, "quantite_disponible": 1 } # Clean up None values suggested = {k: v for k, v in suggested.items() if v is not None} # Check for existing peripheral with same vendor_id and product_id existing_peripheral = None vendor_id = suggested.get("caracteristiques_specifiques", {}).get("vendor_id") product_id = suggested.get("caracteristiques_specifiques", {}).get("product_id") if vendor_id and product_id: # Search for peripheral with matching vendor_id and product_id in JSON field from app.models.peripheral import Peripheral all_peripherals = db.query(Peripheral).all() for periph in all_peripherals: if periph.caracteristiques_specifiques: p_vendor = periph.caracteristiques_specifiques.get("vendor_id") p_product = periph.caracteristiques_specifiques.get("product_id") if p_vendor == vendor_id and p_product == product_id: existing_peripheral = periph break if existing_peripheral: # Peripheral already exists return { "success": True, "already_exists": True, "existing_peripheral_id": existing_peripheral.id, "existing_peripheral": { "id": existing_peripheral.id, "nom": existing_peripheral.nom, "type_principal": existing_peripheral.type_principal, "marque": existing_peripheral.marque, "modele": existing_peripheral.modele, "quantite_totale": existing_peripheral.quantite_totale, "quantite_disponible": existing_peripheral.quantite_disponible }, "filename": file.filename, "message": f"Un périphérique avec vendor_id={vendor_id} et product_id={product_id} existe déjà" } else: # New peripheral - return suggested data for form pre-fill return { "success": True, "already_exists": False, "filename": file.filename, "parsed_data": parsed_data, "suggested_peripheral": suggested } except UnicodeDecodeError: raise HTTPException( status_code=400, detail="File encoding error. Please ensure the file is UTF-8 encoded." ) except Exception as e: raise HTTPException( status_code=400, detail=f"Failed to parse markdown file: {str(e)}" ) @router.post("/import/usb-cli/detect", response_model=dict) def detect_usb_devices_from_cli( lsusb_output: str = Form(...), db: Session = Depends(get_peripherals_db) ): """ Detect all USB devices from 'lsusb -v' output. Returns a list of devices for the user to select from. """ try: devices = detect_usb_devices(lsusb_output) if not devices: raise HTTPException( status_code=400, detail="No USB devices found in the provided output. Please ensure you're pasting the output from 'lsusb -v' command." ) return { "success": True, "devices": devices, "total_devices": len(devices) } except Exception as e: raise HTTPException( status_code=400, detail=f"Failed to detect USB devices: {str(e)}" ) @router.post("/import/usb-cli/extract", response_model=dict) def extract_usb_device_from_cli( lsusb_output: str = Form(...), bus: str = Form(...), device: str = Form(...), db: Session = Depends(get_peripherals_db) ): """ Extract a specific USB device from 'lsusb -v' output. Filters the CLI output to only the selected device and returns parsed data. """ try: # Extract the device section device_section = extract_device_section(lsusb_output, bus, device) if not device_section: raise HTTPException( status_code=404, detail=f"Device Bus {bus} Device {device} not found in the provided output" ) # Parse device info device_info = parse_device_info(device_section) # Also use existing parser for more complete data try: detailed_info = parse_lsusb_verbose(device_section) except: detailed_info = {} # Format CLI output as markdown (raw) cli_raw = f"""# Sortie sudo lsusb -v Bus {bus} Device {device} ``` {device_section} ``` """ # Generate YAML structured data cli_yaml_data = { "identification": { "bus": bus, "device": device, "vendor_id": device_info.get("vendor_id"), "product_id": device_info.get("product_id"), "manufacturer": device_info.get("manufacturer"), "product": device_info.get("product"), "serial": device_info.get("serial"), }, "usb": { "version_declared": device_info.get("usb_version"), "type": device_info.get("usb_type"), "negotiated_speed": device_info.get("speed"), }, "classes": { "device_class": device_info.get("device_class"), "interface_classes": device_info.get("interface_classes"), }, "power": { "max_power_ma": device_info.get("max_power"), "is_bus_powered": device_info.get("is_bus_powered"), "is_self_powered": device_info.get("is_self_powered"), "power_sufficient": device_info.get("power_sufficient"), }, "firmware": { "requires_firmware": device_info.get("requires_firmware"), } } # Remove None values from YAML data def clean_dict(d): if isinstance(d, dict): return {k: clean_dict(v) for k, v in d.items() if v is not None} return d cli_yaml_data = clean_dict(cli_yaml_data) # Convert to YAML string import yaml cli_yaml = yaml.dump(cli_yaml_data, default_flow_style=False, allow_unicode=True, sort_keys=False) # Intelligent classification of device type type_principal, sous_type = DeviceClassifier.classify_device( cli_content=device_section, synthese_content=None, device_info=device_info ) # Refine subtypes if needed if type_principal == "Bluetooth" and sous_type == "Autre": sous_type = DeviceClassifier.refine_bluetooth_subtype(device_section) elif type_principal == "Stockage": # Always refine storage subtype to distinguish between flash/HDD/card reader sous_type = DeviceClassifier.refine_storage_subtype(device_section) # Build suggested peripheral data # Field mappings per technical specs: # - marque = Vendor string (idVendor line 3rd column) # - modele = Product string (idProduct line 3rd column) # - fabricant = iManufacturer (manufacturer string) # - produit = iProduct (product string) suggested = { "nom": device_info.get("product") or detailed_info.get("product_string") or f"USB Device {device_info['vendor_id']}:{device_info['product_id']}", "type_principal": type_principal, # Intelligently detected "sous_type": sous_type, # Intelligently detected "marque": detailed_info.get("marque") or device_info.get("manufacturer"), "modele": detailed_info.get("modele") or device_info.get("product"), "fabricant": device_info.get("manufacturer"), # iManufacturer "produit": device_info.get("product"), # iProduct "numero_serie": device_info.get("serial"), "cli_yaml": cli_yaml, # Structured data in YAML format "cli_raw": cli_raw, # Raw CLI output formatted as markdown "iManufacturer": device_info.get("manufacturer"), "iProduct": device_info.get("product"), "usb_device_id": _build_usb_device_id(device_info.get("vendor_id"), device_info.get("product_id")), "caracteristiques_specifiques": { "vendor_id": device_info.get("vendor_id"), # idVendor (hex ID) "product_id": device_info.get("product_id"), # idProduct (hex ID) "fabricant": device_info.get("manufacturer"), # iManufacturer / Vendor name "produit": device_info.get("product"), # iProduct string "usb_version_declared": device_info.get("usb_version"), # bcdUSB (declared, not definitive) "usb_type": device_info.get("usb_type"), # Actual USB type from negotiated speed "negotiated_speed": device_info.get("speed"), # e.g., "High Speed", "SuperSpeed" "device_class": device_info.get("device_class"), # bDeviceClass "interface_classes": device_info.get("interface_classes"), # CRITICAL: bInterfaceClass (normative) "requires_firmware": device_info.get("requires_firmware"), # True if class 255 (Vendor Specific) "max_power_ma": device_info.get("max_power"), # MaxPower in mA "is_bus_powered": device_info.get("is_bus_powered"), "is_self_powered": device_info.get("is_self_powered"), "power_sufficient": device_info.get("power_sufficient"), # Based on port capacity } } # Clean up None values suggested = {k: v for k, v in suggested.items() if v is not None} if "caracteristiques_specifiques" in suggested: suggested["caracteristiques_specifiques"] = { k: v for k, v in suggested["caracteristiques_specifiques"].items() if v is not None } # Check for existing peripheral with same vendor_id and product_id existing_peripheral = None vendor_id = suggested.get("caracteristiques_specifiques", {}).get("vendor_id") product_id = suggested.get("caracteristiques_specifiques", {}).get("product_id") if vendor_id and product_id: from app.models.peripheral import Peripheral all_peripherals = db.query(Peripheral).all() for periph in all_peripherals: if periph.caracteristiques_specifiques: p_vendor = periph.caracteristiques_specifiques.get("vendor_id") p_product = periph.caracteristiques_specifiques.get("product_id") if p_vendor == vendor_id and p_product == product_id: existing_peripheral = periph break if existing_peripheral: # Peripheral already exists return { "success": True, "already_exists": True, "existing_peripheral_id": existing_peripheral.id, "existing_peripheral": { "id": existing_peripheral.id, "nom": existing_peripheral.nom, "type_principal": existing_peripheral.type_principal, "marque": existing_peripheral.marque, "modele": existing_peripheral.modele, "quantite_totale": existing_peripheral.quantite_totale, "quantite_disponible": existing_peripheral.quantite_disponible }, "message": f"Un périphérique avec vendor_id={vendor_id} et product_id={product_id} existe déjà" } else: # New peripheral - return suggested data for form pre-fill return { "success": True, "already_exists": False, "device_section": device_section, "parsed_data": device_info, "detailed_data": detailed_info, "suggested_peripheral": suggested } except HTTPException: raise except Exception as e: raise HTTPException( status_code=400, detail=f"Failed to extract device information: {str(e)}" ) @router.post("/import/usb-structured", response_model=dict) def import_structured_usb_info( usb_info: str = Form(...), db: Session = Depends(get_peripherals_db) ): """ Import structured USB information (from GUI tools or formatted text). Parses the information and returns suggested peripheral data with YAML-formatted CLI. Args: usb_info: Structured USB information text (French format supported) Returns: Suggested peripheral data with general fields and YAML CLI section """ try: # Parse structured USB information parsed = parse_structured_usb_info(usb_info) # Generate YAML structured data import yaml cli_yaml = yaml.dump(parsed["cli_yaml"], default_flow_style=False, allow_unicode=True, sort_keys=False) # Format raw output as markdown cli_raw = f"""# Sortie USB structurée ``` {usb_info.strip()} ``` """ # Build device_info for classification (include interface_classes!) device_info = { "vendor_id": parsed["caracteristiques_specifiques"].get("vendor_id"), "product_id": parsed["caracteristiques_specifiques"].get("product_id"), "manufacturer": parsed["caracteristiques_specifiques"].get("fabricant"), "product": parsed["caracteristiques_specifiques"].get("produit"), "device_class": parsed["caracteristiques_specifiques"].get("device_class"), "interface_classes": parsed["caracteristiques_specifiques"].get("interface_classes"), # CRITICAL } # Intelligent classification type_principal, sous_type = DeviceClassifier.classify_device( cli_content=usb_info, synthese_content=None, device_info=device_info ) # Refine subtypes if needed if type_principal == "Bluetooth" and sous_type == "Autre": sous_type = DeviceClassifier.refine_bluetooth_subtype(usb_info) elif type_principal == "Stockage": sous_type = DeviceClassifier.refine_storage_subtype(usb_info) # Build suggested peripheral data # Field mappings per technical specs: # - marque = Vendor string (idVendor line 3rd column) # - modele = Product string (idProduct line 3rd column) # - fabricant = iManufacturer (manufacturer string) # - produit = iProduct (product string) suggested = { "nom": parsed["general"].get("nom", "Périphérique USB"), "type_principal": type_principal, "sous_type": sous_type, "marque": parsed["general"].get("marque"), "modele": parsed["general"].get("modele"), "fabricant": parsed["general"].get("fabricant"), # iManufacturer "produit": parsed["general"].get("produit"), # iProduct "numero_serie": parsed["general"].get("numero_serie"), "cli_yaml": cli_yaml, # Structured data in YAML format "cli_raw": cli_raw, # Raw output formatted as markdown "iManufacturer": parsed["general"].get("fabricant"), "iProduct": parsed["general"].get("produit"), "usb_device_id": _build_usb_device_id( parsed["caracteristiques_specifiques"].get("vendor_id"), parsed["caracteristiques_specifiques"].get("product_id") ), "caracteristiques_specifiques": parsed["caracteristiques_specifiques"] } # Clean up None values suggested = {k: v for k, v in suggested.items() if v is not None} if "caracteristiques_specifiques" in suggested: suggested["caracteristiques_specifiques"] = { k: v for k, v in suggested["caracteristiques_specifiques"].items() if v is not None } # Check for existing peripheral existing_peripheral = None vendor_id = suggested.get("caracteristiques_specifiques", {}).get("vendor_id") product_id = suggested.get("caracteristiques_specifiques", {}).get("product_id") if vendor_id and product_id: from app.models.peripheral import Peripheral all_peripherals = db.query(Peripheral).all() for periph in all_peripherals: if periph.caracteristiques_specifiques: p_vendor = periph.caracteristiques_specifiques.get("vendor_id") p_product = periph.caracteristiques_specifiques.get("product_id") if p_vendor == vendor_id and p_product == product_id: existing_peripheral = periph break if existing_peripheral: return { "success": True, "already_exists": True, "existing_peripheral_id": existing_peripheral.id, "existing_peripheral": { "id": existing_peripheral.id, "nom": existing_peripheral.nom, "type_principal": existing_peripheral.type_principal, "marque": existing_peripheral.marque, "modele": existing_peripheral.modele, "quantite_disponible": existing_peripheral.quantite_disponible } } # New peripheral return { "success": True, "already_exists": False, "suggested_peripheral": suggested, "parsed_data": parsed["cli_yaml"] } except Exception as e: raise HTTPException( status_code=400, detail=f"Failed to parse structured USB information: {str(e)}" )