diff --git a/internal/exec/exec.go b/internal/exec/exec.go index b77eb4c4..d2003a55 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -9,22 +9,17 @@ import ( "github.com/AlexxIT/go2rtc/internal/rtsp" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/pipe" pkg "github.com/AlexxIT/go2rtc/pkg/rtsp" "github.com/AlexxIT/go2rtc/pkg/shell" "github.com/rs/zerolog" "os" "os/exec" - "strings" "sync" "time" ) func Init() { - // depends on RTSP server - if rtsp.Port == "" { - return - } - rtsp.HandleFunc(func(conn *pkg.Conn) bool { waitersMu.Lock() waiter := waiters[conn.URL.Path] @@ -49,23 +44,34 @@ func Init() { } func Handle(url string) (core.Producer, error) { - sum := md5.Sum([]byte(url)) - path := "/" + hex.EncodeToString(sum[:]) + var path string - url = strings.Replace( - url, "{output}", "rtsp://127.0.0.1:"+rtsp.Port+path, 1, - ) + args := shell.QuoteSplit(url[5:]) // remove `exec:` + for i, arg := range args { + if arg == "{output}" { + if rtsp.Port == "" { + return nil, errors.New("rtsp module disabled") + } + + sum := md5.Sum([]byte(url)) + path = "/" + hex.EncodeToString(sum[:]) + args[i] = "rtsp://127.0.0.1:" + rtsp.Port + path + break + } + } - // remove `exec:` - args := shell.QuoteSplit(url[5:]) cmd := exec.Command(args[0], args[1:]...) + if log.Debug().Enabled() { + cmd.Stderr = os.Stderr + } + + if path == "" { + return pipe.NewClient(cmd) + } if log.Trace().Enabled() { cmd.Stdout = os.Stdout } - if log.Debug().Enabled() { - cmd.Stderr = os.Stderr - } ch := make(chan core.Producer) diff --git a/pkg/core/helpers.go b/pkg/core/helpers.go index 44d24953..0cb78aba 100644 --- a/pkg/core/helpers.go +++ b/pkg/core/helpers.go @@ -6,8 +6,15 @@ import ( "runtime" "strconv" "strings" + "time" ) +// Now90000 - timestamp for Video (clock rate = 90000 samples per second) +// same as: uint32(time.Duration(time.Now().UnixNano()) * 90000 / time.Second) +func Now90000() uint32 { + return uint32(time.Duration(time.Now().UnixMilli()) * 90) +} + const symbols = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_" // RandString base10 - numbers, base16 - hex, base36 - digits+letters, base64 - URL safe symbols diff --git a/pkg/mpegts/helpers.go b/pkg/mpegts/helpers.go index 037078a7..673deeeb 100644 --- a/pkg/mpegts/helpers.go +++ b/pkg/mpegts/helpers.go @@ -137,7 +137,7 @@ func (p *PES) GetPacket() (pkt *rtp.Packet) { pkt = &rtp.Packet{ Header: rtp.Header{ PayloadType: p.StreamType, - Timestamp: uint32(time.Duration(time.Now().UnixNano()) * 90000 / time.Second), + Timestamp: core.Now90000(), }, Payload: payload, } diff --git a/pkg/mpegts/reader.go b/pkg/mpegts/reader.go index 66607992..c38b35dd 100644 --- a/pkg/mpegts/reader.go +++ b/pkg/mpegts/reader.go @@ -129,6 +129,14 @@ func (r *Reader) GetPacket() *rtp.Packet { return nil } +func (r *Reader) GetStreamTypes() []byte { + types := make([]byte, 0, len(r.pes)) + for _, pes := range r.pes { + types = append(types, pes.StreamType) + } + return types +} + // Sync - search sync byte func (r *Reader) Sync() bool { // drop previous readed packet diff --git a/pkg/pipe/client.go b/pkg/pipe/client.go new file mode 100644 index 00000000..91265010 --- /dev/null +++ b/pkg/pipe/client.go @@ -0,0 +1,188 @@ +package pipe + +import ( + "bytes" + "encoding/hex" + "errors" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/mpegts" + "github.com/pion/rtp" + "io" + "os/exec" +) + +type Client struct { + cmd *exec.Cmd + stdout io.ReadCloser + sniff []byte + handle func() error + + medias []*core.Media + receiver *core.Receiver + + recv int +} + +func NewClient(cmd *exec.Cmd) (prod *Client, err error) { + prod = &Client{cmd: cmd} + + prod.stdout, err = cmd.StdoutPipe() + if err != nil { + return nil, err + } + + if err = cmd.Start(); err != nil { + return nil, err + } + + prod.sniff = make([]byte, mpegts.PacketSize*3) // MPEG-TS: SDT+PAT+PMT + prod.recv, err = io.ReadFull(prod.stdout, prod.sniff) + if err != nil { + _ = prod.Stop() + return nil, err + } + + var codec *core.Codec + + if bytes.HasPrefix(prod.sniff, []byte{0, 0, 0, 1}) { + switch { + case h264.NALUType(prod.sniff) == h264.NALUTypeSPS: + codec = &core.Codec{ + Name: core.CodecH264, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + } + prod.handle = prod.ReadBitstreams + } + } else if bytes.HasPrefix(prod.sniff, []byte{0xFF, 0xD8}) { + codec = &core.Codec{ + Name: core.CodecJPEG, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + } + prod.handle = prod.ReadMJPEG + } else if prod.sniff[0] == mpegts.SyncByte { + ts := mpegts.NewReader() + ts.AppendBuffer(prod.sniff) + _ = ts.GetPacket() + for _, streamType := range ts.GetStreamTypes() { + switch streamType { + case mpegts.StreamTypeH264: + codec = &core.Codec{ + Name: core.CodecH264, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + } + prod.handle = prod.ReadMPEGTS + } + } + } + + if codec == nil { + _ = prod.Stop() + return nil, errors.New("unknown format: " + hex.EncodeToString(prod.sniff)) + } + + prod.medias = append(prod.medias, &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + }) + + return +} + +func (c *Client) ReadBitstreams() error { + buf := c.sniff // total bufer + b := make([]byte, 1024*1024) // reading buffer + + for { + payload, n := h264.DecodeStream(buf) + if payload == nil { + n, err := c.stdout.Read(b) + if err != nil { + return err + } + + buf = append(buf, b[:n]...) + c.recv += n + continue + } + + buf = buf[n:] + + //log.Printf("[AVC] %v, len: %d", h264.Types(payload), len(payload)) + + pkt := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: payload, + } + c.receiver.WriteRTP(pkt) + } +} + +func (c *Client) ReadMJPEG() error { + buf := c.sniff // total bufer + b := make([]byte, 1024*1024) // reading buffer + + for { + // one JPEG end and next start + i := bytes.Index(buf, []byte{0xFF, 0xD9, 0xFF, 0xD8}) + if i < 0 { + n, err := c.stdout.Read(b) + if err != nil { + return err + } + + buf = append(buf, b[:n]...) + c.recv += n + + // if we receive frame + if n >= 2 && b[n-2] == 0xFF && b[n-1] == 0xD9 { + i = len(buf) + } else { + continue + } + } else { + i += 2 + } + + pkt := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: buf[:i], + } + c.receiver.WriteRTP(pkt) + + buf = buf[i:] + } +} + +func (c *Client) ReadMPEGTS() error { + b := make([]byte, 1024*1024) // reading buffer + + ts := mpegts.NewReader() + ts.AppendBuffer(c.sniff) + + for { + packet := ts.GetPacket() + if packet == nil { + n, err := c.stdout.Read(b) + if err != nil { + return err + } + + ts.AppendBuffer(b[:n]) + c.recv += n + continue + } + + //log.Printf("[AVC] %v, len: %d, ts: %10d", h264.Types(packet.Payload), len(packet.Payload), packet.Timestamp) + + if packet.PayloadType != mpegts.StreamTypeH264 { + continue + } + + c.receiver.WriteRTP(packet) + } +} diff --git a/pkg/pipe/producer.go b/pkg/pipe/producer.go new file mode 100644 index 00000000..c2d5afdd --- /dev/null +++ b/pkg/pipe/producer.go @@ -0,0 +1,51 @@ +package pipe + +import ( + "encoding/json" + "github.com/AlexxIT/go2rtc/pkg/core" + "strings" +) + +func (c *Client) GetMedias() []*core.Media { + return c.medias +} + +func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + if c.receiver == nil { + c.receiver = core.NewReceiver(media, codec) + } + return c.receiver, nil +} + +func (c *Client) Start() error { + return c.handle() +} + +func (c *Client) Stop() (err error) { + if c.receiver != nil { + c.receiver.Close() + } + if err1 := c.stdout.Close(); err != nil { + err = err1 + } + if err1 := c.cmd.Process.Kill(); err != nil { + err = err1 + } + if err1 := c.cmd.Wait(); err != nil { + err = err1 + } + return +} + +func (c *Client) MarshalJSON() ([]byte, error) { + info := &core.Info{ + Type: "PIPE active producer", + URL: c.cmd.Path + " " + strings.Join(c.cmd.Args, " "), + Medias: c.medias, + Recv: c.recv, + } + if c.receiver != nil { + info.Receivers = append(info.Receivers, c.receiver) + } + return json.Marshal(info) +}