296 lines
8.9 KiB
Python
296 lines
8.9 KiB
Python
"""
|
|
Modules réseau pour scan d'IP, ping, ARP et port scan
|
|
Implémente le workflow de scan selon workflow-scan.md
|
|
"""
|
|
import asyncio
|
|
import ipaddress
|
|
import platform
|
|
import subprocess
|
|
import socket
|
|
from typing import List, Dict, Optional, Tuple
|
|
from datetime import datetime
|
|
import re
|
|
|
|
# Scapy pour ARP
|
|
try:
|
|
from scapy.all import ARP, Ether, srp
|
|
SCAPY_AVAILABLE = True
|
|
except ImportError:
|
|
SCAPY_AVAILABLE = False
|
|
|
|
|
|
class NetworkScanner:
|
|
"""Scanner réseau principal"""
|
|
|
|
def __init__(self, cidr: str, timeout: float = 1.0):
|
|
"""
|
|
Initialise le scanner réseau
|
|
|
|
Args:
|
|
cidr: Réseau CIDR (ex: "192.168.1.0/24")
|
|
timeout: Timeout pour ping et connexions (secondes)
|
|
"""
|
|
self.cidr = cidr
|
|
self.timeout = timeout
|
|
self.network = ipaddress.ip_network(cidr, strict=False)
|
|
|
|
def generate_ip_list(self) -> List[str]:
|
|
"""
|
|
Génère la liste complète d'IP depuis le CIDR
|
|
|
|
Returns:
|
|
Liste des adresses IP en string
|
|
"""
|
|
return [str(ip) for ip in self.network.hosts()]
|
|
|
|
async def ping(self, ip: str) -> bool:
|
|
"""
|
|
Ping une adresse IP (async)
|
|
|
|
Args:
|
|
ip: Adresse IP à pinger
|
|
|
|
Returns:
|
|
True si l'IP répond, False sinon
|
|
"""
|
|
# Détection de l'OS pour la commande ping
|
|
param = '-n' if platform.system().lower() == 'windows' else '-c'
|
|
timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
|
|
|
|
command = ['ping', param, '1', timeout_param, str(int(self.timeout * 1000) if platform.system().lower() == 'windows' else str(int(self.timeout))), ip]
|
|
|
|
try:
|
|
# Exécuter le ping de manière asynchrone
|
|
process = await asyncio.create_subprocess_exec(
|
|
*command,
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL
|
|
)
|
|
await asyncio.wait_for(process.wait(), timeout=self.timeout + 1)
|
|
return process.returncode == 0
|
|
except (asyncio.TimeoutError, Exception):
|
|
return False
|
|
|
|
async def ping_parallel(self, ip_list: List[str], max_concurrent: int = 50) -> Dict[str, bool]:
|
|
"""
|
|
Ping multiple IPs en parallèle
|
|
|
|
Args:
|
|
ip_list: Liste des IPs à pinger
|
|
max_concurrent: Nombre maximum de pings simultanés
|
|
|
|
Returns:
|
|
Dictionnaire {ip: online_status}
|
|
"""
|
|
results = {}
|
|
semaphore = asyncio.Semaphore(max_concurrent)
|
|
|
|
async def ping_with_semaphore(ip: str):
|
|
async with semaphore:
|
|
results[ip] = await self.ping(ip)
|
|
|
|
# Lancer tous les pings en parallèle avec limite
|
|
await asyncio.gather(*[ping_with_semaphore(ip) for ip in ip_list])
|
|
|
|
return results
|
|
|
|
def get_arp_table(self) -> Dict[str, Tuple[str, str]]:
|
|
"""
|
|
Récupère la table ARP du système
|
|
|
|
Returns:
|
|
Dictionnaire {ip: (mac, vendor)}
|
|
"""
|
|
arp_data = {}
|
|
|
|
if SCAPY_AVAILABLE:
|
|
try:
|
|
# Utiliser Scapy pour ARP scan
|
|
answered, _ = srp(
|
|
Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=self.cidr),
|
|
timeout=2,
|
|
verbose=False
|
|
)
|
|
|
|
for sent, received in answered:
|
|
ip = received.psrc
|
|
mac = received.hwsrc
|
|
vendor = self._get_mac_vendor(mac)
|
|
arp_data[ip] = (mac, vendor)
|
|
except Exception as e:
|
|
print(f"Erreur ARP scan avec Scapy: {e}")
|
|
else:
|
|
# Fallback: parser la table ARP système
|
|
try:
|
|
if platform.system().lower() == 'windows':
|
|
output = subprocess.check_output(['arp', '-a'], text=True)
|
|
pattern = r'(\d+\.\d+\.\d+\.\d+)\s+([0-9a-fA-F-:]+)'
|
|
else:
|
|
output = subprocess.check_output(['arp', '-n'], text=True)
|
|
pattern = r'(\d+\.\d+\.\d+\.\d+)\s+\w+\s+([0-9a-fA-F:]+)'
|
|
|
|
matches = re.findall(pattern, output)
|
|
for ip, mac in matches:
|
|
if ip in [str(h) for h in self.network.hosts()]:
|
|
vendor = self._get_mac_vendor(mac)
|
|
arp_data[ip] = (mac, vendor)
|
|
except Exception as e:
|
|
print(f"Erreur lecture table ARP: {e}")
|
|
|
|
return arp_data
|
|
|
|
def _get_mac_vendor(self, mac: str) -> str:
|
|
"""
|
|
Lookup du fabricant depuis l'adresse MAC
|
|
Simplifié pour l'instant - peut être étendu avec une vraie DB OUI
|
|
|
|
Args:
|
|
mac: Adresse MAC
|
|
|
|
Returns:
|
|
Nom du fabricant ou "Unknown"
|
|
"""
|
|
# TODO: Implémenter lookup OUI complet
|
|
# Pour l'instant, retourne un placeholder
|
|
mac_prefix = mac[:8].upper().replace(':', '').replace('-', '')
|
|
|
|
# Mini DB des fabricants courants
|
|
vendors = {
|
|
"00:0C:29": "VMware",
|
|
"00:50:56": "VMware",
|
|
"08:00:27": "VirtualBox",
|
|
"DC:A6:32": "Raspberry Pi",
|
|
"B8:27:EB": "Raspberry Pi",
|
|
}
|
|
|
|
for prefix, vendor in vendors.items():
|
|
if mac.upper().startswith(prefix.replace(':', '')):
|
|
return vendor
|
|
|
|
return "Unknown"
|
|
|
|
async def scan_ports(self, ip: str, ports: List[int]) -> List[int]:
|
|
"""
|
|
Scan des ports TCP sur une IP
|
|
|
|
Args:
|
|
ip: Adresse IP cible
|
|
ports: Liste des ports à scanner
|
|
|
|
Returns:
|
|
Liste des ports ouverts
|
|
"""
|
|
open_ports = []
|
|
|
|
async def check_port(port: int) -> Optional[int]:
|
|
try:
|
|
# Tentative de connexion TCP
|
|
reader, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(ip, port),
|
|
timeout=self.timeout
|
|
)
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return port
|
|
except:
|
|
return None
|
|
|
|
# Scanner tous les ports en parallèle
|
|
results = await asyncio.gather(*[check_port(p) for p in ports])
|
|
open_ports = [p for p in results if p is not None]
|
|
|
|
return open_ports
|
|
|
|
def get_hostname(self, ip: str) -> Optional[str]:
|
|
"""
|
|
Résolution DNS inversée pour obtenir le hostname
|
|
|
|
Args:
|
|
ip: Adresse IP
|
|
|
|
Returns:
|
|
Hostname ou None
|
|
"""
|
|
try:
|
|
hostname, _, _ = socket.gethostbyaddr(ip)
|
|
return hostname
|
|
except:
|
|
return None
|
|
|
|
def classify_ip_status(self, is_online: bool, is_known: bool) -> str:
|
|
"""
|
|
Classification de l'état d'une IP
|
|
|
|
Args:
|
|
is_online: IP en ligne
|
|
is_known: IP connue dans la config
|
|
|
|
Returns:
|
|
État: "online", "offline"
|
|
"""
|
|
return "online" if is_online else "offline"
|
|
|
|
async def full_scan(self, known_ips: Dict[str, Dict], port_list: List[int], max_concurrent: int = 50) -> Dict[str, Dict]:
|
|
"""
|
|
Scan complet du réseau selon workflow-scan.md
|
|
|
|
Args:
|
|
known_ips: Dictionnaire des IPs connues depuis config
|
|
port_list: Liste des ports à scanner
|
|
max_concurrent: Pings simultanés max
|
|
|
|
Returns:
|
|
Dictionnaire des résultats de scan pour chaque IP
|
|
"""
|
|
results = {}
|
|
|
|
# 1. Générer liste IP du CIDR
|
|
ip_list = self.generate_ip_list()
|
|
|
|
# 2. Ping parallélisé
|
|
ping_results = await self.ping_parallel(ip_list, max_concurrent)
|
|
|
|
# 3. ARP + MAC vendor
|
|
arp_table = self.get_arp_table()
|
|
|
|
# 4. Pour chaque IP
|
|
for ip in ip_list:
|
|
is_online = ping_results.get(ip, False)
|
|
is_known = ip in known_ips
|
|
|
|
ip_data = {
|
|
"ip": ip,
|
|
"known": is_known,
|
|
"last_status": self.classify_ip_status(is_online, is_known),
|
|
"last_seen": datetime.utcnow() if is_online else None,
|
|
"mac": None,
|
|
"vendor": None,
|
|
"hostname": None,
|
|
"open_ports": [],
|
|
}
|
|
|
|
# Ajouter infos connues
|
|
if is_known:
|
|
ip_data.update(known_ips[ip])
|
|
|
|
# Infos ARP
|
|
if ip in arp_table:
|
|
mac, vendor = arp_table[ip]
|
|
ip_data["mac"] = mac
|
|
ip_data["vendor"] = vendor
|
|
|
|
# Hostname
|
|
if is_online:
|
|
hostname = self.get_hostname(ip)
|
|
if hostname:
|
|
ip_data["hostname"] = hostname
|
|
|
|
# 5. Port scan (uniquement si online)
|
|
if is_online and port_list:
|
|
open_ports = await self.scan_ports(ip, port_list)
|
|
ip_data["open_ports"] = open_ports
|
|
|
|
results[ip] = ip_data
|
|
|
|
return results
|