Files
pilot/main-lenovo-bureau.py

290 lines
8.6 KiB
Python

# ajouter cette ligne en bas du fichier : sudo visudo
# gilles ALL=(ALL) NOPASSWD: /sbin/shutdown
# ajouter le hostname du computer
import os
import time
import json
import paho.mqtt.client as mqtt
import subprocess
import threading
import psutil
import socket
hostname = socket.gethostname()
# Fonctions pour obtenir les températures CPU
def get_cpu_temperature():
temps = psutil.sensors_temperatures()
for name, entries in temps.items():
#if name == 'k10temp':
if name == 'coretemp':
for entry in entries:
#if entry.label == 'Tctl':
if entry.label == 'Package id 0':
return entry.current
def get_cpu_usage():
cpu_usage = psutil.cpu_percent()
return round(cpu_usage, 1) # Arrondir à 1 chiffre après la virgule
# Paramètres MQTT
mqtt_broker_ip_address = "10.0.0.3"
mqtt_port = 1883
mqtt_username = ""
mqtt_password = ""
#update_frequency = 60 # Mise à jour toutes les 60 secondes
discovery_prefix = "homeassistant"
update_frequency = 5 # en secondes
# Fonction pour obtenir la quantité de mémoire utilisée
def get_memory_used():
memory_info = psutil.virtual_memory()
memory_used_mb = memory_info.used / 1024 / 1024 # Convertir en MB
return round(memory_used_mb) # Arrondir à 0 chiffre après la virgule
device_info = {
"identifiers": ["Mqtt_pilot"],
"name": f"{hostname}",
"manufacturer": "Black",
"model": "desktop",
"sw_version": "1.0.0",
"suggested_area": "salon",
}
# Configuration des entités
shutdown_entity = {
"name": f"shutdown_{hostname}",
"type": "switch",
"unique_id": f"shutdown_{hostname}_44:37:e6:6b:53:86",
"command_topic": f"pilot/{hostname}/shutdown/available",
"state_topic": f"pilot/{hostname}/shutdown",
"availability_topic": f"pilot/{hostname}/shutdown/available",
"device_class": "switch",
"payload_on": "ON",
"payload_off": "OFF",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:power",
"device": device_info,
}
cpu_temp_entity = {
"name": f"cpu_temp_{hostname}",
"type": "sensor",
"unique_id": f"cpu_temp_{hostname}_44:37:e6:6b:53:86",
"state_topic": f"pilot/{hostname}/cpu_temp",
"availability_topic": f"pilot/{hostname}/cpu_temp/available",
"device_class": "temperature",
"unit_of_measurement": "°C",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:thermometer",
"device": device_info,
}
# Définition de l'entité memory_used
memory_used_entity = {
"name": f"memory_used_{hostname}",
"type": "sensor",
"unique_id": f"memory_used_{hostname}_44:37:e6:6b:53:86",
"state_topic": f"pilot/{hostname}/memory_used",
"availability_topic": f"pilot/{hostname}/memory_used/available",
# "device_class": "memory",
"unit_of_measurement": "MB",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:memory",
"device": device_info,
}
cpu_usage_entity = {
"name": f"cpu_usage_{hostname}",
"type": "sensor",
"unique_id": f"cpu_usage_{hostname}_44:37:e6:6b:53:86",
"state_topic": f"pilot/{hostname}/cpu_usage",
"availability_topic": f"pilot/{hostname}/cpu_usage/available",
"unit_of_measurement": "%",
"payload_available": "online",
"payload_not_available": "offline",
"icon": "mdi:memory",
"device": device_info,
}
def publish_discovery_messages(client):
# Publie les messages de découverte pour les entités
# ...
print("publish_discovery_messages")
client.publish(
f"{discovery_prefix}/switch/{hostname}/{shutdown_entity['name']}/config",
json.dumps(shutdown_entity),
retain=True,
)
# Publication de la configuration du capteur cpu_temp
client.publish(
f"{discovery_prefix}/sensor/{hostname}/{cpu_temp_entity['name']}/config",
json.dumps(cpu_temp_entity),
retain=True,
)
# Publication de la configuration du capteur memory_used
client.publish(
f"{discovery_prefix}/sensor/{hostname}/{memory_used_entity['name']}/config",
json.dumps(memory_used_entity),
retain=True,
)
# Publication de la disponibilité pour l'entité cpu_usage
client.publish(
f"{discovery_prefix}/sensor/{hostname}/{cpu_usage_entity['name']}/config",
json.dumps(cpu_usage_entity),
retain=True,
)
# client.publish(f"{discovery_prefix}/sensor/{battery_entity['name']}/config", json.dumps(battery_entity), retain=True)
def publish_availability(client):
client.publish(
shutdown_entity["availability_topic"],
shutdown_entity["payload_available"],
retain=True,
)
# Publication de la disponibilité pour l'entité cpu_temp
client.publish(
cpu_temp_entity["availability_topic"],
cpu_temp_entity["payload_available"],
retain=True,
)
# Publication de la disponibilité pour l'entité memory_used
client.publish(
memory_used_entity["availability_topic"],
memory_used_entity["payload_available"],
retain=True,
)
client.publish(
cpu_usage_entity["availability_topic"],
cpu_usage_entity["payload_available"],
retain=True,
)
print("Published availability for all entities")
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected with result code {rc}")
client.subscribe(shutdown_entity["command_topic"])
publish_discovery_messages(client)
publish_availability(client)
# publish_sensor_data(
# client
# ) # Démarre la première publication des données du capteur
# Publier l'état "ON" pour le switch au démarrage
client.publish(
shutdown_entity["state_topic"], shutdown_entity["payload_on"], retain=True
)
print(f"Set {shutdown_entity['name']} to ON")
def on_message(client, userdata, message):
# Gestion des messages MQTT
print("on_message")
# Vérifier si le message est pour le switch "shutdown"
if message.topic == shutdown_entity["command_topic"]:
if message.payload.decode() == shutdown_entity["payload_off"]:
print("Received 'OFF' command - shutting down the system")
client.publish(
shutdown_entity["state_topic"],
shutdown_entity["payload_off"],
retain=True,
)
# Exécuter la commande de shutdown
time.sleep(1)
client.publish(
shutdown_entity["availability_topic"],
shutdown_entity["payload_not_available"],
retain=True,
)
time.sleep(1)
subprocess.run(["sudo", "shutdown", "-h", "now"])
elif message.payload.decode() == shutdown_entity["payload_on"]:
print("Received 'ON' command - no action for 'ON'")
def publish_sensor_values(client):
cpu_temp = get_cpu_temperature()
if cpu_temp is not None:
client.publish(cpu_temp_entity["state_topic"], round(cpu_temp, 1), retain=True)
memory_used = get_memory_used()
if memory_used is not None:
client.publish(memory_used_entity["state_topic"], memory_used, retain=True)
cpu_usage = get_cpu_usage()
if cpu_usage is not None:
client.publish(cpu_usage_entity["state_topic"], cpu_usage, retain=True)
# Configuration et démarrage du client MQTT
client = mqtt.Client()
client.username_pw_set(mqtt_username, mqtt_password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(mqtt_broker_ip_address, mqtt_port, 60)
client.loop_start()
# Maintenir le script en exécution
try:
while True:
publish_availability(client) # Maintenir l'état disponible
publish_sensor_values(client) # Publier les valeurs des capteurs
time.sleep(update_frequency) # Attendre avant la prochaine mise à jour
except KeyboardInterrupt:
print("Script interrupted, closing MQTT connection")
# Publier l'état "unavailable" pour les entités
client.publish(
shutdown_entity["availability_topic"],
shutdown_entity["payload_not_available"],
retain=True,
)
client.publish(
cpu_temp_entity["availability_topic"],
cpu_temp_entity["payload_not_available"],
retain=True,
)
client.publish(
memory_used_entity["availability_topic"],
memory_used_entity["payload_not_available"],
retain=True,
)
client.publish(
cpu_usage_entity["availability_topic"],
cpu_usage_entity["payload_not_available"],
retain=True,
)
client.disconnect()