Merge pull request #871 from dadav/signal

Feature: Make kill signal configurable
This commit is contained in:
Alex X
2024-04-29 18:40:07 +03:00
committed by GitHub
4 changed files with 56 additions and 15 deletions
+6
View File
@@ -416,11 +416,17 @@ The source can be used with:
- [Raspberry Pi Cameras](https://www.raspberrypi.com/documentation/computers/camera_software.html) - [Raspberry Pi Cameras](https://www.raspberrypi.com/documentation/computers/camera_software.html)
- any your own software - any your own software
Pipe commands support two parameters (format: `exec:{command}#{param1}#{param2}`):
- `killsignal` - signal which will be send to stop the process (numeric form)
- `killtimeout` - time in seconds for forced termination with sigkill
```yaml ```yaml
streams: streams:
stream: exec:ffmpeg -re -i /media/BigBuckBunny.mp4 -c copy -rtsp_transport tcp -f rtsp {output} stream: exec:ffmpeg -re -i /media/BigBuckBunny.mp4 -c copy -rtsp_transport tcp -f rtsp {output}
picam_h264: exec:libcamera-vid -t 0 --inline -o - picam_h264: exec:libcamera-vid -t 0 --inline -o -
picam_mjpeg: exec:libcamera-vid -t 0 --codec mjpeg -o - picam_mjpeg: exec:libcamera-vid -t 0 --codec mjpeg -o -
canon: exec:gphoto2 --capture-movie --stdout#killsignal=2#killtimeout=5
``` ```
#### Source: Echo #### Source: Echo
+16 -10
View File
@@ -5,6 +5,7 @@ import (
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
"net/url"
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
@@ -45,17 +46,19 @@ func Init() {
log = app.GetLogger("exec") log = app.GetLogger("exec")
} }
func execHandle(url string) (core.Producer, error) { func execHandle(rawURL string) (core.Producer, error) {
var path string var path string
args := shell.QuoteSplit(url[5:]) // remove `exec:` rawURL, rawQuery, _ := strings.Cut(rawURL, "#")
args := shell.QuoteSplit(rawURL[5:]) // remove `exec:`
for i, arg := range args { for i, arg := range args {
if arg == "{output}" { if arg == "{output}" {
if rtsp.Port == "" { if rtsp.Port == "" {
return nil, errors.New("rtsp module disabled") return nil, errors.New("rtsp module disabled")
} }
sum := md5.Sum([]byte(url)) sum := md5.Sum([]byte(rawURL))
path = "/" + hex.EncodeToString(sum[:]) path = "/" + hex.EncodeToString(sum[:])
args[i] = "rtsp://127.0.0.1:" + rtsp.Port + path args[i] = "rtsp://127.0.0.1:" + rtsp.Port + path
break break
@@ -68,14 +71,15 @@ func execHandle(url string) (core.Producer, error) {
} }
if path == "" { if path == "" {
return handlePipe(url, cmd) query := streams.ParseQuery(rawQuery)
return handlePipe(rawURL, cmd, query)
} }
return handleRTSP(url, path, cmd) return handleRTSP(rawURL, path, cmd)
} }
func handlePipe(url string, cmd *exec.Cmd) (core.Producer, error) { func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error) {
r, err := PipeCloser(cmd) r, err := PipeCloser(cmd, query)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -145,6 +149,8 @@ func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) {
// internal // internal
var log zerolog.Logger var (
var waiters = map[string]chan core.Producer{} log zerolog.Logger
var waitersMu sync.Mutex waiters = map[string]chan core.Producer{}
waitersMu sync.Mutex
)
+31 -5
View File
@@ -2,29 +2,55 @@ package exec
import ( import (
"bufio" "bufio"
"errors"
"io" "io"
"net/url"
"os/exec" "os/exec"
"syscall"
"time"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
) )
// PipeCloser - return StdoutPipe that Kill cmd on Close call // PipeCloser - return StdoutPipe that Kill cmd on Close call
func PipeCloser(cmd *exec.Cmd) (io.ReadCloser, error) { func PipeCloser(cmd *exec.Cmd, query url.Values) (io.ReadCloser, error) {
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// add buffer for pipe reader to reduce syscall // add buffer for pipe reader to reduce syscall
return pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd}, nil return &pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd, query}, nil
} }
type pipeCloser struct { type pipeCloser struct {
io.Reader io.Reader
io.Closer io.Closer
cmd *exec.Cmd cmd *exec.Cmd
query url.Values
} }
func (p pipeCloser) Close() error { func (p *pipeCloser) Close() error {
return core.Any(p.Closer.Close(), p.cmd.Process.Kill(), p.cmd.Wait()) return errors.Join(p.Closer.Close(), p.Kill(), p.Wait())
}
func (p *pipeCloser) Kill() error {
if s := p.query.Get("killsignal"); s != "" {
log.Trace().Msgf("[exec] kill with custom sig=%s", s)
sig := syscall.Signal(core.Atoi(s))
return p.cmd.Process.Signal(sig)
}
return p.cmd.Process.Kill()
}
func (p *pipeCloser) Wait() error {
if s := p.query.Get("killtimeout"); s != "" {
timeout := time.Duration(core.Atoi(s)) * time.Second
timer := time.AfterFunc(timeout, func() {
log.Trace().Msgf("[exec] kill after timeout=%s", s)
_ = p.cmd.Process.Kill()
})
defer timer.Stop() // stop timer if Wait ends before timeout
}
return p.cmd.Wait()
} }
+3
View File
@@ -6,6 +6,9 @@ import (
) )
func ParseQuery(s string) url.Values { func ParseQuery(s string) url.Values {
if len(s) == 0 {
return nil
}
params := url.Values{} params := url.Values{}
for _, key := range strings.Split(s, "#") { for _, key := range strings.Split(s, "#") {
var value string var value string