diff --git a/internal/streams/api.go b/internal/streams/api.go index 69d2276a..d64c4846 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -101,3 +101,24 @@ func apiStreams(w http.ResponseWriter, r *http.Request) { } } } + +func apiStreamsDOT(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + + dot := make([]byte, 0, 1024) + dot = append(dot, "digraph {\n"...) + if query.Has("src") { + for _, name := range query["src"] { + if stream := streams[name]; stream != nil { + dot = AppendDOT(dot, stream) + } + } + } else { + for _, stream := range streams { + dot = AppendDOT(dot, stream) + } + } + dot = append(dot, '}') + + api.Response(w, dot, "text/vnd.graphviz") +} diff --git a/internal/streams/dot.go b/internal/streams/dot.go new file mode 100644 index 00000000..aa008c40 --- /dev/null +++ b/internal/streams/dot.go @@ -0,0 +1,164 @@ +package streams + +import ( + "encoding/json" + "fmt" + "strings" +) + +func AppendDOT(dot []byte, stream *Stream) []byte { + for _, prod := range stream.producers { + if prod.conn == nil { + continue + } + c, err := marshalConn(prod.conn) + if err != nil { + continue + } + dot = c.appendDOT(dot, "producer") + } + for _, cons := range stream.consumers { + c, err := marshalConn(cons) + if err != nil { + continue + } + dot = c.appendDOT(dot, "consumer") + } + return dot +} + +func marshalConn(v any) (*conn, error) { + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + var c conn + if err = json.Unmarshal(b, &c); err != nil { + return nil, err + } + return &c, nil +} + +const bytesK = "KMGTP" + +func humanBytes(i int) string { + if i < 1000 { + return fmt.Sprintf("%d B", i) + } + + f := float64(i) / 1000 + var n uint8 + for f >= 1000 && n < 5 { + f /= 1000 + n++ + } + return fmt.Sprintf("%.2f %cB", f, bytesK[n]) +} + +type node struct { + ID uint32 `json:"id"` + Codec map[string]any `json:"codec"` + Parent uint32 `json:"parent"` + Childs []uint32 `json:"childs"` + Bytes int `json:"bytes"` + //Packets uint32 `json:"packets"` + //Drops uint32 `json:"drops"` +} + +var codecKeys = []string{"codec_name", "sample_rate", "channels", "profile", "level"} + +func (n *node) codec() []byte { + b := make([]byte, 0, 128) + for _, k := range codecKeys { + if v := n.Codec[k]; v != nil { + b = fmt.Appendf(b, "%s=%v\n", k, v) + } + } + return b[:len(b)-1] +} + +func (n *node) appendDOT(dot []byte, group string) []byte { + dot = fmt.Appendf(dot, "%d [group=%s, label=%q, title=%q];\n", n.ID, group, n.Codec["codec_name"], n.codec()) + //for _, sink := range n.Childs { + // dot = fmt.Appendf(dot, "%d -> %d;\n", n.ID, sink) + //} + return dot +} + +type conn struct { + ID uint32 `json:"id"` + FormatName string `json:"format_name"` + Protocol string `json:"protocol"` + RemoteAddr string `json:"remote_addr"` + Source string `json:"source"` + URL string `json:"url"` + UserAgent string `json:"user_agent"` + Receivers []node `json:"receivers"` + Senders []node `json:"senders"` + BytesRecv int `json:"bytes_recv"` + BytesSend int `json:"bytes_send"` +} + +func (c *conn) appendDOT(dot []byte, group string) []byte { + host := c.host() + dot = fmt.Appendf(dot, "%s [group=host];\n", host) + dot = fmt.Appendf(dot, "%d [group=%s, label=%q, title=%q];\n", c.ID, group, c.FormatName, c.label()) + if group == "producer" { + dot = fmt.Appendf(dot, "%s -> %d [label=%q];\n", host, c.ID, humanBytes(c.BytesRecv)) + } else { + dot = fmt.Appendf(dot, "%d -> %s [label=%q];\n", c.ID, host, humanBytes(c.BytesSend)) + } + + for _, recv := range c.Receivers { + dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", c.ID, recv.ID, humanBytes(recv.Bytes)) + dot = recv.appendDOT(dot, "node") + } + for _, send := range c.Senders { + dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.Parent, c.ID, humanBytes(send.Bytes)) + //dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.ID, c.ID, humanBytes(send.Bytes)) + //dot = send.appendDOT(dot, "node") + } + return dot +} + +func (c *conn) host() (s string) { + if c.Protocol == "pipe" { + return "127.0.0.1" + } + + if s = c.RemoteAddr; s == "" { + return "unknown" + } + + if i := strings.Index(s, "forwarded"); i > 0 { + s = s[i+10:] + } + + if s[0] == '[' { + if i := strings.Index(s, "]"); i > 0 { + return s[1:i] + } + } + + if i := strings.IndexAny(s, " ,:"); i > 0 { + return s[:i] + } + return +} + +func (c *conn) label() (s string) { + s = "format_name=" + c.FormatName + if c.Protocol != "" { + s += "\nprotocol=" + c.Protocol + } + if c.Source != "" { + s += "\nsource=" + c.Source + } + if c.URL != "" { + s += "\nurl=" + c.URL + } + if c.UserAgent != "" { + s += "\nuser_agent=" + c.UserAgent + } + return +} diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 6d6fa773..ff0f5654 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -27,6 +27,7 @@ func Init() { } api.HandleFunc("api/streams", apiStreams) + api.HandleFunc("api/streams.dot", apiStreamsDOT) if cfg.Publish == nil { return diff --git a/www/index.html b/www/index.html index 6adf9f0a..63fedcec 100644 --- a/www/index.html +++ b/www/index.html @@ -139,7 +139,7 @@ const isChecked = checkboxStates[name] ? 'checked' : ''; tr.innerHTML = `` + - `${online} / info / probe` + + `${online} / info / probe / net` + `${links}`; } diff --git a/www/main.js b/www/main.js index 2c15e071..714c9127 100644 --- a/www/main.js +++ b/www/main.js @@ -138,6 +138,7 @@ body.dark-mode hr {
  • Add
  • Config
  • Log
  • +
  • Net
  • 🌙 diff --git a/www/network.html b/www/network.html new file mode 100644 index 00000000..5193b6a9 --- /dev/null +++ b/www/network.html @@ -0,0 +1,44 @@ + + + + + go2rtc - Network + + + + + +
    + + + \ No newline at end of file