Files
pilot/backup_v1/20251229_0205/main_prog.py
Gilles Soulier c5381b7112 Pilot v2: Core implementation + battery telemetry
Major updates:
- Complete Rust rewrite (pilot-v2/) with working MQTT client
- Fixed MQTT event loop deadlock (background task pattern)
- Battery telemetry for Linux (auto-detected via /sys/class/power_supply)
- Home Assistant auto-discovery for all sensors and switches
- Comprehensive documentation (AVANCEMENT.md, CLAUDE.md, roadmap)
- Docker test environment with Mosquitto broker
- Helper scripts for development and testing

Features working:
 MQTT connectivity with LWT
 YAML configuration with validation
 Telemetry: CPU, memory, IP, battery (Linux)
 Commands: shutdown, reboot, sleep, screen (dry-run tested)
 HA discovery and integration
 Allowlist and cooldown protection

Ready for testing on real hardware.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-30 06:23:00 +01:00

536 lines
23 KiB
Python

# ajouter cette ligne en bas du fichier : sudo visudo
# gilles ALL=(ALL) NOPASSWD: /sbin/shutdown
# gilles ALL=(ALL) NOPASSWD: /sbin/reboot
# gilles ALL=(ALL) NOPASSWD: /usr/bin/tee /sys/devices/system/cpu/cpu0/cpufreq/scaling_setspeed
# structure du message discovery <discovery_prefix>/<component>/[<node_id>/]<object_id>/config
import os
import time
import json
import paho.mqtt.client as mqtt
import subprocess
import threading
import psutil
import threading
stop_threads = threading.Event()
# Paramètres MQTT
mqtt_broker_ip_address = "10.0.0.3"
mqtt_port = 1883
mqtt_username = ""
mqtt_password = ""
default_update_frequency = 60 # Mise à jour toutes les 60 secondes
discovery_prefix = "homeassistant"
device_name = "yoga14"
mac_address = "60:57:18:99:ed:05"
node_id = device_name
mise_a_jours_frequente = 5
mise_a_jour_moyenne = 30
mise_a_jour_lente = 60
device_info = {
"identifiers": [device_name],
"name": "Yoga 14",
"manufacturer": "Lenovo",
"model": "laptop",
"sw_version": "1.0.0",
"suggested_area": "salon",
}
# Configuration des entités
shutdown_entity = {
"name": f"shutdown_{device_name}",
"type": "switch",
"unique_id": f"shutdown_{device_name}_{mac_address}",
"command_topic": f"pilot/{device_name}/shutdown/set",
"state_topic": f"pilot/{device_name}/shutdown/state",
"availability_topic": f"pilot/{device_name}/shutdown/available",
"device_class": "switch",
"payload_on": "ON",
"payload_off": "OFF",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:power",
"device": device_info,
}
reboot_entity = {
"name": f"reboot_{device_name}",
"type": "switch",
"unique_id": f"reboot_{device_name}_{mac_address}",
"command_topic": f"pilot/{device_name}/reboot/set",
"state_topic": f"pilot/{device_name}/reboot/state",
"availability_topic": f"pilot/{device_name}/reboot/available",
"device_class": "switch",
"payload_on": "ON",
"payload_off": "OFF",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:restart",
"device": device_info,
}
battery_entity = {
"name": f"battery_{device_name}",
"type": "sensor",
"unique_id": f"battery_{device_name}_{mac_address}",
"state_topic": f"pilot/{device_name}/battery/state",
"unit_of_measurement": "%",
"device_class": "battery",
"availability_topic": f"pilot/{device_name}/battery/available",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:battery",
"device": device_info,
"update_interval": 60
}
charging_status_entity = {
"name": f"charging_status_{device_name}",
"type": "binary_sensor",
"unique_id": f"charging_status_{device_name}_{mac_address}",
"state_topic": f"pilot/{device_name}/charging_status/state",
"availability_topic": f"pilot/{device_name}/charging_status/available",
"device_class": "battery_charging",
"payload_on": "ON",
"payload_off": "OFF",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:battery-charging",
"device": device_info,
"update_interval": 5
}
screen_entity = {
"name": f"screen_{device_name}",
"type": "switch",
"unique_id": f"screen_{device_name}_{mac_address}",
"command_topic": f"pilot/{device_name}/screen/set",
"state_topic": f"pilot/{device_name}/screen/state",
"availability_topic": f"pilot/{device_name}/screen/available",
"device_class": "switch",
"payload_on": "ON",
"payload_off": "OFF",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:monitor",
"device": device_info,
}
# Ajout des nouvelles entités pour le CPU et la mémoire
cpu_temperature_entity = {
"name": f"cpu_temperature_{device_name}",
"type": "sensor",
"unique_id": f"cpu_temperature_{device_name}_{mac_address}",
"state_topic": f"pilot/{device_name}/cpu_temperature/state",
"unit_of_measurement": "°C",
"device_class": "temperature",
"availability_topic": f"pilot/{device_name}/cpu_temperature/available",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:thermometer",
"device": device_info,
"update_interval": 20
}
cpu_usage_entity = {
"name": f"cpu_usage_{device_name}",
"type": "sensor",
"unique_id": f"cpu_usage_{device_name}_{mac_address}",
"state_topic": f"pilot/{device_name}/cpu_usage/state",
"unit_of_measurement": "%",
"device_class": "power",
"availability_topic": f"pilot/{device_name}/cpu_usage/available",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:chip",
"device": device_info,
"update_interval": 10
}
memory_usage_entity = {
"name": f"memory_usage_{device_name}",
"type": "sensor",
"unique_id": f"memory_usage_{device_name}_{mac_address}",
"state_topic": f"pilot/{device_name}/memory_usage/state",
"unit_of_measurement": "%",
# "device_class": "memory",
"availability_topic": f"pilot/{device_name}/memory_usage/available",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:memory",
"device": device_info,
"update_interval": 10
}
cpu_frequency_entity = {
"name": f"cpu_frequency_{device_name}",
"type": "sensor",
"unique_id": f"cpu_frequency_{device_name}_{mac_address}",
"state_topic": f"pilot/{device_name}/cpu_frequency/state",
"unit_of_measurement": "GHz",
"device_class": "frequency",
"availability_topic": f"pilot/{device_name}/cpu_frequency/available",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:speedometer",
"device": device_info,
"update_interval": 20
}
ip_address_entity = {
"name": f"ip_address_{device_name}",
"type": "sensor",
"unique_id": f"ip_address_{device_name}_{mac_address}",
"state_topic": f"pilot/{device_name}/ip_address/state",
"availability_topic": f"pilot/{device_name}/ip_address/available",
#"device_class": "connectivity",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:ip",
"device": device_info,
"update_interval": 60
}
cpu_frequency_slider_entity = {
"name": f"cpu_frequency_slider_{device_name}",
"type": "number",
"unique_id": f"cpu_frequency_slider_{device_name}_{mac_address}",
"command_topic": f"pilot/{device_name}/cpu_frequency_slider/set",
"state_topic": f"pilot/{device_name}/cpu_frequency_slider/state",
"availability_topic": f"pilot/{device_name}/cpu_frequency_slider/available",
"payload_available": "online",
"payload_not_available": "offline",
"min": 0.5,
"max": 3.0,
"step": 0.1,
"unit_of_measurement": "GHz",
"icon": "mdi:speedometer",
"device": device_info,
}
def publish_discovery_messages(client):
# Publie les messages de découverte pour les entités
# ...
# print("publish_discovery_messages")
client.publish(f"{discovery_prefix}/switch/{node_id}/{shutdown_entity['name']}/config", json.dumps(shutdown_entity), retain=True)
client.publish(f"{discovery_prefix}/switch/{node_id}/{reboot_entity['name']}/config", json.dumps(reboot_entity), retain=True)
client.publish(f"{discovery_prefix}/sensor/{node_id}/{battery_entity['name']}/config", json.dumps(battery_entity), retain=True)
client.publish(f"{discovery_prefix}/switch/{node_id}/{screen_entity['name']}/config", json.dumps(screen_entity), retain=True)
client.publish(f"{discovery_prefix}/sensor/{node_id}/{cpu_temperature_entity['name']}/config", json.dumps(cpu_temperature_entity), retain=True)
client.publish(f"{discovery_prefix}/sensor/{node_id}/{cpu_usage_entity['name']}/config", json.dumps(cpu_usage_entity), retain=True)
client.publish(f"{discovery_prefix}/sensor/{node_id}/{memory_usage_entity['name']}/config", json.dumps(memory_usage_entity), retain=True)
client.publish(f"{discovery_prefix}/sensor/{node_id}/{cpu_frequency_entity['name']}/config", json.dumps(cpu_frequency_entity), retain=True)
client.publish(f"{discovery_prefix}/binary_sensor/{node_id}/{charging_status_entity['name']}/config", json.dumps(charging_status_entity), retain=True)
client.publish(f"{discovery_prefix}/sensor/{node_id}/{ip_address_entity['name']}/config", json.dumps(ip_address_entity), retain=True)
client.publish(f"{discovery_prefix}/number/{node_id}/{cpu_frequency_slider_entity['name']}/config", json.dumps(cpu_frequency_slider_entity), retain=True)
# print("Discovery messages published")
def publish_availability(client):
client.publish(shutdown_entity["availability_topic"], shutdown_entity["payload_available"], retain=True)
client.publish(reboot_entity["availability_topic"], reboot_entity["payload_available"], retain=True)
client.publish(battery_entity["availability_topic"], battery_entity["payload_available"], retain=True)
client.publish(screen_entity["availability_topic"], screen_entity["payload_available"], retain=True)
client.publish(cpu_temperature_entity["availability_topic"], cpu_temperature_entity["payload_available"], retain=True)
client.publish(cpu_usage_entity["availability_topic"], cpu_usage_entity["payload_available"], retain=True)
client.publish(memory_usage_entity["availability_topic"], memory_usage_entity["payload_available"], retain=True)
client.publish(cpu_frequency_entity["availability_topic"], cpu_frequency_entity["payload_available"], retain=True)
client.publish(charging_status_entity["availability_topic"], charging_status_entity["payload_available"], retain=True)
client.publish(ip_address_entity["availability_topic"], ip_address_entity["payload_available"], retain=True)
client.publish(cpu_frequency_slider_entity["availability_topic"], cpu_frequency_slider_entity["payload_available"], retain=True)
# print("Published availability for all entities")
def get_local_ip_address():
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
s.connect(('10.254.254.254', 1)) # Adresse IP arbitraire pour établir une connexion
local_ip = s.getsockname()[0]
s.close()
print(f"Publishing IP address: {local_ip}") # Log l'adresse IP
return local_ip
except Exception as e:
print(f"Error retrieving local IP address: {e}")
return None
def get_cpu_temperature():
# Utilisation de psutil ou autre pour obtenir la température du CPU
try:
temp = psutil.sensors_temperatures()['coretemp'][0].current # Par exemple pour les CPUs Intel
print(f"Publishing CPU temperature: {temp}°C")
return temp
except Exception as e:
print(f"Error reading CPU temperature: {e}")
return None
def get_cpu_usage():
try:
cpu_percent = psutil.cpu_percent(interval=1)
print(f"CPU usage: {cpu_percent}%") # Ajoute ce log pour déboguer
return cpu_percent
except Exception as e:
print(f"Error reading CPU usage: {e}")
return None
def get_memory_usage():
try:
memory_info = psutil.virtual_memory()
print(f"Memory usage: {memory_info.percent}%") # Ajoute ce log pour déboguer
return memory_info.percent
except Exception as e:
print(f"Error reading memory usage: {e}")
return None
def get_cpu_frequency():
try:
with open("/sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq", "r") as file:
# La fréquence est en kHz, donc on la convertit en GHz
freq_khz = int(file.read().strip())
freq_ghz = freq_khz / 1_000_000 # Conversion de kHz à GHz
print(f"CPU frequency: {freq_ghz:.2f} GHz") # Ajoute ce log pour déboguer
return f"{freq_ghz:.2f}" # Formater avec 2 chiffres après la virgule
except Exception as e:
print(f"Error reading CPU frequency: {e}")
return None
def get_battery_level():
try:
with open("/sys/class/power_supply/BAT0/capacity", "r") as file:
battery_level = file.read().strip()
print(f"Publishing battery level: {battery_level}%")
return battery_level
except Exception as e:
print(f"Error reading battery level: {e}")
return None
def get_charging_status():
try:
with open("/sys/class/power_supply/ADP1/online", "r") as file:
status = file.read().strip()
print(f"Publishing charging status: {status}")
return "ON" if status == "1" else "OFF"
except Exception as e:
print(f"Error reading charging status: {e}")
return None
def set_cpu_frequency(frequency):
try:
frequency_khz = int(float(frequency) * 1_000_000) # Convertir GHz en kHz
with open("/sys/devices/system/cpu/cpu0/cpufreq/scaling_setspeed", "w") as file:
file.write(str(frequency_khz))
print(f"Set CPU frequency to {frequency} GHz")
except Exception as e:
print(f"Error setting CPU frequency: {e}")
def publish_sensor_data(client):
battery_level = get_battery_level()
if battery_level is not None:
client.publish(battery_entity["state_topic"], battery_level, retain=True)
cpu_temp = get_cpu_temperature()
if cpu_temp is not None:
client.publish(cpu_temperature_entity["state_topic"], cpu_temp, retain=True)
cpu_usage = get_cpu_usage()
if cpu_usage is not None:
client.publish(cpu_usage_entity["state_topic"], cpu_usage, retain=True)
memory_usage = get_memory_usage()
if memory_usage is not None:
print(f"Publishing memory usage: {memory_usage}%") # Ajoute ce log
client.publish(memory_usage_entity["state_topic"], memory_usage, retain=True)
cpu_freq = get_cpu_frequency()
if cpu_freq is not None:
client.publish(cpu_frequency_entity["state_topic"], cpu_freq, retain=True)
print(f"Published CPU frequency: {cpu_freq} GHz")
charging_status = get_charging_status()
if charging_status is not None:
client.publish(charging_status_entity["state_topic"], charging_status, retain=True)
threading.Timer(update_frequency, publish_sensor_data, [client]).start()
def publish_battery_level(client):
battery_level = get_battery_level()
if battery_level is not None:
client.publish(battery_entity["state_topic"], battery_level, retain=True)
threading.Timer(get_update_interval(battery_entity), publish_battery_level, [client]).start()
def publish_cpu_temperature(client):
cpu_temp = get_cpu_temperature()
if cpu_temp is not None:
client.publish(cpu_temperature_entity["state_topic"], cpu_temp, retain=True)
threading.Timer(get_update_interval(cpu_temperature_entity), publish_cpu_temperature, [client]).start()
def publish_cpu_usage(client):
cpu_usage = get_cpu_usage()
if cpu_usage is not None:
client.publish(cpu_usage_entity["state_topic"], cpu_usage, retain=True)
threading.Timer(get_update_interval(cpu_usage_entity), publish_cpu_usage, [client]).start()
def publish_memory_usage(client):
memory_usage = get_memory_usage()
if memory_usage is not None:
client.publish(memory_usage_entity["state_topic"], memory_usage, retain=True)
threading.Timer(get_update_interval(memory_usage_entity), publish_memory_usage, [client]).start()
def publish_cpu_frequency(client):
cpu_freq = get_cpu_frequency()
if cpu_freq is not None:
client.publish(cpu_frequency_entity["state_topic"], cpu_freq, retain=True)
threading.Timer(get_update_interval(cpu_frequency_entity), publish_cpu_frequency, [client]).start()
def publish_charging_status(client):
charging_status = get_charging_status()
if charging_status is not None:
client.publish(charging_status_entity["state_topic"], charging_status, retain=True)
threading.Timer(get_update_interval(charging_status_entity), publish_charging_status, [client]).start()
def publish_ip_address(client):
local_ip = get_local_ip_address()
if local_ip is not None:
client.publish(ip_address_entity["state_topic"], local_ip, retain=True)
threading.Timer(get_update_interval(ip_address_entity), publish_ip_address, [client]).start()
def publish_current_cpu_frequency(client):
cpu_freq = get_cpu_frequency()
if cpu_freq is not None:
client.publish(cpu_frequency_slider_entity["state_topic"], cpu_freq, retain=True)
def get_update_interval(entity):
return entity.get("update_interval", default_update_frequency)
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected with result code {rc}")
client.subscribe(shutdown_entity["command_topic"])
client.subscribe(reboot_entity["command_topic"])
client.subscribe(screen_entity["command_topic"])
client.subscribe(cpu_frequency_slider_entity["command_topic"]) # S'abonner au slider
publish_discovery_messages(client)
publish_availability(client)
# Démarrer la publication des données avec les fréquences respectives
publish_battery_level(client)
publish_cpu_temperature(client)
publish_cpu_usage(client)
publish_memory_usage(client)
publish_cpu_frequency(client)
publish_charging_status(client)
publish_ip_address(client) # Ajout de la publication de l'adresse IP
publish_current_cpu_frequency(client) # Publier l'état initial du slider
# Publier l'état "ON" pour le switch au démarrage
client.publish(shutdown_entity["state_topic"], shutdown_entity["payload_on"], retain=True)
client.publish(reboot_entity["state_topic"], reboot_entity["payload_on"], retain=True)
client.publish(screen_entity["state_topic"], screen_entity["payload_on"], retain=True)
print(f"Set {shutdown_entity['name']} to ON")
def deactivate_entities(client):
"""Désactive toutes les entités en les marquant comme 'unavailable'."""
client.publish(shutdown_entity["availability_topic"], shutdown_entity["payload_not_available"], retain=True)
client.publish(reboot_entity["availability_topic"], reboot_entity["payload_not_available"], retain=True)
client.publish(battery_entity["availability_topic"], battery_entity["payload_not_available"], retain=True)
client.publish(screen_entity["availability_topic"], screen_entity["payload_not_available"], retain=True)
client.publish(cpu_temperature_entity["availability_topic"], cpu_temperature_entity["payload_not_available"], retain=True)
client.publish(cpu_usage_entity["availability_topic"], cpu_usage_entity["payload_not_available"], retain=True)
client.publish(memory_usage_entity["availability_topic"], memory_usage_entity["payload_not_available"], retain=True)
client.publish(cpu_frequency_entity["availability_topic"], cpu_frequency_entity["payload_not_available"], retain=True)
client.publish(charging_status_entity["availability_topic"], charging_status_entity["payload_not_available"], retain=True)
client.publish(ip_address_entity["availability_topic"], ip_address_entity["payload_not_available"], retain=True)
client.publish(cpu_frequency_slider_entity["availability_topic"], cpu_frequency_slider_entity["payload_not_available"], retain=True)
client.loop_stop() # Arrête la boucle MQTT proprement pour s'assurer que tous les messages sont publiés
print("All entities deactivated.")
def on_message(client, userdata, message):
# Gestion des messages MQTT
print("on_message")
# Vérifier si le message est pour le switch "shutdown"
if message.topic == shutdown_entity["command_topic"]:
if message.payload.decode() == shutdown_entity["payload_off"]:
print("Received 'OFF' command - shutting down the system")
client.publish(shutdown_entity["state_topic"], shutdown_entity["payload_off"], retain=True)
# Exécuter la commande de shutdown
time.sleep(1)
deactivate_entities(client) # Désactiver toutes les entités avant de fermer
time.sleep(2)
subprocess.run(["sudo", "shutdown", "-h", "now"])
exit(0) # Sortir immédiatement du programme après le shutdown
elif message.payload.decode() == shutdown_entity["payload_on"]:
print("Received 'ON' command - no action for 'ON'")
elif message.topic == reboot_entity["command_topic"]:
if message.payload.decode() == reboot_entity["payload_off"]:
print("Received 'OFF' command - rebooting the system")
client.publish(reboot_entity["state_topic"], reboot_entity["payload_off"], retain=True)
time.sleep(1)
deactivate_entities(client) # Désactiver toutes les entités avant de redémarrer
subprocess.run(["sudo", "reboot"])
os._exit(0) # Sortir immédiatement du programme après le reboot
elif message.payload.decode() == reboot_entity["payload_on"]:
print("Received 'ON' command - no action for 'ON'")
elif message.topic == screen_entity["command_topic"]:
if message.payload.decode() == screen_entity["payload_off"]:
print("Received 'OFF' command - turning off the screen")
client.publish(screen_entity["state_topic"], screen_entity["payload_off"], retain=True)
result = subprocess.run(["busctl", "--user", "set-property", "org.gnome.Mutter.DisplayConfig", "/org/gnome/Mutter/DisplayConfig", "org.gnome.Mutter.DisplayConfig", "PowerSaveMode", "i", "1"], capture_output=True, text=True)
print(f"Command output: {result.stdout}")
if result.stderr:
print(f"Command error: {result.stderr}")
elif message.payload.decode() == screen_entity["payload_on"]:
print("Received 'ON' command - turning on the screen")
client.publish(screen_entity["state_topic"], screen_entity["payload_on"], retain=True)
result = subprocess.run(["busctl", "--user", "set-property", "org.gnome.Mutter.DisplayConfig", "/org/gnome/Mutter/DisplayConfig", "org.gnome.Mutter.DisplayConfig", "PowerSaveMode", "i", "0"], capture_output=True, text=True)
print(f"Command output: {result.stdout}")
if result.stderr:
print(f"Command error: {result.stderr}")
elif message.topic == cpu_frequency_slider_entity["command_topic"]:
frequency = message.payload.decode()
print(f"Received CPU frequency slider command: {frequency} GHz")
set_cpu_frequency(frequency)
client.publish(cpu_frequency_entity["state_topic"], frequency, retain=True)
client.publish(cpu_frequency_slider_entity["state_topic"], frequency, retain=True)
# Configuration et démarrage du client MQTT
client = mqtt.Client()
client.username_pw_set(mqtt_username, mqtt_password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(mqtt_broker_ip_address, mqtt_port, 60)
client.loop_start()
# Maintenir le script en exécution
try:
while True:
publish_availability(client) # Maintenir l'état disponible
time.sleep(1) # Attendre avant la prochaine mise à jour
except KeyboardInterrupt:
print("Script interrupted, closing MQTT connection")
# Publier l'état "unavailable" pour les entités
deactivate_entities(client)
time.sleep(1)
# Fermer la connexion MQTT proprement
client.disconnect()