293 lines
7.6 KiB
Go
293 lines
7.6 KiB
Go
package api
|
|
|
|
import (
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gorilla/websocket"
|
|
|
|
"mqtt_explorer/internal/filters"
|
|
"mqtt_explorer/internal/metrics"
|
|
"mqtt_explorer/internal/mqtt"
|
|
"mqtt_explorer/internal/settings"
|
|
"mqtt_explorer/internal/storage"
|
|
"mqtt_explorer/internal/sysinfo"
|
|
"mqtt_explorer/internal/topics"
|
|
"mqtt_explorer/internal/ws"
|
|
)
|
|
|
|
type Server struct {
|
|
router *gin.Engine
|
|
store *storage.Store
|
|
tree *topics.Tree
|
|
hub *ws.Hub
|
|
mqttMgr *mqtt.Manager
|
|
metrics *metrics.Collector
|
|
filters *filters.Store
|
|
settings *settings.Store
|
|
settingsRuntime *settings.Runtime
|
|
defaults settings.Settings
|
|
sysinfo *sysinfo.Store
|
|
}
|
|
|
|
type PublishRequest struct {
|
|
Topic string `json:"topic"`
|
|
Payload string `json:"payload"`
|
|
QOS byte `json:"qos"`
|
|
Retained bool `json:"retained"`
|
|
}
|
|
|
|
type TestConnectionRequest struct {
|
|
Broker string `json:"broker"`
|
|
}
|
|
|
|
func NewServer(store *storage.Store, tree *topics.Tree, hub *ws.Hub, mqttMgr *mqtt.Manager, filterStore *filters.Store, settingsStore *settings.Store, settingsRuntime *settings.Runtime, defaults settings.Settings, sysStore *sysinfo.Store, staticDir string) *Server {
|
|
router := gin.Default()
|
|
|
|
s := &Server{
|
|
router: router,
|
|
store: store,
|
|
tree: tree,
|
|
hub: hub,
|
|
mqttMgr: mqttMgr,
|
|
metrics: &metrics.Collector{},
|
|
filters: filterStore,
|
|
settings: settingsStore,
|
|
settingsRuntime: settingsRuntime,
|
|
defaults: defaults,
|
|
sysinfo: sysStore,
|
|
}
|
|
s.registerRoutes()
|
|
if staticDir != "" {
|
|
router.Static("/assets", staticDir+"/assets")
|
|
router.Static("/themes", staticDir+"/themes")
|
|
router.Static("/favicon", staticDir+"/favicon")
|
|
router.StaticFile("/site.webmanifest", staticDir+"/site.webmanifest")
|
|
router.StaticFile("/favicon.ico", staticDir+"/favicon.ico")
|
|
router.NoRoute(func(c *gin.Context) {
|
|
c.File(staticDir + "/index.html")
|
|
})
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (s *Server) registerRoutes() {
|
|
s.router.GET("/api/health", s.health)
|
|
s.router.GET("/api/stats", s.stats)
|
|
s.router.GET("/api/metrics", s.metricsHandler)
|
|
s.router.GET("/api/filters", s.getFilters)
|
|
s.router.POST("/api/filters", s.updateFilters)
|
|
s.router.GET("/api/settings", s.getSettings)
|
|
s.router.POST("/api/settings", s.updateSettings)
|
|
s.router.GET("/api/sysinfo", s.getSysinfo)
|
|
s.router.POST("/api/test-connection", s.testConnection)
|
|
s.router.GET("/api/topics", s.getTopics)
|
|
s.router.GET("/api/topic/:topic/history", s.getHistory)
|
|
s.router.POST("/api/topic/:topic/clear-history", s.clearHistory)
|
|
s.router.POST("/api/history/clear", s.clearAllHistory)
|
|
s.router.POST("/api/publish", s.publish)
|
|
s.router.GET("/ws/events", s.wsEvents)
|
|
}
|
|
|
|
func (s *Server) Run(addr string) error {
|
|
return s.router.Run(addr)
|
|
}
|
|
|
|
func (s *Server) health(c *gin.Context) {
|
|
c.JSON(http.StatusOK, gin.H{"status": "ok"})
|
|
}
|
|
|
|
func (s *Server) stats(c *gin.Context) {
|
|
stats, err := s.store.Stats()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, stats)
|
|
}
|
|
|
|
func (s *Server) metricsHandler(c *gin.Context) {
|
|
stats, err := s.store.Stats()
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
usage := s.metrics.Snapshot()
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"cpuPercent": usage.CPUPercent,
|
|
"memBytes": usage.MemBytes,
|
|
"memLimit": usage.MemLimit,
|
|
"dbBytes": stats.Bytes,
|
|
"dbSize": stats.Size,
|
|
})
|
|
}
|
|
|
|
func (s *Server) getFilters(c *gin.Context) {
|
|
c.JSON(http.StatusOK, s.filters.Snapshot())
|
|
}
|
|
|
|
func (s *Server) updateFilters(c *gin.Context) {
|
|
var rules []filters.Rule
|
|
if err := c.ShouldBindJSON(&rules); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "payload invalide"})
|
|
return
|
|
}
|
|
s.filters.Update(rules)
|
|
c.JSON(http.StatusOK, gin.H{"status": "ok"})
|
|
}
|
|
|
|
func (s *Server) getSettings(c *gin.Context) {
|
|
current, err := s.settings.Load(s.defaults)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, current)
|
|
}
|
|
|
|
func (s *Server) updateSettings(c *gin.Context) {
|
|
var next settings.Settings
|
|
if err := c.ShouldBindJSON(&next); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "payload invalide"})
|
|
return
|
|
}
|
|
if err := s.settings.Save(next); err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
s.filters.Update(toFilterRules(next.TopicFilters))
|
|
if s.settingsRuntime != nil {
|
|
s.settingsRuntime.Update(next)
|
|
}
|
|
c.JSON(http.StatusOK, gin.H{"status": "ok"})
|
|
}
|
|
|
|
func (s *Server) getSysinfo(c *gin.Context) {
|
|
c.JSON(http.StatusOK, s.sysinfo.Snapshot())
|
|
}
|
|
|
|
func toFilterRules(filtersIn []settings.TopicFilter) []filters.Rule {
|
|
out := make([]filters.Rule, 0, len(filtersIn))
|
|
for _, entry := range filtersIn {
|
|
out = append(out, filters.Rule{
|
|
Topic: entry.Topic,
|
|
Save: entry.Save,
|
|
View: entry.View,
|
|
})
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *Server) testConnection(c *gin.Context) {
|
|
var req TestConnectionRequest
|
|
if err := c.ShouldBindJSON(&req); err != nil || req.Broker == "" {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "broker manquant"})
|
|
return
|
|
}
|
|
|
|
parsed, err := url.Parse(req.Broker)
|
|
if err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "broker invalide"})
|
|
return
|
|
}
|
|
|
|
host := parsed.Hostname()
|
|
port := parsed.Port()
|
|
if port == "" {
|
|
port = "1883"
|
|
}
|
|
|
|
addr := net.JoinHostPort(host, port)
|
|
start := time.Now()
|
|
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
|
|
if err != nil {
|
|
c.JSON(http.StatusOK, gin.H{"ok": false, "error": err.Error()})
|
|
return
|
|
}
|
|
_ = conn.Close()
|
|
|
|
c.JSON(http.StatusOK, gin.H{
|
|
"ok": true,
|
|
"latency": time.Since(start).Milliseconds(),
|
|
"endpoint": addr,
|
|
})
|
|
}
|
|
|
|
func (s *Server) getTopics(c *gin.Context) {
|
|
c.JSON(http.StatusOK, s.tree.Snapshot())
|
|
}
|
|
|
|
func (s *Server) getHistory(c *gin.Context) {
|
|
topic := c.Param("topic")
|
|
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50"))
|
|
from := c.DefaultQuery("from", "")
|
|
to := c.DefaultQuery("to", "")
|
|
|
|
messages, err := s.store.GetHistory(topic, limit, from, to)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, messages)
|
|
}
|
|
|
|
func (s *Server) clearHistory(c *gin.Context) {
|
|
topic := c.Param("topic")
|
|
deleted, err := s.store.ClearTopicHistory(topic)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, gin.H{"deleted": deleted})
|
|
}
|
|
|
|
func (s *Server) clearAllHistory(c *gin.Context) {
|
|
deleted, err := s.store.ClearAllHistory()
|
|
if err != nil {
|
|
log.Printf("clear db error: %v", err)
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
log.Printf("clear db ok: deleted=%d", deleted)
|
|
c.JSON(http.StatusOK, gin.H{"deleted": deleted})
|
|
}
|
|
|
|
func (s *Server) publish(c *gin.Context) {
|
|
var req PublishRequest
|
|
if err := c.ShouldBindJSON(&req); err != nil || req.Topic == "" {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "requete invalide"})
|
|
return
|
|
}
|
|
|
|
if err := s.mqttMgr.Publish(req.Topic, []byte(req.Payload), req.QOS, req.Retained); err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, gin.H{"status": "ok"})
|
|
}
|
|
|
|
func (s *Server) wsEvents(c *gin.Context) {
|
|
upgrader := websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
}
|
|
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
s.hub.Add(conn)
|
|
|
|
for {
|
|
if _, _, err := conn.ReadMessage(); err != nil {
|
|
s.hub.Remove(conn)
|
|
_ = conn.Close()
|
|
break
|
|
}
|
|
}
|
|
}
|