diff --git a/internal/exec/closer.go b/internal/exec/closer.go new file mode 100644 index 00000000..66d0e3ac --- /dev/null +++ b/internal/exec/closer.go @@ -0,0 +1,39 @@ +package exec + +import ( + "errors" + "net/url" + "os" + "os/exec" + "syscall" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +// closer support custom killsignal with custom killtimeout +type closer struct { + cmd *exec.Cmd + query url.Values +} + +func (c *closer) Close() (err error) { + sig := os.Kill + if s := c.query.Get("killsignal"); s != "" { + sig = syscall.Signal(core.Atoi(s)) + } + + log.Trace().Msgf("[exec] kill with signal=%d", sig) + err = c.cmd.Process.Signal(sig) + + if s := c.query.Get("killtimeout"); s != "" { + timeout := time.Duration(core.Atoi(s)) * time.Second + timer := time.AfterFunc(timeout, func() { + log.Trace().Msgf("[exec] kill after timeout=%s", s) + _ = c.cmd.Process.Kill() + }) + defer timer.Stop() // stop timer if Wait ends before timeout + } + + return errors.Join(err, c.cmd.Wait()) +} diff --git a/internal/exec/exec.go b/internal/exec/exec.go index ac1691d3..035317d9 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -1,10 +1,12 @@ package exec import ( + "bufio" "crypto/md5" "encoding/hex" "errors" "fmt" + "io" "net/url" "os" "os/exec" @@ -49,8 +51,10 @@ func Init() { } func execHandle(rawURL string) (core.Producer, error) { + rawURL, rawQuery, _ := strings.Cut(rawURL, "#") + query := streams.ParseQuery(rawQuery) + var path string - var query url.Values // RTSP flow should have `{output}` inside URL // pipe flow may have `#{params}` inside URL @@ -62,9 +66,6 @@ func execHandle(rawURL string) (core.Producer, error) { sum := md5.Sum([]byte(rawURL)) path = "/" + hex.EncodeToString(sum[:]) rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:] - } else if i = strings.IndexByte(rawURL, '#'); i > 0 { - query = streams.ParseQuery(rawURL[i+1:]) - rawURL = rawURL[:i] } args := shell.QuoteSplit(rawURL[5:]) // remove `exec:` @@ -74,23 +75,34 @@ func execHandle(rawURL string) (core.Producer, error) { debug: log.Debug().Enabled(), } - if path == "" { - return handlePipe(rawURL, cmd, query) - } - - return handleRTSP(rawURL, cmd, path) -} - -func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer, error) { if query.Get("backchannel") == "1" { return stdin.NewClient(cmd) } - r, err := PipeCloser(cmd, query) + cl := &closer{cmd: cmd, query: query} + + if path == "" { + return handlePipe(rawURL, cmd, cl) + } + + return handleRTSP(rawURL, cmd, cl, path) +} + +func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, error) { + stdout, err := cmd.StdoutPipe() if err != nil { return nil, err } + rc := struct { + io.Reader + io.Closer + }{ + // add buffer for pipe reader to reduce syscall + bufio.NewReaderSize(stdout, core.BufferSize), + cl, + } + log.Debug().Strs("args", cmd.Args).Msg("[exec] run pipe") ts := time.Now() @@ -99,9 +111,9 @@ func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer, return nil, err } - prod, err := magic.Open(r) + prod, err := magic.Open(rc) if err != nil { - _ = r.Close() + _ = rc.Close() return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr) } @@ -115,7 +127,7 @@ func handlePipe(source string, cmd *exec.Cmd, query url.Values) (core.Producer, return prod, nil } -func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error) { +func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.Producer, error) { if log.Trace().Enabled() { cmd.Stdout = os.Stdout } @@ -147,9 +159,9 @@ func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error }() select { - case <-time.After(time.Second * 60): - _ = cmd.Process.Kill() + case <-time.After(time.Minute): log.Error().Str("source", source).Msg("[exec] timeout") + _ = cl.Close() return nil, errors.New("exec: timeout") case <-done: // limit message size @@ -157,10 +169,7 @@ func handleRTSP(source string, cmd *exec.Cmd, path string) (core.Producer, error case prod := <-waiter: log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp") setRemoteInfo(prod, source, cmd.Args) - prod.OnClose = func() error { - log.Debug().Msgf("[exec] kill rtsp") - return errors.Join(cmd.Process.Kill(), cmd.Wait()) - } + prod.OnClose = cl.Close return prod, nil } } diff --git a/internal/exec/pipe.go b/internal/exec/pipe.go deleted file mode 100644 index 12ea136b..00000000 --- a/internal/exec/pipe.go +++ /dev/null @@ -1,56 +0,0 @@ -package exec - -import ( - "bufio" - "errors" - "io" - "net/url" - "os/exec" - "syscall" - "time" - - "github.com/AlexxIT/go2rtc/pkg/core" -) - -// PipeCloser - return StdoutPipe that Kill cmd on Close call -func PipeCloser(cmd *exec.Cmd, query url.Values) (io.ReadCloser, error) { - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, err - } - - // add buffer for pipe reader to reduce syscall - return &pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd, query}, nil -} - -type pipeCloser struct { - io.Reader - io.Closer - cmd *exec.Cmd - query url.Values -} - -func (p *pipeCloser) Close() error { - return errors.Join(p.Closer.Close(), p.Kill(), p.Wait()) -} - -func (p *pipeCloser) Kill() error { - if s := p.query.Get("killsignal"); s != "" { - log.Trace().Msgf("[exec] kill with custom sig=%s", s) - sig := syscall.Signal(core.Atoi(s)) - return p.cmd.Process.Signal(sig) - } - return p.cmd.Process.Kill() -} - -func (p *pipeCloser) Wait() error { - if s := p.query.Get("killtimeout"); s != "" { - timeout := time.Duration(core.Atoi(s)) * time.Second - timer := time.AfterFunc(timeout, func() { - log.Trace().Msgf("[exec] kill after timeout=%s", s) - _ = p.cmd.Process.Kill() - }) - defer timer.Stop() // stop timer if Wait ends before timeout - } - return p.cmd.Wait() -}