Rewrite magic client

This commit is contained in:
Alexey Khit
2023-08-16 17:15:27 +03:00
parent 1dd3dbbcd8
commit 4e1a0e1ab9
8 changed files with 428 additions and 239 deletions
+6 -13
View File
@@ -5,6 +5,11 @@ import (
"encoding/hex"
"errors"
"fmt"
"os"
"os/exec"
"sync"
"time"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/rtsp"
"github.com/AlexxIT/go2rtc/internal/streams"
@@ -13,10 +18,6 @@ import (
pkg "github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/rs/zerolog"
"os"
"os/exec"
"sync"
"time"
)
func Init() {
@@ -82,15 +83,7 @@ func handlePipe(url string, cmd *exec.Cmd) (core.Producer, error) {
return nil, err
}
client := magic.NewClient(r)
if err = client.Probe(); err != nil {
return nil, err
}
client.Desc = "exec active producer"
client.URL = url
return client, nil
return magic.Open(r)
}
func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) {
+4 -16
View File
@@ -6,7 +6,6 @@ import (
"net/http"
"net/url"
"strings"
"time"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
@@ -67,14 +66,11 @@ func handleHTTP(url string) (core.Producer, error) {
default: // "video/mpeg":
}
client := magic.NewClient(res.Body)
if err = client.Probe(); err != nil {
client, err := magic.Open(res.Body)
if err != nil {
return nil, err
}
client.Desc = "HTTP active producer"
client.URL = url
return client, nil
}
@@ -84,18 +80,10 @@ func handleTCP(rawURL string) (core.Producer, error) {
return nil, err
}
conn, err := net.DialTimeout("tcp", u.Host, time.Second*3)
conn, err := net.DialTimeout("tcp", u.Host, core.ConnDialTimeout)
if err != nil {
return nil, err
}
client := magic.NewClient(conn)
if err = client.Probe(); err != nil {
return nil, err
}
client.Desc = "TCP active producer"
client.URL = rawURL
return client, nil
return magic.Open(conn)
}
+101
View File
@@ -0,0 +1,101 @@
package core
import (
"errors"
"io"
)
const ProbeSize = 5 * 1024 * 1024 // 5MB
const (
BufferDisable = 0
BufferDrainAndClear = -1
)
// ReadSeeker support buffering and Seek over buffer
// positive BufferSize will enable buffering mode
// Seek to negative offset will clear buffer
// Seek with a positive BufferSize will continue buffering after the last read from the buffer
// Seek with a negative BufferSize will clear buffer after the last read from the buffer
// Read more than BufferSize will raise error
type ReadSeeker struct {
io.Reader
BufferSize int
buf []byte
pos int
}
func NewReadSeeker(rd io.Reader) *ReadSeeker {
if rs, ok := rd.(*ReadSeeker); ok {
return rs
}
return &ReadSeeker{Reader: rd}
}
func (r *ReadSeeker) Read(p []byte) (n int, err error) {
// with zero buffer - read as usual
if r.BufferSize == BufferDisable {
return r.Reader.Read(p)
}
// if buffer not empty - read from it
if r.pos < len(r.buf) {
n = copy(p, r.buf[r.pos:])
r.pos += n
return
}
// with negative buffer - empty it and read as usual
if r.BufferSize < 0 {
r.BufferSize = BufferDisable
r.buf = nil
r.pos = 0
return r.Reader.Read(p)
}
n, err = r.Reader.Read(p)
if len(r.buf)+n > r.BufferSize {
return 0, errors.New("probe reader overflow")
}
r.buf = append(r.buf, p[:n]...)
r.pos += n
return
}
func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
var pos int
switch whence {
case io.SeekStart:
pos = int(offset)
case io.SeekCurrent:
pos = r.pos + int(offset)
case io.SeekEnd:
pos = len(r.buf) + int(offset)
}
// negative offset - empty buffer
if pos < 0 {
r.buf = nil
r.pos = 0
} else if pos >= len(r.buf) {
r.pos = len(r.buf)
} else {
r.pos = pos
}
return int64(r.pos), nil
}
func (r *ReadSeeker) Peek(n int) ([]byte, error) {
r.BufferSize = n
b := make([]byte, n)
if _, err := io.ReadAtLeast(r, b, n); err != nil {
return nil, err
}
r.BufferSize = BufferDrainAndClear
r.pos = 0
return b, nil
}
+64
View File
@@ -0,0 +1,64 @@
package core
import (
"bytes"
"io"
"testing"
"github.com/stretchr/testify/require"
)
func TestReadSeeker(t *testing.T) {
b := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
buf := bytes.NewReader(b)
rd := NewReadSeeker(buf)
rd.BufferSize = ProbeSize
// 1. Read to buffer
b = make([]byte, 3)
n, err := rd.Read(b)
require.Nil(t, err)
require.Equal(t, []byte{0, 1, 2}, b[:n])
// 2. Seek to start
_, err = rd.Seek(0, io.SeekStart)
require.Nil(t, err)
// 3. Read from buffer
b = make([]byte, 2)
n, err = rd.Read(b)
require.Nil(t, err)
require.Equal(t, []byte{0, 1}, b[:n])
// 4. Read from buffer
n, err = rd.Read(b)
require.Nil(t, err)
require.Equal(t, []byte{2}, b[:n])
// 5. Read to buffer
n, err = rd.Read(b)
require.Nil(t, err)
require.Equal(t, []byte{3, 4}, b[:n])
// 6. Seek to start
_, err = rd.Seek(0, io.SeekStart)
require.Nil(t, err)
// 7. Disable buffer
rd.BufferSize = -1
// 8. Read from buffer
b = make([]byte, 10)
n, err = rd.Read(b)
require.Nil(t, err)
require.Equal(t, []byte{0, 1, 2, 3, 4}, b[:n])
// 9. Direct read
n, err = rd.Read(b)
require.Nil(t, err)
require.Equal(t, []byte{5, 6, 7, 8, 9}, b[:n])
// 10. Check buffer empty
require.Nil(t, rd.buf)
}
+118
View File
@@ -0,0 +1,118 @@
package bitstream
import (
"encoding/hex"
"encoding/json"
"errors"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/AlexxIT/go2rtc/pkg/h265"
"github.com/pion/rtp"
)
type Client struct {
rd *core.ReadSeeker
media *core.Media
receiver *core.Receiver
recv int
}
func Open(r io.Reader) (*Client, error) {
rd := core.NewReadSeeker(r)
buf, err := rd.Peek(256)
if err != nil {
return nil, err
}
buf = annexb.EncodeToAVCC(buf, false) // won't break original buffer
var codec *core.Codec
switch {
case h264.NALUType(buf) == h264.NALUTypeSPS:
codec = h264.AVCCToCodec(buf)
case h265.NALUType(buf) == h265.NALUTypeVPS:
codec = h265.AVCCToCodec(buf)
default:
return nil, errors.New("bitstream: unsupported header: " + hex.EncodeToString(buf[:8]))
}
client := &Client{
rd: rd,
media: &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
},
}
return client, nil
}
func (c *Client) GetMedias() []*core.Media {
return []*core.Media{c.media}
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
if c.receiver == nil {
c.receiver = core.NewReceiver(media, codec)
}
return c.receiver, nil
}
func (c *Client) Start() error {
var buf []byte
b := make([]byte, core.BufferSize)
for {
n, err := c.rd.Read(b)
if err != nil {
return err
}
c.recv += n
buf = append(buf, b[:n]...)
i := annexb.IndexFrame(buf)
if i < 0 {
continue
}
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: annexb.EncodeToAVCC(buf[:i], true),
}
c.receiver.WriteRTP(pkt)
//log.Printf("[AVC] %v, len: %d", h264.Types(pkt.Payload), len(pkt.Payload))
buf = buf[i:]
}
}
func (c *Client) Stop() error {
if c.receiver != nil {
c.receiver.Close()
}
if closer, ok := c.rd.Reader.(io.Closer); ok {
return closer.Close()
}
return nil
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "Bitstream active producer",
Medias: []*core.Media{c.media},
Receivers: []*core.Receiver{c.receiver},
Recv: c.recv,
}
return json.Marshal(info)
}
+25 -190
View File
@@ -4,211 +4,46 @@ import (
"bytes"
"encoding/hex"
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h265"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/pion/rtp"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/AlexxIT/go2rtc/pkg/magic/bitstream"
"github.com/AlexxIT/go2rtc/pkg/magic/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
)
// Client - can read unknown bytestream and autodetect format
type Client struct {
Desc string
URL string
Handle func() error
r io.ReadCloser
sniff []byte
medias []*core.Media
receiver *core.Receiver
recv int
rd *core.ReadSeeker
prod core.Producer
}
func NewClient(r io.ReadCloser) *Client {
return &Client{r: r}
}
func Open(r io.Reader) (*Client, error) {
rd := core.NewReadSeeker(r)
func (c *Client) Probe() (err error) {
c.sniff = make([]byte, mpegts.PacketSize*3) // MPEG-TS: SDT+PAT+PMT
c.recv, err = io.ReadFull(c.r, c.sniff)
b, err := rd.Peek(4)
if err != nil {
_ = c.Close()
return
return nil, err
}
var codec *core.Codec
if bytes.HasPrefix(c.sniff, []byte{0, 0, 0, 1}) {
switch {
case h264.NALUType(c.sniff) == h264.NALUTypeSPS:
codec = &core.Codec{
Name: core.CodecH264,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
c.Handle = c.ReadBitstreams
case h265.NALUType(c.sniff) == h265.NALUTypeVPS:
codec = &core.Codec{
Name: core.CodecH265,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
c.Handle = c.ReadBitstreams
switch {
case bytes.HasPrefix(b, []byte(annexb.StartCode)) || bytes.HasPrefix(b, []byte{0, 0, 1}):
var prod core.Producer
if prod, err = bitstream.Open(rd); err != nil {
return nil, err
}
return &Client{rd: rd, prod: prod}, nil
} else if bytes.HasPrefix(c.sniff, []byte{0xFF, 0xD8}) {
codec = &core.Codec{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
c.Handle = c.ReadMJPEG
case bytes.HasPrefix(b, []byte{0xFF, 0xD8}):
return &Client{rd: rd, prod: mjpeg.NewClient(rd)}, nil
} else if c.sniff[0] == mpegts.SyncByte {
ts := mpegts.NewReader()
ts.AppendBuffer(c.sniff)
_ = ts.GetPacket()
for _, streamType := range ts.GetStreamTypes() {
switch streamType {
case mpegts.StreamTypeH264:
codec = &core.Codec{
Name: core.CodecH264,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
c.Handle = c.ReadMPEGTS
case bytes.HasPrefix(b, []byte{'F', 'L', 'V'}):
break // TODO
case mpegts.StreamTypeH265:
codec = &core.Codec{
Name: core.CodecH265,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
}
c.Handle = c.ReadMPEGTS
}
}
case b[0] == mpegts.SyncByte:
break // TODO
}
if codec == nil {
_ = c.Close()
return errors.New("unknown format: " + hex.EncodeToString(c.sniff[:8]))
}
c.medias = append(c.medias, &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
})
return
}
func (c *Client) ReadBitstreams() error {
buf := c.sniff // total bufer
b := make([]byte, 1024*1024) // reading buffer
var decodeStream func([]byte) ([]byte, int)
switch c.receiver.Codec.Name {
case core.CodecH264:
decodeStream = h264.DecodeStream
case core.CodecH265:
decodeStream = h265.DecodeStream
}
for {
payload, n := decodeStream(buf)
if payload == nil {
n, err := c.r.Read(b)
if err != nil {
return err
}
buf = append(buf, b[:n]...)
c.recv += n
continue
}
buf = buf[n:]
//log.Printf("[AVC] %v, len: %d", h264.Types(payload), len(payload))
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: payload,
}
c.receiver.WriteRTP(pkt)
}
}
func (c *Client) ReadMJPEG() error {
buf := c.sniff // total bufer
b := make([]byte, 1024*1024) // reading buffer
for {
// one JPEG end and next start
i := bytes.Index(buf, []byte{0xFF, 0xD9, 0xFF, 0xD8})
if i < 0 {
n, err := c.r.Read(b)
if err != nil {
return err
}
buf = append(buf, b[:n]...)
c.recv += n
// if we receive frame
if n >= 2 && b[n-2] == 0xFF && b[n-1] == 0xD9 {
i = len(buf)
} else {
continue
}
} else {
i += 2
}
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: buf[:i],
}
c.receiver.WriteRTP(pkt)
buf = buf[i:]
}
}
func (c *Client) ReadMPEGTS() error {
b := make([]byte, 1024*1024) // reading buffer
ts := mpegts.NewReader()
ts.AppendBuffer(c.sniff)
for {
packet := ts.GetPacket()
if packet == nil {
n, err := c.r.Read(b)
if err != nil {
return err
}
ts.AppendBuffer(b[:n])
c.recv += n
continue
}
//log.Printf("[AVC] %v, len: %d, ts: %10d", h264.Types(packet.Payload), len(packet.Payload), packet.Timestamp)
switch packet.PayloadType {
case mpegts.StreamTypeH264, mpegts.StreamTypeH265:
c.receiver.WriteRTP(packet)
}
}
}
func (c *Client) Close() error {
return c.r.Close()
return nil, errors.New("magic: unsupported header: " + hex.EncodeToString(b))
}
+104
View File
@@ -0,0 +1,104 @@
package mjpeg
import (
"bytes"
"encoding/json"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type Client struct {
rd *core.ReadSeeker
media *core.Media
receiver *core.Receiver
recv int
}
func NewClient(rd io.Reader) *Client {
return &Client{
rd: core.NewReadSeeker(rd),
media: &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
},
}
}
func (c *Client) GetMedias() []*core.Media {
return []*core.Media{c.media}
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
if c.receiver == nil {
c.receiver = core.NewReceiver(media, codec)
}
return c.receiver, nil
}
func (c *Client) Start() error {
var buf []byte // total bufer
b := make([]byte, core.BufferSize) // reading buffer
for {
// one JPEG end and next start
i := bytes.Index(buf, []byte{0xFF, 0xD9, 0xFF, 0xD8})
if i < 0 {
n, err := c.rd.Read(b)
if err != nil {
return err
}
c.recv += n
buf = append(buf, b[:n]...)
// if we receive frame
if n >= 2 && b[n-2] == 0xFF && b[n-1] == 0xD9 {
i = len(buf)
} else {
continue
}
} else {
i += 2
}
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: buf[:i],
}
c.receiver.WriteRTP(pkt)
buf = buf[i:]
}
}
func (c *Client) Stop() error {
if c.receiver != nil {
c.receiver.Close()
}
if closer, ok := c.rd.Reader.(io.Closer); ok {
return closer.Close()
}
return nil
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "MJPEG active producer",
Medias: []*core.Media{c.media},
Receivers: []*core.Receiver{c.receiver},
Recv: c.recv,
}
return json.Marshal(info)
}
+6 -20
View File
@@ -2,40 +2,26 @@ package magic
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (c *Client) GetMedias() []*core.Media {
return c.medias
return c.prod.GetMedias()
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
if c.receiver == nil {
c.receiver = core.NewReceiver(media, codec)
}
return c.receiver, nil
return c.prod.GetTrack(media, codec)
}
func (c *Client) Start() error {
return c.Handle()
return c.prod.Start()
}
func (c *Client) Stop() (err error) {
if c.receiver != nil {
c.receiver.Close()
}
return c.Close()
return c.prod.Stop()
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: c.Desc,
URL: c.URL,
Medias: c.medias,
Recv: c.recv,
}
if c.receiver != nil {
info.Receivers = append(info.Receivers, c.receiver)
}
return json.Marshal(info)
return json.Marshal(c.prod)
}