diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 95231c9..80955e5 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -15,3 +15,5 @@ sqlx = { version = "0.8", features = ["sqlite", "runtime-tokio", "ma tower-http = { version = "0.6", features = ["cors", "trace"] } utoipa = { version = "5", features = ["axum_extras"] } chrono = { version = "0.4", features = ["serde"] } +tokio-stream = { version = "0.1", features = ["sync"] } +futures = "0.3" diff --git a/backend/migrations/002_history.sql b/backend/migrations/002_history.sql new file mode 100644 index 0000000..195fa4d --- /dev/null +++ b/backend/migrations/002_history.sql @@ -0,0 +1,15 @@ +-- Historique des métriques (série temporelle) +-- Alimenté à chaque push agent, rétention 7 jours (purge automatique) +CREATE TABLE IF NOT EXISTS metrics_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id TEXT NOT NULL, + timestamp TEXT NOT NULL, + cpu_percent REAL, + ram_percent REAL, + disk_percent REAL, + temperature_c REAL, + net_rx_bps INTEGER, + net_tx_bps INTEGER +); + +CREATE INDEX IF NOT EXISTS idx_mh_agent_ts ON metrics_history(agent_id, timestamp DESC); diff --git a/backend/src/main.rs b/backend/src/main.rs index e8f2280..caef2d2 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -8,24 +8,18 @@ mod db; mod error; mod models; mod routes; +mod state; #[derive(OpenApi)] #[openapi( paths( routes::health::health, - routes::agents::list, - routes::agents::register, - routes::agents::get_one, - routes::network::list, - routes::network::push, - routes::network::get_one, - routes::metrics::list, - routes::metrics::push, - routes::metrics::get_one, - routes::events::list, - routes::events::push, - routes::widgets::network, - routes::widgets::metrics, + routes::agents::list, routes::agents::register, routes::agents::get_one, + routes::network::list, routes::network::push, routes::network::get_one, + routes::metrics::list, routes::metrics::push, routes::metrics::get_one, + routes::history::get_history, + routes::events::list, routes::events::push, + routes::widgets::network, routes::widgets::metrics, ), components(schemas( models::Agent, models::RegisterAgent, @@ -33,7 +27,8 @@ mod routes; models::Metric, models::PushMetrics, models::Event, models::PushEvent, )), - info(title = "SentinelMesh API", version = "0.1.0", description = "API centrale SentinelMesh") + info(title = "SentinelMesh API", version = "0.2.0", + description = "API centrale SentinelMesh — historique, SSE, Prometheus") )] struct ApiDoc; @@ -46,17 +41,20 @@ async fn main() -> anyhow::Result<()> { let database_url = std::env::var("DATABASE_URL") .unwrap_or_else(|_| "sqlite://sentinelmesh.sqlite".into()); - let db = db::connect(&database_url).await?; + let db = db::connect(&database_url).await?; + let state = state::AppState::new(db); info!("Base de données connectée : {database_url}"); let spec = ApiDoc::openapi(); - let app = routes::api_router(db) + let app = routes::api_router(state) .route("/api-docs/openapi.json", get(move || async move { Json(spec) })) .layer(CorsLayer::permissive()); let addr = std::env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0:8080".into()); - info!("SentinelMesh backend démarré sur http://{addr}"); - info!("OpenAPI spec : http://{addr}/api-docs/openapi.json"); + info!("SentinelMesh backend v{} sur http://{addr}", env!("CARGO_PKG_VERSION")); + info!("SSE temps réel : http://{addr}/api/v1/stream"); + info!("Prometheus scrape : http://{addr}/metrics"); + info!("OpenAPI spec : http://{addr}/api-docs/openapi.json"); let listener = tokio::net::TcpListener::bind(&addr).await?; axum::serve(listener, app).await?; diff --git a/backend/src/routes/history.rs b/backend/src/routes/history.rs new file mode 100644 index 0000000..5a6390d --- /dev/null +++ b/backend/src/routes/history.rs @@ -0,0 +1,88 @@ +use axum::{ + extract::{Path, Query, State}, + Json, +}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use sqlx::SqlitePool; +use utoipa::path as oapath; + +use crate::error::Result; + +#[derive(Debug, Serialize, sqlx::FromRow)] +pub struct MetricPoint { + pub id: i64, + pub agent_id: String, + pub timestamp: String, + pub cpu_percent: Option, + pub ram_percent: Option, + pub disk_percent: Option, + pub temperature_c: Option, + pub net_rx_bps: Option, + pub net_tx_bps: Option, +} + +#[derive(Deserialize)] +pub struct HistoryParams { + /// Nombre d'heures d'historique (défaut : 24, max : 168) + pub hours: Option, +} + +#[oapath(get, path = "/api/v1/history/{agent_id}", + params( + ("agent_id" = String, Path, description = "Identifiant de l'agent"), + ("hours" = Option, Query, description = "Fenêtre en heures (défaut 24, max 168)"), + ), + responses((status = 200, description = "Série temporelle des métriques")))] +pub async fn get_history( + State(db): State, + Path(agent_id): Path, + Query(params): Query, +) -> Result>> { + let hours = params.hours.unwrap_or(24).min(168) as i64; + let since = (Utc::now() - chrono::Duration::hours(hours)).to_rfc3339(); + + let rows = sqlx::query_as::<_, MetricPoint>( + "SELECT * FROM metrics_history + WHERE agent_id = ? AND timestamp > ? + ORDER BY timestamp ASC", + ) + .bind(&agent_id) + .bind(&since) + .fetch_all(&db) + .await?; + + Ok(Json(rows)) +} + +/// Insère un point dans l'historique et purge les données > 7 jours +pub async fn record_history(db: &SqlitePool, agent_id: &str, m: &crate::models::PushMetrics) { + let now = Utc::now().to_rfc3339(); + let cutoff = (Utc::now() - chrono::Duration::days(7)).to_rfc3339(); + + // Insertion + let _ = sqlx::query( + "INSERT INTO metrics_history + (agent_id, timestamp, cpu_percent, ram_percent, disk_percent, temperature_c, net_rx_bps, net_tx_bps) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(agent_id).bind(&now) + .bind(m.cpu_percent).bind(m.ram_percent).bind(m.disk_percent) + .bind(m.temperature_c).bind(m.net_rx_bps).bind(m.net_tx_bps) + .execute(db) + .await; + + // Rétention : purge des données de plus de 7 jours (1 fois / 60 push ≈ 1 min) + if rand_bool_1_in_60() { + let _ = sqlx::query("DELETE FROM metrics_history WHERE timestamp < ?") + .bind(&cutoff) + .execute(db) + .await; + } +} + +fn rand_bool_1_in_60() -> bool { + use std::time::{SystemTime, UNIX_EPOCH}; + let ns = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().subsec_nanos(); + ns % 60 == 0 +} diff --git a/backend/src/routes/metrics.rs b/backend/src/routes/metrics.rs index 4ea1319..c838562 100644 --- a/backend/src/routes/metrics.rs +++ b/backend/src/routes/metrics.rs @@ -3,7 +3,9 @@ use chrono::Utc; use sqlx::SqlitePool; use utoipa::path as oapath; -use crate::{error::Result, models::{Metric, PushMetrics}}; +use crate::{error::Result, models::{Metric, PushMetrics}, state::AppState}; + +use super::history; #[oapath(get, path = "/api/v1/metrics", responses((status = 200, body = Vec)))] @@ -18,10 +20,12 @@ pub async fn list(State(db): State) -> Result>> { request_body = PushMetrics, responses((status = 200, description = "Métriques enregistrées")))] pub async fn push( - State(db): State, + State(state): State, Json(body): Json, ) -> Result> { let now = Utc::now().to_rfc3339(); + + // Mise à jour dernière valeur sqlx::query( "INSERT INTO metrics (agent_id, timestamp, cpu_percent, ram_percent, load_avg, temperature_c, disk_percent, net_rx_bps, net_tx_bps) @@ -36,8 +40,25 @@ pub async fn push( .bind(body.cpu_percent).bind(body.ram_percent).bind(body.load_avg) .bind(body.temperature_c).bind(body.disk_percent) .bind(body.net_rx_bps).bind(body.net_tx_bps) - .execute(&db) + .execute(&state.db) .await?; + + // Historique + history::record_history(&state.db, &body.agent_id, &body).await; + + // Broadcast SSE + let payload = serde_json::json!({ + "agent_id": body.agent_id, + "timestamp": now, + "cpu_percent": body.cpu_percent, + "ram_percent": body.ram_percent, + "disk_percent": body.disk_percent, + "temperature_c": body.temperature_c, + "net_rx_bps": body.net_rx_bps, + "net_tx_bps": body.net_tx_bps, + }); + let _ = state.metrics_tx.send(payload.to_string()); + Ok(Json(serde_json::json!({ "ok": true }))) } diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs index b89195b..8647acc 100644 --- a/backend/src/routes/mod.rs +++ b/backend/src/routes/mod.rs @@ -1,24 +1,40 @@ use axum::{routing::get, Router}; -use sqlx::SqlitePool; + +use crate::state::AppState; pub mod agents; pub mod events; pub mod health; +pub mod history; pub mod metrics; pub mod network; +pub mod prometheus; +pub mod sse; pub mod widgets; -pub fn api_router(db: SqlitePool) -> Router { +pub fn api_router(state: AppState) -> Router { Router::new() + // Santé .route("/api/v1/health", get(health::health)) + // Agents .route("/api/v1/agents", get(agents::list).post(agents::register)) .route("/api/v1/agents/{id}", get(agents::get_one)) + // Réseau .route("/api/v1/network", get(network::list).post(network::push)) .route("/api/v1/network/{ip}", get(network::get_one)) + // Métriques .route("/api/v1/metrics", get(metrics::list).post(metrics::push)) .route("/api/v1/metrics/{agent_id}", get(metrics::get_one)) + // Historique + .route("/api/v1/history/{agent_id}", get(history::get_history)) + // Événements .route("/api/v1/events", get(events::list).post(events::push)) + // Widgets Glance .route("/api/v1/widgets/network", get(widgets::network)) .route("/api/v1/widgets/metrics", get(widgets::metrics)) - .with_state(db) + // SSE temps réel + .route("/api/v1/stream", get(sse::stream)) + // Prometheus + .route("/metrics", get(prometheus::metrics)) + .with_state(state) } diff --git a/backend/src/routes/prometheus.rs b/backend/src/routes/prometheus.rs new file mode 100644 index 0000000..0d9852f --- /dev/null +++ b/backend/src/routes/prometheus.rs @@ -0,0 +1,62 @@ +use axum::{extract::State, http::header, response::IntoResponse}; +use sqlx::SqlitePool; + +use crate::error::Result; + +/// Exposition des métriques au format Prometheus text (scraping Grafana/Prometheus). +/// +/// Endpoint : GET /metrics +/// Compatible avec prometheus.yml : +/// scrape_configs: +/// - job_name: sentinelmesh +/// static_configs: +/// - targets: ['sentinelmesh:8080'] +pub async fn metrics(State(db): State) -> Result { + let rows = sqlx::query_as::<_, ( + String, String, + Option, Option, Option, Option, + Option, Option, + )>( + "SELECT a.hostname, m.agent_id, + m.cpu_percent, m.ram_percent, m.disk_percent, m.temperature_c, + m.net_rx_bps, m.net_tx_bps + FROM metrics m JOIN agents a ON a.id = m.agent_id", + ) + .fetch_all(&db) + .await?; + + let mut out = String::with_capacity(2048); + + macro_rules! gauge { + ($name:expr, $help:expr, $type:expr) => { + out.push_str(&format!( + "# HELP sentinelmesh_{} {}\n# TYPE sentinelmesh_{} {}\n", + $name, $help, $name, $type + )); + }; + } + + gauge!("cpu_percent", "Utilisation CPU en pourcentage", "gauge"); + gauge!("ram_percent", "Utilisation RAM en pourcentage", "gauge"); + gauge!("disk_percent", "Utilisation disque en pourcentage", "gauge"); + gauge!("temperature_c", "Température CPU/système en Celsius", "gauge"); + gauge!("net_rx_bps", "Débit réseau entrant en octets/s", "gauge"); + gauge!("net_tx_bps", "Débit réseau sortant en octets/s", "gauge"); + + for (hostname, agent_id, cpu, ram, disk, temp, rx, tx) in &rows { + let lbl = format!("agent=\"{agent_id}\",hostname=\"{hostname}\""); + let push = |name: &str, val: f64| format!("sentinelmesh_{name}{{{lbl}}} {val:.2}\n"); + + if let Some(v) = cpu { out.push_str(&push("cpu_percent", *v)); } + if let Some(v) = ram { out.push_str(&push("ram_percent", *v)); } + if let Some(v) = disk { out.push_str(&push("disk_percent", *v)); } + if let Some(v) = temp { out.push_str(&push("temperature_c", *v)); } + if let Some(v) = rx { out.push_str(&push("net_rx_bps", *v as f64)); } + if let Some(v) = tx { out.push_str(&push("net_tx_bps", *v as f64)); } + } + + Ok(( + [(header::CONTENT_TYPE, "text/plain; version=0.0.4; charset=utf-8")], + out, + )) +} diff --git a/backend/src/routes/sse.rs b/backend/src/routes/sse.rs new file mode 100644 index 0000000..a4badcf --- /dev/null +++ b/backend/src/routes/sse.rs @@ -0,0 +1,34 @@ +use std::convert::Infallible; + +use axum::{ + extract::State, + response::sse::{Event, KeepAlive, Sse}, +}; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; + +use crate::state::AppState; + +/// Flux SSE — envoie les événements `metrics` et `network` en temps réel. +/// +/// Utilisation Glance (widget custom-api avec cache 0s non supporté nativement), +/// ou JavaScript natif : +/// const es = new EventSource("/api/v1/stream"); +/// es.addEventListener("metrics", e => console.log(JSON.parse(e.data))); +pub async fn stream( + State(state): State, +) -> Sse>> { + let metrics_rx = BroadcastStream::new(state.metrics_tx.subscribe()); + let network_rx = BroadcastStream::new(state.network_tx.subscribe()); + + let metrics_events = metrics_rx.filter_map(|msg| { + msg.ok().map(|data| Ok(Event::default().event("metrics").data(data))) + }); + + let network_events = network_rx.filter_map(|msg| { + msg.ok().map(|data| Ok(Event::default().event("network").data(data))) + }); + + let merged = metrics_events.merge(network_events); + + Sse::new(merged).keep_alive(KeepAlive::default()) +} diff --git a/backend/src/state.rs b/backend/src/state.rs new file mode 100644 index 0000000..429886b --- /dev/null +++ b/backend/src/state.rs @@ -0,0 +1,28 @@ +use axum::extract::FromRef; +use sqlx::SqlitePool; +use tokio::sync::broadcast; + +/// Émetteurs broadcast partagés entre la boucle de push et les clients SSE +pub type BroadcastTx = broadcast::Sender; + +#[derive(Clone)] +pub struct AppState { + pub db: SqlitePool, + pub metrics_tx: BroadcastTx, // métriques temps réel + pub network_tx: BroadcastTx, // nouveaux équipements réseau +} + +impl AppState { + pub fn new(db: SqlitePool) -> Self { + let (metrics_tx, _) = broadcast::channel(128); + let (network_tx, _) = broadcast::channel(128); + Self { db, metrics_tx, network_tx } + } +} + +// Permet aux routes existantes State de fonctionner sans modification +impl FromRef for SqlitePool { + fn from_ref(state: &AppState) -> Self { + state.db.clone() + } +}