Files
Gilles Soulier 383ad292d3 first
2025-12-24 14:47:39 +01:00

293 lines
7.6 KiB
Go

package api
import (
"log"
"net"
"net/http"
"net/url"
"strconv"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"mqtt_explorer/internal/filters"
"mqtt_explorer/internal/metrics"
"mqtt_explorer/internal/mqtt"
"mqtt_explorer/internal/settings"
"mqtt_explorer/internal/storage"
"mqtt_explorer/internal/sysinfo"
"mqtt_explorer/internal/topics"
"mqtt_explorer/internal/ws"
)
type Server struct {
router *gin.Engine
store *storage.Store
tree *topics.Tree
hub *ws.Hub
mqttMgr *mqtt.Manager
metrics *metrics.Collector
filters *filters.Store
settings *settings.Store
settingsRuntime *settings.Runtime
defaults settings.Settings
sysinfo *sysinfo.Store
}
type PublishRequest struct {
Topic string `json:"topic"`
Payload string `json:"payload"`
QOS byte `json:"qos"`
Retained bool `json:"retained"`
}
type TestConnectionRequest struct {
Broker string `json:"broker"`
}
func NewServer(store *storage.Store, tree *topics.Tree, hub *ws.Hub, mqttMgr *mqtt.Manager, filterStore *filters.Store, settingsStore *settings.Store, settingsRuntime *settings.Runtime, defaults settings.Settings, sysStore *sysinfo.Store, staticDir string) *Server {
router := gin.Default()
s := &Server{
router: router,
store: store,
tree: tree,
hub: hub,
mqttMgr: mqttMgr,
metrics: &metrics.Collector{},
filters: filterStore,
settings: settingsStore,
settingsRuntime: settingsRuntime,
defaults: defaults,
sysinfo: sysStore,
}
s.registerRoutes()
if staticDir != "" {
router.Static("/assets", staticDir+"/assets")
router.Static("/themes", staticDir+"/themes")
router.Static("/favicon", staticDir+"/favicon")
router.StaticFile("/site.webmanifest", staticDir+"/site.webmanifest")
router.StaticFile("/favicon.ico", staticDir+"/favicon.ico")
router.NoRoute(func(c *gin.Context) {
c.File(staticDir + "/index.html")
})
}
return s
}
func (s *Server) registerRoutes() {
s.router.GET("/api/health", s.health)
s.router.GET("/api/stats", s.stats)
s.router.GET("/api/metrics", s.metricsHandler)
s.router.GET("/api/filters", s.getFilters)
s.router.POST("/api/filters", s.updateFilters)
s.router.GET("/api/settings", s.getSettings)
s.router.POST("/api/settings", s.updateSettings)
s.router.GET("/api/sysinfo", s.getSysinfo)
s.router.POST("/api/test-connection", s.testConnection)
s.router.GET("/api/topics", s.getTopics)
s.router.GET("/api/topic/:topic/history", s.getHistory)
s.router.POST("/api/topic/:topic/clear-history", s.clearHistory)
s.router.POST("/api/history/clear", s.clearAllHistory)
s.router.POST("/api/publish", s.publish)
s.router.GET("/ws/events", s.wsEvents)
}
func (s *Server) Run(addr string) error {
return s.router.Run(addr)
}
func (s *Server) health(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
func (s *Server) stats(c *gin.Context) {
stats, err := s.store.Stats()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, stats)
}
func (s *Server) metricsHandler(c *gin.Context) {
stats, err := s.store.Stats()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
usage := s.metrics.Snapshot()
c.JSON(http.StatusOK, gin.H{
"cpuPercent": usage.CPUPercent,
"memBytes": usage.MemBytes,
"memLimit": usage.MemLimit,
"dbBytes": stats.Bytes,
"dbSize": stats.Size,
})
}
func (s *Server) getFilters(c *gin.Context) {
c.JSON(http.StatusOK, s.filters.Snapshot())
}
func (s *Server) updateFilters(c *gin.Context) {
var rules []filters.Rule
if err := c.ShouldBindJSON(&rules); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "payload invalide"})
return
}
s.filters.Update(rules)
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
func (s *Server) getSettings(c *gin.Context) {
current, err := s.settings.Load(s.defaults)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, current)
}
func (s *Server) updateSettings(c *gin.Context) {
var next settings.Settings
if err := c.ShouldBindJSON(&next); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "payload invalide"})
return
}
if err := s.settings.Save(next); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
s.filters.Update(toFilterRules(next.TopicFilters))
if s.settingsRuntime != nil {
s.settingsRuntime.Update(next)
}
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
func (s *Server) getSysinfo(c *gin.Context) {
c.JSON(http.StatusOK, s.sysinfo.Snapshot())
}
func toFilterRules(filtersIn []settings.TopicFilter) []filters.Rule {
out := make([]filters.Rule, 0, len(filtersIn))
for _, entry := range filtersIn {
out = append(out, filters.Rule{
Topic: entry.Topic,
Save: entry.Save,
View: entry.View,
})
}
return out
}
func (s *Server) testConnection(c *gin.Context) {
var req TestConnectionRequest
if err := c.ShouldBindJSON(&req); err != nil || req.Broker == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "broker manquant"})
return
}
parsed, err := url.Parse(req.Broker)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "broker invalide"})
return
}
host := parsed.Hostname()
port := parsed.Port()
if port == "" {
port = "1883"
}
addr := net.JoinHostPort(host, port)
start := time.Now()
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
if err != nil {
c.JSON(http.StatusOK, gin.H{"ok": false, "error": err.Error()})
return
}
_ = conn.Close()
c.JSON(http.StatusOK, gin.H{
"ok": true,
"latency": time.Since(start).Milliseconds(),
"endpoint": addr,
})
}
func (s *Server) getTopics(c *gin.Context) {
c.JSON(http.StatusOK, s.tree.Snapshot())
}
func (s *Server) getHistory(c *gin.Context) {
topic := c.Param("topic")
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50"))
from := c.DefaultQuery("from", "")
to := c.DefaultQuery("to", "")
messages, err := s.store.GetHistory(topic, limit, from, to)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, messages)
}
func (s *Server) clearHistory(c *gin.Context) {
topic := c.Param("topic")
deleted, err := s.store.ClearTopicHistory(topic)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"deleted": deleted})
}
func (s *Server) clearAllHistory(c *gin.Context) {
deleted, err := s.store.ClearAllHistory()
if err != nil {
log.Printf("clear db error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
log.Printf("clear db ok: deleted=%d", deleted)
c.JSON(http.StatusOK, gin.H{"deleted": deleted})
}
func (s *Server) publish(c *gin.Context) {
var req PublishRequest
if err := c.ShouldBindJSON(&req); err != nil || req.Topic == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "requete invalide"})
return
}
if err := s.mqttMgr.Publish(req.Topic, []byte(req.Payload), req.QOS, req.Retained); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}
func (s *Server) wsEvents(c *gin.Context) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
s.hub.Add(conn)
for {
if _, _, err := conn.ReadMessage(); err != nil {
s.hub.Remove(conn)
_ = conn.Close()
break
}
}
}