Files
serv_benchmark/backend/app/utils/usb_info_parser.py
Gilles Soulier c67befc549 addon
2026-01-05 16:08:01 +01:00

373 lines
15 KiB
Python
Executable File

"""
Enhanced USB information parser
Parses structured USB device information (from lsusb -v or GUI tools)
Outputs YAML-formatted CLI section
"""
import re
import yaml
from typing import Dict, Any, Optional, List
def parse_structured_usb_info(text: str) -> Dict[str, Any]:
"""
Parse structured USB information text
Args:
text: Raw USB information (French or English)
Returns:
Dict with general fields and structured CLI data
"""
result = {
"general": {},
"cli_yaml": {},
"caracteristiques_specifiques": {}
}
# Normalize text
lines = text.strip().split('\n')
# ===========================================
# CHAMPS COMMUNS À TOUS (→ caracteristiques_specifiques)
# Per technical specs:
# - marque = Vendor string (3rd column of idVendor)
# - modele = Product string (3rd column of idProduct)
# - fabricant = iManufacturer (manufacturer string)
# - produit = iProduct (product string)
# ===========================================
for line in lines:
line = line.strip()
# Vendor ID - COMMUN
if match := re.search(r'Vendor\s+ID\s*:\s*(0x[0-9a-fA-F]+)\s+(.+)', line):
vid = match.group(1).lower()
result["caracteristiques_specifiques"]["vendor_id"] = vid
vendor_str = match.group(2).strip()
if vendor_str and vendor_str != "0":
result["general"]["marque"] = vendor_str
# Product ID - COMMUN
if match := re.search(r'Product\s+ID\s*:\s*(0x[0-9a-fA-F]+)\s+(.+)', line):
pid = match.group(1).lower()
result["caracteristiques_specifiques"]["product_id"] = pid
product_str = match.group(2).strip()
if product_str and product_str != "0":
result["general"]["modele"] = product_str
# Vendor string - marque
if match := re.search(r'Vendor\s+string\s*:\s*(.+)', line):
vendor = match.group(1).strip()
if vendor and vendor != "0":
result["general"]["marque"] = vendor
# iManufacturer - fabricant
if match := re.search(r'iManufacturer\s*:\s*(.+)', line):
manufacturer = match.group(1).strip()
if manufacturer and manufacturer != "0":
result["caracteristiques_specifiques"]["fabricant"] = manufacturer
result["general"]["fabricant"] = manufacturer
# Product string - modele
if match := re.search(r'Product\s+string\s*:\s*(.+)', line):
product = match.group(1).strip()
if product and product != "0":
result["general"]["modele"] = product
# Also use as nom if not already set
if "nom" not in result["general"]:
result["general"]["nom"] = product
# iProduct - produit
if match := re.search(r'iProduct\s*:\s*(.+)', line):
product = match.group(1).strip()
if product and product != "0":
result["caracteristiques_specifiques"]["produit"] = product
result["general"]["produit"] = product
# Serial number - PARFOIS ABSENT → general seulement si présent
if match := re.search(r'Numéro\s+de\s+série\s*:\s*(.+)', line):
serial = match.group(1).strip()
if serial and "non présent" not in serial.lower() and serial != "0":
result["general"]["numero_serie"] = serial
# USB version (bcdUSB) - DECLARED, not definitive
if match := re.search(r'USB\s+([\d.]+).*bcdUSB\s+([\d.]+)', line):
result["caracteristiques_specifiques"]["usb_version_declared"] = f"USB {match.group(2)}"
# Vitesse négociée - CRITICAL: determines actual USB type
if match := re.search(r'Vitesse\s+négociée\s*:\s*(.+)', line):
speed = match.group(1).strip()
result["caracteristiques_specifiques"]["negotiated_speed"] = speed
# Determine USB type from negotiated speed
speed_lower = speed.lower()
if 'low speed' in speed_lower or '1.5' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 1.1"
elif 'full speed' in speed_lower or '12 mb' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 1.1"
elif 'high speed' in speed_lower or '480 mb' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 2.0"
elif 'superspeed+' in speed_lower or '10 gb' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 3.1"
elif 'superspeed' in speed_lower or '5 gb' in speed_lower:
result["caracteristiques_specifiques"]["usb_type"] = "USB 3.0"
# Classe périphérique (bDeviceClass) - LESS RELIABLE than bInterfaceClass
if match := re.search(r'Classe\s+périphérique\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
class_code = match.group(1)
class_name = match.group(2) if match.group(2) else ""
result["caracteristiques_specifiques"]["device_class"] = class_code
result["caracteristiques_specifiques"]["device_class_nom"] = class_name.strip()
# Sous-classe périphérique
if match := re.search(r'Sous-classe\s+périphérique\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
subclass_code = match.group(1)
subclass_name = match.group(2) if match.group(2) else ""
result["caracteristiques_specifiques"]["device_subclass"] = subclass_code
result["caracteristiques_specifiques"]["device_subclass_nom"] = subclass_name.strip()
# Protocole périphérique
if match := re.search(r'Protocole\s+périphérique\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
protocol_code = match.group(1)
protocol_name = match.group(2) if match.group(2) else ""
result["caracteristiques_specifiques"]["device_protocol"] = protocol_code
result["caracteristiques_specifiques"]["device_protocol_nom"] = protocol_name.strip()
# Puissance maximale (MaxPower)
if match := re.search(r'Puissance\s+maximale.*:\s*(\d+)\s*mA', line):
power_ma = int(match.group(1))
result["caracteristiques_specifiques"]["max_power_ma"] = power_ma
# Determine power sufficiency based on USB type
usb_type = result["caracteristiques_specifiques"].get("usb_type", "USB 2.0")
if "USB 3" in usb_type:
port_capacity = 900 # USB 3.x: 900 mA @ 5V = 4.5W
else:
port_capacity = 500 # USB 2.0: 500 mA @ 5V = 2.5W
result["caracteristiques_specifiques"]["power_sufficient"] = power_ma <= port_capacity
# Mode alimentation (Bus Powered vs Self Powered)
if match := re.search(r'Mode\s+d.alimentation\s*:\s*(.+)', line):
power_mode = match.group(1).strip()
result["caracteristiques_specifiques"]["power_mode"] = power_mode
result["caracteristiques_specifiques"]["is_bus_powered"] = "bus" in power_mode.lower()
result["caracteristiques_specifiques"]["is_self_powered"] = "self" in power_mode.lower()
# ===========================================
# DÉTAILS SPÉCIFIQUES (→ cli_yaml)
# Tous les champs vont aussi dans cli_yaml pour avoir une vue complète
# ===========================================
# Bus & Device
for line in lines:
line = line.strip()
if match := re.search(r'Bus\s*:\s*(\d+)', line):
result["cli_yaml"]["bus"] = match.group(1)
if match := re.search(r'Device\s*:\s*(\d+)', line):
result["cli_yaml"]["device"] = match.group(1)
# Copy all caracteristiques_specifiques to cli_yaml
result["cli_yaml"]["identification"] = {
"vendor_id": result["caracteristiques_specifiques"].get("vendor_id"),
"product_id": result["caracteristiques_specifiques"].get("product_id"),
"vendor_string": result["general"].get("marque"),
"product_string": result["general"].get("modele") or result["general"].get("nom"),
"numero_serie": result["general"].get("numero_serie"),
}
result["cli_yaml"]["usb"] = {
"version": result["caracteristiques_specifiques"].get("usb_version"),
"vitesse_negociee": result["caracteristiques_specifiques"].get("vitesse_negociee"),
}
result["cli_yaml"]["classe"] = {
"device_class": result["caracteristiques_specifiques"].get("device_class"),
"device_class_nom": result["caracteristiques_specifiques"].get("device_class_nom"),
"device_subclass": result["caracteristiques_specifiques"].get("device_subclass"),
"device_subclass_nom": result["caracteristiques_specifiques"].get("device_subclass_nom"),
"device_protocol": result["caracteristiques_specifiques"].get("device_protocol"),
"device_protocol_nom": result["caracteristiques_specifiques"].get("device_protocol_nom"),
}
result["cli_yaml"]["alimentation"] = {
"max_power": result["caracteristiques_specifiques"].get("max_power"),
"power_mode": result["caracteristiques_specifiques"].get("power_mode"),
}
# Extract interface information (CRITICAL for Mass Storage detection)
interfaces = extract_interfaces(text)
if interfaces:
result["cli_yaml"]["interfaces"] = interfaces
# Extract interface classes for classification
interface_classes = []
requires_firmware = False
for iface in interfaces:
if "classe" in iface:
class_code = iface["classe"].get("code")
class_name = iface["classe"].get("nom", "")
interface_classes.append({
"code": class_code,
"name": class_name
})
# Check for Vendor Specific (255) - requires firmware
if class_code == 255:
requires_firmware = True
result["caracteristiques_specifiques"]["interface_classes"] = interface_classes
result["caracteristiques_specifiques"]["requires_firmware"] = requires_firmware
# Extract endpoints
endpoints = extract_endpoints(text)
if endpoints:
result["cli_yaml"]["endpoints"] = endpoints
return result
def extract_interfaces(text: str) -> List[Dict[str, Any]]:
"""
Extract interface information
CRITICAL: bInterfaceClass is normative for Mass Storage detection (class 08)
"""
interfaces = []
lines = text.split('\n')
current_interface = None
for line in lines:
line = line.strip()
# New interface
if match := re.search(r'Interface\s+(\d+)', line):
if current_interface:
interfaces.append(current_interface)
current_interface = {
"numero": int(match.group(1)),
}
if not current_interface:
continue
# Alternate setting
if match := re.search(r'Alternate\s+setting\s*:\s*(\d+)', line):
current_interface["alternate_setting"] = int(match.group(1))
# Number of endpoints
if match := re.search(r'Nombre\s+d.endpoints\s*:\s*(\d+)', line):
current_interface["nombre_endpoints"] = int(match.group(1))
# Interface class (CRITICAL for Mass Storage)
if match := re.search(r'Classe\s+interface\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
class_code = int(match.group(1))
class_name = match.group(2).strip() if match.group(2) else ""
current_interface["classe"] = {
"code": class_code, # Store as int for classifier
"nom": class_name
}
# Interface subclass
if match := re.search(r'Sous-classe\s+interface\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
current_interface["sous_classe"] = {
"code": int(match.group(1)),
"nom": match.group(2).strip() if match.group(2) else ""
}
# Interface protocol
if match := re.search(r'Protocole\s+interface\s*:\s*(\d+)\s*(?:→\s*(.+))?', line):
current_interface["protocole"] = {
"code": int(match.group(1)),
"nom": match.group(2).strip() if match.group(2) else ""
}
if current_interface:
interfaces.append(current_interface)
return interfaces
def extract_endpoints(text: str) -> List[Dict[str, Any]]:
"""Extract endpoint information"""
endpoints = []
lines = text.split('\n')
for line in lines:
line = line.strip()
# Endpoint line: EP 0x81 (IN)
if match := re.search(r'EP\s+(0x[0-9a-fA-F]+)\s*\(([IN|OUT]+)\)', line):
endpoint = {
"adresse": match.group(1).lower(),
"direction": match.group(2)
}
endpoints.append(endpoint)
continue
# Type de transfert
if endpoints and (match := re.search(r'Type(?:\s+de\s+transfert)?\s*:\s*(\w+)', line)):
endpoints[-1]["type_transfert"] = match.group(1)
# Taille max paquet
if endpoints and (match := re.search(r'Taille\s+max\s+paquet\s*:\s*(\d+)\s*octets?', line)):
endpoints[-1]["taille_max_paquet"] = int(match.group(1))
# Interval
if endpoints and (match := re.search(r'Intervalle\s*:\s*(\d+)', line)):
endpoints[-1]["intervalle"] = int(match.group(1))
# bMaxBurst
if endpoints and (match := re.search(r'bMaxBurst\s*:\s*(\d+)', line)):
endpoints[-1]["max_burst"] = int(match.group(1))
return endpoints
def format_cli_as_yaml(cli_data: Dict[str, Any]) -> str:
"""
Format CLI data as YAML string
Args:
cli_data: Parsed CLI data
Returns:
YAML formatted string
"""
if not cli_data:
return ""
# Custom YAML formatting with comments
yaml_str = "# Informations USB extraites\n\n"
yaml_str += yaml.dump(cli_data, allow_unicode=True, sort_keys=False, indent=2, default_flow_style=False)
return yaml_str
def create_full_cli_section(text: str) -> str:
"""
Create a complete CLI section with both YAML and raw output
Args:
text: Raw USB information text
Returns:
Markdown-formatted CLI section with YAML + raw output
"""
parsed = parse_structured_usb_info(text)
cli_section = "# Informations USB\n\n"
# Add YAML section
cli_section += "## Données structurées (YAML)\n\n"
cli_section += "```yaml\n"
cli_section += format_cli_as_yaml(parsed["cli_yaml"])
cli_section += "```\n\n"
# Add raw output section
cli_section += "## Sortie brute\n\n"
cli_section += "```\n"
cli_section += text.strip()
cli_section += "\n```\n"
return cli_section