Files
ipwatch/mqtt/client/ipwatch_mqtt_agent.py
2026-02-07 16:57:37 +01:00

338 lines
11 KiB
Python

#!/usr/bin/env python3
"""
IPWatch MQTT Agent
Agent à installer sur chaque machine pour recevoir les commandes shutdown/reboot via MQTT
Installation:
pip install paho-mqtt psutil netifaces
Configuration:
Créer /etc/ipwatch/mqtt-agent.conf avec:
[mqtt]
broker = localhost
port = 1883
username =
password =
[agent]
hostname = auto
check_interval = 30
"""
import paho.mqtt.client as mqtt
import platform
import os
import sys
import subprocess
import json
import time
import socket
import configparser
import logging
from datetime import datetime
from pathlib import Path
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/ipwatch-mqtt-agent.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('ipwatch-mqtt-agent')
class IPWatchMQTTAgent:
"""Agent MQTT pour recevoir les commandes de contrôle système"""
def __init__(self, config_file='/etc/ipwatch/mqtt-agent.conf'):
self.config = self.load_config(config_file)
self.hostname = self.get_hostname()
self.ip_address = self.get_ip_address()
# Topics MQTT
self.base_topic = f"ipwatch/device/{self.ip_address}"
self.command_topic = f"{self.base_topic}/command"
self.status_topic = f"{self.base_topic}/status"
self.availability_topic = f"{self.base_topic}/availability"
# Client MQTT
self.client = mqtt.Client(client_id=f"ipwatch-agent-{self.hostname}")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
# Will message (si l'agent se déconnecte brutalement)
self.client.will_set(
self.availability_topic,
payload="offline",
qos=1,
retain=True
)
def load_config(self, config_file):
"""Charge la configuration depuis le fichier"""
config = configparser.ConfigParser()
if not Path(config_file).exists():
logger.warning(f"Fichier de configuration {config_file} introuvable, utilisation des valeurs par défaut")
return {
'broker': 'localhost',
'port': 1883,
'username': None,
'password': None,
'check_interval': 30
}
config.read(config_file)
return {
'broker': config.get('mqtt', 'broker', fallback='localhost'),
'port': config.getint('mqtt', 'port', fallback=1883),
'username': config.get('mqtt', 'username', fallback=None),
'password': config.get('mqtt', 'password', fallback=None),
'check_interval': config.getint('agent', 'check_interval', fallback=30)
}
def get_hostname(self):
"""Récupère le hostname de la machine"""
return platform.node()
def get_ip_address(self):
"""Récupère l'adresse IP principale de la machine"""
try:
# Créer une socket pour déterminer l'IP locale
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception as e:
logger.error(f"Erreur récupération IP: {e}")
return "127.0.0.1"
def on_connect(self, client, userdata, flags, rc):
"""Callback lors de la connexion au broker MQTT"""
if rc == 0:
logger.info(f"✓ Connecté au broker MQTT {self.config['broker']}:{self.config['port']}")
# S'abonner au topic des commandes
client.subscribe(self.command_topic)
client.subscribe(f"ipwatch/device/+/command") # Pour broadcast
logger.info(f"✓ Abonné à {self.command_topic}")
# Publier la disponibilité
client.publish(self.availability_topic, "online", qos=1, retain=True)
# Publier le statut initial
self.publish_status()
else:
logger.error(f"✗ Échec connexion MQTT, code: {rc}")
def on_disconnect(self, client, userdata, rc):
"""Callback lors de la déconnexion"""
if rc != 0:
logger.warning(f"⚠ Déconnexion inattendue du broker MQTT (code {rc})")
def on_message(self, client, userdata, msg):
"""Callback lors de la réception d'un message"""
try:
payload = msg.payload.decode('utf-8')
logger.info(f"→ Message reçu sur {msg.topic}: {payload}")
# Parser le message JSON
try:
command_data = json.loads(payload)
command = command_data.get('command', payload) # Support format simple ou JSON
except json.JSONDecodeError:
command = payload # Format texte simple
# Exécuter la commande
self.execute_command(command)
except Exception as e:
logger.error(f"✗ Erreur traitement message: {e}")
def execute_command(self, command):
"""Exécute une commande système"""
logger.info(f"⚙ Exécution commande: {command}")
try:
if command == "shutdown":
self.shutdown()
elif command == "reboot":
self.reboot()
elif command == "status":
self.publish_status()
else:
logger.warning(f"⚠ Commande inconnue: {command}")
self.publish_response(f"Commande inconnue: {command}", success=False)
except Exception as e:
logger.error(f"✗ Erreur exécution commande: {e}")
self.publish_response(str(e), success=False)
def shutdown(self):
"""Éteint la machine"""
logger.warning("🔴 Shutdown demandé, arrêt dans 5 secondes...")
self.publish_response("Shutdown en cours...", success=True)
time.sleep(1)
# Publier offline avant l'arrêt
self.client.publish(self.availability_topic, "offline", qos=1, retain=True)
time.sleep(1)
# Commande d'arrêt selon l'OS
if platform.system() == "Windows":
subprocess.run(["shutdown", "/s", "/t", "5"])
else:
subprocess.run(["sudo", "shutdown", "-h", "+0"])
def reboot(self):
"""Redémarre la machine"""
logger.warning("🔄 Reboot demandé, redémarrage dans 5 secondes...")
self.publish_response("Reboot en cours...", success=True)
time.sleep(1)
# Publier offline avant le redémarrage
self.client.publish(self.availability_topic, "offline", qos=1, retain=True)
time.sleep(1)
# Commande de redémarrage selon l'OS
if platform.system() == "Windows":
subprocess.run(["shutdown", "/r", "/t", "5"])
else:
subprocess.run(["sudo", "reboot"])
def publish_status(self):
"""Publie le statut de la machine"""
try:
import psutil
status = {
"hostname": self.hostname,
"ip": self.ip_address,
"platform": platform.system(),
"platform_version": platform.version(),
"uptime": time.time() - psutil.boot_time(),
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
"timestamp": datetime.now().isoformat()
}
self.client.publish(
self.status_topic,
json.dumps(status),
qos=1,
retain=False
)
logger.info(f"✓ Statut publié")
except ImportError:
logger.warning("psutil non installé, statut limité")
status = {
"hostname": self.hostname,
"ip": self.ip_address,
"timestamp": datetime.now().isoformat()
}
self.client.publish(self.status_topic, json.dumps(status), qos=1)
def publish_response(self, message, success=True):
"""Publie une réponse sur le topic de statut"""
response = {
"success": success,
"message": message,
"timestamp": datetime.now().isoformat()
}
self.client.publish(
f"{self.base_topic}/response",
json.dumps(response),
qos=1
)
def run(self):
"""Démarre l'agent"""
try:
# Authentification si configurée
if self.config['username'] and self.config['password']:
self.client.username_pw_set(
self.config['username'],
self.config['password']
)
# Connexion au broker
logger.info(f"→ Connexion au broker MQTT {self.config['broker']}:{self.config['port']}...")
self.client.connect(
self.config['broker'],
self.config['port'],
keepalive=60
)
# Démarrer la boucle MQTT
self.client.loop_start()
# Publier le statut périodiquement
logger.info(f"✓ Agent IPWatch MQTT démarré sur {self.ip_address}")
logger.info(f" Topics: {self.command_topic} | {self.status_topic}")
while True:
time.sleep(self.config['check_interval'])
self.publish_status()
except KeyboardInterrupt:
logger.info("\n→ Arrêt demandé par l'utilisateur")
self.stop()
except Exception as e:
logger.error(f"✗ Erreur fatale: {e}")
self.stop()
sys.exit(1)
def stop(self):
"""Arrête l'agent proprement"""
logger.info("→ Arrêt de l'agent...")
self.client.publish(self.availability_topic, "offline", qos=1, retain=True)
self.client.loop_stop()
self.client.disconnect()
logger.info("✓ Agent arrêté")
def main():
"""Point d'entrée principal"""
import argparse
parser = argparse.ArgumentParser(description='IPWatch MQTT Agent')
parser.add_argument(
'-c', '--config',
default='/etc/ipwatch/mqtt-agent.conf',
help='Fichier de configuration (défaut: /etc/ipwatch/mqtt-agent.conf)'
)
parser.add_argument(
'--test',
action='store_true',
help='Mode test (affiche la config et quitte)'
)
args = parser.parse_args()
agent = IPWatchMQTTAgent(config_file=args.config)
if args.test:
print(f"Configuration:")
print(f" Broker: {agent.config['broker']}:{agent.config['port']}")
print(f" Hostname: {agent.hostname}")
print(f" IP: {agent.ip_address}")
print(f" Command topic: {agent.command_topic}")
print(f" Status topic: {agent.status_topic}")
return
agent.run()
if __name__ == "__main__":
main()