diff --git a/internal/execbc/init.go b/internal/execbc/init.go new file mode 100644 index 00000000..7d346331 --- /dev/null +++ b/internal/execbc/init.go @@ -0,0 +1,21 @@ +package execbc + +import ( + "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/execbc" + "github.com/AlexxIT/go2rtc/pkg/shell" +) + +func Init() { + streams.HandleFunc("execbc", handle) +} + +func handle(url string) (core.Producer, error) { + args := shell.QuoteSplit(url[7:]) + con, err := execbc.NewClient(args) + if err != nil { + return nil, err + } + return con, nil +} diff --git a/main.go b/main.go index 91bc9938..aa218fd3 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/dvrip" "github.com/AlexxIT/go2rtc/internal/echo" "github.com/AlexxIT/go2rtc/internal/exec" + "github.com/AlexxIT/go2rtc/internal/execbc" "github.com/AlexxIT/go2rtc/internal/expr" "github.com/AlexxIT/go2rtc/internal/ffmpeg" "github.com/AlexxIT/go2rtc/internal/gopro" @@ -80,6 +81,7 @@ func main() { bubble.Init() // bubble source expr.Init() // expr source gopro.Init() // gopro source + execbc.Init() // Local Backchannel // 6. Helper modules diff --git a/pkg/execbc/client.go b/pkg/execbc/client.go new file mode 100644 index 00000000..c5b3490d --- /dev/null +++ b/pkg/execbc/client.go @@ -0,0 +1,53 @@ +package execbc + +import ( + "io" + "net" + "os/exec" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +type Client struct { + medias []*core.Media + sender *core.Sender + conn net.Conn + send int + pipeCloser io.WriteCloser + commandArgs []string + cmd *exec.Cmd +} + +func NewClient(commandArgs []string) (*Client, error) { + c := &Client{commandArgs: commandArgs} + media := &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecPCMA, ClockRate: 8000}, + }, + } + + c.medias = append(c.medias, media) + + cmdName := c.commandArgs[0] + args := c.commandArgs[1:] + cmd := *exec.Command(cmdName, args...) + + pipeCloser, error := PipeCloser(&cmd) + if error != nil { + return nil, error + } + c.pipeCloser = pipeCloser + c.cmd = &cmd + return c, nil +} + +func (c Client) Open() (err error) { + c.cmd.Run() + return +} + +func (c Client) Close() (err error) { + return c.pipeCloser.Close() +} diff --git a/pkg/execbc/consumer.go b/pkg/execbc/consumer.go new file mode 100644 index 00000000..9595a47c --- /dev/null +++ b/pkg/execbc/consumer.go @@ -0,0 +1,62 @@ +package execbc + +import ( + "encoding/json" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +func (c *Client) GetMedias() []*core.Media { + return c.medias +} + +func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return nil, core.ErrCantGetTrack +} + +func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { + if c.sender == nil { + c.sender = core.NewSender(media, track.Codec) + c.sender.Handler = func(packet *rtp.Packet) { + c.pipeCloser.Write(packet.Payload) + + c.send += len(packet.Payload) + } + } + + c.sender.HandleRTP(track) + return nil +} + +func (c *Client) Start() (err error) { + if err = c.Open(); err != nil { + return + } + return +} + +func (c *Client) Stop() (err error) { + if c.sender != nil { + c.sender.Close() + } + + if c.conn != nil { + _ = c.Close() + return c.conn.Close() + } + + return nil +} + +func (c *Client) MarshalJSON() ([]byte, error) { + info := &core.Info{ + Type: "Command Backchannel PCMA", + Medias: c.medias, + Send: c.send, + } + if c.sender != nil { + info.Senders = []*core.Sender{c.sender} + } + return json.Marshal(info) +} diff --git a/pkg/execbc/pipe.go b/pkg/execbc/pipe.go new file mode 100644 index 00000000..5ad73398 --- /dev/null +++ b/pkg/execbc/pipe.go @@ -0,0 +1,27 @@ +package execbc + +import ( + "io" + "os/exec" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +type pipeCloser struct { + io.Writer + io.Closer + cmd *exec.Cmd +} + +func PipeCloser(cmd *exec.Cmd) (io.WriteCloser, error) { + stdin, err := cmd.StdinPipe() + + if err != nil { + return nil, err + } + return pipeCloser{stdin, stdin, cmd}, nil +} + +func (p pipeCloser) Close() (err error) { + return core.Any(p.Closer.Close(), p.cmd.Process.Kill(), p.cmd.Wait()) +}