feat(backend): historique, SSE et Prometheus — Phase 6
- Migration 002 : table metrics_history (index agent_id/timestamp)
- AppState : broadcast channels (metrics_tx, network_tx) + FromRef<SqlitePool>
- GET /api/v1/history/{agent_id}?hours=N : série temporelle, rétention 7 jours
- POST /api/v1/metrics : écrit dans history + broadcast SSE
- GET /api/v1/stream : Server-Sent Events (events metrics + network)
- GET /metrics : endpoint Prometheus text/plain (CPU, RAM, disque, temp, réseau)
- Bump version API 0.2.0
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -15,3 +15,5 @@ sqlx = { version = "0.8", features = ["sqlite", "runtime-tokio", "ma
|
||||
tower-http = { version = "0.6", features = ["cors", "trace"] }
|
||||
utoipa = { version = "5", features = ["axum_extras"] }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
tokio-stream = { version = "0.1", features = ["sync"] }
|
||||
futures = "0.3"
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
-- Historique des métriques (série temporelle)
|
||||
-- Alimenté à chaque push agent, rétention 7 jours (purge automatique)
|
||||
CREATE TABLE IF NOT EXISTS metrics_history (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
agent_id TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
cpu_percent REAL,
|
||||
ram_percent REAL,
|
||||
disk_percent REAL,
|
||||
temperature_c REAL,
|
||||
net_rx_bps INTEGER,
|
||||
net_tx_bps INTEGER
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_mh_agent_ts ON metrics_history(agent_id, timestamp DESC);
|
||||
+14
-16
@@ -8,24 +8,18 @@ mod db;
|
||||
mod error;
|
||||
mod models;
|
||||
mod routes;
|
||||
mod state;
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
paths(
|
||||
routes::health::health,
|
||||
routes::agents::list,
|
||||
routes::agents::register,
|
||||
routes::agents::get_one,
|
||||
routes::network::list,
|
||||
routes::network::push,
|
||||
routes::network::get_one,
|
||||
routes::metrics::list,
|
||||
routes::metrics::push,
|
||||
routes::metrics::get_one,
|
||||
routes::events::list,
|
||||
routes::events::push,
|
||||
routes::widgets::network,
|
||||
routes::widgets::metrics,
|
||||
routes::agents::list, routes::agents::register, routes::agents::get_one,
|
||||
routes::network::list, routes::network::push, routes::network::get_one,
|
||||
routes::metrics::list, routes::metrics::push, routes::metrics::get_one,
|
||||
routes::history::get_history,
|
||||
routes::events::list, routes::events::push,
|
||||
routes::widgets::network, routes::widgets::metrics,
|
||||
),
|
||||
components(schemas(
|
||||
models::Agent, models::RegisterAgent,
|
||||
@@ -33,7 +27,8 @@ mod routes;
|
||||
models::Metric, models::PushMetrics,
|
||||
models::Event, models::PushEvent,
|
||||
)),
|
||||
info(title = "SentinelMesh API", version = "0.1.0", description = "API centrale SentinelMesh")
|
||||
info(title = "SentinelMesh API", version = "0.2.0",
|
||||
description = "API centrale SentinelMesh — historique, SSE, Prometheus")
|
||||
)]
|
||||
struct ApiDoc;
|
||||
|
||||
@@ -47,15 +42,18 @@ async fn main() -> anyhow::Result<()> {
|
||||
.unwrap_or_else(|_| "sqlite://sentinelmesh.sqlite".into());
|
||||
|
||||
let db = db::connect(&database_url).await?;
|
||||
let state = state::AppState::new(db);
|
||||
info!("Base de données connectée : {database_url}");
|
||||
|
||||
let spec = ApiDoc::openapi();
|
||||
let app = routes::api_router(db)
|
||||
let app = routes::api_router(state)
|
||||
.route("/api-docs/openapi.json", get(move || async move { Json(spec) }))
|
||||
.layer(CorsLayer::permissive());
|
||||
|
||||
let addr = std::env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0:8080".into());
|
||||
info!("SentinelMesh backend démarré sur http://{addr}");
|
||||
info!("SentinelMesh backend v{} sur http://{addr}", env!("CARGO_PKG_VERSION"));
|
||||
info!("SSE temps réel : http://{addr}/api/v1/stream");
|
||||
info!("Prometheus scrape : http://{addr}/metrics");
|
||||
info!("OpenAPI spec : http://{addr}/api-docs/openapi.json");
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await?;
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
use axum::{
|
||||
extract::{Path, Query, State},
|
||||
Json,
|
||||
};
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::SqlitePool;
|
||||
use utoipa::path as oapath;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
#[derive(Debug, Serialize, sqlx::FromRow)]
|
||||
pub struct MetricPoint {
|
||||
pub id: i64,
|
||||
pub agent_id: String,
|
||||
pub timestamp: String,
|
||||
pub cpu_percent: Option<f64>,
|
||||
pub ram_percent: Option<f64>,
|
||||
pub disk_percent: Option<f64>,
|
||||
pub temperature_c: Option<f64>,
|
||||
pub net_rx_bps: Option<i64>,
|
||||
pub net_tx_bps: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct HistoryParams {
|
||||
/// Nombre d'heures d'historique (défaut : 24, max : 168)
|
||||
pub hours: Option<u32>,
|
||||
}
|
||||
|
||||
#[oapath(get, path = "/api/v1/history/{agent_id}",
|
||||
params(
|
||||
("agent_id" = String, Path, description = "Identifiant de l'agent"),
|
||||
("hours" = Option<u32>, Query, description = "Fenêtre en heures (défaut 24, max 168)"),
|
||||
),
|
||||
responses((status = 200, description = "Série temporelle des métriques")))]
|
||||
pub async fn get_history(
|
||||
State(db): State<SqlitePool>,
|
||||
Path(agent_id): Path<String>,
|
||||
Query(params): Query<HistoryParams>,
|
||||
) -> Result<Json<Vec<MetricPoint>>> {
|
||||
let hours = params.hours.unwrap_or(24).min(168) as i64;
|
||||
let since = (Utc::now() - chrono::Duration::hours(hours)).to_rfc3339();
|
||||
|
||||
let rows = sqlx::query_as::<_, MetricPoint>(
|
||||
"SELECT * FROM metrics_history
|
||||
WHERE agent_id = ? AND timestamp > ?
|
||||
ORDER BY timestamp ASC",
|
||||
)
|
||||
.bind(&agent_id)
|
||||
.bind(&since)
|
||||
.fetch_all(&db)
|
||||
.await?;
|
||||
|
||||
Ok(Json(rows))
|
||||
}
|
||||
|
||||
/// Insère un point dans l'historique et purge les données > 7 jours
|
||||
pub async fn record_history(db: &SqlitePool, agent_id: &str, m: &crate::models::PushMetrics) {
|
||||
let now = Utc::now().to_rfc3339();
|
||||
let cutoff = (Utc::now() - chrono::Duration::days(7)).to_rfc3339();
|
||||
|
||||
// Insertion
|
||||
let _ = sqlx::query(
|
||||
"INSERT INTO metrics_history
|
||||
(agent_id, timestamp, cpu_percent, ram_percent, disk_percent, temperature_c, net_rx_bps, net_tx_bps)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(agent_id).bind(&now)
|
||||
.bind(m.cpu_percent).bind(m.ram_percent).bind(m.disk_percent)
|
||||
.bind(m.temperature_c).bind(m.net_rx_bps).bind(m.net_tx_bps)
|
||||
.execute(db)
|
||||
.await;
|
||||
|
||||
// Rétention : purge des données de plus de 7 jours (1 fois / 60 push ≈ 1 min)
|
||||
if rand_bool_1_in_60() {
|
||||
let _ = sqlx::query("DELETE FROM metrics_history WHERE timestamp < ?")
|
||||
.bind(&cutoff)
|
||||
.execute(db)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
fn rand_bool_1_in_60() -> bool {
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
let ns = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().subsec_nanos();
|
||||
ns % 60 == 0
|
||||
}
|
||||
@@ -3,7 +3,9 @@ use chrono::Utc;
|
||||
use sqlx::SqlitePool;
|
||||
use utoipa::path as oapath;
|
||||
|
||||
use crate::{error::Result, models::{Metric, PushMetrics}};
|
||||
use crate::{error::Result, models::{Metric, PushMetrics}, state::AppState};
|
||||
|
||||
use super::history;
|
||||
|
||||
#[oapath(get, path = "/api/v1/metrics",
|
||||
responses((status = 200, body = Vec<Metric>)))]
|
||||
@@ -18,10 +20,12 @@ pub async fn list(State(db): State<SqlitePool>) -> Result<Json<Vec<Metric>>> {
|
||||
request_body = PushMetrics,
|
||||
responses((status = 200, description = "Métriques enregistrées")))]
|
||||
pub async fn push(
|
||||
State(db): State<SqlitePool>,
|
||||
State(state): State<AppState>,
|
||||
Json(body): Json<PushMetrics>,
|
||||
) -> Result<Json<serde_json::Value>> {
|
||||
let now = Utc::now().to_rfc3339();
|
||||
|
||||
// Mise à jour dernière valeur
|
||||
sqlx::query(
|
||||
"INSERT INTO metrics (agent_id, timestamp, cpu_percent, ram_percent, load_avg,
|
||||
temperature_c, disk_percent, net_rx_bps, net_tx_bps)
|
||||
@@ -36,8 +40,25 @@ pub async fn push(
|
||||
.bind(body.cpu_percent).bind(body.ram_percent).bind(body.load_avg)
|
||||
.bind(body.temperature_c).bind(body.disk_percent)
|
||||
.bind(body.net_rx_bps).bind(body.net_tx_bps)
|
||||
.execute(&db)
|
||||
.execute(&state.db)
|
||||
.await?;
|
||||
|
||||
// Historique
|
||||
history::record_history(&state.db, &body.agent_id, &body).await;
|
||||
|
||||
// Broadcast SSE
|
||||
let payload = serde_json::json!({
|
||||
"agent_id": body.agent_id,
|
||||
"timestamp": now,
|
||||
"cpu_percent": body.cpu_percent,
|
||||
"ram_percent": body.ram_percent,
|
||||
"disk_percent": body.disk_percent,
|
||||
"temperature_c": body.temperature_c,
|
||||
"net_rx_bps": body.net_rx_bps,
|
||||
"net_tx_bps": body.net_tx_bps,
|
||||
});
|
||||
let _ = state.metrics_tx.send(payload.to_string());
|
||||
|
||||
Ok(Json(serde_json::json!({ "ok": true })))
|
||||
}
|
||||
|
||||
|
||||
@@ -1,24 +1,40 @@
|
||||
use axum::{routing::get, Router};
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
use crate::state::AppState;
|
||||
|
||||
pub mod agents;
|
||||
pub mod events;
|
||||
pub mod health;
|
||||
pub mod history;
|
||||
pub mod metrics;
|
||||
pub mod network;
|
||||
pub mod prometheus;
|
||||
pub mod sse;
|
||||
pub mod widgets;
|
||||
|
||||
pub fn api_router(db: SqlitePool) -> Router {
|
||||
pub fn api_router(state: AppState) -> Router {
|
||||
Router::new()
|
||||
// Santé
|
||||
.route("/api/v1/health", get(health::health))
|
||||
// Agents
|
||||
.route("/api/v1/agents", get(agents::list).post(agents::register))
|
||||
.route("/api/v1/agents/{id}", get(agents::get_one))
|
||||
// Réseau
|
||||
.route("/api/v1/network", get(network::list).post(network::push))
|
||||
.route("/api/v1/network/{ip}", get(network::get_one))
|
||||
// Métriques
|
||||
.route("/api/v1/metrics", get(metrics::list).post(metrics::push))
|
||||
.route("/api/v1/metrics/{agent_id}", get(metrics::get_one))
|
||||
// Historique
|
||||
.route("/api/v1/history/{agent_id}", get(history::get_history))
|
||||
// Événements
|
||||
.route("/api/v1/events", get(events::list).post(events::push))
|
||||
// Widgets Glance
|
||||
.route("/api/v1/widgets/network", get(widgets::network))
|
||||
.route("/api/v1/widgets/metrics", get(widgets::metrics))
|
||||
.with_state(db)
|
||||
// SSE temps réel
|
||||
.route("/api/v1/stream", get(sse::stream))
|
||||
// Prometheus
|
||||
.route("/metrics", get(prometheus::metrics))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
use axum::{extract::State, http::header, response::IntoResponse};
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
/// Exposition des métriques au format Prometheus text (scraping Grafana/Prometheus).
|
||||
///
|
||||
/// Endpoint : GET /metrics
|
||||
/// Compatible avec prometheus.yml :
|
||||
/// scrape_configs:
|
||||
/// - job_name: sentinelmesh
|
||||
/// static_configs:
|
||||
/// - targets: ['sentinelmesh:8080']
|
||||
pub async fn metrics(State(db): State<SqlitePool>) -> Result<impl IntoResponse> {
|
||||
let rows = sqlx::query_as::<_, (
|
||||
String, String,
|
||||
Option<f64>, Option<f64>, Option<f64>, Option<f64>,
|
||||
Option<i64>, Option<i64>,
|
||||
)>(
|
||||
"SELECT a.hostname, m.agent_id,
|
||||
m.cpu_percent, m.ram_percent, m.disk_percent, m.temperature_c,
|
||||
m.net_rx_bps, m.net_tx_bps
|
||||
FROM metrics m JOIN agents a ON a.id = m.agent_id",
|
||||
)
|
||||
.fetch_all(&db)
|
||||
.await?;
|
||||
|
||||
let mut out = String::with_capacity(2048);
|
||||
|
||||
macro_rules! gauge {
|
||||
($name:expr, $help:expr, $type:expr) => {
|
||||
out.push_str(&format!(
|
||||
"# HELP sentinelmesh_{} {}\n# TYPE sentinelmesh_{} {}\n",
|
||||
$name, $help, $name, $type
|
||||
));
|
||||
};
|
||||
}
|
||||
|
||||
gauge!("cpu_percent", "Utilisation CPU en pourcentage", "gauge");
|
||||
gauge!("ram_percent", "Utilisation RAM en pourcentage", "gauge");
|
||||
gauge!("disk_percent", "Utilisation disque en pourcentage", "gauge");
|
||||
gauge!("temperature_c", "Température CPU/système en Celsius", "gauge");
|
||||
gauge!("net_rx_bps", "Débit réseau entrant en octets/s", "gauge");
|
||||
gauge!("net_tx_bps", "Débit réseau sortant en octets/s", "gauge");
|
||||
|
||||
for (hostname, agent_id, cpu, ram, disk, temp, rx, tx) in &rows {
|
||||
let lbl = format!("agent=\"{agent_id}\",hostname=\"{hostname}\"");
|
||||
let push = |name: &str, val: f64| format!("sentinelmesh_{name}{{{lbl}}} {val:.2}\n");
|
||||
|
||||
if let Some(v) = cpu { out.push_str(&push("cpu_percent", *v)); }
|
||||
if let Some(v) = ram { out.push_str(&push("ram_percent", *v)); }
|
||||
if let Some(v) = disk { out.push_str(&push("disk_percent", *v)); }
|
||||
if let Some(v) = temp { out.push_str(&push("temperature_c", *v)); }
|
||||
if let Some(v) = rx { out.push_str(&push("net_rx_bps", *v as f64)); }
|
||||
if let Some(v) = tx { out.push_str(&push("net_tx_bps", *v as f64)); }
|
||||
}
|
||||
|
||||
Ok((
|
||||
[(header::CONTENT_TYPE, "text/plain; version=0.0.4; charset=utf-8")],
|
||||
out,
|
||||
))
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
use std::convert::Infallible;
|
||||
|
||||
use axum::{
|
||||
extract::State,
|
||||
response::sse::{Event, KeepAlive, Sse},
|
||||
};
|
||||
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
|
||||
|
||||
use crate::state::AppState;
|
||||
|
||||
/// Flux SSE — envoie les événements `metrics` et `network` en temps réel.
|
||||
///
|
||||
/// Utilisation Glance (widget custom-api avec cache 0s non supporté nativement),
|
||||
/// ou JavaScript natif :
|
||||
/// const es = new EventSource("/api/v1/stream");
|
||||
/// es.addEventListener("metrics", e => console.log(JSON.parse(e.data)));
|
||||
pub async fn stream(
|
||||
State(state): State<AppState>,
|
||||
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, Infallible>>> {
|
||||
let metrics_rx = BroadcastStream::new(state.metrics_tx.subscribe());
|
||||
let network_rx = BroadcastStream::new(state.network_tx.subscribe());
|
||||
|
||||
let metrics_events = metrics_rx.filter_map(|msg| {
|
||||
msg.ok().map(|data| Ok(Event::default().event("metrics").data(data)))
|
||||
});
|
||||
|
||||
let network_events = network_rx.filter_map(|msg| {
|
||||
msg.ok().map(|data| Ok(Event::default().event("network").data(data)))
|
||||
});
|
||||
|
||||
let merged = metrics_events.merge(network_events);
|
||||
|
||||
Sse::new(merged).keep_alive(KeepAlive::default())
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
use axum::extract::FromRef;
|
||||
use sqlx::SqlitePool;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
/// Émetteurs broadcast partagés entre la boucle de push et les clients SSE
|
||||
pub type BroadcastTx = broadcast::Sender<String>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub db: SqlitePool,
|
||||
pub metrics_tx: BroadcastTx, // métriques temps réel
|
||||
pub network_tx: BroadcastTx, // nouveaux équipements réseau
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn new(db: SqlitePool) -> Self {
|
||||
let (metrics_tx, _) = broadcast::channel(128);
|
||||
let (network_tx, _) = broadcast::channel(128);
|
||||
Self { db, metrics_tx, network_tx }
|
||||
}
|
||||
}
|
||||
|
||||
// Permet aux routes existantes State<SqlitePool> de fonctionner sans modification
|
||||
impl FromRef<AppState> for SqlitePool {
|
||||
fn from_ref(state: &AppState) -> Self {
|
||||
state.db.clone()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user