diff --git a/.gitignore b/.gitignore new file mode 100755 index 0000000..a547bf3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# Logs +logs +*.log +npm-debug.log* +yarn-debug.log* +yarn-error.log* +pnpm-debug.log* +lerna-debug.log* + +node_modules +dist +dist-ssr +*.local + +# Editor directories and files +.vscode/* +!.vscode/extensions.json +.idea +.DS_Store +*.suo +*.ntvs* +*.njsproj +*.sln +*.sw? diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a6b5009 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,29 @@ +# Build frontend +FROM node:20-alpine AS frontend +WORKDIR /app/frontend +COPY frontend/package*.json ./ +RUN if [ -f package-lock.json ]; then npm ci; else npm install; fi +COPY frontend ./ +RUN npm run build + +# Build backend +FROM golang:1.23-alpine AS backend +WORKDIR /app/backend +RUN apk add --no-cache git +COPY backend/go.mod ./ +COPY backend/go.sum ./ +RUN go mod download +COPY backend ./ +RUN CGO_ENABLED=0 go build -o /app/bin/server ./cmd/server + +# Final image +FROM alpine:3.20 +RUN adduser -D -H -u 10001 app && apk add --no-cache ca-certificates \ + && mkdir -p /data /app \ + && chown -R app:app /data /app +WORKDIR /app +COPY --from=backend /app/bin/server /app/server +COPY --from=frontend /app/frontend/dist /app/frontend/dist +EXPOSE 8088 +ENV PORT=8088 +CMD ["/app/server"] diff --git a/README.md b/README.md old mode 100644 new mode 100755 index ab9736e..68911c1 --- a/README.md +++ b/README.md @@ -1,2 +1,47 @@ -# mqtt_explorer +# MQTT Web Explorer - Monokai Pro + +Interface web moderne pour l'exploration de brokers MQTT avec backend persistant en Go. + +## Caractéristiques +- **Backend Go**: Connexion continue même quand l'onglet est fermé. +- **SQLite**: Historique complet des messages stocké localement. +- **UI Monokai**: Thème sombre haute fidélité pour les développeurs. +- **Explorateur CLI**: Arbre hiérarchique intelligent avec filtres regex. +- **Payload Intelligent**: Détection automatique de JSON et Images Base64. +- **Responsive**: Compatible Desktop, Tablette et Mobile. + +## Lancement rapide + +### Via Docker (Recommandé) +```bash +docker compose up -d --build +``` +L'application sera disponible sur `http://localhost:8088`. + +### Développement local (Backend) +```bash +cd backend +go run ./cmd/server +``` + +### Développement local (Frontend) +```bash +cd frontend +npm install +npm run dev +``` + +## Structure du projet +- `backend/`: Serveur Go (Gin + Paho MQTT + SQLite). +- `frontend/`: Client React (Tailwind + Lucide + Vite). +- `doc/`: Analyses détaillées et documentation technique. +- `docker-compose.yml`: Orchestration des services (Mosquitto + App). + +## Configuration +Les variables d'environnement suivantes peuvent être configurées : +- `MQTT_BROKER`: URL du broker (default: tcp://broker.hivemq.com:1883) +- `SQLITE_DB`: Chemin de la DB (default: ./data/mqtt.db) +- `TTL_DAYS`: Durée de conservation des messages (default: 7) +- `MQTT_SUBSCRIBE`: Topic de souscription par défaut (default: #) +- `MQTT_QOS`: QoS par défaut (default: 0) diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go new file mode 100644 index 0000000..9ddcb5b --- /dev/null +++ b/backend/cmd/server/main.go @@ -0,0 +1,243 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/google/uuid" + + "mqtt_explorer/internal/api" + "mqtt_explorer/internal/config" + "mqtt_explorer/internal/filters" + "mqtt_explorer/internal/mqtt" + "mqtt_explorer/internal/settings" + "mqtt_explorer/internal/storage" + "mqtt_explorer/internal/sysinfo" + "mqtt_explorer/internal/topics" + "mqtt_explorer/internal/ws" +) + +func main() { + cfg := config.Load() + + store, err := storage.Open(cfg.SQLitePath) + if err != nil { + panic(err) + } + + tree := topics.NewTree() + hub := ws.NewHub() + filterStore := filters.NewStore() + sysStore := sysinfo.NewStore() + settingsStore := settings.NewStore(cfg.SettingsFile) + defaultSettings := settings.Settings{ + Theme: "dark-monokai", + RepoURL: "https://gitea.maison43.duckdns.org/gilles/mqtt_explorer", + TTLDays: cfg.TTLDays, + MaxPayloadBytes: 1024 * 100, + AutoPurgePayloads: false, + AutoPurgePayloadBytes: 1024 * 250, + AutoExpandDepth: 2, + ImageDetection: true, + HighlightMs: 300, + MQTTProfiles: []settings.MQTTProfile{ + { + ID: "default", + Name: "Broker local", + Host: "10.0.0.3", + Port: 1883, + Username: "", + Password: "", + IsDefault: true, + }, + }, + ActiveProfileID: "default", + ApplyViewFilter: true, + ExpandTreeOnStart: false, + TopicFilters: []settings.TopicFilter{}, + UIFontSize: 13, + TopicFontSize: 12, + PayloadFontSize: 12, + StatsRefreshMs: 5000, + ResizeHandlePx: 8, + } + loadedSettings, err := settingsStore.Load(defaultSettings) + if err != nil { + log.Printf("settings load error: %v", err) + } else { + filterStore.Update(toFilterRules(loadedSettings.TopicFilters)) + } + settingsRuntime := settings.NewRuntime(loadedSettings) + + mqttMgr, err := mqtt.NewManager( + cfg.MQTTBroker, + cfg.MQTTUsername, + cfg.MQTTPassword, + cfg.MQTTClientID, + func(topic string, payload []byte, qos byte, retained bool) { + if cfg.MQTTDebug { + preview := string(payload) + if len(preview) > 256 { + preview = preview[:256] + "..." + } + log.Printf("mqtt message topic=%s qos=%d retained=%t size=%d payload=%q", topic, qos, retained, len(payload), preview) + } + + if strings.HasPrefix(topic, "$SYS/") { + sysStore.Update(topic, string(payload)) + return + } + + msg := topics.Message{ + ID: uuid.NewString(), + Topic: topic, + Payload: string(payload), + QOS: qos, + Retained: retained, + Timestamp: time.Now().UTC(), + Size: len(payload), + } + + tree.AddMessage(msg) + if rule, ok := filterStore.Match(topic); !ok || rule.Save { + currentSettings := settingsRuntime.Get() + skipStore := currentSettings.AutoPurgePayloads && currentSettings.AutoPurgePayloadBytes > 0 && len(payload) >= currentSettings.AutoPurgePayloadBytes + if !skipStore { + _ = store.InsertMessage(storage.Message{ + ID: msg.ID, + Topic: msg.Topic, + Payload: msg.Payload, + QOS: msg.QOS, + Retained: msg.Retained, + Timestamp: msg.Timestamp, + Size: msg.Size, + }) + } + } + + hub.Broadcast(ws.Event{Type: "message", Data: msg}) + }, + cfg.MQTTSubscribe, + cfg.MQTTQOS, + cfg.SysSubscribe, + ) + if err != nil { + panic(err) + } + + go startTTLJob(store, settingsRuntime) + go startOversizeJob(store, settingsRuntime) + go startDBSizeJob(store, 400*1024*1024) + go startStatsJob(store, hub) + + staticDir := resolveStaticDir() + server := api.NewServer(store, tree, hub, mqttMgr, filterStore, settingsStore, settingsRuntime, defaultSettings, sysStore, staticDir) + + go func() { + addr := fmt.Sprintf(":%s", cfg.HTTPPort) + if err := server.Run(addr); err != nil { + panic(err) + } + }() + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + + mqttMgr.Disconnect() +} + +func toFilterRules(filtersIn []settings.TopicFilter) []filters.Rule { + out := make([]filters.Rule, 0, len(filtersIn)) + for _, entry := range filtersIn { + out = append(out, filters.Rule{ + Topic: entry.Topic, + Save: entry.Save, + View: entry.View, + }) + } + return out +} + +func resolveStaticDir() string { + candidates := []string{ + "./frontend/dist", + "./dist", + } + + for _, path := range candidates { + if info, err := os.Stat(path); err == nil && info.IsDir() { + abs, err := filepath.Abs(path) + if err == nil { + return abs + } + return path + } + } + + return "" +} + +func startTTLJob(store *storage.Store, settingsRuntime *settings.Runtime) { + ticker := time.NewTicker(10 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + ttlDays := settingsRuntime.Get().TTLDays + if ttlDays <= 0 { + continue + } + cutoff := time.Now().AddDate(0, 0, -ttlDays) + _, _ = store.PurgeBefore(cutoff) + } +} + +func startOversizeJob(store *storage.Store, settingsRuntime *settings.Runtime) { + ticker := time.NewTicker(10 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + settingsSnapshot := settingsRuntime.Get() + if !settingsSnapshot.AutoPurgePayloads || settingsSnapshot.AutoPurgePayloadBytes <= 0 { + continue + } + _, _ = store.PurgeOversize(settingsSnapshot.AutoPurgePayloadBytes) + } +} + +func startDBSizeJob(store *storage.Store, maxBytes int64) { + if maxBytes <= 0 { + return + } + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + stats, err := store.Stats() + if err != nil || stats.Bytes <= maxBytes { + continue + } + if deleted, err := store.DeleteOldestFraction(0.25); err == nil && deleted > 0 { + _ = store.Compact() + } + } +} + +func startStatsJob(store *storage.Store, hub *ws.Hub) { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + for range ticker.C { + stats, err := store.Stats() + if err != nil { + continue + } + hub.Broadcast(ws.Event{Type: "stats", Data: stats}) + } +} diff --git a/backend/go.mod b/backend/go.mod new file mode 100755 index 0000000..305c678 --- /dev/null +++ b/backend/go.mod @@ -0,0 +1,52 @@ +module mqtt_explorer + +go 1.23.0 + +toolchain go1.24.4 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/gin-gonic/gin v1.10.0 + github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 + modernc.org/sqlite v1.32.0 +) + +require ( + github.com/bytedance/sonic v1.12.8 // indirect + github.com/bytedance/sonic/loader v0.2.2 // indirect + github.com/cloudwego/base64x v0.1.5 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/gabriel-vasile/mimetype v1.4.8 // indirect + github.com/gin-contrib/sse v1.0.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.25.0 // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/pelletier/go-toml/v2 v2.2.3 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect + golang.org/x/arch v0.14.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/net v0.37.0 // indirect + golang.org/x/sync v0.12.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect + modernc.org/libc v1.55.3 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.8.0 // indirect + modernc.org/strutil v1.2.0 // indirect + modernc.org/token v1.1.0 // indirect +) diff --git a/backend/go.sum b/backend/go.sum new file mode 100644 index 0000000..e95b179 --- /dev/null +++ b/backend/go.sum @@ -0,0 +1,134 @@ +github.com/bytedance/sonic v1.12.8 h1:4xYRVRlXIgvSZ4e8iVTlMF5szgpXd4AfvuWgA8I8lgs= +github.com/bytedance/sonic v1.12.8/go.mod h1:uVvFidNmlt9+wa31S1urfwwthTWteBgG0hWuoKAXTx8= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/bytedance/sonic/loader v0.2.2 h1:jxAJuN9fOot/cyz5Q6dUuMJF5OqQ6+5GfA8FjjQ0R4o= +github.com/bytedance/sonic/loader v0.2.2/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= +github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= +github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= +github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E= +github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0= +github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= +github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.25.0 h1:5Dh7cjvzR7BRZadnsVOzPhWsrwUr0nmsZJxEAnFLNO8= +github.com/go-playground/validator/v10 v10.25.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= +github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= +github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +golang.org/x/arch v0.14.0 h1:z9JUEZWr8x4rR0OU6c4/4t6E6jOZ8/QBS2bBYBm4tx4= +golang.org/x/arch v0.14.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= +golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ= +modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ= +modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y= +modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s= +modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= +modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= +modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw= +modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U= +modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= +modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= +modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= +modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= +modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc= +modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= +modernc.org/sqlite v1.32.0 h1:6BM4uGza7bWypsw4fdLRsLxut6bHe4c58VeqjRgST8s= +modernc.org/sqlite v1.32.0/go.mod h1:UqoylwmTb9F+IqXERT8bW9zzOWN8qwAIcLdzeBZs4hA= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= diff --git a/backend/internal/api/server.go b/backend/internal/api/server.go new file mode 100644 index 0000000..9f9e7fe --- /dev/null +++ b/backend/internal/api/server.go @@ -0,0 +1,292 @@ +package api + +import ( + "log" + "net" + "net/http" + "net/url" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + + "mqtt_explorer/internal/filters" + "mqtt_explorer/internal/metrics" + "mqtt_explorer/internal/mqtt" + "mqtt_explorer/internal/settings" + "mqtt_explorer/internal/storage" + "mqtt_explorer/internal/sysinfo" + "mqtt_explorer/internal/topics" + "mqtt_explorer/internal/ws" +) + +type Server struct { + router *gin.Engine + store *storage.Store + tree *topics.Tree + hub *ws.Hub + mqttMgr *mqtt.Manager + metrics *metrics.Collector + filters *filters.Store + settings *settings.Store + settingsRuntime *settings.Runtime + defaults settings.Settings + sysinfo *sysinfo.Store +} + +type PublishRequest struct { + Topic string `json:"topic"` + Payload string `json:"payload"` + QOS byte `json:"qos"` + Retained bool `json:"retained"` +} + +type TestConnectionRequest struct { + Broker string `json:"broker"` +} + +func NewServer(store *storage.Store, tree *topics.Tree, hub *ws.Hub, mqttMgr *mqtt.Manager, filterStore *filters.Store, settingsStore *settings.Store, settingsRuntime *settings.Runtime, defaults settings.Settings, sysStore *sysinfo.Store, staticDir string) *Server { + router := gin.Default() + + s := &Server{ + router: router, + store: store, + tree: tree, + hub: hub, + mqttMgr: mqttMgr, + metrics: &metrics.Collector{}, + filters: filterStore, + settings: settingsStore, + settingsRuntime: settingsRuntime, + defaults: defaults, + sysinfo: sysStore, + } + s.registerRoutes() + if staticDir != "" { + router.Static("/assets", staticDir+"/assets") + router.Static("/themes", staticDir+"/themes") + router.Static("/favicon", staticDir+"/favicon") + router.StaticFile("/site.webmanifest", staticDir+"/site.webmanifest") + router.StaticFile("/favicon.ico", staticDir+"/favicon.ico") + router.NoRoute(func(c *gin.Context) { + c.File(staticDir + "/index.html") + }) + } + return s +} + +func (s *Server) registerRoutes() { + s.router.GET("/api/health", s.health) + s.router.GET("/api/stats", s.stats) + s.router.GET("/api/metrics", s.metricsHandler) + s.router.GET("/api/filters", s.getFilters) + s.router.POST("/api/filters", s.updateFilters) + s.router.GET("/api/settings", s.getSettings) + s.router.POST("/api/settings", s.updateSettings) + s.router.GET("/api/sysinfo", s.getSysinfo) + s.router.POST("/api/test-connection", s.testConnection) + s.router.GET("/api/topics", s.getTopics) + s.router.GET("/api/topic/:topic/history", s.getHistory) + s.router.POST("/api/topic/:topic/clear-history", s.clearHistory) + s.router.POST("/api/history/clear", s.clearAllHistory) + s.router.POST("/api/publish", s.publish) + s.router.GET("/ws/events", s.wsEvents) +} + +func (s *Server) Run(addr string) error { + return s.router.Run(addr) +} + +func (s *Server) health(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"status": "ok"}) +} + +func (s *Server) stats(c *gin.Context) { + stats, err := s.store.Stats() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, stats) +} + +func (s *Server) metricsHandler(c *gin.Context) { + stats, err := s.store.Stats() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + usage := s.metrics.Snapshot() + c.JSON(http.StatusOK, gin.H{ + "cpuPercent": usage.CPUPercent, + "memBytes": usage.MemBytes, + "memLimit": usage.MemLimit, + "dbBytes": stats.Bytes, + "dbSize": stats.Size, + }) +} + +func (s *Server) getFilters(c *gin.Context) { + c.JSON(http.StatusOK, s.filters.Snapshot()) +} + +func (s *Server) updateFilters(c *gin.Context) { + var rules []filters.Rule + if err := c.ShouldBindJSON(&rules); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "payload invalide"}) + return + } + s.filters.Update(rules) + c.JSON(http.StatusOK, gin.H{"status": "ok"}) +} + +func (s *Server) getSettings(c *gin.Context) { + current, err := s.settings.Load(s.defaults) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, current) +} + +func (s *Server) updateSettings(c *gin.Context) { + var next settings.Settings + if err := c.ShouldBindJSON(&next); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "payload invalide"}) + return + } + if err := s.settings.Save(next); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + s.filters.Update(toFilterRules(next.TopicFilters)) + if s.settingsRuntime != nil { + s.settingsRuntime.Update(next) + } + c.JSON(http.StatusOK, gin.H{"status": "ok"}) +} + +func (s *Server) getSysinfo(c *gin.Context) { + c.JSON(http.StatusOK, s.sysinfo.Snapshot()) +} + +func toFilterRules(filtersIn []settings.TopicFilter) []filters.Rule { + out := make([]filters.Rule, 0, len(filtersIn)) + for _, entry := range filtersIn { + out = append(out, filters.Rule{ + Topic: entry.Topic, + Save: entry.Save, + View: entry.View, + }) + } + return out +} + +func (s *Server) testConnection(c *gin.Context) { + var req TestConnectionRequest + if err := c.ShouldBindJSON(&req); err != nil || req.Broker == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "broker manquant"}) + return + } + + parsed, err := url.Parse(req.Broker) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "broker invalide"}) + return + } + + host := parsed.Hostname() + port := parsed.Port() + if port == "" { + port = "1883" + } + + addr := net.JoinHostPort(host, port) + start := time.Now() + conn, err := net.DialTimeout("tcp", addr, 3*time.Second) + if err != nil { + c.JSON(http.StatusOK, gin.H{"ok": false, "error": err.Error()}) + return + } + _ = conn.Close() + + c.JSON(http.StatusOK, gin.H{ + "ok": true, + "latency": time.Since(start).Milliseconds(), + "endpoint": addr, + }) +} + +func (s *Server) getTopics(c *gin.Context) { + c.JSON(http.StatusOK, s.tree.Snapshot()) +} + +func (s *Server) getHistory(c *gin.Context) { + topic := c.Param("topic") + limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50")) + from := c.DefaultQuery("from", "") + to := c.DefaultQuery("to", "") + + messages, err := s.store.GetHistory(topic, limit, from, to) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, messages) +} + +func (s *Server) clearHistory(c *gin.Context) { + topic := c.Param("topic") + deleted, err := s.store.ClearTopicHistory(topic) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"deleted": deleted}) +} + +func (s *Server) clearAllHistory(c *gin.Context) { + deleted, err := s.store.ClearAllHistory() + if err != nil { + log.Printf("clear db error: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + log.Printf("clear db ok: deleted=%d", deleted) + c.JSON(http.StatusOK, gin.H{"deleted": deleted}) +} + +func (s *Server) publish(c *gin.Context) { + var req PublishRequest + if err := c.ShouldBindJSON(&req); err != nil || req.Topic == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "requete invalide"}) + return + } + + if err := s.mqttMgr.Publish(req.Topic, []byte(req.Payload), req.QOS, req.Retained); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "ok"}) +} + +func (s *Server) wsEvents(c *gin.Context) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + return + } + s.hub.Add(conn) + + for { + if _, _, err := conn.ReadMessage(); err != nil { + s.hub.Remove(conn) + _ = conn.Close() + break + } + } +} diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go new file mode 100644 index 0000000..998c7ea --- /dev/null +++ b/backend/internal/config/config.go @@ -0,0 +1,76 @@ +package config + +import ( + "os" + "strconv" + "strings" +) + +type Config struct { + HTTPPort string + MQTTBroker string + MQTTUsername string + MQTTPassword string + MQTTClientID string + MQTTSubscribe string + MQTTQOS byte + SQLitePath string + TTLDays int + MQTTDebug bool + SettingsFile string + SysSubscribe string +} + +func Load() Config { + cfg := Config{ + HTTPPort: getEnv("PORT", "8088"), + MQTTBroker: getEnv("MQTT_BROKER", "tcp://broker.hivemq.com:1883"), + MQTTUsername: getEnv("MQTT_USERNAME", ""), + MQTTPassword: getEnv("MQTT_PASSWORD", ""), + MQTTClientID: getEnv("MQTT_CLIENT_ID", "mqtt-web-explorer"), + MQTTSubscribe: getEnv("MQTT_SUBSCRIBE", "#"), + SQLitePath: getEnv("SQLITE_DB", "./data/mqtt.db"), + TTLDays: getEnvInt("TTL_DAYS", 7), + MQTTQOS: byte(getEnvInt("MQTT_QOS", 0)), + MQTTDebug: getEnvBool("MQTT_DEBUG", false), + SettingsFile: getEnv("SETTINGS_FILE", "/data/settings.yml"), + SysSubscribe: getEnv("MQTT_SYS_SUBSCRIBE", "$SYS/#"), + } + + return cfg +} + +func getEnv(key, fallback string) string { + val := os.Getenv(key) + if val == "" { + return fallback + } + return val +} + +func getEnvInt(key string, fallback int) int { + val := os.Getenv(key) + if val == "" { + return fallback + } + parsed, err := strconv.Atoi(val) + if err != nil { + return fallback + } + return parsed +} + +func getEnvBool(key string, fallback bool) bool { + val := os.Getenv(key) + if val == "" { + return fallback + } + switch strings.ToLower(val) { + case "1", "true", "yes", "on": + return true + case "0", "false", "no", "off": + return false + default: + return fallback + } +} diff --git a/backend/internal/filters/filters.go b/backend/internal/filters/filters.go new file mode 100644 index 0000000..6565637 --- /dev/null +++ b/backend/internal/filters/filters.go @@ -0,0 +1,68 @@ +package filters + +import ( + "strings" + "sync" +) + +type Rule struct { + Topic string `json:"topic"` + Save bool `json:"save"` + View bool `json:"view"` +} + +type Store struct { + mu sync.RWMutex + rules []Rule +} + +func NewStore() *Store { + return &Store{rules: []Rule{}} +} + +func (s *Store) Update(rules []Rule) { + clean := make([]Rule, 0, len(rules)) + for _, rule := range rules { + if strings.TrimSpace(rule.Topic) == "" { + continue + } + clean = append(clean, rule) + } + s.mu.Lock() + defer s.mu.Unlock() + s.rules = clean +} + +func (s *Store) Snapshot() []Rule { + s.mu.RLock() + defer s.mu.RUnlock() + out := make([]Rule, len(s.rules)) + copy(out, s.rules) + return out +} + +func (s *Store) Match(topic string) (Rule, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + for _, rule := range s.rules { + if matchTopic(rule.Topic, topic) { + return rule, true + } + } + return Rule{}, false +} + +func matchTopic(rule, topic string) bool { + rule = strings.TrimSpace(rule) + if rule == "" { + return false + } + if rule == "#" { + return true + } + if strings.HasSuffix(rule, "/#") { + prefix := strings.TrimSuffix(rule, "/#") + return strings.HasPrefix(topic, prefix) + } + return rule == topic +} diff --git a/backend/internal/metrics/metrics.go b/backend/internal/metrics/metrics.go new file mode 100644 index 0000000..faa047b --- /dev/null +++ b/backend/internal/metrics/metrics.go @@ -0,0 +1,207 @@ +package metrics + +import ( + "bufio" + "fmt" + "os" + "runtime" + "strconv" + "strings" + "sync" + "time" +) + +type Snapshot struct { + CPUPercent float64 `json:"cpuPercent"` + MemBytes int64 `json:"memBytes"` + MemLimit int64 `json:"memLimit"` +} + +type Collector struct { + mu sync.Mutex + lastCPUUseUse uint64 + lastTime time.Time +} + +func (c *Collector) Snapshot() Snapshot { + c.mu.Lock() + defer c.mu.Unlock() + + usage, okCPU := readCPUUsage() + mem, limit, okMem := readMemoryUsage() + + cpuPercent := 0.0 + if okCPU { + now := time.Now() + if !c.lastTime.IsZero() { + deltaUsage := float64(usage - c.lastCPUUseUse) + deltaTime := now.Sub(c.lastTime).Seconds() + cpuQuota := cpuQuotaCount() + if cpuQuota <= 0 { + cpuQuota = float64(runtime.NumCPU()) + } + if deltaTime > 0 && cpuQuota > 0 { + cpuPercent = (deltaUsage / (deltaTime * 1_000_000)) * (100 / cpuQuota) + } + } + c.lastCPUUseUse = usage + c.lastTime = now + } + + if !okMem || mem <= 0 { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + mem = int64(memStats.Alloc) + limit = int64(memStats.Sys) + } + + return Snapshot{ + CPUPercent: cpuPercent, + MemBytes: mem, + MemLimit: limit, + } +} + +func readCPUUsage() (uint64, bool) { + file, err := os.Open("/sys/fs/cgroup/cpu.stat") + if err != nil { + return readCPUUsageV1() + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + parts := strings.Fields(line) + if len(parts) == 2 && parts[0] == "usage_usec" { + val, err := strconv.ParseUint(parts[1], 10, 64) + if err == nil { + return val, true + } + } + } + return readCPUUsageV1() +} + +func readMemoryUsage() (int64, int64, bool) { + current, err := os.ReadFile("/sys/fs/cgroup/memory.current") + if err != nil { + return readMemoryUsageV1() + } + limit, err := os.ReadFile("/sys/fs/cgroup/memory.max") + if err != nil { + return readMemoryUsageV1() + } + memBytes, err := strconv.ParseInt(strings.TrimSpace(string(current)), 10, 64) + if err != nil { + return 0, 0, false + } + limitRaw := strings.TrimSpace(string(limit)) + if limitRaw == "max" { + return memBytes, 0, true + } + limitBytes, err := strconv.ParseInt(limitRaw, 10, 64) + if err != nil { + return memBytes, 0, true + } + return memBytes, limitBytes, true +} + +func readCPUUsageV1() (uint64, bool) { + data, err := os.ReadFile("/sys/fs/cgroup/cpuacct/cpuacct.usage") + if err != nil { + return readProcCPUUsage() + } + val, err := strconv.ParseUint(strings.TrimSpace(string(data)), 10, 64) + if err != nil { + return readProcCPUUsage() + } + return val / 1000, true +} + +func readMemoryUsageV1() (int64, int64, bool) { + current, err := os.ReadFile("/sys/fs/cgroup/memory/memory.usage_in_bytes") + if err != nil { + return 0, 0, false + } + limit, err := os.ReadFile("/sys/fs/cgroup/memory/memory.limit_in_bytes") + if err != nil { + return 0, 0, false + } + memBytes, err := strconv.ParseInt(strings.TrimSpace(string(current)), 10, 64) + if err != nil { + return 0, 0, false + } + limitBytes, err := strconv.ParseInt(strings.TrimSpace(string(limit)), 10, 64) + if err != nil { + return memBytes, 0, true + } + return memBytes, limitBytes, true +} + +func readProcCPUUsage() (uint64, bool) { + data, err := os.ReadFile("/proc/self/stat") + if err != nil { + return 0, false + } + contents := string(data) + closeIdx := strings.LastIndex(contents, ")") + if closeIdx == -1 || closeIdx+2 >= len(contents) { + return 0, false + } + fields := strings.Fields(contents[closeIdx+2:]) + if len(fields) < 15 { + return 0, false + } + utime, err := strconv.ParseUint(fields[11], 10, 64) + if err != nil { + return 0, false + } + stime, err := strconv.ParseUint(fields[12], 10, 64) + if err != nil { + return 0, false + } + clkTck := uint64(100) + totalTicks := utime + stime + usec := (totalTicks * 1_000_000) / clkTck + return usec, true +} + +func cpuQuotaCount() float64 { + data, err := os.ReadFile("/sys/fs/cgroup/cpu.max") + if err != nil { + return float64(runtime.NumCPU()) + } + parts := strings.Fields(string(data)) + if len(parts) != 2 { + return 0 + } + if parts[0] == "max" { + return float64(runtime.NumCPU()) + } + quota, err := strconv.ParseFloat(parts[0], 64) + if err != nil { + return 0 + } + period, err := strconv.ParseFloat(parts[1], 64) + if err != nil || period == 0 { + return 0 + } + return quota / period +} + +func FormatBytes(size int64) string { + if size < 1024 { + return fmt.Sprintf("%d B", size) + } + kb := float64(size) / 1024 + if kb < 1024 { + return fmt.Sprintf("%.1f KB", kb) + } + mb := kb / 1024 + if mb < 1024 { + return fmt.Sprintf("%.1f MB", mb) + } + gb := mb / 1024 + return fmt.Sprintf("%.2f GB", gb) +} diff --git a/backend/internal/mqtt/manager.go b/backend/internal/mqtt/manager.go new file mode 100644 index 0000000..df9b0cd --- /dev/null +++ b/backend/internal/mqtt/manager.go @@ -0,0 +1,82 @@ +package mqtt + +import ( + "fmt" + "time" + + paho "github.com/eclipse/paho.mqtt.golang" +) + +type MessageHandler func(topic string, payload []byte, qos byte, retained bool) + +type Manager struct { + client paho.Client + handler MessageHandler + subTopic string + subQOS byte + sysTopic string +} + +func NewManager(broker, username, password, clientID string, handler MessageHandler, subTopic string, subQOS byte, sysTopic string) (*Manager, error) { + opts := paho.NewClientOptions() + opts.AddBroker(broker) + opts.SetClientID(clientID) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(5 * time.Second) + if username != "" { + opts.SetUsername(username) + opts.SetPassword(password) + } + + mgr := &Manager{ + handler: handler, + subTopic: subTopic, + subQOS: subQOS, + sysTopic: sysTopic, + } + + opts.SetOnConnectHandler(func(c paho.Client) { + if token := c.Subscribe(mgr.subTopic, mgr.subQOS, mgr.onMessage); token.Wait() && token.Error() != nil { + fmt.Printf("erreur subscribe MQTT: %v\n", token.Error()) + } + if mgr.sysTopic != "" { + if token := c.Subscribe(mgr.sysTopic, mgr.subQOS, mgr.onMessage); token.Wait() && token.Error() != nil { + fmt.Printf("erreur subscribe SYS MQTT: %v\n", token.Error()) + } + } + }) + + opts.SetConnectionLostHandler(func(_ paho.Client, err error) { + fmt.Printf("connexion MQTT perdue: %v\n", err) + }) + + mgr.client = paho.NewClient(opts) + if token := mgr.client.Connect(); token.Wait() && token.Error() != nil { + return nil, fmt.Errorf("connexion MQTT: %w", token.Error()) + } + + return mgr, nil +} + +func (m *Manager) onMessage(_ paho.Client, msg paho.Message) { + if m.handler == nil { + return + } + m.handler(msg.Topic(), msg.Payload(), msg.Qos(), msg.Retained()) +} + +func (m *Manager) Publish(topic string, payload []byte, qos byte, retained bool) error { + if m.client == nil { + return fmt.Errorf("client MQTT indisponible") + } + token := m.client.Publish(topic, qos, retained, payload) + token.Wait() + return token.Error() +} + +func (m *Manager) Disconnect() { + if m.client != nil && m.client.IsConnected() { + m.client.Disconnect(250) + } +} diff --git a/backend/internal/settings/runtime.go b/backend/internal/settings/runtime.go new file mode 100644 index 0000000..7b196fc --- /dev/null +++ b/backend/internal/settings/runtime.go @@ -0,0 +1,24 @@ +package settings + +import "sync" + +type Runtime struct { + mu sync.RWMutex + current Settings +} + +func NewRuntime(initial Settings) *Runtime { + return &Runtime{current: initial} +} + +func (r *Runtime) Get() Settings { + r.mu.RLock() + defer r.mu.RUnlock() + return r.current +} + +func (r *Runtime) Update(next Settings) { + r.mu.Lock() + r.current = next + r.mu.Unlock() +} diff --git a/backend/internal/settings/store.go b/backend/internal/settings/store.go new file mode 100644 index 0000000..ebe5cd9 --- /dev/null +++ b/backend/internal/settings/store.go @@ -0,0 +1,148 @@ +package settings + +import ( + "fmt" + "os" + "path/filepath" + "sync" + + "gopkg.in/yaml.v3" +) + +type Store struct { + mu sync.RWMutex + path string +} + +type Settings struct { + Theme string `yaml:"theme" json:"theme"` + RepoURL string `yaml:"repoUrl" json:"repoUrl"` + TTLDays int `yaml:"ttlDays" json:"ttlDays"` + MaxPayloadBytes int `yaml:"maxPayloadBytes" json:"maxPayloadBytes"` + AutoPurgePayloads bool `yaml:"autoPurgePayloads" json:"autoPurgePayloads"` + AutoPurgePayloadBytes int `yaml:"autoPurgePayloadBytes" json:"autoPurgePayloadBytes"` + AutoExpandDepth int `yaml:"autoExpandDepth" json:"autoExpandDepth"` + ImageDetection bool `yaml:"imageDetectionEnabled" json:"imageDetectionEnabled"` + HighlightMs int `yaml:"highlightMs" json:"highlightMs"` + MQTTProfiles []MQTTProfile `yaml:"mqttProfiles" json:"mqttProfiles"` + ActiveProfileID string `yaml:"activeProfileId" json:"activeProfileId"` + ApplyViewFilter bool `yaml:"applyViewFilter" json:"applyViewFilter"` + ExpandTreeOnStart bool `yaml:"expandTreeOnStart" json:"expandTreeOnStart"` + TopicFilters []TopicFilter `yaml:"topicFilters" json:"topicFilters"` + UIFontSize int `yaml:"uiFontSize" json:"uiFontSize"` + TopicFontSize int `yaml:"topicFontSize" json:"topicFontSize"` + PayloadFontSize int `yaml:"payloadFontSize" json:"payloadFontSize"` + StatsRefreshMs int `yaml:"statsRefreshMs" json:"statsRefreshMs"` + ResizeHandlePx int `yaml:"resizeHandlePx" json:"resizeHandlePx"` +} + +type MQTTProfile struct { + ID string `yaml:"id" json:"id"` + Name string `yaml:"name" json:"name"` + Host string `yaml:"host" json:"host"` + Port int `yaml:"port" json:"port"` + Username string `yaml:"username" json:"username"` + Password string `yaml:"password" json:"password"` + IsDefault bool `yaml:"isDefault" json:"isDefault"` +} + +type TopicFilter struct { + Topic string `yaml:"topic" json:"topic"` + Save bool `yaml:"save" json:"save"` + View bool `yaml:"view" json:"view"` +} + +func NewStore(path string) *Store { + return &Store{path: path} +} + +func (s *Store) Load(defaults Settings) (Settings, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + content, err := os.ReadFile(s.path) + if err != nil { + if os.IsNotExist(err) { + return defaults, nil + } + return defaults, fmt.Errorf("lecture settings: %w", err) + } + + var loaded Settings + if err := yaml.Unmarshal(content, &loaded); err != nil { + return defaults, fmt.Errorf("yaml settings: %w", err) + } + + return merge(defaults, loaded), nil +} + +func (s *Store) Save(next Settings) error { + s.mu.Lock() + defer s.mu.Unlock() + + dir := filepath.Dir(s.path) + if err := os.MkdirAll(dir, 0o755); err != nil { + return fmt.Errorf("mkdir settings: %w", err) + } + + content, err := yaml.Marshal(next) + if err != nil { + return fmt.Errorf("marshal settings: %w", err) + } + + return os.WriteFile(s.path, content, 0o644) +} + +func merge(base, override Settings) Settings { + merged := base + if override.Theme != "" { + merged.Theme = override.Theme + } + if override.RepoURL != "" { + merged.RepoURL = override.RepoURL + } + if override.TTLDays != 0 { + merged.TTLDays = override.TTLDays + } + if override.MaxPayloadBytes != 0 { + merged.MaxPayloadBytes = override.MaxPayloadBytes + } + merged.AutoPurgePayloads = override.AutoPurgePayloads + if override.AutoPurgePayloadBytes != 0 { + merged.AutoPurgePayloadBytes = override.AutoPurgePayloadBytes + } + if override.AutoExpandDepth != 0 { + merged.AutoExpandDepth = override.AutoExpandDepth + } + merged.ImageDetection = override.ImageDetection + if override.HighlightMs != 0 { + merged.HighlightMs = override.HighlightMs + } + if len(override.MQTTProfiles) > 0 { + merged.MQTTProfiles = override.MQTTProfiles + } + if override.ActiveProfileID != "" { + merged.ActiveProfileID = override.ActiveProfileID + } + merged.ApplyViewFilter = override.ApplyViewFilter + merged.ExpandTreeOnStart = override.ExpandTreeOnStart + if len(override.TopicFilters) > 0 { + merged.TopicFilters = override.TopicFilters + } + if override.UIFontSize != 0 { + merged.UIFontSize = override.UIFontSize + } + if override.TopicFontSize != 0 { + merged.TopicFontSize = override.TopicFontSize + } + if override.PayloadFontSize != 0 { + merged.PayloadFontSize = override.PayloadFontSize + } + if override.StatsRefreshMs != 0 { + merged.StatsRefreshMs = override.StatsRefreshMs + } + if override.ResizeHandlePx != 0 { + merged.ResizeHandlePx = override.ResizeHandlePx + } + return merged +} diff --git a/backend/internal/storage/storage.go b/backend/internal/storage/storage.go new file mode 100644 index 0000000..2c28525 --- /dev/null +++ b/backend/internal/storage/storage.go @@ -0,0 +1,326 @@ +package storage + +import ( + "database/sql" + "fmt" + "math" + "os" + "path/filepath" + "strings" + "time" + + _ "modernc.org/sqlite" +) + +type Store struct { + db *sql.DB + path string +} + +type Message struct { + ID string + Topic string + Payload string + QOS byte + Retained bool + Timestamp time.Time + Size int +} + +type Stats struct { + Count int64 `json:"count"` + Size string `json:"size"` + Bytes int64 `json:"bytes"` +} + +func Open(path string) (*Store, error) { + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("creation dossier sqlite: %w", err) + } + + db, err := sql.Open("sqlite", path) + if err != nil { + return nil, fmt.Errorf("ouverture sqlite: %w", err) + } + + if _, err := db.Exec("PRAGMA journal_mode=WAL;"); err != nil { + return nil, fmt.Errorf("pragma WAL: %w", err) + } + if _, err := db.Exec("PRAGMA synchronous=NORMAL;"); err != nil { + return nil, fmt.Errorf("pragma synchronous: %w", err) + } + + store := &Store{db: db, path: path} + if err := store.initSchema(); err != nil { + if isCorruptErr(err) { + _ = db.Close() + return recoverCorruptDB(path) + } + return nil, err + } + if err := store.integrityCheck(); err != nil { + if isCorruptErr(err) { + _ = db.Close() + return recoverCorruptDB(path) + } + return nil, err + } + return store, nil +} + +func (s *Store) initSchema() error { + schema := ` +CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + topic TEXT NOT NULL, + payload TEXT NOT NULL, + qos INTEGER NOT NULL, + retained INTEGER NOT NULL, + ts TEXT NOT NULL, + size INTEGER NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_messages_topic_ts ON messages(topic, ts); +CREATE INDEX IF NOT EXISTS idx_messages_ts ON messages(ts); +` + _, err := s.db.Exec(schema) + if err != nil { + return fmt.Errorf("schema sqlite: %w", err) + } + return nil +} + +func (s *Store) integrityCheck() error { + var result string + if err := s.db.QueryRow("PRAGMA integrity_check;").Scan(&result); err != nil { + return fmt.Errorf("integrity check: %w", err) + } + if strings.ToLower(result) != "ok" { + return fmt.Errorf("integrity check: %s", result) + } + return nil +} + +func recoverCorruptDB(path string) (*Store, error) { + suffix := time.Now().UTC().Format("20060102-150405") + backupWithSuffix(path, suffix) + backupWithSuffix(path+"-wal", suffix) + backupWithSuffix(path+"-shm", suffix) + + db, err := sql.Open("sqlite", path) + if err != nil { + return nil, fmt.Errorf("ouverture sqlite apres recovery: %w", err) + } + if _, err := db.Exec("PRAGMA journal_mode=WAL;"); err != nil { + _ = db.Close() + return nil, fmt.Errorf("pragma WAL: %w", err) + } + if _, err := db.Exec("PRAGMA synchronous=NORMAL;"); err != nil { + _ = db.Close() + return nil, fmt.Errorf("pragma synchronous: %w", err) + } + store := &Store{db: db, path: path} + if err := store.initSchema(); err != nil { + _ = db.Close() + return nil, err + } + return store, nil +} + +func backupWithSuffix(path, suffix string) { + if _, err := os.Stat(path); err != nil { + return + } + _ = os.Rename(path, fmt.Sprintf("%s.corrupt-%s", path, suffix)) +} + +func isCorruptErr(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "database disk image is malformed") || + strings.Contains(msg, "malformed") || + strings.Contains(msg, "btreeinitpage") || + strings.Contains(msg, "error code 11") +} + +func (s *Store) InsertMessage(msg Message) error { + _, err := s.db.Exec( + `INSERT INTO messages(id, topic, payload, qos, retained, ts, size) VALUES(?,?,?,?,?,?,?)`, + msg.ID, + msg.Topic, + msg.Payload, + int(msg.QOS), + boolToInt(msg.Retained), + msg.Timestamp.UTC().Format(time.RFC3339Nano), + msg.Size, + ) + if err != nil { + return fmt.Errorf("insert message: %w", err) + } + return nil +} + +func (s *Store) GetHistory(topic string, limit int, from, to string) ([]Message, error) { + args := []any{topic} + query := "SELECT id, topic, payload, qos, retained, ts, size FROM messages WHERE topic = ?" + if from != "" { + query += " AND ts >= ?" + args = append(args, from) + } + if to != "" { + query += " AND ts <= ?" + args = append(args, to) + } + query += " ORDER BY ts DESC" + if limit > 0 { + query += " LIMIT ?" + args = append(args, limit) + } + + rows, err := s.db.Query(query, args...) + if err != nil { + return nil, fmt.Errorf("lecture historique: %w", err) + } + defer rows.Close() + + var out []Message + for rows.Next() { + var msg Message + var retained int + var ts string + if err := rows.Scan(&msg.ID, &msg.Topic, &msg.Payload, &msg.QOS, &retained, &ts, &msg.Size); err != nil { + return nil, fmt.Errorf("scan historique: %w", err) + } + msg.Retained = retained == 1 + parsed, _ := time.Parse(time.RFC3339Nano, ts) + msg.Timestamp = parsed + out = append(out, msg) + } + + return out, nil +} + +func (s *Store) ClearTopicHistory(topic string) (int64, error) { + res, err := s.db.Exec("DELETE FROM messages WHERE topic = ?", topic) + if err != nil { + return 0, fmt.Errorf("suppression topic: %w", err) + } + affected, _ := res.RowsAffected() + return affected, nil +} + +func (s *Store) ClearAllHistory() (int64, error) { + res, err := s.db.Exec("DELETE FROM messages") + if err != nil { + return 0, fmt.Errorf("suppression db: %w", err) + } + affected, _ := res.RowsAffected() + if err := s.Compact(); err != nil { + return affected, err + } + return affected, nil +} + +func (s *Store) PurgeOversize(maxBytes int) (int64, error) { + res, err := s.db.Exec("DELETE FROM messages WHERE size >= ?", maxBytes) + if err != nil { + return 0, fmt.Errorf("purge oversize: %w", err) + } + affected, _ := res.RowsAffected() + return affected, nil +} + +func (s *Store) DeleteOldestFraction(fraction float64) (int64, error) { + if fraction <= 0 { + return 0, nil + } + var count int64 + if err := s.db.QueryRow("SELECT COUNT(*) FROM messages").Scan(&count); err != nil { + return 0, fmt.Errorf("count messages: %w", err) + } + limit := int64(math.Round(float64(count) * fraction)) + if limit <= 0 { + return 0, nil + } + res, err := s.db.Exec("DELETE FROM messages WHERE id IN (SELECT id FROM messages ORDER BY ts ASC LIMIT ?)", limit) + if err != nil { + return 0, fmt.Errorf("purge oldest fraction: %w", err) + } + affected, _ := res.RowsAffected() + return affected, nil +} + +func (s *Store) PurgeBefore(cutoff time.Time) (int64, error) { + res, err := s.db.Exec("DELETE FROM messages WHERE ts < ?", cutoff.UTC().Format(time.RFC3339Nano)) + if err != nil { + return 0, fmt.Errorf("purge ttl: %w", err) + } + affected, _ := res.RowsAffected() + return affected, nil +} + +func (s *Store) Stats() (Stats, error) { + var count int64 + if err := s.db.QueryRow("SELECT COUNT(*) FROM messages").Scan(&count); err != nil { + return Stats{}, fmt.Errorf("stats count: %w", err) + } + sizeBytes := s.totalSize() + return Stats{ + Count: count, + Size: formatBytes(sizeBytes), + Bytes: sizeBytes, + }, nil +} + +func (s *Store) totalSize() int64 { + if s.path == "" { + return 0 + } + total := fileSize(s.path) + total += fileSize(s.path + "-wal") + total += fileSize(s.path + "-shm") + return total +} + +func (s *Store) Compact() error { + if _, err := s.db.Exec("PRAGMA wal_checkpoint(TRUNCATE);"); err != nil { + return fmt.Errorf("checkpoint wal: %w", err) + } + if _, err := s.db.Exec("VACUUM;"); err != nil { + return fmt.Errorf("vacuum: %w", err) + } + return nil +} + +func fileSize(path string) int64 { + info, err := os.Stat(path) + if err != nil { + return 0 + } + return info.Size() +} + +func formatBytes(size int64) string { + if size < 1024 { + return fmt.Sprintf("%d B", size) + } + kb := float64(size) / 1024 + if kb < 1024 { + return fmt.Sprintf("%.1f KB", kb) + } + mb := kb / 1024 + if mb < 1024 { + return fmt.Sprintf("%.1f MB", mb) + } + gb := mb / 1024 + return fmt.Sprintf("%.2f GB", gb) +} + +func boolToInt(val bool) int { + if val { + return 1 + } + return 0 +} diff --git a/backend/internal/sysinfo/sysinfo.go b/backend/internal/sysinfo/sysinfo.go new file mode 100644 index 0000000..7d950fb --- /dev/null +++ b/backend/internal/sysinfo/sysinfo.go @@ -0,0 +1,55 @@ +package sysinfo + +import ( + "strings" + "sync" +) + +type Snapshot struct { + Version string `json:"version"` + Clients string `json:"clients"` + MsgReceived string `json:"msgReceived"` + MsgSent string `json:"msgSent"` + MsgStored string `json:"msgStored"` + Subscriptions string `json:"subscriptions"` +} + +type Store struct { + mu sync.RWMutex + data Snapshot +} + +func NewStore() *Store { + return &Store{} +} + +func (s *Store) Update(topic string, payload string) { + trimmed := strings.TrimSpace(payload) + if trimmed == "" { + return + } + key := strings.TrimPrefix(topic, "$SYS/broker/") + + s.mu.Lock() + defer s.mu.Unlock() + switch key { + case "version": + s.data.Version = trimmed + case "clients/connected": + s.data.Clients = trimmed + case "messages/received": + s.data.MsgReceived = trimmed + case "messages/sent": + s.data.MsgSent = trimmed + case "messages/stored": + s.data.MsgStored = trimmed + case "subscriptions/count": + s.data.Subscriptions = trimmed + } +} + +func (s *Store) Snapshot() Snapshot { + s.mu.RLock() + defer s.mu.RUnlock() + return s.data +} diff --git a/backend/internal/topics/tree.go b/backend/internal/topics/tree.go new file mode 100644 index 0000000..fe0d5fb --- /dev/null +++ b/backend/internal/topics/tree.go @@ -0,0 +1,109 @@ +package topics + +import ( + "sort" + "strings" + "sync" + "time" +) + +type Message struct { + ID string `json:"id"` + Topic string `json:"topic"` + Payload string `json:"payload"` + QOS byte `json:"qos"` + Retained bool `json:"retained"` + Timestamp time.Time `json:"timestamp"` + Size int `json:"size"` +} + +type Tree struct { + mu sync.RWMutex + root *node +} + +type node struct { + name string + fullName string + children map[string]*node + messageCount int + lastMessage *Message +} + +type NodeSnapshot struct { + Name string `json:"name"` + FullName string `json:"fullName"` + MessageCount int `json:"messageCount"` + LastMessage *Message `json:"lastMessage,omitempty"` + Children []NodeSnapshot `json:"children"` +} + +func NewTree() *Tree { + return &Tree{ + root: &node{ + name: "root", + fullName: "", + children: make(map[string]*node), + }, + } +} + +func (t *Tree) AddMessage(msg Message) { + t.mu.Lock() + defer t.mu.Unlock() + + parts := strings.Split(msg.Topic, "/") + current := t.root + for i, part := range parts { + child, ok := current.children[part] + if !ok { + full := strings.Join(parts[:i+1], "/") + child = &node{ + name: part, + fullName: full, + children: make(map[string]*node), + } + current.children[part] = child + } + current = child + current.messageCount++ + if i == len(parts)-1 { + copyMsg := msg + current.lastMessage = ©Msg + } + } +} + +func (t *Tree) Snapshot() NodeSnapshot { + t.mu.RLock() + defer t.mu.RUnlock() + + return snapshotNode(t.root) +} + +func snapshotNode(n *node) NodeSnapshot { + children := make([]NodeSnapshot, 0, len(n.children)) + keys := make([]string, 0, len(n.children)) + for key := range n.children { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + child := n.children[key] + children = append(children, snapshotNode(child)) + } + + var last *Message + if n.lastMessage != nil { + copyMsg := *n.lastMessage + last = ©Msg + } + + return NodeSnapshot{ + Name: n.name, + FullName: n.fullName, + MessageCount: n.messageCount, + LastMessage: last, + Children: children, + } +} diff --git a/backend/internal/ws/hub.go b/backend/internal/ws/hub.go new file mode 100644 index 0000000..f16b9a1 --- /dev/null +++ b/backend/internal/ws/hub.go @@ -0,0 +1,47 @@ +package ws + +import ( + "encoding/json" + "sync" + + "github.com/gorilla/websocket" +) + +type Hub struct { + mu sync.RWMutex + clients map[*websocket.Conn]struct{} +} + +type Event struct { + Type string `json:"type"` + Data interface{} `json:"data"` +} + +func NewHub() *Hub { + return &Hub{clients: make(map[*websocket.Conn]struct{})} +} + +func (h *Hub) Add(conn *websocket.Conn) { + h.mu.Lock() + defer h.mu.Unlock() + h.clients[conn] = struct{}{} +} + +func (h *Hub) Remove(conn *websocket.Conn) { + h.mu.Lock() + defer h.mu.Unlock() + delete(h.clients, conn) +} + +func (h *Hub) Broadcast(event Event) { + payload, err := json.Marshal(event) + if err != nil { + return + } + + h.mu.RLock() + defer h.mu.RUnlock() + for conn := range h.clients { + _ = conn.WriteMessage(websocket.TextMessage, payload) + } +} diff --git a/data/mqtt.db b/data/mqtt.db new file mode 100644 index 0000000..014ee08 Binary files /dev/null and b/data/mqtt.db differ diff --git a/data/mqtt.db-shm b/data/mqtt.db-shm new file mode 100644 index 0000000..52dfeae Binary files /dev/null and b/data/mqtt.db-shm differ diff --git a/data/mqtt.db-wal b/data/mqtt.db-wal new file mode 100644 index 0000000..43b2f38 Binary files /dev/null and b/data/mqtt.db-wal differ diff --git a/data/settings.yml b/data/settings.yml new file mode 100644 index 0000000..9c2ffd7 --- /dev/null +++ b/data/settings.yml @@ -0,0 +1,36 @@ +theme: dark-monokai +repoUrl: https://gitea.maison43.duckdns.org/gilles/mqtt_explorer +ttlDays: 7 +maxPayloadBytes: 102400 +autoPurgePayloads: false +autoPurgePayloadBytes: 256000 +autoExpandDepth: 2 +imageDetectionEnabled: false +highlightMs: 1000 +mqttProfiles: + - id: default + name: 10.0.0.3 + host: 10.0.0.3 + port: 1883 + username: "" + password: "" + isDefault: true + - id: profile-1766565436272 + name: test + host: 10.0.0.50 + port: 1883 + username: "" + password: "" + isDefault: false +activeProfileId: default +applyViewFilter: false +expandTreeOnStart: true +topicFilters: + - topic: $SYS + save: true + view: true +uiFontSize: 12 +topicFontSize: 12 +payloadFontSize: 12 +statsRefreshMs: 1000 +resizeHandlePx: 4 diff --git a/doc/analyse/architecture.md b/doc/analyse/architecture.md new file mode 100755 index 0000000..08791a6 --- /dev/null +++ b/doc/analyse/architecture.md @@ -0,0 +1,56 @@ + +# Analyse Technique - MQTT Web Explorer + +## 1. Vision Globale +L'objectif est de créer un outil de monitoring MQTT robuste capable de maintenir la continuité des données (via un backend Go persistant) tout en offrant une expérience utilisateur fluide et élégante (Inspiré de MQTT Explorer, thème Monokai). + +## 2. Dessin ASCII de l'Interface (Desktop) + +```text ++----------------------------------------------------------------------------------+ +| [M] MQTT EXPLORER | Profil: Production v1 | Status: [ CONNECTED ] [Settings]| ++----------------------------------------------------------------------------------+ +| RECHERCHE: [ sensors/.* ] [X] Retained [X] JSON [X] Active | TOPIC SELECTIONNÉ| ++----------------------------------------------------------------------------------+ +| EXPLORATEUR (Arbre CLI) | DÉTAILS DU MESSAGE (sensors/kitchen/temp) | +| ▾ sensors | ------------------------------------------ | +| ▾ kitchen | [ RAW ] [ PRETTY ] [ DIFF ] [ GRAPH ] | +| ▸ light | ------------------------------------------ | +| ▸ temp (3 msgs/s) | { | +| ▾ livingroom | "value": 21.4, | +| ▸ door | "unit": "°C", | +| ▾ devices | "timestamp": "2023-10-27T10:00:00Z" | +| ▸ bridge-01 | } | +| | ------------------------------------------ | +| | IMAGES DÉTECTÉES: [ Image Preview ] | ++-----------------------------------+- ------------------------------------------ | +| [ GRAPH DOCK: | \/\/\/\/\ | Value: 21.4 | Max: 22.0 | Min: 20.1 ] | ++----------------------------------------------------------------------------------+ +``` + +## 3. Architecture Logicielle + +### Backend (Go + Gin) +- **MQTT Manager**: Service singleton gérant les connexions Paho MQTT. +- **Persistence Layer**: SQLite pour le stockage des messages (Index sur Topic+Timestamp). +- **TTL Manager**: Routine de purge automatique (Nettoyage des messages > N jours). +- **WebSocket Hub**: Streaming en temps réel vers le frontend React. +- **API REST**: Configuration des profils, tests réseau, historique paginé. + +### Frontend (React + TypeScript) +- **State Management**: Context API / Hooks pour l'état de l'arbre. +- **Rendering**: Composant d'arbre récursif optimisé "CLI style". +- **Theming**: Système de variables CSS (Monokai Pro). +- **Charts**: Recharts pour la visualisation des séries temporelles. + +## 4. Plan de Développement +1. **Phase 1: Backend Core** (Go, SQLite, MQTT loop). +2. **Phase 2: WebSocket & API** (Hub de messages, endpoints profils). +3. **Phase 3: UI Layout** (Shell responsive, Thème Monokai). +4. **Phase 4: Explorateur de Topics** (Arbre récursif, filtres). +5. **Phase 5: Payload & Historique** (Pretty JSON, Diff, SQLite Fetch). +6. **Phase 6: Fonctionnalités Avancées** (Images, Graphes, Dockerisation). + +## 5. Icônes & Ressources +- **Icônes**: Utilisation de Lucide-React (Antenna, Share2, Search, Settings, Trash, Image, LineChart). +- **Polices**: JetBrains Mono pour les données techniques, Inter pour l'UI. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100755 index 0000000..7e633a2 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,31 @@ + +services: + mqtt-explorer: + build: . + container_name: mqtt-web-explorer + ports: + - "8088:8088" + environment: + - MQTT_BROKER=tcp://10.0.0.3:1883 + - PORT=8088 + - SQLITE_DB=/data/mqtt.db + - MQTT_DEBUG=false + - SETTINGS_FILE=/data/settings.yml + - MQTT_SYS_SUBSCRIBE=$$SYS/# + depends_on: + - mosquitto + volumes: + - ./data:/data + restart: unless-stopped + + mosquitto: + image: eclipse-mosquitto:latest + container_name: mosquitto-test + ports: + - "1883:1883" + - "9001:9001" + volumes: + - ./docker/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + restart: unless-stopped + +volumes: {} diff --git a/docker/mosquitto/mosquitto.conf b/docker/mosquitto/mosquitto.conf new file mode 100644 index 0000000..82d68fd --- /dev/null +++ b/docker/mosquitto/mosquitto.conf @@ -0,0 +1,8 @@ +listener 1883 +allow_anonymous true +persistence true +persistence_location /mosquitto/data/ +sys_interval 10 + +listener 9001 +protocol websockets diff --git a/frontend/index.html b/frontend/index.html new file mode 100755 index 0000000..da39e29 --- /dev/null +++ b/frontend/index.html @@ -0,0 +1,21 @@ + + + +
+ + +
+ {JSON.stringify(sanitized?.sanitized ?? parsedJSON, null, 2)}
+
+ ) : (
+
+ {payloadPreview}
+
+ )}
+
+ {payloadPreview}
+
+
+ {previousMessage ? previousMessage.payload : 'Aucun message précédent.'}
+
+
+ {payloadPreview}
+
+ {JSON.stringify(parsedPreviousJSON, null, 2)}
+ {JSON.stringify(sanitized?.sanitized ?? parsedJSON, null, 2)}
+