14 KiB
14 KiB
📝 Consignes de Codage MQTT pour IPWatch
Ce document définit les standards de développement pour l'écosystème MQTT d'IPWatch.
1. Conventions de Nommage
1.1 Topics MQTT
Format : ipwatch/device/{IP_ADDRESS}/{category}
Règles :
- ✅ Utiliser des minuscules uniquement
- ✅ Utiliser
/comme séparateur hiérarchique - ✅ Pas d'espaces ni de caractères spéciaux
- ✅ Maximum 128 caractères
- ❌ Éviter les wildcards (
+,#) dans les noms de topics
Exemples :
✅ ipwatch/device/192.168.1.100/command
✅ ipwatch/device/10.0.0.50/status
❌ IPWatch/Device/192.168.1.100/Command (majuscules)
❌ ipwatch-device-192.168.1.100-command (mauvais séparateur)
1.2 Noms de Variables (Python)
# ✅ Bon
mqtt_broker = "localhost"
command_topic = f"ipwatch/device/{ip}/command"
message_payload = json.dumps(data)
# ❌ Mauvais
MQTTBroker = "localhost" # Pas de PascalCase pour variables
commandTopic = f"ipwatch/device/{ip}/command" # Pas de camelCase
msg = json.dumps(data) # Abréviation non claire
1.3 Constantes
# ✅ Constantes en MAJUSCULES
MQTT_QOS_COMMAND = 1
MQTT_QOS_STATUS = 1
DEFAULT_KEEPALIVE = 60
MAX_RETRY_ATTEMPTS = 3
# Topics en constantes
TOPIC_PREFIX = "ipwatch/device"
TOPIC_COMMAND = "{}/command"
TOPIC_STATUS = "{}/status"
2. Structure du Code
2.1 Organisation des Fichiers
mqtt/
├── client/ # Agent MQTT côté client
│ ├── ipwatch_mqtt_agent.py # Agent principal
│ ├── mqtt-agent.conf.example # Configuration exemple
│ └── requirements.txt # Dépendances Python
├── docs/ # Documentation
│ ├── MQTT_ARCHITECTURE.md
│ ├── MQTT_CODING_GUIDELINES.md
│ └── HOMEASSISTANT_SPEC.md
└── systemd/ # Services systemd
└── ipwatch-mqtt-agent.service
2.2 Structure d'un Module Python
#!/usr/bin/env python3
"""
Titre du module
Description détaillée
Installation:
pip install dependencies
Usage:
python module.py --help
"""
# Imports standard library
import os
import sys
import json
import logging
from pathlib import Path
# Imports third-party
import paho.mqtt.client as mqtt
# Imports locaux
from backend.app.core.config import config_manager
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Constantes globales
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
# Classes
class MyClass:
"""Docstring de la classe"""
pass
# Fonctions
def my_function():
"""Docstring de la fonction"""
pass
# Point d'entrée
if __name__ == "__main__":
main()
3. Gestion MQTT
3.1 Connexion au Broker
# ✅ Bon - Gestion robuste des erreurs
def connect_mqtt(broker, port, username=None, password=None):
"""
Connecte au broker MQTT avec gestion d'erreurs
Args:
broker (str): Adresse du broker
port (int): Port MQTT
username (str, optional): Nom d'utilisateur
password (str, optional): Mot de passe
Returns:
mqtt.Client: Client MQTT connecté
Raises:
ConnectionError: Si la connexion échoue
"""
try:
client = mqtt.Client(client_id=f"ipwatch-{os.getpid()}")
if username and password:
client.username_pw_set(username, password)
client.connect(broker, port, keepalive=60)
logger.info(f"✓ Connecté au broker {broker}:{port}")
return client
except Exception as e:
logger.error(f"✗ Erreur connexion MQTT: {e}")
raise ConnectionError(f"Impossible de se connecter à {broker}:{port}")
# ❌ Mauvais - Pas de gestion d'erreur
def connect_mqtt(broker, port):
client = mqtt.Client()
client.connect(broker, port)
return client
3.2 Publication de Messages
# ✅ Bon - QoS et vérification
def publish_command(client, ip_address, command):
"""
Publie une commande MQTT
Args:
client: Client MQTT
ip_address (str): IP de destination
command (str): Commande à envoyer
Returns:
bool: True si publié avec succès
"""
topic = f"ipwatch/device/{ip_address}/command"
payload = json.dumps({
"command": command,
"timestamp": datetime.now().isoformat()
})
result = client.publish(topic, payload, qos=1)
result.wait_for_publish(timeout=5)
if result.is_published():
logger.info(f"✓ Commande '{command}' envoyée à {ip_address}")
return True
else:
logger.error(f"✗ Échec publication commande vers {ip_address}")
return False
# ❌ Mauvais - Pas de QoS, pas de vérification
def publish_command(client, ip, cmd):
client.publish(f"ipwatch/device/{ip}/command", cmd)
3.3 Souscription et Callbacks
# ✅ Bon - Callbacks avec gestion d'erreur
class MQTTAgent:
def __init__(self):
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
def on_connect(self, client, userdata, flags, rc):
"""Callback lors de la connexion"""
if rc == 0:
logger.info("✓ Connecté au broker")
client.subscribe(self.command_topic, qos=1)
else:
logger.error(f"✗ Échec connexion, code: {rc}")
def on_message(self, client, userdata, msg):
"""Callback réception message"""
try:
payload = json.loads(msg.payload.decode('utf-8'))
command = payload.get('command')
self.execute_command(command)
except json.JSONDecodeError as e:
logger.error(f"✗ JSON invalide: {e}")
except Exception as e:
logger.error(f"✗ Erreur traitement message: {e}")
def on_disconnect(self, client, userdata, rc):
"""Callback déconnexion"""
if rc != 0:
logger.warning(f"⚠ Déconnexion inattendue (code {rc})")
4. Gestion des Erreurs
4.1 Try/Except
# ✅ Bon - Gestion spécifique des erreurs
def execute_shutdown():
"""Exécute la commande shutdown"""
try:
subprocess.run(
["sudo", "shutdown", "-h", "now"],
check=True,
timeout=10
)
logger.info("✓ Shutdown initié")
return True
except subprocess.TimeoutExpired:
logger.error("✗ Timeout exécution shutdown")
return False
except subprocess.CalledProcessError as e:
logger.error(f"✗ Erreur shutdown: {e}")
return False
except PermissionError:
logger.error("✗ Permissions sudo insuffisantes")
return False
except Exception as e:
logger.error(f"✗ Erreur inattendue: {e}")
return False
# ❌ Mauvais - Catch all sans logging
def execute_shutdown():
try:
subprocess.run(["sudo", "shutdown", "-h", "now"])
except:
pass
4.2 Logging
# ✅ Bon - Niveaux de log appropriés
logger.debug("Payload: %s", payload) # Détails de debug
logger.info("✓ Connexion établie") # Opérations normales
logger.warning("⚠ Retry connexion...") # Avertissements
logger.error("✗ Échec publication") # Erreurs
logger.critical("🔴 Broker inaccessible") # Erreurs critiques
# ❌ Mauvais - Mauvais niveaux
logger.info("Erreur critique!") # Devrait être error/critical
logger.error("Opération réussie") # Devrait être info
5. Format des Payloads JSON
5.1 Commandes
# ✅ Bon - Structure claire
{
"command": "shutdown", # string, required
"timestamp": "2025-12-23T10:30:00Z" # ISO 8601, required
}
# Avec options
{
"command": "shutdown",
"timestamp": "2025-12-23T10:30:00Z",
"delay": 60, # secondes avant exécution
"force": false # shutdown forcé ou non
}
5.2 Statut
# ✅ Bon - Métriques utiles
{
"hostname": "server-01",
"ip": "10.0.0.100",
"platform": "Linux",
"platform_version": "5.15.0-91-generic",
"uptime": 86400, # secondes
"cpu_percent": 45.2,
"memory_percent": 62.5,
"disk_percent": 78.3,
"timestamp": "2025-12-23T10:30:00Z"
}
# ❌ Mauvais - Données incomplètes
{
"host": "server-01",
"status": "ok"
}
5.3 Réponses
# ✅ Bon - Succès/échec clair
{
"success": true,
"message": "Shutdown en cours...",
"timestamp": "2025-12-23T10:30:05Z"
}
# En cas d'erreur
{
"success": false,
"message": "Permissions insuffisantes",
"error_code": "PERMISSION_DENIED",
"timestamp": "2025-12-23T10:30:05Z"
}
6. Tests
6.1 Tests Unitaires
# tests/test_mqtt_agent.py
import pytest
from unittest.mock import MagicMock, patch
from ipwatch_mqtt_agent import IPWatchMQTTAgent
def test_get_ip_address():
"""Test récupération adresse IP"""
agent = IPWatchMQTTAgent()
ip = agent.get_ip_address()
assert isinstance(ip, str)
assert len(ip.split('.')) == 4
def test_publish_status():
"""Test publication du statut"""
agent = IPWatchMQTTAgent()
agent.client = MagicMock()
agent.publish_status()
agent.client.publish.assert_called_once()
args = agent.client.publish.call_args
assert "ipwatch/device" in args[0][0]
@patch('subprocess.run')
def test_shutdown(mock_subprocess):
"""Test commande shutdown"""
agent = IPWatchMQTTAgent()
agent.shutdown()
mock_subprocess.assert_called_once()
assert "shutdown" in mock_subprocess.call_args[0][0]
6.2 Tests d'Intégration
# tests/test_mqtt_integration.py
import time
import paho.mqtt.client as mqtt
def test_command_flow():
"""Test flux complet command → response"""
received_messages = []
def on_message(client, userdata, msg):
received_messages.append(msg.payload.decode())
# Setup client test
client = mqtt.Client("test-client")
client.on_message = on_message
client.connect("localhost", 1883)
client.subscribe("ipwatch/device/+/response")
client.loop_start()
# Envoyer commande
client.publish(
"ipwatch/device/10.0.0.100/command",
'{"command": "status"}',
qos=1
)
# Attendre réponse
time.sleep(2)
assert len(received_messages) > 0
assert "success" in received_messages[0]
client.loop_stop()
client.disconnect()
7. Sécurité
7.1 Credentials
# ✅ Bon - Variables d'environnement
MQTT_USERNAME = os.getenv('MQTT_USERNAME')
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD')
if not MQTT_USERNAME or not MQTT_PASSWORD:
logger.warning("⚠ MQTT sans authentification")
# ❌ Mauvais - Hardcodé
MQTT_USERNAME = "ipwatch"
MQTT_PASSWORD = "password123"
7.2 Validation des Entrées
# ✅ Bon - Validation stricte
def execute_command(command):
"""Exécute une commande MQTT"""
ALLOWED_COMMANDS = ['shutdown', 'reboot', 'status']
if command not in ALLOWED_COMMANDS:
logger.error(f"✗ Commande refusée: {command}")
return False
# Exécution sécurisée
if command == "shutdown":
return execute_shutdown()
# ...
# ❌ Mauvais - Exécution directe
def execute_command(command):
subprocess.run(command, shell=True) # DANGER: injection!
8. Performance
8.1 Optimisation des Payloads
# ✅ Bon - Payload compact
payload = json.dumps({
"cmd": "status", # Abréviation pour réduire taille
"ts": int(time.time()) # Timestamp Unix
}, separators=(',', ':')) # Pas d'espaces
# ❌ Mauvais - Payload volumineux
payload = json.dumps({
"command": "status",
"timestamp": "2025-12-23T10:30:00.123456Z",
"extra_field_1": None,
"extra_field_2": None
}, indent=2) # Indentation inutile
8.2 Gestion de la Boucle
# ✅ Bon - Boucle non-bloquante
client.loop_start() # Thread séparé
while True:
time.sleep(30)
publish_status()
# ❌ Mauvais - Boucle bloquante
while True:
client.loop() # Bloque le thread principal
time.sleep(30)
9. Documentation
9.1 Docstrings
# ✅ Bon - Docstring complet
def send_mqtt_command(ip_address, command):
"""
Envoie une commande MQTT à un équipement
Cette fonction publie une commande sur le topic MQTT correspondant
à l'adresse IP fournie. La commande est envoyée avec QoS 1 pour
garantir la livraison.
Args:
ip_address (str): Adresse IP de l'équipement cible
command (str): Commande à envoyer (shutdown, reboot, status)
Returns:
bool: True si la commande a été publiée avec succès
Raises:
ValueError: Si la commande n'est pas valide
ConnectionError: Si le broker MQTT est inaccessible
Example:
>>> send_mqtt_command("192.168.1.100", "status")
True
"""
pass
# ❌ Mauvais - Pas de documentation
def send_mqtt_command(ip, cmd):
pass
10. Checklist Pré-commit
Avant de commit du code MQTT, vérifier :
- Code formaté avec
black(PEP 8) - Pas de secrets/credentials hardcodés
- Logging approprié (info/error/debug)
- Gestion des erreurs avec try/except
- Docstrings sur fonctions/classes publiques
- Tests unitaires passent
- QoS 1 pour messages critiques
- Validation des payloads JSON
- Topics respectent la convention
- Variables d'environnement documentées
11. Outils Recommandés
# Formatage code
pip install black
black mqtt/client/
# Linting
pip install pylint
pylint mqtt/client/ipwatch_mqtt_agent.py
# Tests
pip install pytest pytest-cov
pytest mqtt/tests/ --cov
# Type checking
pip install mypy
mypy mqtt/client/
12. Exemples de Code Complet
Voir :
mqtt/client/ipwatch_mqtt_agent.py- Agent MQTT clientbackend/app/services/mqtt_client.py- Client MQTT backendbackend/app/routers/tracking.py- Endpoints API MQTT