Add "human" error from exec source
This commit is contained in:
+36
-14
@@ -5,6 +5,7 @@ import (
|
|||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
@@ -102,14 +103,22 @@ func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
|
func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
|
||||||
|
stderr := limitBuffer{buf: make([]byte, 512)}
|
||||||
|
|
||||||
|
if cmd.Stderr != nil {
|
||||||
|
cmd.Stderr = io.MultiWriter(cmd.Stderr, &stderr)
|
||||||
|
} else {
|
||||||
|
cmd.Stderr = &stderr
|
||||||
|
}
|
||||||
|
|
||||||
if log.Trace().Enabled() {
|
if log.Trace().Enabled() {
|
||||||
cmd.Stdout = os.Stdout
|
cmd.Stdout = os.Stdout
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan core.Producer)
|
waiter := make(chan core.Producer)
|
||||||
|
|
||||||
waitersMu.Lock()
|
waitersMu.Lock()
|
||||||
waiters[path] = ch
|
waiters[path] = waiter
|
||||||
waitersMu.Unlock()
|
waitersMu.Unlock()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -127,16 +136,9 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
chErr := make(chan error)
|
done := make(chan error, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := cmd.Wait()
|
done <- cmd.Wait()
|
||||||
// unblocking write to channel
|
|
||||||
select {
|
|
||||||
case chErr <- err:
|
|
||||||
default:
|
|
||||||
log.Trace().Str("url", url).Msg("[exec] close")
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@@ -144,9 +146,10 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
|
|||||||
_ = cmd.Process.Kill()
|
_ = cmd.Process.Kill()
|
||||||
log.Error().Str("url", url).Msg("[exec] timeout")
|
log.Error().Str("url", url).Msg("[exec] timeout")
|
||||||
return nil, errors.New("timeout")
|
return nil, errors.New("timeout")
|
||||||
case err := <-chErr:
|
case <-done:
|
||||||
return nil, fmt.Errorf("exec: %s", err)
|
// limit message size
|
||||||
case prod := <-ch:
|
return nil, errors.New("exec: " + stderr.String())
|
||||||
|
case prod := <-waiter:
|
||||||
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run")
|
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run")
|
||||||
return prod, nil
|
return prod, nil
|
||||||
}
|
}
|
||||||
@@ -159,3 +162,22 @@ var (
|
|||||||
waiters = map[string]chan core.Producer{}
|
waiters = map[string]chan core.Producer{}
|
||||||
waitersMu sync.Mutex
|
waitersMu sync.Mutex
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type limitBuffer struct {
|
||||||
|
buf []byte
|
||||||
|
n int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *limitBuffer) String() string {
|
||||||
|
if l.n == len(l.buf) {
|
||||||
|
return string(l.buf) + "..."
|
||||||
|
}
|
||||||
|
return string(l.buf[:l.n])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *limitBuffer) Write(p []byte) (int, error) {
|
||||||
|
if l.n < cap(l.buf) {
|
||||||
|
l.n += copy(l.buf[l.n:], p)
|
||||||
|
}
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user