Files
pilot/backup_v1/20251229_0205/main.py
Gilles Soulier c5381b7112 Pilot v2: Core implementation + battery telemetry
Major updates:
- Complete Rust rewrite (pilot-v2/) with working MQTT client
- Fixed MQTT event loop deadlock (background task pattern)
- Battery telemetry for Linux (auto-detected via /sys/class/power_supply)
- Home Assistant auto-discovery for all sensors and switches
- Comprehensive documentation (AVANCEMENT.md, CLAUDE.md, roadmap)
- Docker test environment with Mosquitto broker
- Helper scripts for development and testing

Features working:
 MQTT connectivity with LWT
 YAML configuration with validation
 Telemetry: CPU, memory, IP, battery (Linux)
 Commands: shutdown, reboot, sleep, screen (dry-run tested)
 HA discovery and integration
 Allowlist and cooldown protection

Ready for testing on real hardware.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-30 06:23:00 +01:00

405 lines
13 KiB
Python

# ajouter cette ligne en bas du fichier : sudo visudo
# gilles ALL=(ALL) NOPASSWD: /sbin/shutdown
# ajouter le hostname du computer
import os
import time
import json
import paho.mqtt.client as mqtt
import subprocess
import threading
import psutil
import pynvml
import socket
hostname = socket.gethostname()
# Initialisation de pynvml
pynvml.nvmlInit()
# Fonctions pour obtenir les températures CPU et GPU
def get_cpu_temperature():
temps = psutil.sensors_temperatures()
for name, entries in temps.items():
if name == 'k10temp':
for entry in entries:
if entry.label == 'Tctl':
return entry.current
def get_gpu_temperature():
try:
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
return pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)
except pynvml.NVMLError as error:
print(f"Failed to get GPU temperature: {error}")
return None
def get_gpu_memory_usage():
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0) # 0 pour la première carte graphique
info = pynvml.nvmlDeviceGetMemoryInfo(handle)
pynvml.nvmlShutdown()
memory_used_gb = info.used / 1024 / 1024 # Convertir en GB
return round(memory_used_gb, 2) # Arrondir à 2 chiffres après la virgule
# Paramètres MQTT
mqtt_broker_ip_address = "10.0.0.3"
mqtt_port = 1883
mqtt_username = ""
mqtt_password = ""
#update_frequency = 60 # Mise à jour toutes les 60 secondes
discovery_prefix = "homeassistant"
update_frequency = 5 # en secondes
# Fonction pour obtenir la quantité de mémoire utilisée
def get_memory_used():
memory_info = psutil.virtual_memory()
memory_used_mb = memory_info.used / 1024 / 1024 # Convertir en MB
return round(memory_used_mb) # Arrondir à 0 chiffre après la virgule
def get_cpu_usage():
cpu_usage = psutil.cpu_percent()
return round(cpu_usage, 1) # Arrondir à 1 chiffre après la virgule
device_info = {
"identifiers": ["Mqtt_pilot"],
"name": f"{hostname}",
"manufacturer": "Black",
"model": "desktop",
"sw_version": "1.0.0",
"suggested_area": "salon",
}
# Configuration des entités
shutdown_entity = {
"name": f"shutdown_{hostname}",
"type": "switch",
"unique_id": f"shutdown_{hostname}_18:c0:4d:b5:65:74",
"command_topic": f"pilot/{hostname}/shutdown/available",
"state_topic": f"pilot/{hostname}/shutdown",
"availability_topic": f"pilot/{hostname}/shutdown/available",
"device_class": "switch",
"payload_on": "ON",
"payload_off": "OFF",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:power",
"device": device_info,
}
# battery_entity = {
# "name": f"battery_{hostname}",
# "type": "sensor",
# "unique_id": f"battery_{hostname}_18:c0:4d:b5:65:74",
# "state_topic": f"pilot/{hostname}/battery",
# "unit_of_measurement": "%",
# "device_class": "battery",
# "availability_topic": f"pilot/{hostname}/battery/available",
# "payload_available": "online",
# "payload_not_available": "offline",
# "icon": "mdi:battery",
# "device": device_info,
# }
cpu_temp_entity = {
"name": f"cpu_temp_{hostname}",
"type": "sensor",
"unique_id": f"cpu_temp_{hostname}_18:c0:4d:b5:65:74",
"state_topic": f"pilot/{hostname}/cpu_temp",
"availability_topic": f"pilot/{hostname}/cpu_temp/available",
"device_class": "temperature",
"unit_of_measurement": "°C",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:thermometer",
"device": device_info,
}
gpu_temp_entity = {
"name": f"gpu_temp_{hostname}",
"type": "sensor",
"unique_id": f"gpu_temp_{hostname}_18:c0:4d:b5:65:74",
"state_topic": f"pilot/{hostname}/gpu_temp",
"availability_topic": f"pilot/{hostname}/gpu_temp/available",
"device_class": "temperature",
"unit_of_measurement": "°C",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:thermometer",
"device": device_info,
}
# Définition de l'entité memory_used
memory_used_entity = {
"name": f"memory_used_{hostname}",
"type": "sensor",
"unique_id": f"memory_used_{hostname}_18:c0:4d:b5:65:74",
"state_topic": f"pilot/{hostname}/memory_used",
"availability_topic": f"pilot/{hostname}/memory_used/available",
# "device_class": "memory",
"unit_of_measurement": "MB",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:memory",
"device": device_info,
}
gpu_memory_usage_entity = {
"name": f"gpu_memory_usage_{hostname}",
"type": "sensor",
"unique_id": f"gpu_memory_used_{hostname}_18:c0:4d:b5:65:74",
"state_topic": f"pilot/{hostname}/gpu_memory_usage/state",
"availability_topic": f"pilot/{hostname}/gpu_memory_usage/available",
"unit_of_measurement": "MB",
# "device_class": "memory",
"unit_of_measurement": "MB",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:memory",
"device": device_info,
}
cpu_usage_entity = {
"name": f"cpu_usage_{hostname}",
"type": "sensor",
"unique_id": f"cpu_usage_{hostname}_18:c0:4d:b5:65:74",
"state_topic": f"pilot/{hostname}/cpu_usage",
"availability_topic": f"pilot/{hostname}/cpu_usage/available",
"unit_of_measurement": "%",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:memory",
"device": device_info,
}
def publish_discovery_messages(client):
# Publie les messages de découverte pour les entités
# ...
print("publish_discovery_messages")
client.publish(
f"{discovery_prefix}/switch/{hostname}/{shutdown_entity['name']}/config",
json.dumps(shutdown_entity),
retain=True,
)
# Publication de la configuration du capteur cpu_temp
client.publish(
f"{discovery_prefix}/sensor/{hostname}/{cpu_temp_entity['name']}/config",
json.dumps(cpu_temp_entity),
retain=True,
)
# Publication de la configuration du capteur gpu_temp
client.publish(
f"{discovery_prefix}/sensor/{hostname}/{gpu_temp_entity['name']}/config",
json.dumps(gpu_temp_entity),
retain=True,
)
# Publication de la configuration du capteur memory_used
client.publish(
f"{discovery_prefix}/sensor/{hostname}/{memory_used_entity['name']}/config",
json.dumps(memory_used_entity),
retain=True,
)
# Publication de la disponibilité pour l'entité cpu_usage
client.publish(
f"{discovery_prefix}/sensor/{hostname}/{cpu_usage_entity['name']}/config",
json.dumps(cpu_usage_entity),
retain=True,
)
# Publication de la configuration du capteur gpu_memory_usage
client.publish(
f"{discovery_prefix}/sensor/{hostname}/{gpu_memory_usage_entity['name']}/config",
json.dumps(gpu_memory_usage_entity),
retain=True,
)
# client.publish(f"{discovery_prefix}/sensor/{battery_entity['name']}/config", json.dumps(battery_entity), retain=True)
def publish_availability(client):
client.publish(
shutdown_entity["availability_topic"],
shutdown_entity["payload_available"],
retain=True,
)
# Publication de la disponibilité pour l'entité cpu_temp
client.publish(
cpu_temp_entity["availability_topic"],
cpu_temp_entity["payload_available"],
retain=True,
)
# Publication de la disponibilité pour l'entité gpu_temp
client.publish(
gpu_temp_entity["availability_topic"],
gpu_temp_entity["payload_available"],
retain=True,
)
# Publication de la disponibilité pour l'entité memory_used
client.publish(
memory_used_entity["availability_topic"],
memory_used_entity["payload_available"],
retain=True,
)
client.publish(
cpu_usage_entity["availability_topic"],
cpu_usage_entity["payload_available"],
retain=True,
)
# Publication de la disponibilité pour l'entité gpu_memory_usage
client.publish(
gpu_memory_usage_entity["availability_topic"],
"online", # ou "offline" si le capteur n'est pas disponible
retain=True,
)
# client.publish(
# battery_entity["availability_topic"],
# battery_entity["payload_available"],
# retain=True,
# )
print("Published availability for all entities")
# def publish_sensor_data(client):
# battery_level = get_battery_level()
# if battery_level is not None:
# client.publish(battery_entity["state_topic"], battery_level, retain=True)
# threading.Timer(update_frequency, publish_sensor_data, [client]).start()
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected with result code {rc}")
client.subscribe(shutdown_entity["command_topic"])
publish_discovery_messages(client)
publish_availability(client)
# publish_sensor_data(
# client
# ) # Démarre la première publication des données du capteur
# Publier l'état "ON" pour le switch au démarrage
client.publish(
shutdown_entity["state_topic"], shutdown_entity["payload_on"], retain=True
)
print(f"Set {shutdown_entity['name']} to ON")
def on_message(client, userdata, message):
# Gestion des messages MQTT
print("on_message")
# Vérifier si le message est pour le switch "shutdown"
if message.topic == shutdown_entity["command_topic"]:
if message.payload.decode() == shutdown_entity["payload_off"]:
print("Received 'OFF' command - shutting down the system")
client.publish(
shutdown_entity["state_topic"],
shutdown_entity["payload_off"],
retain=True,
)
# Exécuter la commande de shutdown
time.sleep(1)
client.publish(
shutdown_entity["availability_topic"],
shutdown_entity["payload_not_available"],
retain=True,
)
# client.publish(
# battery_entity["availability_topic"],
# battery_entity["payload_not_available"],
# retain=True,
# )
time.sleep(1)
subprocess.run(["sudo", "shutdown", "-h", "now"])
elif message.payload.decode() == shutdown_entity["payload_on"]:
print("Received 'ON' command - no action for 'ON'")
def publish_sensor_values(client):
cpu_temp = get_cpu_temperature()
if cpu_temp is not None:
client.publish(cpu_temp_entity["state_topic"], round(cpu_temp, 1), retain=True)
gpu_temp = get_gpu_temperature()
if gpu_temp is not None:
client.publish(gpu_temp_entity["state_topic"], round(gpu_temp, 1), retain=True)
memory_used = get_memory_used()
if memory_used is not None:
client.publish(memory_used_entity["state_topic"], memory_used, retain=True)
cpu_usage = get_cpu_usage()
if cpu_usage is not None:
client.publish(cpu_usage_entity["state_topic"], cpu_usage, retain=True)
gpu_memory_usage = get_gpu_memory_usage()
if gpu_memory_usage is not None:
client.publish(gpu_memory_usage_entity["state_topic"], gpu_memory_usage, retain=True)
# def get_battery_level():
# try:
# with open("/sys/class/power_supply/BAT0/capacity", "r") as file:
# return file.read().strip()
# except Exception as e:
# print(f"Error reading battery level: {e}")
# return None
# Configuration et démarrage du client MQTT
client = mqtt.Client()
client.username_pw_set(mqtt_username, mqtt_password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(mqtt_broker_ip_address, mqtt_port, 60)
client.loop_start()
# Maintenir le script en exécution
try:
while True:
publish_availability(client) # Maintenir l'état disponible
publish_sensor_values(client) # Publier les valeurs des capteurs
time.sleep(update_frequency) # Attendre avant la prochaine mise à jour
except KeyboardInterrupt:
print("Script interrupted, closing MQTT connection")
# Publier l'état "unavailable" pour les entités
client.publish(
shutdown_entity["availability_topic"],
shutdown_entity["payload_not_available"],
retain=True,
)
client.publish(
cpu_temp_entity["availability_topic"],
cpu_temp_entity["payload_not_available"],
retain=True,
)
client.publish(
gpu_temp_entity["availability_topic"],
gpu_temp_entity["payload_not_available"],
retain=True,
)
client.publish(
memory_used_entity["availability_topic"],
memory_used_entity["payload_not_available"],
retain=True,
)
client.publish(
cpu_usage_entity["availability_topic"],
cpu_usage_entity["payload_not_available"],
retain=True,
)
client.publish(
gpu_memory_usage_entity["availability_topic"],
gpu_memory_usage_entity["payload_not_available"],
retain=True,
)
# client.publish(
# battery_entity["availability_topic"],
# battery_entity["payload_not_available"],
# retain=True,
# )
# Fermer la connexion MQTT proprement
client.disconnect()