Files
LANMap/backend/app/routers/ips.py
2025-12-07 01:16:52 +01:00

408 lines
12 KiB
Python

"""
Endpoints API pour la gestion des IPs
"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import List, Optional
from datetime import datetime, timedelta
import xml.etree.ElementTree as ET
import yaml
from backend.app.core.database import get_db
from backend.app.models.ip import IP, IPHistory
from backend.app.core.config import config_manager
from pydantic import BaseModel
router = APIRouter(prefix="/api/ips", tags=["IPs"])
# Schémas Pydantic pour validation
class IPUpdate(BaseModel):
"""Schéma pour mise à jour d'IP"""
name: Optional[str] = None
known: Optional[bool] = None
location: Optional[str] = None
host: Optional[str] = None
link: Optional[str] = None
class IPResponse(BaseModel):
"""Schéma de réponse IP"""
ip: str
name: Optional[str]
known: bool
location: Optional[str]
host: Optional[str]
first_seen: Optional[datetime]
last_seen: Optional[datetime]
last_status: Optional[str]
mac: Optional[str]
vendor: Optional[str]
hostname: Optional[str]
link: Optional[str]
open_ports: List[int]
class Config:
from_attributes = True
class IPHistoryResponse(BaseModel):
"""Schéma de réponse historique"""
id: int
ip: str
timestamp: datetime
status: str
open_ports: List[int]
class Config:
from_attributes = True
@router.get("/", response_model=List[IPResponse])
async def get_all_ips(
status: Optional[str] = None,
known: Optional[bool] = None,
db: Session = Depends(get_db)
):
"""
Récupère toutes les IPs avec filtres optionnels
Args:
status: Filtrer par statut (online/offline)
known: Filtrer par IPs connues/inconnues
db: Session de base de données
Returns:
Liste des IPs
"""
query = db.query(IP)
if status:
query = query.filter(IP.last_status == status)
if known is not None:
query = query.filter(IP.known == known)
ips = query.all()
return ips
@router.get("/{ip_address}", response_model=IPResponse)
async def get_ip(ip_address: str, db: Session = Depends(get_db)):
"""
Récupère les détails d'une IP spécifique
Args:
ip_address: Adresse IP
db: Session de base de données
Returns:
Détails de l'IP
"""
ip = db.query(IP).filter(IP.ip == ip_address).first()
if not ip:
raise HTTPException(status_code=404, detail="IP non trouvée")
return ip
@router.put("/{ip_address}", response_model=IPResponse)
async def update_ip(
ip_address: str,
ip_update: IPUpdate,
db: Session = Depends(get_db)
):
"""
Met à jour les informations d'une IP
Args:
ip_address: Adresse IP
ip_update: Données à mettre à jour
db: Session de base de données
Returns:
IP mise à jour
"""
ip = db.query(IP).filter(IP.ip == ip_address).first()
if not ip:
raise HTTPException(status_code=404, detail="IP non trouvée")
# Mettre à jour les champs fournis
update_data = ip_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(ip, field, value)
db.commit()
db.refresh(ip)
return ip
@router.delete("/{ip_address}")
async def delete_ip(ip_address: str, db: Session = Depends(get_db)):
"""
Supprime une IP (et son historique)
Args:
ip_address: Adresse IP
db: Session de base de données
Returns:
Message de confirmation
"""
ip = db.query(IP).filter(IP.ip == ip_address).first()
if not ip:
raise HTTPException(status_code=404, detail="IP non trouvée")
db.delete(ip)
db.commit()
return {"message": f"IP {ip_address} supprimée"}
@router.get("/{ip_address}/history", response_model=List[IPHistoryResponse])
async def get_ip_history(
ip_address: str,
hours: int = 24,
db: Session = Depends(get_db)
):
"""
Récupère l'historique d'une IP
Args:
ip_address: Adresse IP
hours: Nombre d'heures d'historique (défaut: 24h)
db: Session de base de données
Returns:
Liste des événements historiques
"""
# Vérifier que l'IP existe
ip = db.query(IP).filter(IP.ip == ip_address).first()
if not ip:
raise HTTPException(status_code=404, detail="IP non trouvée")
# Calculer la date limite
since = datetime.utcnow() - timedelta(hours=hours)
# Récupérer l'historique
history = db.query(IPHistory).filter(
IPHistory.ip == ip_address,
IPHistory.timestamp >= since
).order_by(desc(IPHistory.timestamp)).all()
return history
@router.get("/stats/summary")
async def get_stats(db: Session = Depends(get_db)):
"""
Récupère les statistiques globales du réseau
Returns:
Statistiques (total, online, offline, known, unknown)
"""
total = db.query(IP).count()
online = db.query(IP).filter(IP.last_status == "online").count()
offline = db.query(IP).filter(IP.last_status == "offline").count()
known = db.query(IP).filter(IP.known == True).count()
unknown = db.query(IP).filter(IP.known == False).count()
return {
"total": total,
"online": online,
"offline": offline,
"known": known,
"unknown": unknown
}
@router.get("/config/options")
async def get_config_options():
"""
Récupère les options de configuration (locations, hosts, port_protocols, version)
Returns:
Dictionnaire avec locations, hosts, port_protocols et version
"""
config = config_manager.config
# Récupérer les protocoles de ports depuis la config
port_protocols = {}
if hasattr(config.ports, 'protocols') and config.ports.protocols:
port_protocols = config.ports.protocols
return {
"locations": config.locations,
"hosts": [{"name": h.name, "location": h.location} for h in config.hosts],
"port_protocols": port_protocols,
"version": config.app.version
}
@router.get("/config/content")
async def get_config_content():
"""
Récupère le contenu brut du fichier config.yaml
Returns:
Contenu du fichier YAML
"""
try:
with open("./config.yaml", "r", encoding="utf-8") as f:
content = f.read()
return {"content": content}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur lecture config: {str(e)}")
@router.post("/config/reload")
async def reload_config():
"""
Recharge la configuration depuis config.yaml
Returns:
Message de confirmation
"""
try:
config_manager.reload_config()
return {"message": "Configuration rechargée avec succès"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur rechargement config: {str(e)}")
@router.post("/import/ipscan")
async def import_ipscan(file: UploadFile = File(...), db: Session = Depends(get_db)):
"""
Importe les données depuis un fichier XML Angry IP Scanner
Args:
file: Fichier XML uploadé
db: Session de base de données
Returns:
Statistiques d'import
"""
if not file.filename.endswith('.xml'):
raise HTTPException(status_code=400, detail="Le fichier doit être un XML")
try:
# Lire le contenu du fichier
content = await file.read()
# Essayer de parser le XML avec récupération d'erreurs
try:
root = ET.fromstring(content)
except ET.ParseError as e:
# Si le parsing échoue, essayer de nettoyer le contenu
import re
content_str = content.decode('utf-8', errors='ignore')
# Supprimer les caractères de contrôle invalides (sauf tab, CR, LF)
content_str = ''.join(char for char in content_str
if ord(char) >= 32 or char in '\t\r\n')
try:
root = ET.fromstring(content_str.encode('utf-8'))
except ET.ParseError:
raise HTTPException(status_code=400, detail=f"Fichier XML invalide même après nettoyage: {str(e)}")
imported = 0
updated = 0
errors = []
# Parser chaque host
for host in root.findall('.//host'):
try:
# Extraire l'adresse IP
ip_address = host.get('address')
if not ip_address:
continue
# Extraire les informations
hostname = None
mac = None
vendor = None
ports = []
for result in host.findall('result'):
name = result.get('name')
value = result.text.strip() if result.text else ""
# Nettoyer les valeurs [n/a]
if value == "[n/a]":
value = None
if name == "Nom d'hôte" and value:
hostname = value
elif name == "Adresse MAC" and value:
mac = value
elif name == "Constructeur MAC" and value:
vendor = value
elif name == "Ports" and value:
# Parser les ports (format: "22,80,443")
try:
ports = [int(p.strip()) for p in value.split(',') if p.strip().isdigit()]
except Exception as e:
ports = []
# Vérifier si l'IP existe déjà
existing_ip = db.query(IP).filter(IP.ip == ip_address).first()
if existing_ip:
# Mettre à jour avec de nouvelles informations
if hostname:
if not existing_ip.hostname:
existing_ip.hostname = hostname
if not existing_ip.name:
existing_ip.name = hostname
if mac and not existing_ip.mac:
existing_ip.mac = mac
# Toujours mettre à jour vendor et ports depuis IPScan (plus complet et à jour)
if vendor:
existing_ip.vendor = vendor
if ports:
existing_ip.open_ports = ports
existing_ip.last_status = "online"
existing_ip.last_seen = datetime.utcnow()
updated += 1
else:
# Créer une nouvelle entrée
new_ip = IP(
ip=ip_address,
name=hostname,
hostname=hostname,
mac=mac,
vendor=vendor,
open_ports=ports or [],
last_status="online",
known=False,
first_seen=datetime.utcnow(),
last_seen=datetime.utcnow()
)
db.add(new_ip)
imported += 1
except Exception as e:
errors.append(f"Erreur pour {ip_address}: {str(e)}")
continue
# Commit des changements
db.commit()
return {
"message": "Import terminé",
"imported": imported,
"updated": updated,
"errors": errors[:10] # Limiter à 10 erreurs
}
except ET.ParseError as e:
raise HTTPException(status_code=400, detail=f"Fichier XML invalide: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur import: {str(e)}")