package api import ( "log" "net" "net/http" "net/url" "strconv" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "mqtt_explorer/internal/filters" "mqtt_explorer/internal/metrics" "mqtt_explorer/internal/mqtt" "mqtt_explorer/internal/settings" "mqtt_explorer/internal/storage" "mqtt_explorer/internal/sysinfo" "mqtt_explorer/internal/topics" "mqtt_explorer/internal/ws" ) type Server struct { router *gin.Engine store *storage.Store tree *topics.Tree hub *ws.Hub mqttMgr *mqtt.Manager metrics *metrics.Collector filters *filters.Store settings *settings.Store settingsRuntime *settings.Runtime defaults settings.Settings sysinfo *sysinfo.Store } type PublishRequest struct { Topic string `json:"topic"` Payload string `json:"payload"` QOS byte `json:"qos"` Retained bool `json:"retained"` } type TestConnectionRequest struct { Broker string `json:"broker"` } func NewServer(store *storage.Store, tree *topics.Tree, hub *ws.Hub, mqttMgr *mqtt.Manager, filterStore *filters.Store, settingsStore *settings.Store, settingsRuntime *settings.Runtime, defaults settings.Settings, sysStore *sysinfo.Store, staticDir string) *Server { router := gin.Default() s := &Server{ router: router, store: store, tree: tree, hub: hub, mqttMgr: mqttMgr, metrics: &metrics.Collector{}, filters: filterStore, settings: settingsStore, settingsRuntime: settingsRuntime, defaults: defaults, sysinfo: sysStore, } s.registerRoutes() if staticDir != "" { router.Static("/assets", staticDir+"/assets") router.Static("/themes", staticDir+"/themes") router.Static("/favicon", staticDir+"/favicon") router.StaticFile("/site.webmanifest", staticDir+"/site.webmanifest") router.StaticFile("/favicon.ico", staticDir+"/favicon.ico") router.NoRoute(func(c *gin.Context) { c.File(staticDir + "/index.html") }) } return s } func (s *Server) registerRoutes() { s.router.GET("/api/health", s.health) s.router.GET("/api/stats", s.stats) s.router.GET("/api/metrics", s.metricsHandler) s.router.GET("/api/filters", s.getFilters) s.router.POST("/api/filters", s.updateFilters) s.router.GET("/api/settings", s.getSettings) s.router.POST("/api/settings", s.updateSettings) s.router.GET("/api/sysinfo", s.getSysinfo) s.router.POST("/api/test-connection", s.testConnection) s.router.GET("/api/topics", s.getTopics) s.router.GET("/api/topic/:topic/history", s.getHistory) s.router.POST("/api/topic/:topic/clear-history", s.clearHistory) s.router.POST("/api/history/clear", s.clearAllHistory) s.router.POST("/api/publish", s.publish) s.router.GET("/ws/events", s.wsEvents) } func (s *Server) Run(addr string) error { return s.router.Run(addr) } func (s *Server) health(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "ok"}) } func (s *Server) stats(c *gin.Context) { stats, err := s.store.Stats() if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } c.JSON(http.StatusOK, stats) } func (s *Server) metricsHandler(c *gin.Context) { stats, err := s.store.Stats() if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } usage := s.metrics.Snapshot() c.JSON(http.StatusOK, gin.H{ "cpuPercent": usage.CPUPercent, "memBytes": usage.MemBytes, "memLimit": usage.MemLimit, "dbBytes": stats.Bytes, "dbSize": stats.Size, }) } func (s *Server) getFilters(c *gin.Context) { c.JSON(http.StatusOK, s.filters.Snapshot()) } func (s *Server) updateFilters(c *gin.Context) { var rules []filters.Rule if err := c.ShouldBindJSON(&rules); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "payload invalide"}) return } s.filters.Update(rules) c.JSON(http.StatusOK, gin.H{"status": "ok"}) } func (s *Server) getSettings(c *gin.Context) { current, err := s.settings.Load(s.defaults) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } c.JSON(http.StatusOK, current) } func (s *Server) updateSettings(c *gin.Context) { var next settings.Settings if err := c.ShouldBindJSON(&next); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "payload invalide"}) return } if err := s.settings.Save(next); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } s.filters.Update(toFilterRules(next.TopicFilters)) if s.settingsRuntime != nil { s.settingsRuntime.Update(next) } c.JSON(http.StatusOK, gin.H{"status": "ok"}) } func (s *Server) getSysinfo(c *gin.Context) { c.JSON(http.StatusOK, s.sysinfo.Snapshot()) } func toFilterRules(filtersIn []settings.TopicFilter) []filters.Rule { out := make([]filters.Rule, 0, len(filtersIn)) for _, entry := range filtersIn { out = append(out, filters.Rule{ Topic: entry.Topic, Save: entry.Save, View: entry.View, }) } return out } func (s *Server) testConnection(c *gin.Context) { var req TestConnectionRequest if err := c.ShouldBindJSON(&req); err != nil || req.Broker == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "broker manquant"}) return } parsed, err := url.Parse(req.Broker) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "broker invalide"}) return } host := parsed.Hostname() port := parsed.Port() if port == "" { port = "1883" } addr := net.JoinHostPort(host, port) start := time.Now() conn, err := net.DialTimeout("tcp", addr, 3*time.Second) if err != nil { c.JSON(http.StatusOK, gin.H{"ok": false, "error": err.Error()}) return } _ = conn.Close() c.JSON(http.StatusOK, gin.H{ "ok": true, "latency": time.Since(start).Milliseconds(), "endpoint": addr, }) } func (s *Server) getTopics(c *gin.Context) { c.JSON(http.StatusOK, s.tree.Snapshot()) } func (s *Server) getHistory(c *gin.Context) { topic := c.Param("topic") limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50")) from := c.DefaultQuery("from", "") to := c.DefaultQuery("to", "") messages, err := s.store.GetHistory(topic, limit, from, to) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } c.JSON(http.StatusOK, messages) } func (s *Server) clearHistory(c *gin.Context) { topic := c.Param("topic") deleted, err := s.store.ClearTopicHistory(topic) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } c.JSON(http.StatusOK, gin.H{"deleted": deleted}) } func (s *Server) clearAllHistory(c *gin.Context) { deleted, err := s.store.ClearAllHistory() if err != nil { log.Printf("clear db error: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } log.Printf("clear db ok: deleted=%d", deleted) c.JSON(http.StatusOK, gin.H{"deleted": deleted}) } func (s *Server) publish(c *gin.Context) { var req PublishRequest if err := c.ShouldBindJSON(&req); err != nil || req.Topic == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "requete invalide"}) return } if err := s.mqttMgr.Publish(req.Topic, []byte(req.Payload), req.QOS, req.Retained); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } c.JSON(http.StatusOK, gin.H{"status": "ok"}) } func (s *Server) wsEvents(c *gin.Context) { upgrader := websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { return } s.hub.Add(conn) for { if _, _, err := conn.ReadMessage(); err != nil { s.hub.Remove(conn) _ = conn.Close() break } } }