From 288ec178ac4fe88699715419c1f107581179a4d3 Mon Sep 17 00:00:00 2001 From: Gilles Soulier Date: Tue, 19 May 2026 06:31:39 +0200 Subject: [PATCH] feat(agent-metric): publication MQTT optionnelle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Nouveau module mqtt.rs : connexion broker, publish QoS 0, reconnexion auto - Topics : sentinelmesh//metrics/realtime|medium et events - Config : section mqtt (enabled, broker, port, topic_prefix, client_id) - Publié sur : collecte temps réel, medium, boot, événements système - Désactivé par défaut (mqtt.enabled: false) Co-Authored-By: Claude Sonnet 4.6 --- agents/agent-metric/Cargo.toml | 1 + agents/agent-metric/src/config.rs | 11 ++++++ agents/agent-metric/src/main.rs | 43 +++++++++++++++----- agents/agent-metric/src/mqtt.rs | 65 +++++++++++++++++++++++++++++++ 4 files changed, 111 insertions(+), 9 deletions(-) create mode 100644 agents/agent-metric/src/mqtt.rs diff --git a/agents/agent-metric/Cargo.toml b/agents/agent-metric/Cargo.toml index 063eacf..d6b39ed 100644 --- a/agents/agent-metric/Cargo.toml +++ b/agents/agent-metric/Cargo.toml @@ -15,3 +15,4 @@ serde_yaml = "0.9" reqwest = { version = "0.12", features = ["json"] } chrono = { version = "0.4", features = ["serde"] } sysinfo = "0.32" +rumqttc = "0.24" diff --git a/agents/agent-metric/src/config.rs b/agents/agent-metric/src/config.rs index 708b14d..6eabb81 100644 --- a/agents/agent-metric/src/config.rs +++ b/agents/agent-metric/src/config.rs @@ -6,6 +6,8 @@ pub struct Config { pub agent: AgentConfig, pub intervals: IntervalsConfig, pub api: ApiConfig, + #[serde(default)] + pub mqtt: Option, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -31,6 +33,15 @@ pub struct ApiConfig { pub listen: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct MqttConfig { + pub enabled: bool, + pub broker: String, + pub port: u16, + pub topic_prefix: String, + pub client_id: String, +} + impl Config { pub fn load(path: &str) -> anyhow::Result { let content = std::fs::read_to_string(path)?; diff --git a/agents/agent-metric/src/main.rs b/agents/agent-metric/src/main.rs index a0f8d1e..03a18a7 100644 --- a/agents/agent-metric/src/main.rs +++ b/agents/agent-metric/src/main.rs @@ -9,6 +9,7 @@ mod api; mod backend; mod collectors; mod config; +mod mqtt; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -27,6 +28,17 @@ async fn main() -> anyhow::Result<()> { error!("Enregistrement backend : {e}"); } + // Connexion MQTT optionnelle + let mqtt_pub: Option = match &cfg.mqtt { + Some(m) if m.enabled => { + match mqtt::MqttPublisher::connect(m, &cfg.agent.hostname).await { + Ok(p) => Some(p), + Err(e) => { error!("MQTT connexion échouée : {e}"); None } + } + } + _ => None, + }; + // Collecte hardware au démarrage let hardware = collectors::static_info::collect(); info!("Hardware : {} — {} cœurs — {} Mo RAM", hardware.cpu_model, hardware.cpu_cores, hardware.ram_total_mb); @@ -35,9 +47,13 @@ async fn main() -> anyhow::Result<()> { } // Événement boot - if let Err(e) = backend::push_event(&client, &cfg.agent.id, "boot", json!({ "hostname": cfg.agent.hostname })).await { + let boot_payload = json!({ "hostname": cfg.agent.hostname }); + if let Err(e) = backend::push_event(&client, &cfg.agent.id, "boot", boot_payload.clone()).await { error!("Push événement boot : {e}"); } + if let Some(p) = &mqtt_pub { + p.publish_event(&json!({ "event": "boot", "hostname": cfg.agent.hostname })).await; + } // État partagé avec l'API locale let shared = Arc::new(RwLock::new(api::AgentState { @@ -61,28 +77,37 @@ async fn main() -> anyhow::Result<()> { if let Err(e) = client.push_medium(&cfg.agent.id, &medium).await { error!("Push medium initial : {e}"); } + if let Some(p) = &mqtt_pub { + p.publish_medium(&serde_json::to_value(&medium).unwrap_or_default()).await; + } // Minuteries - let mut rt_ticker = time::interval(realtime_interval); + let mut rt_ticker = time::interval(realtime_interval); let mut med_ticker = time::interval(medium_interval); med_ticker.tick().await; // consomme le premier tick immédiat loop { tokio::select! { _ = rt_ticker.tick() => { - let metrics = rt_collector.collect(cfg.intervals.realtime_ms); - shared.write().await.realtime = metrics.clone(); - if let Err(e) = client.push_realtime(&cfg.agent.id, &metrics).await { + let m = rt_collector.collect(cfg.intervals.realtime_ms); + shared.write().await.realtime = m.clone(); + if let Err(e) = client.push_realtime(&cfg.agent.id, &m).await { error!("Push temps réel : {e}"); } + if let Some(p) = &mqtt_pub { + p.publish_realtime(&serde_json::to_value(&m).unwrap_or_default()).await; + } } _ = med_ticker.tick() => { - info!("Collecte medium (disques, températures)…"); - let metrics = collectors::medium::collect(); - shared.write().await.medium = metrics.clone(); - if let Err(e) = client.push_medium(&cfg.agent.id, &metrics).await { + info!("Collecte medium…"); + let m = collectors::medium::collect(); + shared.write().await.medium = m.clone(); + if let Err(e) = client.push_medium(&cfg.agent.id, &m).await { error!("Push medium : {e}"); } + if let Some(p) = &mqtt_pub { + p.publish_medium(&serde_json::to_value(&m).unwrap_or_default()).await; + } } } } diff --git a/agents/agent-metric/src/mqtt.rs b/agents/agent-metric/src/mqtt.rs new file mode 100644 index 0000000..402ee9b --- /dev/null +++ b/agents/agent-metric/src/mqtt.rs @@ -0,0 +1,65 @@ +use anyhow::Result; +use rumqttc::{AsyncClient, MqttOptions, QoS}; +use serde_json::Value; +use tracing::{info, warn}; + +use crate::config::MqttConfig; + +pub struct MqttPublisher { + client: AsyncClient, + topic_prefix: String, + hostname: String, +} + +impl MqttPublisher { + pub async fn connect(cfg: &MqttConfig, hostname: &str) -> Result { + let client_id = if cfg.client_id.is_empty() { + format!("sentinelmesh-metric-{hostname}") + } else { + cfg.client_id.clone() + }; + + let mut opts = MqttOptions::new(client_id, &cfg.broker, cfg.port); + opts.set_keep_alive(std::time::Duration::from_secs(30)); + + let (client, mut event_loop) = AsyncClient::new(opts, 64); + + // Boucle événements en tâche de fond + tokio::spawn(async move { + loop { + if let Err(e) = event_loop.poll().await { + warn!("MQTT boucle événements : {e}"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + }); + + info!("MQTT connecté à {}:{}", cfg.broker, cfg.port); + + Ok(Self { + client, + topic_prefix: cfg.topic_prefix.clone(), + hostname: hostname.to_string(), + }) + } + + pub async fn publish(&self, subtopic: &str, payload: &Value) { + let topic = format!("{}/{}/{subtopic}", self.topic_prefix, self.hostname); + let json = payload.to_string(); + if let Err(e) = self.client.publish(&topic, QoS::AtMostOnce, false, json).await { + warn!("MQTT publish {topic} : {e}"); + } + } + + pub async fn publish_realtime(&self, payload: &Value) { + self.publish("metrics/realtime", payload).await; + } + + pub async fn publish_medium(&self, payload: &Value) { + self.publish("metrics/medium", payload).await; + } + + pub async fn publish_event(&self, payload: &Value) { + self.publish("events", payload).await; + } +}