From 6cd910dbc0d9fa717fdfd841c3af41860bc05743 Mon Sep 17 00:00:00 2001 From: Gilles Soulier Date: Tue, 19 May 2026 06:32:57 +0200 Subject: [PATCH] feat(agent-scan-network): MQTT + configs exemples + ROADMAP Phase 6 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - mqtt.rs : publication résumé scan sur sentinelmesh//network/scan - Config MQTT dans les deux agents (enabled: false par défaut) - ROADMAP Phase 6 complète : SSE, historique, Prometheus, MQTT ✅ Co-Authored-By: Claude Sonnet 4.6 --- ROADMAP.md | 19 ++++--- agents/agent-metric/config.example.yaml | 12 +++++ agents/agent-scan-network/Cargo.toml | 1 + agents/agent-scan-network/config.example.yaml | 10 ++++ agents/agent-scan-network/src/config.rs | 11 ++++ agents/agent-scan-network/src/main.rs | 30 ++++++++--- agents/agent-scan-network/src/mqtt.rs | 51 +++++++++++++++++++ 7 files changed, 118 insertions(+), 16 deletions(-) create mode 100644 agents/agent-scan-network/src/mqtt.rs diff --git a/ROADMAP.md b/ROADMAP.md index 255ec40..2962d6d 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -51,12 +51,15 @@ - [x] `.gitea/workflows/ci.yaml` : check + clippy + fmt + tests sur push/PR - [x] `.gitea/workflows/release.yaml` : build multi-arch + release Gitea sur tag v* -## Phase 6 — Extensions +## Phase 6 — Extensions ✅ -- [ ] MQTT -- [ ] WebSocket / SSE -- [ ] Historique + agrégation + retention -- [ ] Prometheus / InfluxDB -- [ ] Home Assistant / Grafana -- [ ] Support multi-dashboard -- [ ] PostgreSQL +- [x] SSE (Server-Sent Events) : GET /api/v1/stream — events `metrics` et `network` en temps réel +- [x] Historique métriques : GET /api/v1/history/{agent_id}?hours=N, rétention 7 jours +- [x] Prometheus : GET /metrics — format text/plain, compatible Grafana/Prometheus +- [x] MQTT agent-metric : publication realtime/medium/events (rumqttc, QoS 0/1) +- [x] MQTT agent-scan-network : publication résumé scan réseau +- [x] Topics MQTT : sentinelmesh//metrics/realtime|medium et network/scan +- [ ] Home Assistant integration via MQTT discovery — futur +- [ ] PostgreSQL — futur (SQLite suffisant en homelab) +- [ ] WebSocket bidirectionnel — futur +- [ ] InfluxDB / Grafana direct — Prometheus couvre le besoin via scraping diff --git a/agents/agent-metric/config.example.yaml b/agents/agent-metric/config.example.yaml index 3105b04..7070720 100644 --- a/agents/agent-metric/config.example.yaml +++ b/agents/agent-metric/config.example.yaml @@ -13,3 +13,15 @@ intervals: api: listen: "0.0.0.0:9101" + +# MQTT optionnel (Home Assistant, Node-RED, Grafana…) +mqtt: + enabled: false + broker: "localhost" + port: 1883 + topic_prefix: "sentinelmesh" + client_id: "" # auto : sentinelmesh-metric- + # Topics publiés : + # sentinelmesh//metrics/realtime (JSON, chaque seconde) + # sentinelmesh//metrics/medium (JSON, toutes les 30min) + # sentinelmesh//events (JSON, boot/shutdown…) diff --git a/agents/agent-scan-network/Cargo.toml b/agents/agent-scan-network/Cargo.toml index 187723e..ad2b70d 100644 --- a/agents/agent-scan-network/Cargo.toml +++ b/agents/agent-scan-network/Cargo.toml @@ -16,3 +16,4 @@ reqwest = { version = "0.12", features = ["json"] } chrono = { version = "0.4", features = ["serde"] } ipnetwork = "0.20" tokio-util = "0.7" +rumqttc = "0.24" diff --git a/agents/agent-scan-network/config.example.yaml b/agents/agent-scan-network/config.example.yaml index e171c4e..3a268c3 100644 --- a/agents/agent-scan-network/config.example.yaml +++ b/agents/agent-scan-network/config.example.yaml @@ -29,3 +29,13 @@ scan: api: listen: "0.0.0.0:9100" # API locale de l'agent + +# MQTT optionnel +mqtt: + enabled: false + broker: "localhost" + port: 1883 + topic_prefix: "sentinelmesh" + client_id: "" # auto : sentinelmesh-scan- + # Topics publiés : + # sentinelmesh//network/scan (résumé scan complet) diff --git a/agents/agent-scan-network/src/config.rs b/agents/agent-scan-network/src/config.rs index 1fdab4b..54c55a6 100644 --- a/agents/agent-scan-network/src/config.rs +++ b/agents/agent-scan-network/src/config.rs @@ -6,6 +6,17 @@ pub struct Config { pub agent: AgentConfig, pub scan: ScanConfig, pub api: ApiConfig, + #[serde(default)] + pub mqtt: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct MqttConfig { + pub enabled: bool, + pub broker: String, + pub port: u16, + pub topic_prefix: String, + pub client_id: String, } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/agents/agent-scan-network/src/main.rs b/agents/agent-scan-network/src/main.rs index 08554ba..a99f5ad 100644 --- a/agents/agent-scan-network/src/main.rs +++ b/agents/agent-scan-network/src/main.rs @@ -7,6 +7,7 @@ use tracing_subscriber::EnvFilter; mod api; mod backend; mod config; +mod mqtt; mod oui; mod scanner; @@ -18,39 +19,52 @@ async fn main() -> anyhow::Result<()> { let config_path = std::env::args().nth(1).unwrap_or_else(|| "config.yaml".into()); let cfg = config::Config::load(&config_path)?; - info!("Configuration chargée depuis {config_path}"); info!("Agent ID : {} | Subnets : {:?}", cfg.agent.id, cfg.scan.subnets); let client = backend::BackendClient::new(&cfg); - // Enregistrement initial if let Err(e) = client.register(&cfg).await { error!("Impossible de joindre le backend : {e}"); } - // État partagé entre l'API locale et la boucle de scan + // Connexion MQTT optionnelle + let mqtt_pub: Option = match &cfg.mqtt { + Some(m) if m.enabled => { + match mqtt::MqttPublisher::connect(&m.broker, m.port, &m.client_id, &m.topic_prefix, &cfg.agent.hostname).await { + Ok(p) => Some(p), + Err(e) => { error!("MQTT connexion échouée : {e}"); None } + } + } + _ => None, + }; + let shared: api::SharedState = Arc::new(RwLock::new(Vec::new())); - // API locale (tâche de fond) let api_state = shared.clone(); let api_listen = cfg.api.listen.clone(); tokio::spawn(async move { api::serve(api_listen, api_state).await }); - // Boucle de scan principale let interval = time::Duration::from_secs(cfg.scan.interval_seconds); loop { info!("Démarrage du scan réseau…"); match scanner::scan_all(&cfg.scan).await { Ok(devices) => { info!("{} équipements découverts", devices.len()); - - // Mise à jour de l'état partagé *shared.write().await = devices.clone(); - // Push vers le backend if let Err(e) = client.push_devices(&cfg.agent.id, &devices).await { error!("Erreur push backend : {e}"); } + + // Publication MQTT : résumé du scan + équipements online individuels + if let Some(p) = &mqtt_pub { + let online: Vec<_> = devices.iter().filter(|d| d.state == "online").collect(); + p.publish_scan(&serde_json::json!({ + "total": devices.len(), + "online": online.len(), + "devices": online, + })).await; + } } Err(e) => error!("Erreur de scan : {e}"), } diff --git a/agents/agent-scan-network/src/mqtt.rs b/agents/agent-scan-network/src/mqtt.rs new file mode 100644 index 0000000..0902090 --- /dev/null +++ b/agents/agent-scan-network/src/mqtt.rs @@ -0,0 +1,51 @@ +use anyhow::Result; +use rumqttc::{AsyncClient, MqttOptions, QoS}; +use serde_json::Value; +use tracing::{info, warn}; + +pub struct MqttPublisher { + client: AsyncClient, + topic_prefix: String, + hostname: String, +} + +impl MqttPublisher { + pub async fn connect(broker: &str, port: u16, client_id: &str, topic_prefix: &str, hostname: &str) -> Result { + let id = if client_id.is_empty() { + format!("sentinelmesh-scan-{hostname}") + } else { + client_id.to_string() + }; + + let mut opts = MqttOptions::new(id, broker, port); + opts.set_keep_alive(std::time::Duration::from_secs(30)); + + let (client, mut event_loop) = AsyncClient::new(opts, 32); + + tokio::spawn(async move { + loop { + if let Err(e) = event_loop.poll().await { + warn!("MQTT boucle : {e}"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + }); + + info!("MQTT connecté à {broker}:{port}"); + Ok(Self { client, topic_prefix: topic_prefix.to_string(), hostname: hostname.to_string() }) + } + + pub async fn publish_scan(&self, payload: &Value) { + let topic = format!("{}/{}/network/scan", self.topic_prefix, self.hostname); + let json = payload.to_string(); + if let Err(e) = self.client.publish(&topic, QoS::AtMostOnce, false, json).await { + warn!("MQTT publish {topic} : {e}"); + } + } + + #[allow(dead_code)] + pub async fn publish_device_online(&self, payload: &Value) { + let topic = format!("{}/{}/network/online", self.topic_prefix, self.hostname); + let _ = self.client.publish(&topic, QoS::AtLeastOnce, false, payload.to_string()).await; + } +}