""" Endpoints API pour la gestion des IPs """ from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from sqlalchemy.orm import Session from sqlalchemy import desc from typing import List, Optional from datetime import datetime, timedelta import xml.etree.ElementTree as ET import yaml from backend.app.core.database import get_db from backend.app.models.ip import IP, IPHistory from backend.app.core.config import config_manager from pydantic import BaseModel router = APIRouter(prefix="/api/ips", tags=["IPs"]) # Schémas Pydantic pour validation class IPUpdate(BaseModel): """Schéma pour mise à jour d'IP""" name: Optional[str] = None known: Optional[bool] = None location: Optional[str] = None host: Optional[str] = None link: Optional[str] = None class IPResponse(BaseModel): """Schéma de réponse IP""" ip: str name: Optional[str] known: bool location: Optional[str] host: Optional[str] first_seen: Optional[datetime] last_seen: Optional[datetime] last_status: Optional[str] mac: Optional[str] vendor: Optional[str] hostname: Optional[str] link: Optional[str] open_ports: List[int] class Config: from_attributes = True class IPHistoryResponse(BaseModel): """Schéma de réponse historique""" id: int ip: str timestamp: datetime status: str open_ports: List[int] class Config: from_attributes = True @router.get("/", response_model=List[IPResponse]) async def get_all_ips( status: Optional[str] = None, known: Optional[bool] = None, db: Session = Depends(get_db) ): """ Récupère toutes les IPs avec filtres optionnels Args: status: Filtrer par statut (online/offline) known: Filtrer par IPs connues/inconnues db: Session de base de données Returns: Liste des IPs """ query = db.query(IP) if status: query = query.filter(IP.last_status == status) if known is not None: query = query.filter(IP.known == known) ips = query.all() return ips @router.get("/{ip_address}", response_model=IPResponse) async def get_ip(ip_address: str, db: Session = Depends(get_db)): """ Récupère les détails d'une IP spécifique Args: ip_address: Adresse IP db: Session de base de données Returns: Détails de l'IP """ ip = db.query(IP).filter(IP.ip == ip_address).first() if not ip: raise HTTPException(status_code=404, detail="IP non trouvée") return ip @router.put("/{ip_address}", response_model=IPResponse) async def update_ip( ip_address: str, ip_update: IPUpdate, db: Session = Depends(get_db) ): """ Met à jour les informations d'une IP Args: ip_address: Adresse IP ip_update: Données à mettre à jour db: Session de base de données Returns: IP mise à jour """ ip = db.query(IP).filter(IP.ip == ip_address).first() if not ip: raise HTTPException(status_code=404, detail="IP non trouvée") # Mettre à jour les champs fournis update_data = ip_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(ip, field, value) db.commit() db.refresh(ip) return ip @router.delete("/{ip_address}") async def delete_ip(ip_address: str, db: Session = Depends(get_db)): """ Supprime une IP (et son historique) Args: ip_address: Adresse IP db: Session de base de données Returns: Message de confirmation """ ip = db.query(IP).filter(IP.ip == ip_address).first() if not ip: raise HTTPException(status_code=404, detail="IP non trouvée") db.delete(ip) db.commit() return {"message": f"IP {ip_address} supprimée"} @router.get("/{ip_address}/history", response_model=List[IPHistoryResponse]) async def get_ip_history( ip_address: str, hours: int = 24, db: Session = Depends(get_db) ): """ Récupère l'historique d'une IP Args: ip_address: Adresse IP hours: Nombre d'heures d'historique (défaut: 24h) db: Session de base de données Returns: Liste des événements historiques """ # Vérifier que l'IP existe ip = db.query(IP).filter(IP.ip == ip_address).first() if not ip: raise HTTPException(status_code=404, detail="IP non trouvée") # Calculer la date limite since = datetime.utcnow() - timedelta(hours=hours) # Récupérer l'historique history = db.query(IPHistory).filter( IPHistory.ip == ip_address, IPHistory.timestamp >= since ).order_by(desc(IPHistory.timestamp)).all() return history @router.get("/stats/summary") async def get_stats(db: Session = Depends(get_db)): """ Récupère les statistiques globales du réseau Returns: Statistiques (total, online, offline, known, unknown) """ total = db.query(IP).count() online = db.query(IP).filter(IP.last_status == "online").count() offline = db.query(IP).filter(IP.last_status == "offline").count() known = db.query(IP).filter(IP.known == True).count() unknown = db.query(IP).filter(IP.known == False).count() return { "total": total, "online": online, "offline": offline, "known": known, "unknown": unknown } @router.get("/config/options") async def get_config_options(): """ Récupère les options de configuration (locations, hosts, port_protocols, version) Returns: Dictionnaire avec locations, hosts, port_protocols et version """ config = config_manager.config # Récupérer les protocoles de ports depuis la config port_protocols = {} if hasattr(config.ports, 'protocols') and config.ports.protocols: port_protocols = config.ports.protocols return { "locations": config.locations, "hosts": [{"name": h.name, "location": h.location} for h in config.hosts], "port_protocols": port_protocols, "version": config.app.version } @router.get("/config/content") async def get_config_content(): """ Récupère le contenu brut du fichier config.yaml Returns: Contenu du fichier YAML """ try: with open("./config.yaml", "r", encoding="utf-8") as f: content = f.read() return {"content": content} except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur lecture config: {str(e)}") @router.post("/config/reload") async def reload_config(): """ Recharge la configuration depuis config.yaml Returns: Message de confirmation """ try: config_manager.reload_config() return {"message": "Configuration rechargée avec succès"} except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur rechargement config: {str(e)}") @router.post("/import/ipscan") async def import_ipscan(file: UploadFile = File(...), db: Session = Depends(get_db)): """ Importe les données depuis un fichier XML Angry IP Scanner Args: file: Fichier XML uploadé db: Session de base de données Returns: Statistiques d'import """ if not file.filename.endswith('.xml'): raise HTTPException(status_code=400, detail="Le fichier doit être un XML") try: # Lire le contenu du fichier content = await file.read() # Essayer de parser le XML avec récupération d'erreurs try: root = ET.fromstring(content) except ET.ParseError as e: # Si le parsing échoue, essayer de nettoyer le contenu import re content_str = content.decode('utf-8', errors='ignore') # Supprimer les caractères de contrôle invalides (sauf tab, CR, LF) content_str = ''.join(char for char in content_str if ord(char) >= 32 or char in '\t\r\n') try: root = ET.fromstring(content_str.encode('utf-8')) except ET.ParseError: raise HTTPException(status_code=400, detail=f"Fichier XML invalide même après nettoyage: {str(e)}") imported = 0 updated = 0 errors = [] # Parser chaque host for host in root.findall('.//host'): try: # Extraire l'adresse IP ip_address = host.get('address') if not ip_address: continue # Extraire les informations hostname = None mac = None vendor = None ports = [] for result in host.findall('result'): name = result.get('name') value = result.text.strip() if result.text else "" # Nettoyer les valeurs [n/a] if value == "[n/a]": value = None if name == "Nom d'hôte" and value: hostname = value elif name == "Adresse MAC" and value: mac = value elif name == "Constructeur MAC" and value: vendor = value elif name == "Ports" and value: # Parser les ports (format: "22,80,443") try: ports = [int(p.strip()) for p in value.split(',') if p.strip().isdigit()] except Exception as e: ports = [] # Vérifier si l'IP existe déjà existing_ip = db.query(IP).filter(IP.ip == ip_address).first() if existing_ip: # Mettre à jour avec de nouvelles informations if hostname: if not existing_ip.hostname: existing_ip.hostname = hostname if not existing_ip.name: existing_ip.name = hostname if mac and not existing_ip.mac: existing_ip.mac = mac # Toujours mettre à jour vendor et ports depuis IPScan (plus complet et à jour) if vendor: existing_ip.vendor = vendor if ports: existing_ip.open_ports = ports existing_ip.last_status = "online" existing_ip.last_seen = datetime.utcnow() updated += 1 else: # Créer une nouvelle entrée new_ip = IP( ip=ip_address, name=hostname, hostname=hostname, mac=mac, vendor=vendor, open_ports=ports or [], last_status="online", known=False, first_seen=datetime.utcnow(), last_seen=datetime.utcnow() ) db.add(new_ip) imported += 1 except Exception as e: errors.append(f"Erreur pour {ip_address}: {str(e)}") continue # Commit des changements db.commit() return { "message": "Import terminé", "imported": imported, "updated": updated, "errors": errors[:10] # Limiter à 10 erreurs } except ET.ParseError as e: raise HTTPException(status_code=400, detail=f"Fichier XML invalide: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur import: {str(e)}")