package main import ( "fmt" "log" "os" "os/signal" "path/filepath" "strings" "syscall" "time" "github.com/google/uuid" "mqtt_explorer/internal/api" "mqtt_explorer/internal/config" "mqtt_explorer/internal/filters" "mqtt_explorer/internal/mqtt" "mqtt_explorer/internal/settings" "mqtt_explorer/internal/storage" "mqtt_explorer/internal/sysinfo" "mqtt_explorer/internal/topics" "mqtt_explorer/internal/ws" ) func main() { cfg := config.Load() store, err := storage.Open(cfg.SQLitePath) if err != nil { panic(err) } tree := topics.NewTree() hub := ws.NewHub() filterStore := filters.NewStore() sysStore := sysinfo.NewStore() settingsStore := settings.NewStore(cfg.SettingsFile) defaultSettings := settings.Settings{ Theme: "dark-monokai", RepoURL: "https://gitea.maison43.duckdns.org/gilles/mqtt_explorer", TTLDays: cfg.TTLDays, MaxPayloadBytes: 1024 * 100, AutoPurgePayloads: false, AutoPurgePayloadBytes: 1024 * 250, AutoExpandDepth: 2, ImageDetection: true, HighlightMs: 300, MQTTProfiles: []settings.MQTTProfile{ { ID: "default", Name: "Broker local", Host: "10.0.0.3", Port: 1883, Username: "", Password: "", IsDefault: true, }, }, ActiveProfileID: "default", ApplyViewFilter: true, ExpandTreeOnStart: false, TopicFilters: []settings.TopicFilter{}, UIFontSize: 13, TopicFontSize: 12, PayloadFontSize: 12, StatsRefreshMs: 5000, ResizeHandlePx: 8, } loadedSettings, err := settingsStore.Load(defaultSettings) if err != nil { log.Printf("settings load error: %v", err) } else { filterStore.Update(toFilterRules(loadedSettings.TopicFilters)) } settingsRuntime := settings.NewRuntime(loadedSettings) mqttMgr, err := mqtt.NewManager( cfg.MQTTBroker, cfg.MQTTUsername, cfg.MQTTPassword, cfg.MQTTClientID, func(topic string, payload []byte, qos byte, retained bool) { if cfg.MQTTDebug { preview := string(payload) if len(preview) > 256 { preview = preview[:256] + "..." } log.Printf("mqtt message topic=%s qos=%d retained=%t size=%d payload=%q", topic, qos, retained, len(payload), preview) } if strings.HasPrefix(topic, "$SYS/") { sysStore.Update(topic, string(payload)) return } msg := topics.Message{ ID: uuid.NewString(), Topic: topic, Payload: string(payload), QOS: qos, Retained: retained, Timestamp: time.Now().UTC(), Size: len(payload), } tree.AddMessage(msg) if rule, ok := filterStore.Match(topic); !ok || rule.Save { currentSettings := settingsRuntime.Get() skipStore := currentSettings.AutoPurgePayloads && currentSettings.AutoPurgePayloadBytes > 0 && len(payload) >= currentSettings.AutoPurgePayloadBytes if !skipStore { _ = store.InsertMessage(storage.Message{ ID: msg.ID, Topic: msg.Topic, Payload: msg.Payload, QOS: msg.QOS, Retained: msg.Retained, Timestamp: msg.Timestamp, Size: msg.Size, }) } } hub.Broadcast(ws.Event{Type: "message", Data: msg}) }, cfg.MQTTSubscribe, cfg.MQTTQOS, cfg.SysSubscribe, ) if err != nil { panic(err) } go startTTLJob(store, settingsRuntime) go startOversizeJob(store, settingsRuntime) go startDBSizeJob(store, 400*1024*1024) go startStatsJob(store, hub) staticDir := resolveStaticDir() server := api.NewServer(store, tree, hub, mqttMgr, filterStore, settingsStore, settingsRuntime, defaultSettings, sysStore, staticDir) go func() { addr := fmt.Sprintf(":%s", cfg.HTTPPort) if err := server.Run(addr); err != nil { panic(err) } }() quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit mqttMgr.Disconnect() } func toFilterRules(filtersIn []settings.TopicFilter) []filters.Rule { out := make([]filters.Rule, 0, len(filtersIn)) for _, entry := range filtersIn { out = append(out, filters.Rule{ Topic: entry.Topic, Save: entry.Save, View: entry.View, }) } return out } func resolveStaticDir() string { candidates := []string{ "./frontend/dist", "./dist", } for _, path := range candidates { if info, err := os.Stat(path); err == nil && info.IsDir() { abs, err := filepath.Abs(path) if err == nil { return abs } return path } } return "" } func startTTLJob(store *storage.Store, settingsRuntime *settings.Runtime) { ticker := time.NewTicker(10 * time.Minute) defer ticker.Stop() for range ticker.C { ttlDays := settingsRuntime.Get().TTLDays if ttlDays <= 0 { continue } cutoff := time.Now().AddDate(0, 0, -ttlDays) _, _ = store.PurgeBefore(cutoff) } } func startOversizeJob(store *storage.Store, settingsRuntime *settings.Runtime) { ticker := time.NewTicker(10 * time.Minute) defer ticker.Stop() for range ticker.C { settingsSnapshot := settingsRuntime.Get() if !settingsSnapshot.AutoPurgePayloads || settingsSnapshot.AutoPurgePayloadBytes <= 0 { continue } _, _ = store.PurgeOversize(settingsSnapshot.AutoPurgePayloadBytes) } } func startDBSizeJob(store *storage.Store, maxBytes int64) { if maxBytes <= 0 { return } ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for range ticker.C { stats, err := store.Stats() if err != nil || stats.Bytes <= maxBytes { continue } if deleted, err := store.DeleteOldestFraction(0.25); err == nil && deleted > 0 { _ = store.Compact() } } } func startStatsJob(store *storage.Store, hub *ws.Hub) { ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() for range ticker.C { stats, err := store.Stats() if err != nil { continue } hub.Broadcast(ws.Event{Type: "stats", Data: stats}) } }