Files
LANMap/backend/app/routers/ips.py
2025-12-06 12:28:55 +01:00

217 lines
5.1 KiB
Python

"""
Endpoints API pour la gestion des IPs
"""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import List, Optional
from datetime import datetime, timedelta
from backend.app.core.database import get_db
from backend.app.models.ip import IP, IPHistory
from pydantic import BaseModel
router = APIRouter(prefix="/api/ips", tags=["IPs"])
# Schémas Pydantic pour validation
class IPUpdate(BaseModel):
"""Schéma pour mise à jour d'IP"""
name: Optional[str] = None
known: Optional[bool] = None
location: Optional[str] = None
host: Optional[str] = None
class IPResponse(BaseModel):
"""Schéma de réponse IP"""
ip: str
name: Optional[str]
known: bool
location: Optional[str]
host: Optional[str]
first_seen: Optional[datetime]
last_seen: Optional[datetime]
last_status: Optional[str]
mac: Optional[str]
vendor: Optional[str]
hostname: Optional[str]
open_ports: List[int]
class Config:
from_attributes = True
class IPHistoryResponse(BaseModel):
"""Schéma de réponse historique"""
id: int
ip: str
timestamp: datetime
status: str
open_ports: List[int]
class Config:
from_attributes = True
@router.get("/", response_model=List[IPResponse])
async def get_all_ips(
status: Optional[str] = None,
known: Optional[bool] = None,
db: Session = Depends(get_db)
):
"""
Récupère toutes les IPs avec filtres optionnels
Args:
status: Filtrer par statut (online/offline)
known: Filtrer par IPs connues/inconnues
db: Session de base de données
Returns:
Liste des IPs
"""
query = db.query(IP)
if status:
query = query.filter(IP.last_status == status)
if known is not None:
query = query.filter(IP.known == known)
ips = query.all()
return ips
@router.get("/{ip_address}", response_model=IPResponse)
async def get_ip(ip_address: str, db: Session = Depends(get_db)):
"""
Récupère les détails d'une IP spécifique
Args:
ip_address: Adresse IP
db: Session de base de données
Returns:
Détails de l'IP
"""
ip = db.query(IP).filter(IP.ip == ip_address).first()
if not ip:
raise HTTPException(status_code=404, detail="IP non trouvée")
return ip
@router.put("/{ip_address}", response_model=IPResponse)
async def update_ip(
ip_address: str,
ip_update: IPUpdate,
db: Session = Depends(get_db)
):
"""
Met à jour les informations d'une IP
Args:
ip_address: Adresse IP
ip_update: Données à mettre à jour
db: Session de base de données
Returns:
IP mise à jour
"""
ip = db.query(IP).filter(IP.ip == ip_address).first()
if not ip:
raise HTTPException(status_code=404, detail="IP non trouvée")
# Mettre à jour les champs fournis
update_data = ip_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(ip, field, value)
db.commit()
db.refresh(ip)
return ip
@router.delete("/{ip_address}")
async def delete_ip(ip_address: str, db: Session = Depends(get_db)):
"""
Supprime une IP (et son historique)
Args:
ip_address: Adresse IP
db: Session de base de données
Returns:
Message de confirmation
"""
ip = db.query(IP).filter(IP.ip == ip_address).first()
if not ip:
raise HTTPException(status_code=404, detail="IP non trouvée")
db.delete(ip)
db.commit()
return {"message": f"IP {ip_address} supprimée"}
@router.get("/{ip_address}/history", response_model=List[IPHistoryResponse])
async def get_ip_history(
ip_address: str,
hours: int = 24,
db: Session = Depends(get_db)
):
"""
Récupère l'historique d'une IP
Args:
ip_address: Adresse IP
hours: Nombre d'heures d'historique (défaut: 24h)
db: Session de base de données
Returns:
Liste des événements historiques
"""
# Vérifier que l'IP existe
ip = db.query(IP).filter(IP.ip == ip_address).first()
if not ip:
raise HTTPException(status_code=404, detail="IP non trouvée")
# Calculer la date limite
since = datetime.utcnow() - timedelta(hours=hours)
# Récupérer l'historique
history = db.query(IPHistory).filter(
IPHistory.ip == ip_address,
IPHistory.timestamp >= since
).order_by(desc(IPHistory.timestamp)).all()
return history
@router.get("/stats/summary")
async def get_stats(db: Session = Depends(get_db)):
"""
Récupère les statistiques globales du réseau
Returns:
Statistiques (total, online, offline, known, unknown)
"""
total = db.query(IP).count()
online = db.query(IP).filter(IP.last_status == "online").count()
offline = db.query(IP).filter(IP.last_status == "offline").count()
known = db.query(IP).filter(IP.known == True).count()
unknown = db.query(IP).filter(IP.known == False).count()
return {
"total": total,
"online": online,
"offline": offline,
"known": known,
"unknown": unknown
}