Add support pipe to exec source

This commit is contained in:
Alexey Khit
2023-05-03 07:49:10 +03:00
parent 2e8be342ef
commit e78f9fa69d
6 changed files with 277 additions and 17 deletions
+22 -16
View File
@@ -9,22 +9,17 @@ import (
"github.com/AlexxIT/go2rtc/internal/rtsp"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/pipe"
pkg "github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/rs/zerolog"
"os"
"os/exec"
"strings"
"sync"
"time"
)
func Init() {
// depends on RTSP server
if rtsp.Port == "" {
return
}
rtsp.HandleFunc(func(conn *pkg.Conn) bool {
waitersMu.Lock()
waiter := waiters[conn.URL.Path]
@@ -49,23 +44,34 @@ func Init() {
}
func Handle(url string) (core.Producer, error) {
sum := md5.Sum([]byte(url))
path := "/" + hex.EncodeToString(sum[:])
var path string
url = strings.Replace(
url, "{output}", "rtsp://127.0.0.1:"+rtsp.Port+path, 1,
)
args := shell.QuoteSplit(url[5:]) // remove `exec:`
for i, arg := range args {
if arg == "{output}" {
if rtsp.Port == "" {
return nil, errors.New("rtsp module disabled")
}
sum := md5.Sum([]byte(url))
path = "/" + hex.EncodeToString(sum[:])
args[i] = "rtsp://127.0.0.1:" + rtsp.Port + path
break
}
}
// remove `exec:`
args := shell.QuoteSplit(url[5:])
cmd := exec.Command(args[0], args[1:]...)
if log.Debug().Enabled() {
cmd.Stderr = os.Stderr
}
if path == "" {
return pipe.NewClient(cmd)
}
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}
if log.Debug().Enabled() {
cmd.Stderr = os.Stderr
}
ch := make(chan core.Producer)
+7
View File
@@ -6,8 +6,15 @@ import (
"runtime"
"strconv"
"strings"
"time"
)
// Now90000 - timestamp for Video (clock rate = 90000 samples per second)
// same as: uint32(time.Duration(time.Now().UnixNano()) * 90000 / time.Second)
func Now90000() uint32 {
return uint32(time.Duration(time.Now().UnixMilli()) * 90)
}
const symbols = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_"
// RandString base10 - numbers, base16 - hex, base36 - digits+letters, base64 - URL safe symbols
+1 -1
View File
@@ -137,7 +137,7 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) {
pkt = &rtp.Packet{
Header: rtp.Header{
PayloadType: p.StreamType,
Timestamp: uint32(time.Duration(time.Now().UnixNano()) * 90000 / time.Second),
Timestamp: core.Now90000(),
},
Payload: payload,
}
+8
View File
@@ -129,6 +129,14 @@ func (r *Reader) GetPacket() *rtp.Packet {
return nil
}
func (r *Reader) GetStreamTypes() []byte {
types := make([]byte, 0, len(r.pes))
for _, pes := range r.pes {
types = append(types, pes.StreamType)
}
return types
}
// Sync - search sync byte
func (r *Reader) Sync() bool {
// drop previous readed packet
+188
View File
@@ -0,0 +1,188 @@
package pipe
import (
"bytes"
"encoding/hex"
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/pion/rtp"
"io"
"os/exec"
)
type Client struct {
cmd *exec.Cmd
stdout io.ReadCloser
sniff []byte
handle func() error
medias []*core.Media
receiver *core.Receiver
recv int
}
func NewClient(cmd *exec.Cmd) (prod *Client, err error) {
prod = &Client{cmd: cmd}
prod.stdout, err = cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err = cmd.Start(); err != nil {
return nil, err
}
prod.sniff = make([]byte, mpegts.PacketSize*3) // MPEG-TS: SDT+PAT+PMT
prod.recv, err = io.ReadFull(prod.stdout, prod.sniff)
if err != nil {
_ = prod.Stop()
return nil, err
}
var codec *core.Codec
if bytes.HasPrefix(prod.sniff, []byte{0, 0, 0, 1}) {
switch {
case h264.NALUType(prod.sniff) == h264.NALUTypeSPS:
codec = &core.Codec{
Name: core.CodecH264,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
prod.handle = prod.ReadBitstreams
}
} else if bytes.HasPrefix(prod.sniff, []byte{0xFF, 0xD8}) {
codec = &core.Codec{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
prod.handle = prod.ReadMJPEG
} else if prod.sniff[0] == mpegts.SyncByte {
ts := mpegts.NewReader()
ts.AppendBuffer(prod.sniff)
_ = ts.GetPacket()
for _, streamType := range ts.GetStreamTypes() {
switch streamType {
case mpegts.StreamTypeH264:
codec = &core.Codec{
Name: core.CodecH264,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
prod.handle = prod.ReadMPEGTS
}
}
}
if codec == nil {
_ = prod.Stop()
return nil, errors.New("unknown format: " + hex.EncodeToString(prod.sniff))
}
prod.medias = append(prod.medias, &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
})
return
}
func (c *Client) ReadBitstreams() error {
buf := c.sniff // total bufer
b := make([]byte, 1024*1024) // reading buffer
for {
payload, n := h264.DecodeStream(buf)
if payload == nil {
n, err := c.stdout.Read(b)
if err != nil {
return err
}
buf = append(buf, b[:n]...)
c.recv += n
continue
}
buf = buf[n:]
//log.Printf("[AVC] %v, len: %d", h264.Types(payload), len(payload))
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: payload,
}
c.receiver.WriteRTP(pkt)
}
}
func (c *Client) ReadMJPEG() error {
buf := c.sniff // total bufer
b := make([]byte, 1024*1024) // reading buffer
for {
// one JPEG end and next start
i := bytes.Index(buf, []byte{0xFF, 0xD9, 0xFF, 0xD8})
if i < 0 {
n, err := c.stdout.Read(b)
if err != nil {
return err
}
buf = append(buf, b[:n]...)
c.recv += n
// if we receive frame
if n >= 2 && b[n-2] == 0xFF && b[n-1] == 0xD9 {
i = len(buf)
} else {
continue
}
} else {
i += 2
}
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: buf[:i],
}
c.receiver.WriteRTP(pkt)
buf = buf[i:]
}
}
func (c *Client) ReadMPEGTS() error {
b := make([]byte, 1024*1024) // reading buffer
ts := mpegts.NewReader()
ts.AppendBuffer(c.sniff)
for {
packet := ts.GetPacket()
if packet == nil {
n, err := c.stdout.Read(b)
if err != nil {
return err
}
ts.AppendBuffer(b[:n])
c.recv += n
continue
}
//log.Printf("[AVC] %v, len: %d, ts: %10d", h264.Types(packet.Payload), len(packet.Payload), packet.Timestamp)
if packet.PayloadType != mpegts.StreamTypeH264 {
continue
}
c.receiver.WriteRTP(packet)
}
}
+51
View File
@@ -0,0 +1,51 @@
package pipe
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
"strings"
)
func (c *Client) GetMedias() []*core.Media {
return c.medias
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
if c.receiver == nil {
c.receiver = core.NewReceiver(media, codec)
}
return c.receiver, nil
}
func (c *Client) Start() error {
return c.handle()
}
func (c *Client) Stop() (err error) {
if c.receiver != nil {
c.receiver.Close()
}
if err1 := c.stdout.Close(); err != nil {
err = err1
}
if err1 := c.cmd.Process.Kill(); err != nil {
err = err1
}
if err1 := c.cmd.Wait(); err != nil {
err = err1
}
return
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "PIPE active producer",
URL: c.cmd.Path + " " + strings.Join(c.cmd.Args, " "),
Medias: c.medias,
Recv: c.recv,
}
if c.receiver != nil {
info.Receivers = append(info.Receivers, c.receiver)
}
return json.Marshal(info)
}