Files
proxmox_list/prox-visualizer/backend/app/services/proxmox_api.py
T
2026-06-07 11:33:20 +02:00

165 lines
5.7 KiB
Python

from typing import Any, Dict, List, Optional
import httpx
from app.core.config import get_settings
from app.models.schemas import ProxmoxServerConfig, ServerScanStatus, VmRecord
from app.services.metadata import extract_duplicate_id
class ProxmoxClient:
def __init__(self, server: ProxmoxServerConfig):
self.server = server
self.base_url = str(server.url).rstrip("/")
self.errors: List[str] = []
async def __aenter__(self) -> "ProxmoxClient":
settings = get_settings()
self.client = httpx.AsyncClient(
base_url=f"{self.base_url}/api2/json",
headers={
"Authorization": (
f"PVEAPIToken={self.server.token_name}={self.server.token_value}"
)
},
timeout=settings.PROXMOX_TIMEOUT,
verify=settings.PROXMOX_VERIFY_SSL,
)
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self.client.aclose()
async def scan(self) -> tuple[ServerScanStatus, List[VmRecord]]:
items = await self._scan_nodes()
if not items:
fallback_items = await self._scan_cluster_resources()
items.extend(fallback_items)
if not items and await self._has_no_effective_permissions():
self.errors.append(
"permissions effectives vides pour ce token; Proxmox filtre probablement les VM/LXC"
)
status = ServerScanStatus(
name=self.server.name,
url=self.base_url,
ok=not self.errors or bool(items),
vm_count=len(items),
errors=self.errors,
)
return status, items
async def _has_no_effective_permissions(self) -> bool:
permissions = await self._get_data("/access/permissions?path=/", optional=True)
if not isinstance(permissions, dict):
return False
if not permissions:
return True
return all(not value for value in permissions.values())
async def _get_data(self, path: str, *, optional: bool = False) -> Optional[Any]:
try:
response = await self.client.get(path)
response.raise_for_status()
payload = response.json()
except httpx.HTTPStatusError as exc:
message = self._format_http_error(path, exc)
if optional and exc.response.status_code in {401, 403, 404}:
return None
self.errors.append(message)
return None
except httpx.HTTPError as exc:
self.errors.append(f"{path}: {exc}")
return None
except ValueError:
self.errors.append(f"{path}: réponse JSON invalide")
return None
return payload.get("data")
def _format_http_error(self, path: str, exc: httpx.HTTPStatusError) -> str:
detail = exc.response.text.strip()
if len(detail) > 160:
detail = f"{detail[:157]}..."
return f"{path}: HTTP {exc.response.status_code} {detail}".strip()
async def _scan_nodes(self) -> List[VmRecord]:
nodes = await self._get_data("/nodes")
if not isinstance(nodes, list):
return []
items: List[VmRecord] = []
for node in nodes:
node_name = node.get("node")
if not node_name:
continue
items.extend(await self._scan_node_vms(node_name, "qemu"))
items.extend(await self._scan_node_vms(node_name, "lxc"))
return items
async def _scan_node_vms(self, node: str, vm_type: str) -> List[VmRecord]:
path = f"/nodes/{node}/{vm_type}"
records = await self._get_data(path)
if not isinstance(records, list):
return []
items = []
for record in records:
vmid = record.get("vmid")
if vmid is None:
continue
config = await self._get_data(
f"{path}/{vmid}/config",
optional=True,
)
items.append(self._to_vm_record(node, vm_type, record, config))
return items
async def _scan_cluster_resources(self) -> List[VmRecord]:
resources = await self._get_data("/cluster/resources?type=vm")
if not isinstance(resources, list):
return []
items = []
for resource in resources:
vm_type = resource.get("type")
if vm_type not in {"qemu", "lxc"}:
continue
node = resource.get("node") or "unknown"
items.append(self._to_vm_record(node, vm_type, resource, None))
return items
def _to_vm_record(
self,
node: str,
vm_type: str,
record: Dict[str, Any],
config: Optional[Dict[str, Any]],
) -> VmRecord:
vmid = int(record["vmid"])
description = None
tags = record.get("tags")
if isinstance(config, dict):
description = config.get("description")
tags = config.get("tags", tags)
return VmRecord(
id=f"{self.server.name}:{node}:{vm_type}:{vmid}",
server=self.server.name,
node=node,
vmid=vmid,
type=vm_type,
name=record.get("name") or record.get("hostname") or f"{vm_type}-{vmid}",
status=record.get("status"),
cpu=record.get("cpu"),
mem=record.get("mem"),
maxmem=record.get("maxmem"),
disk=record.get("disk"),
maxdisk=record.get("maxdisk"),
uptime=record.get("uptime"),
tags=tags,
duplicate_id=extract_duplicate_id(description),
description_available=description is not None,
raw=record,
)