feat(agent-metric): publication MQTT optionnelle
- Nouveau module mqtt.rs : connexion broker, publish QoS 0, reconnexion auto - Topics : sentinelmesh/<hostname>/metrics/realtime|medium et events - Config : section mqtt (enabled, broker, port, topic_prefix, client_id) - Publié sur : collecte temps réel, medium, boot, événements système - Désactivé par défaut (mqtt.enabled: false) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -15,3 +15,4 @@ serde_yaml = "0.9"
|
|||||||
reqwest = { version = "0.12", features = ["json"] }
|
reqwest = { version = "0.12", features = ["json"] }
|
||||||
chrono = { version = "0.4", features = ["serde"] }
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
sysinfo = "0.32"
|
sysinfo = "0.32"
|
||||||
|
rumqttc = "0.24"
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ pub struct Config {
|
|||||||
pub agent: AgentConfig,
|
pub agent: AgentConfig,
|
||||||
pub intervals: IntervalsConfig,
|
pub intervals: IntervalsConfig,
|
||||||
pub api: ApiConfig,
|
pub api: ApiConfig,
|
||||||
|
#[serde(default)]
|
||||||
|
pub mqtt: Option<MqttConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
@@ -31,6 +33,15 @@ pub struct ApiConfig {
|
|||||||
pub listen: String,
|
pub listen: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
|
pub struct MqttConfig {
|
||||||
|
pub enabled: bool,
|
||||||
|
pub broker: String,
|
||||||
|
pub port: u16,
|
||||||
|
pub topic_prefix: String,
|
||||||
|
pub client_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
pub fn load(path: &str) -> anyhow::Result<Self> {
|
pub fn load(path: &str) -> anyhow::Result<Self> {
|
||||||
let content = std::fs::read_to_string(path)?;
|
let content = std::fs::read_to_string(path)?;
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ mod api;
|
|||||||
mod backend;
|
mod backend;
|
||||||
mod collectors;
|
mod collectors;
|
||||||
mod config;
|
mod config;
|
||||||
|
mod mqtt;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
@@ -27,6 +28,17 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
error!("Enregistrement backend : {e}");
|
error!("Enregistrement backend : {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Connexion MQTT optionnelle
|
||||||
|
let mqtt_pub: Option<mqtt::MqttPublisher> = match &cfg.mqtt {
|
||||||
|
Some(m) if m.enabled => {
|
||||||
|
match mqtt::MqttPublisher::connect(m, &cfg.agent.hostname).await {
|
||||||
|
Ok(p) => Some(p),
|
||||||
|
Err(e) => { error!("MQTT connexion échouée : {e}"); None }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
// Collecte hardware au démarrage
|
// Collecte hardware au démarrage
|
||||||
let hardware = collectors::static_info::collect();
|
let hardware = collectors::static_info::collect();
|
||||||
info!("Hardware : {} — {} cœurs — {} Mo RAM", hardware.cpu_model, hardware.cpu_cores, hardware.ram_total_mb);
|
info!("Hardware : {} — {} cœurs — {} Mo RAM", hardware.cpu_model, hardware.cpu_cores, hardware.ram_total_mb);
|
||||||
@@ -35,9 +47,13 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Événement boot
|
// Événement boot
|
||||||
if let Err(e) = backend::push_event(&client, &cfg.agent.id, "boot", json!({ "hostname": cfg.agent.hostname })).await {
|
let boot_payload = json!({ "hostname": cfg.agent.hostname });
|
||||||
|
if let Err(e) = backend::push_event(&client, &cfg.agent.id, "boot", boot_payload.clone()).await {
|
||||||
error!("Push événement boot : {e}");
|
error!("Push événement boot : {e}");
|
||||||
}
|
}
|
||||||
|
if let Some(p) = &mqtt_pub {
|
||||||
|
p.publish_event(&json!({ "event": "boot", "hostname": cfg.agent.hostname })).await;
|
||||||
|
}
|
||||||
|
|
||||||
// État partagé avec l'API locale
|
// État partagé avec l'API locale
|
||||||
let shared = Arc::new(RwLock::new(api::AgentState {
|
let shared = Arc::new(RwLock::new(api::AgentState {
|
||||||
@@ -61,6 +77,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
if let Err(e) = client.push_medium(&cfg.agent.id, &medium).await {
|
if let Err(e) = client.push_medium(&cfg.agent.id, &medium).await {
|
||||||
error!("Push medium initial : {e}");
|
error!("Push medium initial : {e}");
|
||||||
}
|
}
|
||||||
|
if let Some(p) = &mqtt_pub {
|
||||||
|
p.publish_medium(&serde_json::to_value(&medium).unwrap_or_default()).await;
|
||||||
|
}
|
||||||
|
|
||||||
// Minuteries
|
// Minuteries
|
||||||
let mut rt_ticker = time::interval(realtime_interval);
|
let mut rt_ticker = time::interval(realtime_interval);
|
||||||
@@ -70,19 +89,25 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = rt_ticker.tick() => {
|
_ = rt_ticker.tick() => {
|
||||||
let metrics = rt_collector.collect(cfg.intervals.realtime_ms);
|
let m = rt_collector.collect(cfg.intervals.realtime_ms);
|
||||||
shared.write().await.realtime = metrics.clone();
|
shared.write().await.realtime = m.clone();
|
||||||
if let Err(e) = client.push_realtime(&cfg.agent.id, &metrics).await {
|
if let Err(e) = client.push_realtime(&cfg.agent.id, &m).await {
|
||||||
error!("Push temps réel : {e}");
|
error!("Push temps réel : {e}");
|
||||||
}
|
}
|
||||||
|
if let Some(p) = &mqtt_pub {
|
||||||
|
p.publish_realtime(&serde_json::to_value(&m).unwrap_or_default()).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ = med_ticker.tick() => {
|
_ = med_ticker.tick() => {
|
||||||
info!("Collecte medium (disques, températures)…");
|
info!("Collecte medium…");
|
||||||
let metrics = collectors::medium::collect();
|
let m = collectors::medium::collect();
|
||||||
shared.write().await.medium = metrics.clone();
|
shared.write().await.medium = m.clone();
|
||||||
if let Err(e) = client.push_medium(&cfg.agent.id, &metrics).await {
|
if let Err(e) = client.push_medium(&cfg.agent.id, &m).await {
|
||||||
error!("Push medium : {e}");
|
error!("Push medium : {e}");
|
||||||
}
|
}
|
||||||
|
if let Some(p) = &mqtt_pub {
|
||||||
|
p.publish_medium(&serde_json::to_value(&m).unwrap_or_default()).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,65 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use rumqttc::{AsyncClient, MqttOptions, QoS};
|
||||||
|
use serde_json::Value;
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
use crate::config::MqttConfig;
|
||||||
|
|
||||||
|
pub struct MqttPublisher {
|
||||||
|
client: AsyncClient,
|
||||||
|
topic_prefix: String,
|
||||||
|
hostname: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MqttPublisher {
|
||||||
|
pub async fn connect(cfg: &MqttConfig, hostname: &str) -> Result<Self> {
|
||||||
|
let client_id = if cfg.client_id.is_empty() {
|
||||||
|
format!("sentinelmesh-metric-{hostname}")
|
||||||
|
} else {
|
||||||
|
cfg.client_id.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut opts = MqttOptions::new(client_id, &cfg.broker, cfg.port);
|
||||||
|
opts.set_keep_alive(std::time::Duration::from_secs(30));
|
||||||
|
|
||||||
|
let (client, mut event_loop) = AsyncClient::new(opts, 64);
|
||||||
|
|
||||||
|
// Boucle événements en tâche de fond
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
if let Err(e) = event_loop.poll().await {
|
||||||
|
warn!("MQTT boucle événements : {e}");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
info!("MQTT connecté à {}:{}", cfg.broker, cfg.port);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
client,
|
||||||
|
topic_prefix: cfg.topic_prefix.clone(),
|
||||||
|
hostname: hostname.to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn publish(&self, subtopic: &str, payload: &Value) {
|
||||||
|
let topic = format!("{}/{}/{subtopic}", self.topic_prefix, self.hostname);
|
||||||
|
let json = payload.to_string();
|
||||||
|
if let Err(e) = self.client.publish(&topic, QoS::AtMostOnce, false, json).await {
|
||||||
|
warn!("MQTT publish {topic} : {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn publish_realtime(&self, payload: &Value) {
|
||||||
|
self.publish("metrics/realtime", payload).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn publish_medium(&self, payload: &Value) {
|
||||||
|
self.publish("metrics/medium", payload).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn publish_event(&self, payload: &Value) {
|
||||||
|
self.publish("events", payload).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user