From b3c5ef8c865d4c87522f19d77cd7ee0f63da2f60 Mon Sep 17 00:00:00 2001 From: Alex X Date: Fri, 3 May 2024 14:28:16 +0300 Subject: [PATCH] Add "human" error from exec source --- internal/exec/exec.go | 50 +++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/internal/exec/exec.go b/internal/exec/exec.go index 36a76473..a160136c 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "errors" "fmt" + "io" "net/url" "os" "os/exec" @@ -102,14 +103,22 @@ func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error } func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { + stderr := limitBuffer{buf: make([]byte, 512)} + + if cmd.Stderr != nil { + cmd.Stderr = io.MultiWriter(cmd.Stderr, &stderr) + } else { + cmd.Stderr = &stderr + } + if log.Trace().Enabled() { cmd.Stdout = os.Stdout } - ch := make(chan core.Producer) + waiter := make(chan core.Producer) waitersMu.Lock() - waiters[path] = ch + waiters[path] = waiter waitersMu.Unlock() defer func() { @@ -127,16 +136,9 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { return nil, err } - chErr := make(chan error) - + done := make(chan error, 1) go func() { - err := cmd.Wait() - // unblocking write to channel - select { - case chErr <- err: - default: - log.Trace().Str("url", url).Msg("[exec] close") - } + done <- cmd.Wait() }() select { @@ -144,9 +146,10 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) { _ = cmd.Process.Kill() log.Error().Str("url", url).Msg("[exec] timeout") return nil, errors.New("timeout") - case err := <-chErr: - return nil, fmt.Errorf("exec: %s", err) - case prod := <-ch: + case <-done: + // limit message size + return nil, errors.New("exec: " + stderr.String()) + case prod := <-waiter: log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run") return prod, nil } @@ -159,3 +162,22 @@ var ( waiters = map[string]chan core.Producer{} waitersMu sync.Mutex ) + +type limitBuffer struct { + buf []byte + n int +} + +func (l *limitBuffer) String() string { + if l.n == len(l.buf) { + return string(l.buf) + "..." + } + return string(l.buf[:l.n]) +} + +func (l *limitBuffer) Write(p []byte) (int, error) { + if l.n < cap(l.buf) { + l.n += copy(l.buf[l.n:], p) + } + return len(p), nil +}