Files
ipwatch/mqtt/docs/MQTT_CODING_GUIDELINES.md
2026-02-07 16:57:37 +01:00

14 KiB

📝 Consignes de Codage MQTT pour IPWatch

Ce document définit les standards de développement pour l'écosystème MQTT d'IPWatch.

1. Conventions de Nommage

1.1 Topics MQTT

Format : ipwatch/device/{IP_ADDRESS}/{category}

Règles :

  • Utiliser des minuscules uniquement
  • Utiliser / comme séparateur hiérarchique
  • Pas d'espaces ni de caractères spéciaux
  • Maximum 128 caractères
  • Éviter les wildcards (+, #) dans les noms de topics

Exemples :

✅ ipwatch/device/192.168.1.100/command
✅ ipwatch/device/10.0.0.50/status
❌ IPWatch/Device/192.168.1.100/Command (majuscules)
❌ ipwatch-device-192.168.1.100-command (mauvais séparateur)

1.2 Noms de Variables (Python)

# ✅ Bon
mqtt_broker = "localhost"
command_topic = f"ipwatch/device/{ip}/command"
message_payload = json.dumps(data)

# ❌ Mauvais
MQTTBroker = "localhost"  # Pas de PascalCase pour variables
commandTopic = f"ipwatch/device/{ip}/command"  # Pas de camelCase
msg = json.dumps(data)  # Abréviation non claire

1.3 Constantes

# ✅ Constantes en MAJUSCULES
MQTT_QOS_COMMAND = 1
MQTT_QOS_STATUS = 1
DEFAULT_KEEPALIVE = 60
MAX_RETRY_ATTEMPTS = 3

# Topics en constantes
TOPIC_PREFIX = "ipwatch/device"
TOPIC_COMMAND = "{}/command"
TOPIC_STATUS = "{}/status"

2. Structure du Code

2.1 Organisation des Fichiers

mqtt/
├── client/                       # Agent MQTT côté client
│   ├── ipwatch_mqtt_agent.py   # Agent principal
│   ├── mqtt-agent.conf.example # Configuration exemple
│   └── requirements.txt         # Dépendances Python
├── docs/                        # Documentation
│   ├── MQTT_ARCHITECTURE.md
│   ├── MQTT_CODING_GUIDELINES.md
│   └── HOMEASSISTANT_SPEC.md
└── systemd/                     # Services systemd
    └── ipwatch-mqtt-agent.service

2.2 Structure d'un Module Python

#!/usr/bin/env python3
"""
Titre du module
Description détaillée

Installation:
    pip install dependencies

Usage:
    python module.py --help
"""

# Imports standard library
import os
import sys
import json
import logging
from pathlib import Path

# Imports third-party
import paho.mqtt.client as mqtt

# Imports locaux
from backend.app.core.config import config_manager

# Configuration du logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Constantes globales
MQTT_BROKER = "localhost"
MQTT_PORT = 1883

# Classes
class MyClass:
    """Docstring de la classe"""
    pass

# Fonctions
def my_function():
    """Docstring de la fonction"""
    pass

# Point d'entrée
if __name__ == "__main__":
    main()

3. Gestion MQTT

3.1 Connexion au Broker

# ✅ Bon - Gestion robuste des erreurs
def connect_mqtt(broker, port, username=None, password=None):
    """
    Connecte au broker MQTT avec gestion d'erreurs

    Args:
        broker (str): Adresse du broker
        port (int): Port MQTT
        username (str, optional): Nom d'utilisateur
        password (str, optional): Mot de passe

    Returns:
        mqtt.Client: Client MQTT connecté

    Raises:
        ConnectionError: Si la connexion échoue
    """
    try:
        client = mqtt.Client(client_id=f"ipwatch-{os.getpid()}")

        if username and password:
            client.username_pw_set(username, password)

        client.connect(broker, port, keepalive=60)
        logger.info(f"✓ Connecté au broker {broker}:{port}")
        return client

    except Exception as e:
        logger.error(f"✗ Erreur connexion MQTT: {e}")
        raise ConnectionError(f"Impossible de se connecter à {broker}:{port}")

# ❌ Mauvais - Pas de gestion d'erreur
def connect_mqtt(broker, port):
    client = mqtt.Client()
    client.connect(broker, port)
    return client

3.2 Publication de Messages

# ✅ Bon - QoS et vérification
def publish_command(client, ip_address, command):
    """
    Publie une commande MQTT

    Args:
        client: Client MQTT
        ip_address (str): IP de destination
        command (str): Commande à envoyer

    Returns:
        bool: True si publié avec succès
    """
    topic = f"ipwatch/device/{ip_address}/command"
    payload = json.dumps({
        "command": command,
        "timestamp": datetime.now().isoformat()
    })

    result = client.publish(topic, payload, qos=1)
    result.wait_for_publish(timeout=5)

    if result.is_published():
        logger.info(f"✓ Commande '{command}' envoyée à {ip_address}")
        return True
    else:
        logger.error(f"✗ Échec publication commande vers {ip_address}")
        return False

# ❌ Mauvais - Pas de QoS, pas de vérification
def publish_command(client, ip, cmd):
    client.publish(f"ipwatch/device/{ip}/command", cmd)

3.3 Souscription et Callbacks

# ✅ Bon - Callbacks avec gestion d'erreur
class MQTTAgent:
    def __init__(self):
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_disconnect = self.on_disconnect

    def on_connect(self, client, userdata, flags, rc):
        """Callback lors de la connexion"""
        if rc == 0:
            logger.info("✓ Connecté au broker")
            client.subscribe(self.command_topic, qos=1)
        else:
            logger.error(f"✗ Échec connexion, code: {rc}")

    def on_message(self, client, userdata, msg):
        """Callback réception message"""
        try:
            payload = json.loads(msg.payload.decode('utf-8'))
            command = payload.get('command')
            self.execute_command(command)
        except json.JSONDecodeError as e:
            logger.error(f"✗ JSON invalide: {e}")
        except Exception as e:
            logger.error(f"✗ Erreur traitement message: {e}")

    def on_disconnect(self, client, userdata, rc):
        """Callback déconnexion"""
        if rc != 0:
            logger.warning(f"⚠ Déconnexion inattendue (code {rc})")

4. Gestion des Erreurs

4.1 Try/Except

# ✅ Bon - Gestion spécifique des erreurs
def execute_shutdown():
    """Exécute la commande shutdown"""
    try:
        subprocess.run(
            ["sudo", "shutdown", "-h", "now"],
            check=True,
            timeout=10
        )
        logger.info("✓ Shutdown initié")
        return True

    except subprocess.TimeoutExpired:
        logger.error("✗ Timeout exécution shutdown")
        return False

    except subprocess.CalledProcessError as e:
        logger.error(f"✗ Erreur shutdown: {e}")
        return False

    except PermissionError:
        logger.error("✗ Permissions sudo insuffisantes")
        return False

    except Exception as e:
        logger.error(f"✗ Erreur inattendue: {e}")
        return False

# ❌ Mauvais - Catch all sans logging
def execute_shutdown():
    try:
        subprocess.run(["sudo", "shutdown", "-h", "now"])
    except:
        pass

4.2 Logging

# ✅ Bon - Niveaux de log appropriés
logger.debug("Payload: %s", payload)  # Détails de debug
logger.info("✓ Connexion établie")  # Opérations normales
logger.warning("⚠ Retry connexion...")  # Avertissements
logger.error("✗ Échec publication")  # Erreurs
logger.critical("🔴 Broker inaccessible")  # Erreurs critiques

# ❌ Mauvais - Mauvais niveaux
logger.info("Erreur critique!")  # Devrait être error/critical
logger.error("Opération réussie")  # Devrait être info

5. Format des Payloads JSON

5.1 Commandes

# ✅ Bon - Structure claire
{
    "command": "shutdown",  # string, required
    "timestamp": "2025-12-23T10:30:00Z"  # ISO 8601, required
}

# Avec options
{
    "command": "shutdown",
    "timestamp": "2025-12-23T10:30:00Z",
    "delay": 60,  # secondes avant exécution
    "force": false  # shutdown forcé ou non
}

5.2 Statut

# ✅ Bon - Métriques utiles
{
    "hostname": "server-01",
    "ip": "10.0.0.100",
    "platform": "Linux",
    "platform_version": "5.15.0-91-generic",
    "uptime": 86400,  # secondes
    "cpu_percent": 45.2,
    "memory_percent": 62.5,
    "disk_percent": 78.3,
    "timestamp": "2025-12-23T10:30:00Z"
}

# ❌ Mauvais - Données incomplètes
{
    "host": "server-01",
    "status": "ok"
}

5.3 Réponses

# ✅ Bon - Succès/échec clair
{
    "success": true,
    "message": "Shutdown en cours...",
    "timestamp": "2025-12-23T10:30:05Z"
}

# En cas d'erreur
{
    "success": false,
    "message": "Permissions insuffisantes",
    "error_code": "PERMISSION_DENIED",
    "timestamp": "2025-12-23T10:30:05Z"
}

6. Tests

6.1 Tests Unitaires

# tests/test_mqtt_agent.py
import pytest
from unittest.mock import MagicMock, patch
from ipwatch_mqtt_agent import IPWatchMQTTAgent

def test_get_ip_address():
    """Test récupération adresse IP"""
    agent = IPWatchMQTTAgent()
    ip = agent.get_ip_address()
    assert isinstance(ip, str)
    assert len(ip.split('.')) == 4

def test_publish_status():
    """Test publication du statut"""
    agent = IPWatchMQTTAgent()
    agent.client = MagicMock()

    agent.publish_status()

    agent.client.publish.assert_called_once()
    args = agent.client.publish.call_args
    assert "ipwatch/device" in args[0][0]

@patch('subprocess.run')
def test_shutdown(mock_subprocess):
    """Test commande shutdown"""
    agent = IPWatchMQTTAgent()
    agent.shutdown()

    mock_subprocess.assert_called_once()
    assert "shutdown" in mock_subprocess.call_args[0][0]

6.2 Tests d'Intégration

# tests/test_mqtt_integration.py
import time
import paho.mqtt.client as mqtt

def test_command_flow():
    """Test flux complet command → response"""
    received_messages = []

    def on_message(client, userdata, msg):
        received_messages.append(msg.payload.decode())

    # Setup client test
    client = mqtt.Client("test-client")
    client.on_message = on_message
    client.connect("localhost", 1883)
    client.subscribe("ipwatch/device/+/response")
    client.loop_start()

    # Envoyer commande
    client.publish(
        "ipwatch/device/10.0.0.100/command",
        '{"command": "status"}',
        qos=1
    )

    # Attendre réponse
    time.sleep(2)

    assert len(received_messages) > 0
    assert "success" in received_messages[0]

    client.loop_stop()
    client.disconnect()

7. Sécurité

7.1 Credentials

# ✅ Bon - Variables d'environnement
MQTT_USERNAME = os.getenv('MQTT_USERNAME')
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD')

if not MQTT_USERNAME or not MQTT_PASSWORD:
    logger.warning("⚠ MQTT sans authentification")

# ❌ Mauvais - Hardcodé
MQTT_USERNAME = "ipwatch"
MQTT_PASSWORD = "password123"

7.2 Validation des Entrées

# ✅ Bon - Validation stricte
def execute_command(command):
    """Exécute une commande MQTT"""
    ALLOWED_COMMANDS = ['shutdown', 'reboot', 'status']

    if command not in ALLOWED_COMMANDS:
        logger.error(f"✗ Commande refusée: {command}")
        return False

    # Exécution sécurisée
    if command == "shutdown":
        return execute_shutdown()
    # ...

# ❌ Mauvais - Exécution directe
def execute_command(command):
    subprocess.run(command, shell=True)  # DANGER: injection!

8. Performance

8.1 Optimisation des Payloads

# ✅ Bon - Payload compact
payload = json.dumps({
    "cmd": "status",  # Abréviation pour réduire taille
    "ts": int(time.time())  # Timestamp Unix
}, separators=(',', ':'))  # Pas d'espaces

# ❌ Mauvais - Payload volumineux
payload = json.dumps({
    "command": "status",
    "timestamp": "2025-12-23T10:30:00.123456Z",
    "extra_field_1": None,
    "extra_field_2": None
}, indent=2)  # Indentation inutile

8.2 Gestion de la Boucle

# ✅ Bon - Boucle non-bloquante
client.loop_start()  # Thread séparé

while True:
    time.sleep(30)
    publish_status()

# ❌ Mauvais - Boucle bloquante
while True:
    client.loop()  # Bloque le thread principal
    time.sleep(30)

9. Documentation

9.1 Docstrings

# ✅ Bon - Docstring complet
def send_mqtt_command(ip_address, command):
    """
    Envoie une commande MQTT à un équipement

    Cette fonction publie une commande sur le topic MQTT correspondant
    à l'adresse IP fournie. La commande est envoyée avec QoS 1 pour
    garantir la livraison.

    Args:
        ip_address (str): Adresse IP de l'équipement cible
        command (str): Commande à envoyer (shutdown, reboot, status)

    Returns:
        bool: True si la commande a été publiée avec succès

    Raises:
        ValueError: Si la commande n'est pas valide
        ConnectionError: Si le broker MQTT est inaccessible

    Example:
        >>> send_mqtt_command("192.168.1.100", "status")
        True
    """
    pass

# ❌ Mauvais - Pas de documentation
def send_mqtt_command(ip, cmd):
    pass

10. Checklist Pré-commit

Avant de commit du code MQTT, vérifier :

  • Code formaté avec black (PEP 8)
  • Pas de secrets/credentials hardcodés
  • Logging approprié (info/error/debug)
  • Gestion des erreurs avec try/except
  • Docstrings sur fonctions/classes publiques
  • Tests unitaires passent
  • QoS 1 pour messages critiques
  • Validation des payloads JSON
  • Topics respectent la convention
  • Variables d'environnement documentées

11. Outils Recommandés

# Formatage code
pip install black
black mqtt/client/

# Linting
pip install pylint
pylint mqtt/client/ipwatch_mqtt_agent.py

# Tests
pip install pytest pytest-cov
pytest mqtt/tests/ --cov

# Type checking
pip install mypy
mypy mqtt/client/

12. Exemples de Code Complet

Voir :

  • mqtt/client/ipwatch_mqtt_agent.py - Agent MQTT client
  • backend/app/services/mqtt_client.py - Client MQTT backend
  • backend/app/routers/tracking.py - Endpoints API MQTT