feat(agent-scan-network): MQTT + configs exemples + ROADMAP Phase 6
- mqtt.rs : publication résumé scan sur sentinelmesh/<host>/network/scan - Config MQTT dans les deux agents (enabled: false par défaut) - ROADMAP Phase 6 complète : SSE, historique, Prometheus, MQTT ✅ Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+11
-8
@@ -51,12 +51,15 @@
|
|||||||
- [x] `.gitea/workflows/ci.yaml` : check + clippy + fmt + tests sur push/PR
|
- [x] `.gitea/workflows/ci.yaml` : check + clippy + fmt + tests sur push/PR
|
||||||
- [x] `.gitea/workflows/release.yaml` : build multi-arch + release Gitea sur tag v*
|
- [x] `.gitea/workflows/release.yaml` : build multi-arch + release Gitea sur tag v*
|
||||||
|
|
||||||
## Phase 6 — Extensions
|
## Phase 6 — Extensions ✅
|
||||||
|
|
||||||
- [ ] MQTT
|
- [x] SSE (Server-Sent Events) : GET /api/v1/stream — events `metrics` et `network` en temps réel
|
||||||
- [ ] WebSocket / SSE
|
- [x] Historique métriques : GET /api/v1/history/{agent_id}?hours=N, rétention 7 jours
|
||||||
- [ ] Historique + agrégation + retention
|
- [x] Prometheus : GET /metrics — format text/plain, compatible Grafana/Prometheus
|
||||||
- [ ] Prometheus / InfluxDB
|
- [x] MQTT agent-metric : publication realtime/medium/events (rumqttc, QoS 0/1)
|
||||||
- [ ] Home Assistant / Grafana
|
- [x] MQTT agent-scan-network : publication résumé scan réseau
|
||||||
- [ ] Support multi-dashboard
|
- [x] Topics MQTT : sentinelmesh/<hostname>/metrics/realtime|medium et network/scan
|
||||||
- [ ] PostgreSQL
|
- [ ] Home Assistant integration via MQTT discovery — futur
|
||||||
|
- [ ] PostgreSQL — futur (SQLite suffisant en homelab)
|
||||||
|
- [ ] WebSocket bidirectionnel — futur
|
||||||
|
- [ ] InfluxDB / Grafana direct — Prometheus couvre le besoin via scraping
|
||||||
|
|||||||
@@ -13,3 +13,15 @@ intervals:
|
|||||||
|
|
||||||
api:
|
api:
|
||||||
listen: "0.0.0.0:9101"
|
listen: "0.0.0.0:9101"
|
||||||
|
|
||||||
|
# MQTT optionnel (Home Assistant, Node-RED, Grafana…)
|
||||||
|
mqtt:
|
||||||
|
enabled: false
|
||||||
|
broker: "localhost"
|
||||||
|
port: 1883
|
||||||
|
topic_prefix: "sentinelmesh"
|
||||||
|
client_id: "" # auto : sentinelmesh-metric-<hostname>
|
||||||
|
# Topics publiés :
|
||||||
|
# sentinelmesh/<hostname>/metrics/realtime (JSON, chaque seconde)
|
||||||
|
# sentinelmesh/<hostname>/metrics/medium (JSON, toutes les 30min)
|
||||||
|
# sentinelmesh/<hostname>/events (JSON, boot/shutdown…)
|
||||||
|
|||||||
@@ -16,3 +16,4 @@ reqwest = { version = "0.12", features = ["json"] }
|
|||||||
chrono = { version = "0.4", features = ["serde"] }
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
ipnetwork = "0.20"
|
ipnetwork = "0.20"
|
||||||
tokio-util = "0.7"
|
tokio-util = "0.7"
|
||||||
|
rumqttc = "0.24"
|
||||||
|
|||||||
@@ -29,3 +29,13 @@ scan:
|
|||||||
|
|
||||||
api:
|
api:
|
||||||
listen: "0.0.0.0:9100" # API locale de l'agent
|
listen: "0.0.0.0:9100" # API locale de l'agent
|
||||||
|
|
||||||
|
# MQTT optionnel
|
||||||
|
mqtt:
|
||||||
|
enabled: false
|
||||||
|
broker: "localhost"
|
||||||
|
port: 1883
|
||||||
|
topic_prefix: "sentinelmesh"
|
||||||
|
client_id: "" # auto : sentinelmesh-scan-<hostname>
|
||||||
|
# Topics publiés :
|
||||||
|
# sentinelmesh/<hostname>/network/scan (résumé scan complet)
|
||||||
|
|||||||
@@ -6,6 +6,17 @@ pub struct Config {
|
|||||||
pub agent: AgentConfig,
|
pub agent: AgentConfig,
|
||||||
pub scan: ScanConfig,
|
pub scan: ScanConfig,
|
||||||
pub api: ApiConfig,
|
pub api: ApiConfig,
|
||||||
|
#[serde(default)]
|
||||||
|
pub mqtt: Option<MqttConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
|
pub struct MqttConfig {
|
||||||
|
pub enabled: bool,
|
||||||
|
pub broker: String,
|
||||||
|
pub port: u16,
|
||||||
|
pub topic_prefix: String,
|
||||||
|
pub client_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use tracing_subscriber::EnvFilter;
|
|||||||
mod api;
|
mod api;
|
||||||
mod backend;
|
mod backend;
|
||||||
mod config;
|
mod config;
|
||||||
|
mod mqtt;
|
||||||
mod oui;
|
mod oui;
|
||||||
mod scanner;
|
mod scanner;
|
||||||
|
|
||||||
@@ -18,39 +19,52 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
let config_path = std::env::args().nth(1).unwrap_or_else(|| "config.yaml".into());
|
let config_path = std::env::args().nth(1).unwrap_or_else(|| "config.yaml".into());
|
||||||
let cfg = config::Config::load(&config_path)?;
|
let cfg = config::Config::load(&config_path)?;
|
||||||
info!("Configuration chargée depuis {config_path}");
|
|
||||||
info!("Agent ID : {} | Subnets : {:?}", cfg.agent.id, cfg.scan.subnets);
|
info!("Agent ID : {} | Subnets : {:?}", cfg.agent.id, cfg.scan.subnets);
|
||||||
|
|
||||||
let client = backend::BackendClient::new(&cfg);
|
let client = backend::BackendClient::new(&cfg);
|
||||||
|
|
||||||
// Enregistrement initial
|
|
||||||
if let Err(e) = client.register(&cfg).await {
|
if let Err(e) = client.register(&cfg).await {
|
||||||
error!("Impossible de joindre le backend : {e}");
|
error!("Impossible de joindre le backend : {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
// État partagé entre l'API locale et la boucle de scan
|
// Connexion MQTT optionnelle
|
||||||
|
let mqtt_pub: Option<mqtt::MqttPublisher> = match &cfg.mqtt {
|
||||||
|
Some(m) if m.enabled => {
|
||||||
|
match mqtt::MqttPublisher::connect(&m.broker, m.port, &m.client_id, &m.topic_prefix, &cfg.agent.hostname).await {
|
||||||
|
Ok(p) => Some(p),
|
||||||
|
Err(e) => { error!("MQTT connexion échouée : {e}"); None }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
let shared: api::SharedState = Arc::new(RwLock::new(Vec::new()));
|
let shared: api::SharedState = Arc::new(RwLock::new(Vec::new()));
|
||||||
|
|
||||||
// API locale (tâche de fond)
|
|
||||||
let api_state = shared.clone();
|
let api_state = shared.clone();
|
||||||
let api_listen = cfg.api.listen.clone();
|
let api_listen = cfg.api.listen.clone();
|
||||||
tokio::spawn(async move { api::serve(api_listen, api_state).await });
|
tokio::spawn(async move { api::serve(api_listen, api_state).await });
|
||||||
|
|
||||||
// Boucle de scan principale
|
|
||||||
let interval = time::Duration::from_secs(cfg.scan.interval_seconds);
|
let interval = time::Duration::from_secs(cfg.scan.interval_seconds);
|
||||||
loop {
|
loop {
|
||||||
info!("Démarrage du scan réseau…");
|
info!("Démarrage du scan réseau…");
|
||||||
match scanner::scan_all(&cfg.scan).await {
|
match scanner::scan_all(&cfg.scan).await {
|
||||||
Ok(devices) => {
|
Ok(devices) => {
|
||||||
info!("{} équipements découverts", devices.len());
|
info!("{} équipements découverts", devices.len());
|
||||||
|
|
||||||
// Mise à jour de l'état partagé
|
|
||||||
*shared.write().await = devices.clone();
|
*shared.write().await = devices.clone();
|
||||||
|
|
||||||
// Push vers le backend
|
|
||||||
if let Err(e) = client.push_devices(&cfg.agent.id, &devices).await {
|
if let Err(e) = client.push_devices(&cfg.agent.id, &devices).await {
|
||||||
error!("Erreur push backend : {e}");
|
error!("Erreur push backend : {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publication MQTT : résumé du scan + équipements online individuels
|
||||||
|
if let Some(p) = &mqtt_pub {
|
||||||
|
let online: Vec<_> = devices.iter().filter(|d| d.state == "online").collect();
|
||||||
|
p.publish_scan(&serde_json::json!({
|
||||||
|
"total": devices.len(),
|
||||||
|
"online": online.len(),
|
||||||
|
"devices": online,
|
||||||
|
})).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => error!("Erreur de scan : {e}"),
|
Err(e) => error!("Erreur de scan : {e}"),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,51 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use rumqttc::{AsyncClient, MqttOptions, QoS};
|
||||||
|
use serde_json::Value;
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
pub struct MqttPublisher {
|
||||||
|
client: AsyncClient,
|
||||||
|
topic_prefix: String,
|
||||||
|
hostname: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MqttPublisher {
|
||||||
|
pub async fn connect(broker: &str, port: u16, client_id: &str, topic_prefix: &str, hostname: &str) -> Result<Self> {
|
||||||
|
let id = if client_id.is_empty() {
|
||||||
|
format!("sentinelmesh-scan-{hostname}")
|
||||||
|
} else {
|
||||||
|
client_id.to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut opts = MqttOptions::new(id, broker, port);
|
||||||
|
opts.set_keep_alive(std::time::Duration::from_secs(30));
|
||||||
|
|
||||||
|
let (client, mut event_loop) = AsyncClient::new(opts, 32);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
if let Err(e) = event_loop.poll().await {
|
||||||
|
warn!("MQTT boucle : {e}");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
info!("MQTT connecté à {broker}:{port}");
|
||||||
|
Ok(Self { client, topic_prefix: topic_prefix.to_string(), hostname: hostname.to_string() })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn publish_scan(&self, payload: &Value) {
|
||||||
|
let topic = format!("{}/{}/network/scan", self.topic_prefix, self.hostname);
|
||||||
|
let json = payload.to_string();
|
||||||
|
if let Err(e) = self.client.publish(&topic, QoS::AtMostOnce, false, json).await {
|
||||||
|
warn!("MQTT publish {topic} : {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub async fn publish_device_online(&self, payload: &Value) {
|
||||||
|
let topic = format!("{}/{}/network/online", self.topic_prefix, self.hostname);
|
||||||
|
let _ = self.client.publish(&topic, QoS::AtLeastOnce, false, payload.to_string()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user