Rewrite Strix from scratch as single binary

Complete architecture rewrite following go2rtc patterns:
- pkg/ for pure logic (camdb, tester, probe, generate)
- internal/ for application glue with Init() modules
- Single HTTP server on :4567 with all endpoints
- zerolog with password masking and memory ring buffer
- Environment-based config only (no YAML files)

API endpoints: /api/search, /api/streams, /api/test,
/api/probe, /api/generate, /api/health, /api/log

Dependencies: go2rtc v1.9.14, go-sqlite3, miekg/dns, zerolog
This commit is contained in:
eduard256
2026-03-25 10:38:46 +00:00
parent 3b29188924
commit 27117900eb
3742 changed files with 2801 additions and 283718 deletions
+137
View File
@@ -0,0 +1,137 @@
package camdb
import (
"database/sql"
"strings"
)
type Result struct {
Type string `json:"type"`
ID string `json:"id"`
Name string `json:"name"`
}
// SearchAll returns all presets + all brands, no models
func SearchAll(db *sql.DB) ([]Result, error) {
var results []Result
rows, err := db.Query("SELECT preset_id, name FROM presets ORDER BY preset_id")
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var id, name string
if err = rows.Scan(&id, &name); err != nil {
return nil, err
}
results = append(results, Result{Type: "preset", ID: "p:" + id, Name: name})
}
rows, err = db.Query("SELECT brand_id, brand FROM brands ORDER BY brand LIMIT ?", 50-len(results))
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var id, name string
if err = rows.Scan(&id, &name); err != nil {
return nil, err
}
results = append(results, Result{Type: "brand", ID: "b:" + id, Name: name})
}
return results, nil
}
// SearchQuery searches presets, brands, models by query string (limit 50 total).
// Supports: "model", "brand model", "model brand" -- each word matches independently.
func SearchQuery(db *sql.DB, q string) ([]Result, error) {
var results []Result
like := "%" + q + "%"
// presets
rows, err := db.Query(
"SELECT preset_id, name FROM presets WHERE preset_id LIKE ? OR name LIKE ? ORDER BY preset_id",
like, like,
)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var id, name string
if err = rows.Scan(&id, &name); err != nil {
return nil, err
}
results = append(results, Result{Type: "preset", ID: "p:" + id, Name: name})
}
// brands
rows, err = db.Query(
"SELECT brand_id, brand FROM brands WHERE brand_id LIKE ? OR brand LIKE ? ORDER BY brand LIMIT ?",
like, like, 50-len(results),
)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var id, name string
if err = rows.Scan(&id, &name); err != nil {
return nil, err
}
results = append(results, Result{Type: "brand", ID: "b:" + id, Name: name})
}
if len(results) >= 50 {
return results, nil
}
// models -- each word must match brand or model
words := strings.Fields(q)
where := ""
args := make([]any, 0, len(words)+1)
for i, w := range words {
if i > 0 {
where += " AND "
}
where += "(b.brand LIKE ? OR b.brand_id LIKE ? OR sm.model LIKE ?)"
p := "%" + w + "%"
args = append(args, p, p, p)
}
args = append(args, 50-len(results))
rows, err = db.Query(
`SELECT DISTINCT b.brand_id, b.brand, sm.model
FROM stream_models sm
JOIN streams s ON s.id = sm.stream_id
JOIN brands b ON b.brand_id = s.brand_id
WHERE `+where+`
ORDER BY b.brand, sm.model
LIMIT ?`,
args...,
)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var brandID, brand, model string
if err = rows.Scan(&brandID, &brand, &model); err != nil {
return nil, err
}
results = append(results, Result{
Type: "model",
ID: "m:" + brandID + ":" + model,
Name: brand + ": " + model,
})
}
return results, nil
}
+202
View File
@@ -0,0 +1,202 @@
package camdb
import (
"database/sql"
"encoding/base64"
"errors"
"fmt"
"strconv"
"strings"
)
var defaultPorts = map[string]int{
"rtsp": 554, "rtsps": 322, "http": 80, "https": 443,
"rtmp": 1935, "mms": 554, "rtp": 5004,
}
type StreamParams struct {
IDs string
IP string
User string
Pass string
Channel int
Ports map[int]bool // nil = no filter
}
type raw struct {
url, protocol string
port int
}
// BuildStreams resolves IDs to full stream URLs with credentials and placeholders substituted
func BuildStreams(db *sql.DB, p *StreamParams) ([]string, error) {
var raws []raw
for _, id := range strings.Split(p.IDs, ",") {
id = strings.TrimSpace(id)
if id == "" {
continue
}
var rows *sql.Rows
var err error
switch {
case strings.HasPrefix(id, "b:"):
brandID := id[2:]
rows, err = db.Query(
"SELECT url, protocol, port FROM streams WHERE brand_id = ?", brandID,
)
case strings.HasPrefix(id, "m:"):
parts := strings.SplitN(id[2:], ":", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("camdb: invalid model id: %s", id)
}
rows, err = db.Query(
`SELECT s.url, s.protocol, s.port
FROM stream_models sm
JOIN streams s ON s.id = sm.stream_id
WHERE s.brand_id = ? AND sm.model = ?`,
parts[0], parts[1],
)
case strings.HasPrefix(id, "p:"):
presetID := id[2:]
rows, err = db.Query(
"SELECT url, protocol, port FROM preset_streams WHERE preset_id = ?", presetID,
)
default:
return nil, fmt.Errorf("camdb: unknown id prefix: %s", id)
}
if err != nil {
return nil, err
}
found := false
for rows.Next() {
var r raw
if err = rows.Scan(&r.url, &r.protocol, &r.port); err != nil {
rows.Close()
return nil, err
}
raws = append(raws, r)
found = true
}
rows.Close()
if !found {
return nil, fmt.Errorf("camdb: not found: %s", id)
}
}
// build full URLs, deduplicate
seen := map[string]bool{}
var streams []string
for _, r := range raws {
if len(streams) >= 20000 {
break
}
port := r.port
if port == 0 {
if p, ok := defaultPorts[r.protocol]; ok {
port = p
} else {
port = 80
}
}
if p.Ports != nil && !p.Ports[port] {
continue
}
u := buildURL(r.protocol, r.url, p.IP, port, p.User, p.Pass, p.Channel)
if seen[u] {
continue
}
seen[u] = true
streams = append(streams, u)
}
return streams, nil
}
// ValidateID checks if id format is valid
func ValidateID(id string) error {
switch {
case strings.HasPrefix(id, "b:"):
if len(id) < 3 {
return errors.New("camdb: empty brand id")
}
case strings.HasPrefix(id, "m:"):
if strings.Count(id, ":") < 2 {
return fmt.Errorf("camdb: invalid model id: %s", id)
}
case strings.HasPrefix(id, "p:"):
if len(id) < 3 {
return errors.New("camdb: empty preset id")
}
default:
return fmt.Errorf("camdb: unknown prefix: %s", id)
}
return nil
}
// internals
func buildURL(protocol, path, ip string, port int, user, pass string, channel int) string {
path = replacePlaceholders(path, ip, port, user, pass, channel)
var auth string
if user != "" {
auth = user + ":" + pass + "@"
}
host := ip
if p, ok := defaultPorts[protocol]; !ok || p != port {
host = ip + ":" + strconv.Itoa(port)
}
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
return protocol + "://" + auth + host + path
}
func replacePlaceholders(s, ip string, port int, user, pass string, channel int) string {
auth := ""
if user != "" && pass != "" {
auth = base64.StdEncoding.EncodeToString([]byte(user + ":" + pass))
}
pairs := []string{
"[CHANNEL]", strconv.Itoa(channel),
"[channel]", strconv.Itoa(channel),
"{CHANNEL}", strconv.Itoa(channel),
"{channel}", strconv.Itoa(channel),
"[CHANNEL+1]", strconv.Itoa(channel + 1),
"[channel+1]", strconv.Itoa(channel + 1),
"{CHANNEL+1}", strconv.Itoa(channel + 1),
"{channel+1}", strconv.Itoa(channel + 1),
"[USERNAME]", user, "[username]", user,
"[USER]", user, "[user]", user,
"[PASSWORD]", pass, "[password]", pass,
"[PASWORD]", pass, "[pasword]", pass,
"[PASS]", pass, "[pass]", pass,
"[PWD]", pass, "[pwd]", pass,
"[WIDTH]", "640", "[width]", "640",
"[HEIGHT]", "480", "[height]", "480",
"[IP]", ip, "[ip]", ip,
"[PORT]", strconv.Itoa(port), "[port]", strconv.Itoa(port),
"[AUTH]", auth, "[auth]", auth,
"[TOKEN]", "", "[token]", "",
}
r := strings.NewReplacer(pairs...)
return r.Replace(s)
}
+168
View File
@@ -0,0 +1,168 @@
package generate
import (
"fmt"
"net/url"
"regexp"
"strings"
)
var needMP4 = map[string]bool{"bubble": true}
var reIPv4 = regexp.MustCompile(`\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}`)
func Generate(req *Request) (*Response, error) {
if req.MainStream == "" {
return nil, fmt.Errorf("generate: mainStream required")
}
info := buildInfo(req)
if len(req.Objects) > 0 && (req.Detect == nil || !req.Detect.Enabled) {
if req.Detect == nil {
req.Detect = &DetectConfig{Enabled: true}
} else {
req.Detect.Enabled = true
}
}
if strings.TrimSpace(req.ExistingConfig) == "" {
config := newConfig(info, req)
return &Response{Config: config, Diff: fullDiff(config)}, nil
}
return addToConfig(req.ExistingConfig, info, req)
}
func buildInfo(req *Request) *cameraInfo {
mainScheme := urlScheme(req.MainStream)
ip := extractIP(req.MainStream)
sanitized := strings.NewReplacer(".", "_", ":", "_").Replace(ip)
base := "camera"
streamBase := "stream"
if ip != "" {
base = "camera_" + sanitized
streamBase = sanitized
}
info := &cameraInfo{
CameraName: base,
MainStreamName: streamBase + "_main",
MainSource: req.MainStream,
}
if req.Name != "" {
info.CameraName = req.Name
info.MainStreamName = req.Name + "_main"
}
if req.Go2RTC != nil {
if req.Go2RTC.MainStreamName != "" {
info.MainStreamName = req.Go2RTC.MainStreamName
}
if req.Go2RTC.MainStreamSource != "" {
info.MainSource = req.Go2RTC.MainStreamSource
}
}
info.MainPath = "rtsp://127.0.0.1:8554/" + info.MainStreamName
if needMP4[mainScheme] {
info.MainPath += "?mp4"
}
info.MainInputArgs = "preset-rtsp-restream"
if req.Frigate != nil {
if req.Frigate.MainStreamPath != "" {
info.MainPath = req.Frigate.MainStreamPath
}
if req.Frigate.MainStreamInputArgs != "" {
info.MainInputArgs = req.Frigate.MainStreamInputArgs
}
}
if req.SubStream != "" {
subScheme := urlScheme(req.SubStream)
subName := streamBase + "_sub"
if req.Name != "" {
subName = req.Name + "_sub"
}
subSource := req.SubStream
subPath := "rtsp://127.0.0.1:8554/" + subName
if needMP4[subScheme] {
subPath += "?mp4"
}
subInputArgs := "preset-rtsp-restream"
if req.Go2RTC != nil {
if req.Go2RTC.SubStreamName != "" {
subName = req.Go2RTC.SubStreamName
}
if req.Go2RTC.SubStreamSource != "" {
subSource = req.Go2RTC.SubStreamSource
}
}
if req.Frigate != nil {
if req.Frigate.SubStreamPath != "" {
subPath = req.Frigate.SubStreamPath
}
if req.Frigate.SubStreamInputArgs != "" {
subInputArgs = req.Frigate.SubStreamInputArgs
}
}
info.SubStreamName = subName
info.SubSource = subSource
info.SubPath = subPath
info.SubInputArgs = subInputArgs
}
return info
}
func newConfig(info *cameraInfo, req *Request) string {
var b strings.Builder
b.WriteString("mqtt:\n enabled: false\n\n")
b.WriteString("record:\n enabled: true\n retain:\n days: 7\n mode: motion\n\n")
b.WriteString("go2rtc:\n streams:\n")
writeStreamLines(&b, info)
b.WriteString("cameras:\n")
writeCameraBlock(&b, info, req)
b.WriteString("version: 0.18-0\n")
return b.String()
}
// internals
type cameraInfo struct {
CameraName string
MainStreamName string
MainSource string
MainPath string
MainInputArgs string
SubStreamName string
SubSource string
SubPath string
SubInputArgs string
}
func urlScheme(rawURL string) string {
if i := strings.IndexByte(rawURL, ':'); i > 0 {
return rawURL[:i]
}
return ""
}
func extractIP(rawURL string) string {
if u, err := url.Parse(rawURL); err == nil && u.Hostname() != "" {
return u.Hostname()
}
if m := reIPv4.FindString(rawURL); m != "" {
return m
}
return ""
}
+36
View File
@@ -0,0 +1,36 @@
package generate
import "strings"
func fullDiff(config string) []DiffLine {
lines := strings.Split(config, "\n")
diff := make([]DiffLine, len(lines))
for i, line := range lines {
diff[i] = DiffLine{Line: i + 1, Text: line, Type: "added"}
}
return diff
}
func diffWithContext(lines []string, added map[int]bool, ctx int) []DiffLine {
visible := make(map[int]bool)
for idx := range added {
for c := -ctx; c <= ctx; c++ {
if j := idx + c; j >= 0 && j < len(lines) {
visible[j] = true
}
}
}
var diff []DiffLine
for i, line := range lines {
if !visible[i] {
continue
}
t := "context"
if added[i] {
t = "added"
}
diff = append(diff, DiffLine{Line: i + 1, Text: line, Type: t})
}
return diff
}
+190
View File
@@ -0,0 +1,190 @@
package generate
import (
"fmt"
"regexp"
"strings"
)
var (
reCamerasHeader = regexp.MustCompile(`^cameras:`)
reTopLevel = regexp.MustCompile(`^[a-z]`)
reCameraName = regexp.MustCompile(`^\s{2}(\w[\w-]*):`)
reStreamsHeader = regexp.MustCompile(`^\s{2}streams:`)
reStreamName = regexp.MustCompile(`^\s{4}'?(\w[\w-]*)'?:`)
reStreamContent = regexp.MustCompile(`^\s{4,}`)
reNextSection = regexp.MustCompile(`^[a-z#]`)
reCameraBody = regexp.MustCompile(`^\s{2,}\S`)
reVersion = regexp.MustCompile(`^version:`)
)
func addToConfig(existing string, info *cameraInfo, req *Request) (*Response, error) {
lines := strings.Split(existing, "\n")
existingCams := findNames(lines, reCamerasHeader, reCameraName)
existingStreams := findNames(lines, reStreamsHeader, reStreamName)
info = dedup(info, existingCams, existingStreams)
streamIdx := findStreamInsertPoint(lines)
cameraIdx := findCameraInsertPoint(lines)
if streamIdx == -1 || cameraIdx == -1 {
return nil, fmt.Errorf("generate: can't find go2rtc streams or cameras section")
}
var sb strings.Builder
writeStreamLines(&sb, info)
streamLines := strings.Split(strings.TrimRight(sb.String(), "\n"), "\n")
sb.Reset()
writeCameraBlock(&sb, info, req)
cameraLines := strings.Split(strings.TrimRight(sb.String(), "\n"), "\n")
added := make(map[int]bool)
result := make([]string, 0, len(lines)+len(streamLines)+len(cameraLines))
result = append(result, lines[:streamIdx]...)
mark := len(result)
result = append(result, streamLines...)
for i := range streamLines {
added[mark+i] = true
}
shift := len(streamLines)
adjCameraIdx := cameraIdx + shift
rest := lines[streamIdx:]
split := adjCameraIdx - len(result)
result = append(result, rest[:split]...)
mark = len(result)
result = append(result, cameraLines...)
for i := range cameraLines {
added[mark+i] = true
}
result = append(result, rest[split:]...)
config := strings.Join(result, "\n")
diff := diffWithContext(result, added, 3)
return &Response{Config: config, Diff: diff}, nil
}
func dedup(info *cameraInfo, cams, streams map[string]bool) *cameraInfo {
out := *info
suffix := 0
base := out.CameraName
for cams[out.CameraName] {
suffix++
out.CameraName = fmt.Sprintf("%s_%d", base, suffix)
}
base = out.MainStreamName
for streams[out.MainStreamName] {
suffix++
out.MainStreamName = fmt.Sprintf("%s_%d", base, suffix)
}
if out.SubStreamName != "" {
base = out.SubStreamName
for streams[out.SubStreamName] {
suffix++
out.SubStreamName = fmt.Sprintf("%s_%d", base, suffix)
}
}
return &out
}
func findNames(lines []string, header, nameRe *regexp.Regexp) map[string]bool {
names := make(map[string]bool)
in := false
for _, line := range lines {
if header.MatchString(line) {
in = true
continue
}
if in && reTopLevel.MatchString(line) {
break
}
if in {
if m := nameRe.FindStringSubmatch(line); m != nil {
names[m[1]] = true
}
}
}
return names
}
func findStreamInsertPoint(lines []string) int {
in := false
last := -1
headerIdx := -1
for i, line := range lines {
if reStreamsHeader.MatchString(line) {
in = true
headerIdx = i
continue
}
if in {
if reStreamContent.MatchString(line) {
last = i
} else if reNextSection.MatchString(line) {
if last >= 0 && last+1 < len(lines) && strings.TrimSpace(lines[last+1]) == "" {
return last + 2
}
if last >= 0 {
return last + 1
}
return headerIdx + 1
}
}
}
if last >= 0 {
return last + 1
}
if headerIdx >= 0 {
return headerIdx + 1
}
return -1
}
func findCameraInsertPoint(lines []string) int {
in := false
last := -1
headerIdx := -1
for i, line := range lines {
if reCamerasHeader.MatchString(line) {
in = true
headerIdx = i
continue
}
if in {
if reCameraBody.MatchString(line) {
last = i
} else if reTopLevel.MatchString(line) && !reCamerasHeader.MatchString(line) {
if last < 0 {
return headerIdx + 1
}
idx := last + 1
for idx < len(lines) && strings.TrimSpace(lines[idx]) == "" {
idx++
}
return idx
} else if reVersion.MatchString(line) {
if last < 0 {
return headerIdx + 1
}
idx := i
for idx > 0 && strings.TrimSpace(lines[idx-1]) == "" {
idx--
}
return idx
}
}
}
if headerIdx >= 0 {
return headerIdx + 1
}
return len(lines)
}
+117
View File
@@ -0,0 +1,117 @@
package generate
type Request struct {
MainStream string `json:"mainStream"`
SubStream string `json:"subStream,omitempty"`
Name string `json:"name,omitempty"`
ExistingConfig string `json:"existingConfig,omitempty"`
Go2RTC *Go2RTCOverride `json:"go2rtc,omitempty"`
Frigate *FrigateOverride `json:"frigate,omitempty"`
Objects []string `json:"objects,omitempty"`
Record *RecordConfig `json:"record,omitempty"`
Detect *DetectConfig `json:"detect,omitempty"`
Snapshots *BoolConfig `json:"snapshots,omitempty"`
Motion *MotionConfig `json:"motion,omitempty"`
FFmpeg *FFmpegConfig `json:"ffmpeg,omitempty"`
Live *LiveConfig `json:"live,omitempty"`
Audio *AudioConfig `json:"audio,omitempty"`
Birdseye *BirdseyeConfig `json:"birdseye,omitempty"`
ONVIF *ONVIFConfig `json:"onvif,omitempty"`
PTZ *PTZConfig `json:"ptz,omitempty"`
Notifications *BoolConfig `json:"notifications,omitempty"`
UI *UIConfig `json:"ui,omitempty"`
}
type Go2RTCOverride struct {
MainStreamName string `json:"mainStreamName,omitempty"`
SubStreamName string `json:"subStreamName,omitempty"`
MainStreamSource string `json:"mainStreamSource,omitempty"`
SubStreamSource string `json:"subStreamSource,omitempty"`
}
type FrigateOverride struct {
MainStreamPath string `json:"mainStreamPath,omitempty"`
SubStreamPath string `json:"subStreamPath,omitempty"`
MainStreamInputArgs string `json:"mainStreamInputArgs,omitempty"`
SubStreamInputArgs string `json:"subStreamInputArgs,omitempty"`
}
type RecordConfig struct {
Enabled bool `json:"enabled"`
RetainDays float64 `json:"retain_days,omitempty"`
Mode string `json:"mode,omitempty"`
AlertsDays float64 `json:"alerts_days,omitempty"`
DetectionDays float64 `json:"detections_days,omitempty"`
PreCapture int `json:"pre_capture,omitempty"`
PostCapture int `json:"post_capture,omitempty"`
}
type DetectConfig struct {
Enabled bool `json:"enabled"`
FPS int `json:"fps,omitempty"`
Width int `json:"width,omitempty"`
Height int `json:"height,omitempty"`
}
type MotionConfig struct {
Enabled bool `json:"enabled"`
Threshold int `json:"threshold,omitempty"`
ContourArea int `json:"contour_area,omitempty"`
}
type FFmpegConfig struct {
HWAccel string `json:"hwaccel,omitempty"`
GPU int `json:"gpu,omitempty"`
}
type LiveConfig struct {
Height int `json:"height,omitempty"`
Quality int `json:"quality,omitempty"`
}
type AudioConfig struct {
Enabled bool `json:"enabled"`
Filters []string `json:"filters,omitempty"`
}
type BirdseyeConfig struct {
Enabled bool `json:"enabled"`
Mode string `json:"mode,omitempty"`
}
type ONVIFConfig struct {
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
AutoTracking bool `json:"autotracking,omitempty"`
RequiredZones []string `json:"required_zones,omitempty"`
}
type PTZConfig struct {
Enabled bool `json:"enabled"`
Presets map[string]string `json:"presets,omitempty"`
}
type BoolConfig struct {
Enabled bool `json:"enabled"`
}
type UIConfig struct {
Order int `json:"order,omitempty"`
Dashboard bool `json:"dashboard"`
}
type Response struct {
Config string `json:"config"`
Diff []DiffLine `json:"diff"`
}
type DiffLine struct {
Line int `json:"line"`
Text string `json:"text"`
Type string `json:"type"` // context, added, removed
}
+265
View File
@@ -0,0 +1,265 @@
package generate
import (
"fmt"
"strings"
)
func writeStreamLines(b *strings.Builder, info *cameraInfo) {
fmt.Fprintf(b, " '%s':\n", info.MainStreamName)
fmt.Fprintf(b, " - %s\n", info.MainSource)
if info.SubStreamName != "" {
fmt.Fprintf(b, " '%s':\n", info.SubStreamName)
fmt.Fprintf(b, " - %s\n", info.SubSource)
}
b.WriteByte('\n')
}
func writeCameraBlock(b *strings.Builder, info *cameraInfo, req *Request) {
fmt.Fprintf(b, " %s:\n", info.CameraName)
b.WriteString(" ffmpeg:\n")
writeFFmpegGlobal(b, req)
b.WriteString(" inputs:\n")
if info.SubStreamName != "" {
writeInput(b, info.SubPath, info.SubInputArgs, "detect")
writeInput(b, info.MainPath, info.MainInputArgs, "record")
} else {
writeInput(b, info.MainPath, info.MainInputArgs, "detect", "record")
}
writeLive(b, info, req)
writeDetect(b, req)
writeObjects(b, req)
writeMotion(b, req)
writeRecord(b, req)
writeSnapshots(b, req)
writeAudio(b, req)
writeBirdseye(b, req)
writeONVIF(b, req)
writeNotifications(b, req)
writeUI(b, req)
b.WriteByte('\n')
}
func writeInput(b *strings.Builder, path, inputArgs string, roles ...string) {
fmt.Fprintf(b, " - path: %s\n", path)
fmt.Fprintf(b, " input_args: %s\n", inputArgs)
b.WriteString(" roles:\n")
for _, r := range roles {
fmt.Fprintf(b, " - %s\n", r)
}
}
func writeFFmpegGlobal(b *strings.Builder, req *Request) {
if req.FFmpeg == nil {
return
}
if req.FFmpeg.HWAccel != "" && req.FFmpeg.HWAccel != "auto" {
fmt.Fprintf(b, " hwaccel_args: %s\n", req.FFmpeg.HWAccel)
}
if req.FFmpeg.GPU > 0 {
fmt.Fprintf(b, " gpu: %d\n", req.FFmpeg.GPU)
}
}
func writeLive(b *strings.Builder, info *cameraInfo, req *Request) {
if info.SubStreamName == "" && req.Live == nil {
return
}
b.WriteString(" live:\n")
if info.SubStreamName != "" {
b.WriteString(" streams:\n")
fmt.Fprintf(b, " Main Stream: %s\n", info.MainStreamName)
fmt.Fprintf(b, " Sub Stream: %s\n", info.SubStreamName)
}
if req.Live != nil {
if req.Live.Height > 0 {
fmt.Fprintf(b, " height: %d\n", req.Live.Height)
}
if req.Live.Quality > 0 {
fmt.Fprintf(b, " quality: %d\n", req.Live.Quality)
}
}
}
func writeDetect(b *strings.Builder, req *Request) {
if req.Detect == nil {
b.WriteString(" detect:\n enabled: true\n")
return
}
b.WriteString(" detect:\n")
fmt.Fprintf(b, " enabled: %t\n", req.Detect.Enabled)
if req.Detect.FPS > 0 {
fmt.Fprintf(b, " fps: %d\n", req.Detect.FPS)
}
if req.Detect.Width > 0 {
fmt.Fprintf(b, " width: %d\n", req.Detect.Width)
}
if req.Detect.Height > 0 {
fmt.Fprintf(b, " height: %d\n", req.Detect.Height)
}
}
func writeObjects(b *strings.Builder, req *Request) {
objects := req.Objects
if len(objects) == 0 {
objects = []string{"person"}
}
b.WriteString(" objects:\n track:\n")
for _, obj := range objects {
fmt.Fprintf(b, " - %s\n", obj)
}
}
func writeMotion(b *strings.Builder, req *Request) {
if req.Motion == nil {
return
}
b.WriteString(" motion:\n")
fmt.Fprintf(b, " enabled: %t\n", req.Motion.Enabled)
if req.Motion.Threshold > 0 {
fmt.Fprintf(b, " threshold: %d\n", req.Motion.Threshold)
}
if req.Motion.ContourArea > 0 {
fmt.Fprintf(b, " contour_area: %d\n", req.Motion.ContourArea)
}
}
func writeRecord(b *strings.Builder, req *Request) {
if req.Record == nil {
b.WriteString(" record:\n enabled: true\n")
return
}
b.WriteString(" record:\n")
fmt.Fprintf(b, " enabled: %t\n", req.Record.Enabled)
if req.Record.RetainDays > 0 || req.Record.Mode != "" {
b.WriteString(" retain:\n")
if req.Record.RetainDays > 0 {
fmt.Fprintf(b, " days: %g\n", req.Record.RetainDays)
}
if req.Record.Mode != "" {
fmt.Fprintf(b, " mode: %s\n", req.Record.Mode)
}
}
if req.Record.AlertsDays > 0 || req.Record.PreCapture > 0 || req.Record.PostCapture > 0 {
b.WriteString(" alerts:\n")
if req.Record.AlertsDays > 0 {
fmt.Fprintf(b, " retain:\n days: %g\n", req.Record.AlertsDays)
}
if req.Record.PreCapture > 0 {
fmt.Fprintf(b, " pre_capture: %d\n", req.Record.PreCapture)
}
if req.Record.PostCapture > 0 {
fmt.Fprintf(b, " post_capture: %d\n", req.Record.PostCapture)
}
}
if req.Record.DetectionDays > 0 {
fmt.Fprintf(b, " detections:\n retain:\n days: %g\n", req.Record.DetectionDays)
}
}
func writeSnapshots(b *strings.Builder, req *Request) {
if req.Snapshots == nil || !req.Snapshots.Enabled {
return
}
b.WriteString(" snapshots:\n enabled: true\n")
}
func writeAudio(b *strings.Builder, req *Request) {
if req.Audio == nil || !req.Audio.Enabled {
return
}
b.WriteString(" audio:\n enabled: true\n")
if len(req.Audio.Filters) > 0 {
b.WriteString(" filters:\n")
for _, f := range req.Audio.Filters {
fmt.Fprintf(b, " - %s\n", f)
}
}
}
func writeBirdseye(b *strings.Builder, req *Request) {
if req.Birdseye == nil {
return
}
b.WriteString(" birdseye:\n")
fmt.Fprintf(b, " enabled: %t\n", req.Birdseye.Enabled)
if req.Birdseye.Mode != "" {
fmt.Fprintf(b, " mode: %s\n", req.Birdseye.Mode)
}
}
func writeONVIF(b *strings.Builder, req *Request) {
if req.ONVIF == nil || req.ONVIF.Host == "" {
return
}
b.WriteString(" onvif:\n")
fmt.Fprintf(b, " host: %s\n", req.ONVIF.Host)
port := req.ONVIF.Port
if port == 0 {
port = 80
}
fmt.Fprintf(b, " port: %d\n", port)
if req.ONVIF.User != "" {
fmt.Fprintf(b, " user: %s\n", req.ONVIF.User)
fmt.Fprintf(b, " password: %s\n", req.ONVIF.Password)
}
if req.ONVIF.AutoTracking {
b.WriteString(" autotracking:\n enabled: true\n")
if len(req.ONVIF.RequiredZones) > 0 {
b.WriteString(" required_zones:\n")
for _, z := range req.ONVIF.RequiredZones {
fmt.Fprintf(b, " - %s\n", z)
}
}
}
if req.PTZ != nil && len(req.PTZ.Presets) > 0 {
b.WriteString(" ptz:\n presets:\n")
for name, token := range req.PTZ.Presets {
fmt.Fprintf(b, " %s: %s\n", name, token)
}
}
}
func writeNotifications(b *strings.Builder, req *Request) {
if req.Notifications == nil || !req.Notifications.Enabled {
return
}
b.WriteString(" notifications:\n enabled: true\n")
}
func writeUI(b *strings.Builder, req *Request) {
if req.UI == nil {
return
}
b.WriteString(" ui:\n")
if req.UI.Order > 0 {
fmt.Fprintf(b, " order: %d\n", req.UI.Order)
}
if !req.UI.Dashboard {
b.WriteString(" dashboard: false\n")
}
}
+35
View File
@@ -0,0 +1,35 @@
package probe
import (
"bufio"
"os"
"strings"
)
// LookupARP reads /proc/net/arp to find MAC address for ip. Linux only.
func LookupARP(ip string) string {
file, err := os.Open("/proc/net/arp")
if err != nil {
return ""
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Scan() // skip header
for scanner.Scan() {
fields := strings.Fields(scanner.Text())
if len(fields) < 4 {
continue
}
if fields[0] == ip {
mac := fields[3]
if mac == "00:00:00:00:00:00" {
return ""
}
return strings.ToUpper(mac)
}
}
return ""
}
+21
View File
@@ -0,0 +1,21 @@
package probe
import (
"context"
"net"
"strings"
)
func ReverseDNS(ctx context.Context, ip string) (*DNSResult, error) {
names, err := net.DefaultResolver.LookupAddr(ctx, ip)
if err != nil || len(names) == 0 {
return nil, nil
}
hostname := strings.TrimSuffix(names[0], ".")
if hostname == "" {
return nil, nil
}
return &DNSResult{Hostname: hostname}, nil
}
+65
View File
@@ -0,0 +1,65 @@
package probe
import (
"context"
"crypto/tls"
"fmt"
"net/http"
)
func ProbeHTTP(ctx context.Context, ip string, ports []int) (*HTTPResult, error) {
if len(ports) == 0 {
ports = []int{80, 8080}
}
client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
type result struct {
resp *http.Response
port int
}
ch := make(chan result, len(ports))
for _, port := range ports {
go func(port int) {
url := fmt.Sprintf("http://%s:%d/", ip, port)
req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil)
if err != nil {
return
}
req.Header.Set("User-Agent", "Strix/2.0")
resp, err := client.Do(req)
if err != nil {
return
}
ch <- result{resp: resp, port: port}
}(port)
}
for range ports {
select {
case <-ctx.Done():
return nil, nil
case r := <-ch:
if r.resp.Body != nil {
r.resp.Body.Close()
}
return &HTTPResult{
Port: r.port,
StatusCode: r.resp.StatusCode,
Server: r.resp.Header.Get("Server"),
}, nil
}
}
return nil, nil
}
+137
View File
@@ -0,0 +1,137 @@
package probe
import (
"context"
"net"
"strings"
"time"
"github.com/miekg/dns"
)
const (
hapService = "_hap._tcp.local."
txtCategory = "ci"
txtDeviceID = "id"
txtModel = "md"
txtStatusFlags = "sf"
statusPaired = "0"
categoryCamera = "17"
categoryDoorbell = "18"
)
// QueryHAP sends unicast mDNS query to ip:5353 for HomeKit service.
// Returns nil if device is not a HomeKit camera/doorbell.
func QueryHAP(ctx context.Context, ip string) (*MDNSResult, error) {
msg := &dns.Msg{
Question: []dns.Question{
{Name: hapService, Qtype: dns.TypePTR, Qclass: dns.ClassINET},
},
}
query, err := msg.Pack()
if err != nil {
return nil, err
}
conn, err := net.ListenPacket("udp4", ":0")
if err != nil {
return nil, err
}
defer conn.Close()
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(100 * time.Millisecond)
}
_ = conn.SetDeadline(deadline)
addr := &net.UDPAddr{IP: net.ParseIP(ip), Port: 5353}
if _, err = conn.WriteTo(query, addr); err != nil {
return nil, err
}
buf := make([]byte, 1500)
n, _, err := conn.ReadFrom(buf)
if err != nil {
return nil, nil // timeout = not a HomeKit device
}
var resp dns.Msg
if err = resp.Unpack(buf[:n]); err != nil {
return nil, nil
}
return parseHAPResponse(&resp)
}
// internals
func parseHAPResponse(msg *dns.Msg) (*MDNSResult, error) {
records := make([]dns.RR, 0, len(msg.Answer)+len(msg.Extra))
records = append(records, msg.Answer...)
records = append(records, msg.Extra...)
var ptrName string
for _, rr := range records {
if ptr, ok := rr.(*dns.PTR); ok && ptr.Hdr.Name == hapService {
ptrName = ptr.Ptr
break
}
}
if ptrName == "" {
return nil, nil
}
// ex. "My Camera._hap._tcp.local." -> "My Camera"
var name string
if i := strings.Index(ptrName, "."+hapService); i > 0 {
name = strings.ReplaceAll(ptrName[:i], `\ `, " ")
}
info := map[string]string{}
for _, rr := range records {
txt, ok := rr.(*dns.TXT)
if !ok || txt.Hdr.Name != ptrName {
continue
}
for _, s := range txt.Txt {
k, v, _ := strings.Cut(s, "=")
info[k] = v
}
break
}
category := info[txtCategory]
if category != categoryCamera && category != categoryDoorbell {
return nil, nil
}
categoryName := "camera"
if category == categoryDoorbell {
categoryName = "doorbell"
}
var port int
for _, rr := range records {
if srv, ok := rr.(*dns.SRV); ok && srv.Hdr.Name == ptrName {
port = int(srv.Port)
break
}
}
return &MDNSResult{
Name: name,
DeviceID: info[txtDeviceID],
Model: info[txtModel],
Category: categoryName,
Paired: info[txtStatusFlags] == statusPaired,
Port: port,
}, nil
}
func init() {
dns.Id = func() uint16 { return 0 }
}
+51
View File
@@ -0,0 +1,51 @@
package probe
type Response struct {
IP string `json:"ip"`
Reachable bool `json:"reachable"`
LatencyMs float64 `json:"latency_ms,omitempty"`
Type string `json:"type"` // "unreachable", "standard", "homekit"
Error string `json:"error,omitempty"`
Probes Probes `json:"probes"`
}
type Probes struct {
Ping *PingResult `json:"ping"`
Ports *PortsResult `json:"ports"`
DNS *DNSResult `json:"dns"`
ARP *ARPResult `json:"arp"`
MDNS *MDNSResult `json:"mdns"`
HTTP *HTTPResult `json:"http"`
}
type PingResult struct {
LatencyMs float64 `json:"latency_ms"`
}
type PortsResult struct {
Open []int `json:"open"`
}
type DNSResult struct {
Hostname string `json:"hostname"`
}
type ARPResult struct {
MAC string `json:"mac"`
Vendor string `json:"vendor"`
}
type MDNSResult struct {
Name string `json:"name"`
DeviceID string `json:"device_id"`
Model string `json:"model"`
Category string `json:"category"` // "camera", "doorbell"
Paired bool `json:"paired"`
Port int `json:"port"`
}
type HTTPResult struct {
Port int `json:"port"`
StatusCode int `json:"status_code"`
Server string `json:"server"`
}
+21
View File
@@ -0,0 +1,21 @@
package probe
import (
"database/sql"
"strings"
)
// LookupOUI returns vendor name for MAC address from SQLite oui table.
// MAC format: "C0:56:E3:AA:BB:CC" -> prefix "C0:56:E3"
func LookupOUI(db *sql.DB, mac string) string {
if len(mac) < 8 {
return ""
}
prefix := strings.ToUpper(mac[:8])
prefix = strings.ReplaceAll(prefix, "-", ":")
var brand string
_ = db.QueryRow("SELECT brand FROM oui WHERE prefix = ?", prefix).Scan(&brand)
return brand
}
+39
View File
@@ -0,0 +1,39 @@
package probe
import (
"context"
"net"
"time"
)
func CanICMP() bool {
conn, err := net.DialTimeout("ip4:icmp", "127.0.0.1", 100*time.Millisecond)
if err != nil {
return false
}
conn.Close()
return true
}
func Ping(ctx context.Context, ip string) (*PingResult, error) {
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(100 * time.Millisecond)
}
timeout := time.Until(deadline)
if timeout <= 0 {
return nil, context.DeadlineExceeded
}
start := time.Now()
conn, err := net.DialTimeout("ip4:icmp", ip, timeout)
if err != nil {
return nil, err
}
conn.Close()
return &PingResult{
LatencyMs: float64(time.Since(start).Microseconds()) / 1000.0,
}, nil
}
+64
View File
@@ -0,0 +1,64 @@
package probe
import (
"context"
"fmt"
"net"
"sync"
"time"
)
func ScanPorts(ctx context.Context, ip string, ports []int) (*PortsResult, error) {
if len(ports) == 0 {
return nil, nil
}
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(100 * time.Millisecond)
}
timeout := time.Until(deadline)
if timeout <= 0 {
return nil, context.DeadlineExceeded
}
type hit struct {
port int
latency time.Duration
}
var mu sync.Mutex
var hits []hit
var wg sync.WaitGroup
for _, port := range ports {
wg.Add(1)
go func(port int) {
defer wg.Done()
start := time.Now()
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", ip, port), timeout)
if err != nil {
return
}
conn.Close()
mu.Lock()
hits = append(hits, hit{port: port, latency: time.Since(start)})
mu.Unlock()
}(port)
}
wg.Wait()
if len(hits) == 0 {
return nil, nil
}
open := make([]int, len(hits))
for i, h := range hits {
open[i] = h.port
}
return &PortsResult{Open: open}, nil
}
-405
View File
@@ -1,405 +0,0 @@
package sse
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
)
const (
// IngressPaddingSize is the padding size for Home Assistant Ingress mode.
// HA Supervisor uses aiohttp with 64KB buffer for StreamResponse.
// We need to fill this buffer to force immediate delivery of SSE events.
IngressPaddingSize = 64 * 1024 // 64KB
// IngressHeader is the header that Home Assistant Ingress adds to requests
IngressHeader = "X-Ingress-Path"
)
// Event represents a Server-Sent Event
type Event struct {
ID string
Type string
Data interface{}
Retry int
Comment string
}
// Client represents an SSE client connection
type Client struct {
ID string
Channel chan Event
Response http.ResponseWriter
Request *http.Request
Context context.Context
Cancel context.CancelFunc
}
// Server manages SSE connections
type Server struct {
clients map[string]*Client
register chan *Client
unregister chan *Client
broadcast chan Event
logger interface{ Debug(string, ...any); Error(string, error, ...any) }
}
// NewServer creates a new SSE server
func NewServer(logger interface{ Debug(string, ...any); Error(string, error, ...any) }) *Server {
return &Server{
clients: make(map[string]*Client),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan Event),
logger: logger,
}
}
// Start starts the SSE server
func (s *Server) Start(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
// Close all client connections
for _, client := range s.clients {
client.Cancel()
close(client.Channel)
}
return
case client := <-s.register:
s.clients[client.ID] = client
s.logger.Debug("SSE client registered", "id", client.ID)
case client := <-s.unregister:
if _, ok := s.clients[client.ID]; ok {
delete(s.clients, client.ID)
close(client.Channel)
s.logger.Debug("SSE client unregistered", "id", client.ID)
}
case event := <-s.broadcast:
for _, client := range s.clients {
select {
case client.Channel <- event:
default:
// Client's channel is full, close it
s.unregister <- client
}
}
}
}
}()
}
// ServeHTTP handles SSE connections
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Check if SSE is supported
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
// Set headers for SSE
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("X-Accel-Buffering", "no") // Disable Nginx buffering
// Create client
ctx, cancel := context.WithCancel(r.Context())
client := &Client{
ID: generateClientID(),
Channel: make(chan Event, 100),
Response: w,
Request: r,
Context: ctx,
Cancel: cancel,
}
// Register client
s.register <- client
// Remove client on disconnect
defer func() {
s.unregister <- client
cancel()
}()
// Send initial connection event
s.SendToClient(client, Event{
Type: "connected",
Data: map[string]string{"id": client.ID},
})
// Listen for events
for {
select {
case <-ctx.Done():
return
case <-r.Context().Done():
return
case event := <-client.Channel:
if err := s.writeEvent(w, flusher, event); err != nil {
s.logger.Error("failed to write SSE event", err, "client", client.ID)
return
}
}
}
}
// SendToClient sends an event to a specific client
func (s *Server) SendToClient(client *Client, event Event) {
select {
case client.Channel <- event:
default:
// Channel is full, log warning
s.logger.Debug("client channel full, dropping event", "client", client.ID)
}
}
// Broadcast sends an event to all clients
func (s *Server) Broadcast(event Event) {
s.broadcast <- event
}
// writeEvent writes an event to the response writer
func (s *Server) writeEvent(w http.ResponseWriter, flusher http.Flusher, event Event) error {
// Write event ID if present
if event.ID != "" {
if _, err := fmt.Fprintf(w, "id: %s\n", event.ID); err != nil {
return err
}
}
// Write event type if present
if event.Type != "" {
if _, err := fmt.Fprintf(w, "event: %s\n", event.Type); err != nil {
return err
}
}
// Write retry if present
if event.Retry > 0 {
if _, err := fmt.Fprintf(w, "retry: %d\n", event.Retry); err != nil {
return err
}
}
// Write comment if present
if event.Comment != "" {
if _, err := fmt.Fprintf(w, ": %s\n", event.Comment); err != nil {
return err
}
}
// Write data
if event.Data != nil {
var dataStr string
switch v := event.Data.(type) {
case string:
dataStr = v
case []byte:
dataStr = string(v)
default:
data, err := json.Marshal(v)
if err != nil {
return err
}
dataStr = string(data)
}
// Split data by newlines for proper SSE format
for _, line := range splitLines(dataStr) {
if _, err := fmt.Fprintf(w, "data: %s\n", line); err != nil {
return err
}
}
}
// End event with double newline
if _, err := fmt.Fprintf(w, "\n"); err != nil {
return err
}
// Flush the data
flusher.Flush()
return nil
}
// splitLines splits a string into lines
func splitLines(s string) []string {
var lines []string
var current string
for _, ch := range s {
if ch == '\n' {
lines = append(lines, current)
current = ""
} else {
current += string(ch)
}
}
if current != "" {
lines = append(lines, current)
}
return lines
}
// generateClientID generates a unique client ID
func generateClientID() string {
return fmt.Sprintf("client-%d-%d", time.Now().Unix(), time.Now().Nanosecond())
}
// StreamWriter provides a simple interface for writing SSE events
type StreamWriter struct {
client *Client
server *Server
isIngress bool // True when running through Home Assistant Ingress proxy
}
// NewStreamWriter creates a new stream writer for a client
func (s *Server) NewStreamWriter(w http.ResponseWriter, r *http.Request) (*StreamWriter, error) {
// Check if SSE is supported
flusher, ok := w.(http.Flusher)
if !ok {
return nil, fmt.Errorf("SSE not supported")
}
// Set headers for SSE
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("X-Accel-Buffering", "no")
// Send initial flush to establish connection
flusher.Flush()
// Detect Home Assistant Ingress mode by checking for X-Ingress-Path header
isIngress := r.Header.Get(IngressHeader) != ""
// Create client
ctx, cancel := context.WithCancel(r.Context())
client := &Client{
ID: generateClientID(),
Channel: make(chan Event, 100),
Response: w,
Request: r,
Context: ctx,
Cancel: cancel,
}
return &StreamWriter{
client: client,
server: s,
isIngress: isIngress,
}, nil
}
// SendEvent sends an event through the stream writer
func (sw *StreamWriter) SendEvent(eventType string, data interface{}) error {
event := Event{
Type: eventType,
Data: data,
}
flusher, ok := sw.client.Response.(http.Flusher)
if !ok {
return fmt.Errorf("response does not support flushing")
}
// Use Ingress-aware write method
return sw.writeEventWithIngress(sw.client.Response, flusher, event)
}
// writeEventWithIngress writes an event and adds padding for Ingress mode
func (sw *StreamWriter) writeEventWithIngress(w http.ResponseWriter, flusher http.Flusher, event Event) error {
// Write the event using standard method
if err := sw.server.writeEvent(w, flusher, event); err != nil {
return err
}
// In Ingress mode, add padding to fill the 64KB buffer and force immediate delivery
if sw.isIngress {
if err := sw.writePadding(w, flusher); err != nil {
return err
}
}
return nil
}
// writePadding writes SSE comment padding to fill proxy buffers.
// SSE comments (lines starting with ':') are ignored by clients.
func (sw *StreamWriter) writePadding(w http.ResponseWriter, flusher http.Flusher) error {
// Create padding using SSE comments which are ignored by clients
// Each line is ": " + padding content + "\n"
// We need ~64KB to fill the aiohttp StreamResponse buffer
const lineSize = 1024 // 1KB per line
const numLines = 64 // 64 lines = 64KB
paddingLine := ": " + strings.Repeat(".", lineSize-4) + "\n" // -4 for ": " and "\n"
for i := 0; i < numLines; i++ {
if _, err := fmt.Fprint(w, paddingLine); err != nil {
return err
}
}
// Flush the padding
flusher.Flush()
return nil
}
// SendJSON sends JSON data as an event
func (sw *StreamWriter) SendJSON(eventType string, v interface{}) error {
return sw.SendEvent(eventType, v)
}
// IsIngress returns true if running through Home Assistant Ingress proxy
func (sw *StreamWriter) IsIngress() bool {
return sw.isIngress
}
// SendMessage sends a simple message
func (sw *StreamWriter) SendMessage(message string) error {
return sw.SendEvent("message", map[string]string{"message": message})
}
// SendError sends an error message
func (sw *StreamWriter) SendError(err error) error {
return sw.SendEvent("error", map[string]string{"error": err.Error()})
}
// SendProgress sends a progress update
func (sw *StreamWriter) SendProgress(current, total int, message string) error {
return sw.SendEvent("progress", map[string]interface{}{
"current": current,
"total": total,
"message": message,
"percent": float64(current) / float64(total) * 100,
})
}
// Close closes the stream writer
func (sw *StreamWriter) Close() {
// Perform final flush if possible
if flusher, ok := sw.client.Response.(http.Flusher); ok {
flusher.Flush()
}
sw.client.Cancel()
}
+97
View File
@@ -0,0 +1,97 @@
package tester
import (
"sync"
"time"
)
const SessionTTL = 30 * time.Minute
type Session struct {
ID string `json:"session_id"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
Total int `json:"total"`
Tested int `json:"tested"`
Alive int `json:"alive"`
WithScreen int `json:"with_screenshot"`
Results []*Result `json:"results"`
Screenshots [][]byte `json:"-"`
cancel chan struct{}
mu sync.Mutex
}
type Result struct {
Source string `json:"source"`
Screenshot string `json:"screenshot,omitempty"`
Codecs []string `json:"codecs,omitempty"`
LatencyMs int64 `json:"latency_ms,omitempty"`
Skipped bool `json:"skipped,omitempty"`
}
func NewSession(id string, total int) *Session {
return &Session{
ID: id,
Status: "running",
CreatedAt: time.Now(),
Total: total,
cancel: make(chan struct{}),
}
}
func (s *Session) AddResult(r *Result) {
s.mu.Lock()
s.Results = append(s.Results, r)
s.Alive++
if r.Screenshot != "" {
s.WithScreen++
}
s.mu.Unlock()
}
func (s *Session) AddTested() {
s.mu.Lock()
s.Tested++
s.mu.Unlock()
}
func (s *Session) AddScreenshot(data []byte) int {
s.mu.Lock()
idx := len(s.Screenshots)
s.Screenshots = append(s.Screenshots, data)
s.mu.Unlock()
return idx
}
func (s *Session) GetScreenshot(idx int) []byte {
s.mu.Lock()
defer s.mu.Unlock()
if idx < 0 || idx >= len(s.Screenshots) {
return nil
}
return s.Screenshots[idx]
}
func (s *Session) Done() {
s.mu.Lock()
s.Status = "done"
s.ExpiresAt = time.Now().Add(SessionTTL)
s.mu.Unlock()
}
func (s *Session) Cancel() {
select {
case <-s.cancel:
default:
close(s.cancel)
}
}
func (s *Session) Cancelled() <-chan struct{} {
return s.cancel
}
func (s *Session) Lock() { s.mu.Lock() }
func (s *Session) Unlock() { s.mu.Unlock() }
+50
View File
@@ -0,0 +1,50 @@
package tester
import (
"fmt"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/rtsp"
)
// SourceHandler tests stream URL, returns Producer or error
type SourceHandler func(rawURL string) (core.Producer, error)
var handlers = map[string]SourceHandler{}
func RegisterSource(scheme string, handler SourceHandler) {
handlers[scheme] = handler
}
func GetHandler(rawURL string) SourceHandler {
if i := strings.IndexByte(rawURL, ':'); i > 0 {
return handlers[rawURL[:i]]
}
return nil
}
func init() {
RegisterSource("rtsp", rtspHandler)
RegisterSource("rtsps", rtspHandler)
RegisterSource("rtspx", rtspHandler)
}
// rtspHandler -- Dial + Describe. Proves: port open, RTSP responds, auth OK, SDP received.
func rtspHandler(rawURL string) (core.Producer, error) {
rawURL, _, _ = strings.Cut(rawURL, "#")
conn := rtsp.NewClient(rawURL)
conn.Backchannel = false
if err := conn.Dial(); err != nil {
return nil, fmt.Errorf("rtsp: dial: %w", err)
}
if err := conn.Describe(); err != nil {
_ = conn.Stop()
return nil, fmt.Errorf("rtsp: describe: %w", err)
}
return conn, nil
}
+171
View File
@@ -0,0 +1,171 @@
package tester
import (
"bytes"
"fmt"
"os/exec"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/magic"
)
const workers = 20
func RunWorkers(s *Session, urls []string) {
ch := make(chan string, len(urls))
for _, u := range urls {
ch <- u
}
close(ch)
done := make(chan struct{})
n := workers
if len(urls) < n {
n = len(urls)
}
for i := 0; i < n; i++ {
go func() {
for rawURL := range ch {
select {
case <-s.Cancelled():
return
default:
}
testURL(s, rawURL)
}
done <- struct{}{}
}()
}
for i := 0; i < n; i++ {
<-done
}
s.Done()
}
func testURL(s *Session, rawURL string) {
defer s.AddTested()
handler := GetHandler(rawURL)
if handler == nil {
return
}
start := time.Now()
prod, err := handler(rawURL)
if err != nil {
return
}
defer func() { _ = prod.Stop() }()
latency := time.Since(start).Milliseconds()
var codecs []string
for _, media := range prod.GetMedias() {
if media.Direction != core.DirectionRecvonly {
continue
}
for _, codec := range media.Codecs {
codecs = append(codecs, codec.Name)
}
}
r := &Result{
Source: rawURL,
Codecs: codecs,
LatencyMs: latency,
}
if raw, codecName := getScreenshot(prod); raw != nil {
var jpeg []byte
switch codecName {
case core.CodecH264, core.CodecH265:
jpeg = toJPEG(raw)
case core.CodecJPEG:
jpeg = raw
default:
jpeg = raw
}
if jpeg != nil {
idx := s.AddScreenshot(jpeg)
r.Screenshot = fmt.Sprintf("/api/test/screenshot?id=%s&i=%d", s.ID, idx)
}
}
s.AddResult(r)
}
// getScreenshot connects Keyframe consumer to producer, waits for first keyframe with 10s timeout
func getScreenshot(prod core.Producer) ([]byte, string) {
cons := magic.NewKeyframe()
for _, prodMedia := range prod.GetMedias() {
if prodMedia.Kind != core.KindVideo || prodMedia.Direction != core.DirectionRecvonly {
continue
}
for _, consMedia := range cons.GetMedias() {
prodCodec, consCodec := prodMedia.MatchMedia(consMedia)
if prodCodec == nil {
continue
}
track, err := prod.GetTrack(prodMedia, prodCodec)
if err != nil {
continue
}
if err = cons.AddTrack(consMedia, consCodec, track); err != nil {
continue
}
goto matched
}
}
return nil, ""
matched:
go func() {
_ = prod.Start()
}()
once := &core.OnceBuffer{}
done := make(chan struct{})
go func() {
_, _ = cons.WriteTo(once)
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
_ = prod.Stop()
return nil, ""
}
return once.Buffer(), cons.CodecName()
}
func toJPEG(raw []byte) []byte {
cmd := exec.Command("ffmpeg",
"-hide_banner", "-loglevel", "error",
"-i", "-",
"-frames:v", "1",
"-f", "image2", "-c:v", "mjpeg",
"-",
)
cmd.Stdin = bytes.NewReader(raw)
out, err := cmd.Output()
if err != nil {
return nil
}
return out
}