diff --git a/internal/exec/exec.go b/internal/exec/exec.go index 1b7842dd..4ebe600b 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -5,6 +5,11 @@ import ( "encoding/hex" "errors" "fmt" + "os" + "os/exec" + "sync" + "time" + "github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/internal/rtsp" "github.com/AlexxIT/go2rtc/internal/streams" @@ -13,10 +18,6 @@ import ( pkg "github.com/AlexxIT/go2rtc/pkg/rtsp" "github.com/AlexxIT/go2rtc/pkg/shell" "github.com/rs/zerolog" - "os" - "os/exec" - "sync" - "time" ) func Init() { @@ -82,15 +83,7 @@ func handlePipe(url string, cmd *exec.Cmd) (core.Producer, error) { return nil, err } - client := magic.NewClient(r) - if err = client.Probe(); err != nil { - return nil, err - } - - client.Desc = "exec active producer" - client.URL = url - - return client, nil + return magic.Open(r) } func handleRTSP(url, path string, cmd *exec.Cmd) (core.Producer, error) { diff --git a/internal/http/http.go b/internal/http/http.go index 31560e6b..f1b3c227 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -6,7 +6,6 @@ import ( "net/http" "net/url" "strings" - "time" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" @@ -67,14 +66,11 @@ func handleHTTP(url string) (core.Producer, error) { default: // "video/mpeg": } - client := magic.NewClient(res.Body) - if err = client.Probe(); err != nil { + client, err := magic.Open(res.Body) + if err != nil { return nil, err } - client.Desc = "HTTP active producer" - client.URL = url - return client, nil } @@ -84,18 +80,10 @@ func handleTCP(rawURL string) (core.Producer, error) { return nil, err } - conn, err := net.DialTimeout("tcp", u.Host, time.Second*3) + conn, err := net.DialTimeout("tcp", u.Host, core.ConnDialTimeout) if err != nil { return nil, err } - client := magic.NewClient(conn) - if err = client.Probe(); err != nil { - return nil, err - } - - client.Desc = "TCP active producer" - client.URL = rawURL - - return client, nil + return magic.Open(conn) } diff --git a/pkg/core/readseeker.go b/pkg/core/readseeker.go new file mode 100644 index 00000000..49894622 --- /dev/null +++ b/pkg/core/readseeker.go @@ -0,0 +1,101 @@ +package core + +import ( + "errors" + "io" +) + +const ProbeSize = 5 * 1024 * 1024 // 5MB + +const ( + BufferDisable = 0 + BufferDrainAndClear = -1 +) + +// ReadSeeker support buffering and Seek over buffer +// positive BufferSize will enable buffering mode +// Seek to negative offset will clear buffer +// Seek with a positive BufferSize will continue buffering after the last read from the buffer +// Seek with a negative BufferSize will clear buffer after the last read from the buffer +// Read more than BufferSize will raise error +type ReadSeeker struct { + io.Reader + + BufferSize int + + buf []byte + pos int +} + +func NewReadSeeker(rd io.Reader) *ReadSeeker { + if rs, ok := rd.(*ReadSeeker); ok { + return rs + } + return &ReadSeeker{Reader: rd} +} + +func (r *ReadSeeker) Read(p []byte) (n int, err error) { + // with zero buffer - read as usual + if r.BufferSize == BufferDisable { + return r.Reader.Read(p) + } + + // if buffer not empty - read from it + if r.pos < len(r.buf) { + n = copy(p, r.buf[r.pos:]) + r.pos += n + return + } + + // with negative buffer - empty it and read as usual + if r.BufferSize < 0 { + r.BufferSize = BufferDisable + r.buf = nil + r.pos = 0 + + return r.Reader.Read(p) + } + + n, err = r.Reader.Read(p) + if len(r.buf)+n > r.BufferSize { + return 0, errors.New("probe reader overflow") + } + r.buf = append(r.buf, p[:n]...) + r.pos += n + return +} + +func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) { + var pos int + switch whence { + case io.SeekStart: + pos = int(offset) + case io.SeekCurrent: + pos = r.pos + int(offset) + case io.SeekEnd: + pos = len(r.buf) + int(offset) + } + + // negative offset - empty buffer + if pos < 0 { + r.buf = nil + r.pos = 0 + } else if pos >= len(r.buf) { + r.pos = len(r.buf) + } else { + r.pos = pos + } + + return int64(r.pos), nil +} + +func (r *ReadSeeker) Peek(n int) ([]byte, error) { + r.BufferSize = n + b := make([]byte, n) + if _, err := io.ReadAtLeast(r, b, n); err != nil { + return nil, err + } + r.BufferSize = BufferDrainAndClear + r.pos = 0 + return b, nil +} diff --git a/pkg/core/readseeker_test.go b/pkg/core/readseeker_test.go new file mode 100644 index 00000000..69080c52 --- /dev/null +++ b/pkg/core/readseeker_test.go @@ -0,0 +1,64 @@ +package core + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReadSeeker(t *testing.T) { + b := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + buf := bytes.NewReader(b) + + rd := NewReadSeeker(buf) + rd.BufferSize = ProbeSize + + // 1. Read to buffer + b = make([]byte, 3) + n, err := rd.Read(b) + require.Nil(t, err) + require.Equal(t, []byte{0, 1, 2}, b[:n]) + + // 2. Seek to start + _, err = rd.Seek(0, io.SeekStart) + require.Nil(t, err) + + // 3. Read from buffer + b = make([]byte, 2) + n, err = rd.Read(b) + require.Nil(t, err) + require.Equal(t, []byte{0, 1}, b[:n]) + + // 4. Read from buffer + n, err = rd.Read(b) + require.Nil(t, err) + require.Equal(t, []byte{2}, b[:n]) + + // 5. Read to buffer + n, err = rd.Read(b) + require.Nil(t, err) + require.Equal(t, []byte{3, 4}, b[:n]) + + // 6. Seek to start + _, err = rd.Seek(0, io.SeekStart) + require.Nil(t, err) + + // 7. Disable buffer + rd.BufferSize = -1 + + // 8. Read from buffer + b = make([]byte, 10) + n, err = rd.Read(b) + require.Nil(t, err) + require.Equal(t, []byte{0, 1, 2, 3, 4}, b[:n]) + + // 9. Direct read + n, err = rd.Read(b) + require.Nil(t, err) + require.Equal(t, []byte{5, 6, 7, 8, 9}, b[:n]) + + // 10. Check buffer empty + require.Nil(t, rd.buf) +} diff --git a/pkg/magic/bitstream/client.go b/pkg/magic/bitstream/client.go new file mode 100644 index 00000000..97bf7088 --- /dev/null +++ b/pkg/magic/bitstream/client.go @@ -0,0 +1,118 @@ +package bitstream + +import ( + "encoding/hex" + "encoding/json" + "errors" + "io" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/h264/annexb" + "github.com/AlexxIT/go2rtc/pkg/h265" + "github.com/pion/rtp" +) + +type Client struct { + rd *core.ReadSeeker + + media *core.Media + receiver *core.Receiver + + recv int +} + +func Open(r io.Reader) (*Client, error) { + rd := core.NewReadSeeker(r) + + buf, err := rd.Peek(256) + if err != nil { + return nil, err + } + + buf = annexb.EncodeToAVCC(buf, false) // won't break original buffer + + var codec *core.Codec + + switch { + case h264.NALUType(buf) == h264.NALUTypeSPS: + codec = h264.AVCCToCodec(buf) + case h265.NALUType(buf) == h265.NALUTypeVPS: + codec = h265.AVCCToCodec(buf) + default: + return nil, errors.New("bitstream: unsupported header: " + hex.EncodeToString(buf[:8])) + } + + client := &Client{ + rd: rd, + media: &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + }, + } + + return client, nil +} + +func (c *Client) GetMedias() []*core.Media { + return []*core.Media{c.media} +} + +func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + if c.receiver == nil { + c.receiver = core.NewReceiver(media, codec) + } + return c.receiver, nil +} + +func (c *Client) Start() error { + var buf []byte + + b := make([]byte, core.BufferSize) + for { + n, err := c.rd.Read(b) + if err != nil { + return err + } + + c.recv += n + + buf = append(buf, b[:n]...) + + i := annexb.IndexFrame(buf) + if i < 0 { + continue + } + + pkt := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: annexb.EncodeToAVCC(buf[:i], true), + } + c.receiver.WriteRTP(pkt) + + //log.Printf("[AVC] %v, len: %d", h264.Types(pkt.Payload), len(pkt.Payload)) + + buf = buf[i:] + } +} + +func (c *Client) Stop() error { + if c.receiver != nil { + c.receiver.Close() + } + if closer, ok := c.rd.Reader.(io.Closer); ok { + return closer.Close() + } + return nil +} + +func (c *Client) MarshalJSON() ([]byte, error) { + info := &core.Info{ + Type: "Bitstream active producer", + Medias: []*core.Media{c.media}, + Receivers: []*core.Receiver{c.receiver}, + Recv: c.recv, + } + return json.Marshal(info) +} diff --git a/pkg/magic/client.go b/pkg/magic/client.go index 640f794c..85635267 100644 --- a/pkg/magic/client.go +++ b/pkg/magic/client.go @@ -4,211 +4,46 @@ import ( "bytes" "encoding/hex" "errors" - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/h264" - "github.com/AlexxIT/go2rtc/pkg/h265" - "github.com/AlexxIT/go2rtc/pkg/mpegts" - "github.com/pion/rtp" "io" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264/annexb" + "github.com/AlexxIT/go2rtc/pkg/magic/bitstream" + "github.com/AlexxIT/go2rtc/pkg/magic/mjpeg" + "github.com/AlexxIT/go2rtc/pkg/mpegts" ) // Client - can read unknown bytestream and autodetect format type Client struct { - Desc string - URL string - - Handle func() error - - r io.ReadCloser - sniff []byte - - medias []*core.Media - receiver *core.Receiver - - recv int + rd *core.ReadSeeker + prod core.Producer } -func NewClient(r io.ReadCloser) *Client { - return &Client{r: r} -} +func Open(r io.Reader) (*Client, error) { + rd := core.NewReadSeeker(r) -func (c *Client) Probe() (err error) { - c.sniff = make([]byte, mpegts.PacketSize*3) // MPEG-TS: SDT+PAT+PMT - c.recv, err = io.ReadFull(c.r, c.sniff) + b, err := rd.Peek(4) if err != nil { - _ = c.Close() - return + return nil, err } - var codec *core.Codec - - if bytes.HasPrefix(c.sniff, []byte{0, 0, 0, 1}) { - switch { - case h264.NALUType(c.sniff) == h264.NALUTypeSPS: - codec = &core.Codec{ - Name: core.CodecH264, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - } - c.Handle = c.ReadBitstreams - - case h265.NALUType(c.sniff) == h265.NALUTypeVPS: - codec = &core.Codec{ - Name: core.CodecH265, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - } - c.Handle = c.ReadBitstreams + switch { + case bytes.HasPrefix(b, []byte(annexb.StartCode)) || bytes.HasPrefix(b, []byte{0, 0, 1}): + var prod core.Producer + if prod, err = bitstream.Open(rd); err != nil { + return nil, err } + return &Client{rd: rd, prod: prod}, nil - } else if bytes.HasPrefix(c.sniff, []byte{0xFF, 0xD8}) { - codec = &core.Codec{ - Name: core.CodecJPEG, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - } - c.Handle = c.ReadMJPEG + case bytes.HasPrefix(b, []byte{0xFF, 0xD8}): + return &Client{rd: rd, prod: mjpeg.NewClient(rd)}, nil - } else if c.sniff[0] == mpegts.SyncByte { - ts := mpegts.NewReader() - ts.AppendBuffer(c.sniff) - _ = ts.GetPacket() - for _, streamType := range ts.GetStreamTypes() { - switch streamType { - case mpegts.StreamTypeH264: - codec = &core.Codec{ - Name: core.CodecH264, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - } - c.Handle = c.ReadMPEGTS + case bytes.HasPrefix(b, []byte{'F', 'L', 'V'}): + break // TODO - case mpegts.StreamTypeH265: - codec = &core.Codec{ - Name: core.CodecH265, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - } - c.Handle = c.ReadMPEGTS - } - } + case b[0] == mpegts.SyncByte: + break // TODO } - if codec == nil { - _ = c.Close() - return errors.New("unknown format: " + hex.EncodeToString(c.sniff[:8])) - } - - c.medias = append(c.medias, &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, - }) - - return -} - -func (c *Client) ReadBitstreams() error { - buf := c.sniff // total bufer - b := make([]byte, 1024*1024) // reading buffer - - var decodeStream func([]byte) ([]byte, int) - switch c.receiver.Codec.Name { - case core.CodecH264: - decodeStream = h264.DecodeStream - case core.CodecH265: - decodeStream = h265.DecodeStream - } - - for { - payload, n := decodeStream(buf) - if payload == nil { - n, err := c.r.Read(b) - if err != nil { - return err - } - - buf = append(buf, b[:n]...) - c.recv += n - continue - } - - buf = buf[n:] - - //log.Printf("[AVC] %v, len: %d", h264.Types(payload), len(payload)) - - pkt := &rtp.Packet{ - Header: rtp.Header{Timestamp: core.Now90000()}, - Payload: payload, - } - c.receiver.WriteRTP(pkt) - } -} - -func (c *Client) ReadMJPEG() error { - buf := c.sniff // total bufer - b := make([]byte, 1024*1024) // reading buffer - - for { - // one JPEG end and next start - i := bytes.Index(buf, []byte{0xFF, 0xD9, 0xFF, 0xD8}) - if i < 0 { - n, err := c.r.Read(b) - if err != nil { - return err - } - - buf = append(buf, b[:n]...) - c.recv += n - - // if we receive frame - if n >= 2 && b[n-2] == 0xFF && b[n-1] == 0xD9 { - i = len(buf) - } else { - continue - } - } else { - i += 2 - } - - pkt := &rtp.Packet{ - Header: rtp.Header{Timestamp: core.Now90000()}, - Payload: buf[:i], - } - c.receiver.WriteRTP(pkt) - - buf = buf[i:] - } -} - -func (c *Client) ReadMPEGTS() error { - b := make([]byte, 1024*1024) // reading buffer - - ts := mpegts.NewReader() - ts.AppendBuffer(c.sniff) - - for { - packet := ts.GetPacket() - if packet == nil { - n, err := c.r.Read(b) - if err != nil { - return err - } - - ts.AppendBuffer(b[:n]) - c.recv += n - continue - } - - //log.Printf("[AVC] %v, len: %d, ts: %10d", h264.Types(packet.Payload), len(packet.Payload), packet.Timestamp) - - switch packet.PayloadType { - case mpegts.StreamTypeH264, mpegts.StreamTypeH265: - c.receiver.WriteRTP(packet) - } - } -} - -func (c *Client) Close() error { - return c.r.Close() + return nil, errors.New("magic: unsupported header: " + hex.EncodeToString(b)) } diff --git a/pkg/magic/mjpeg/client.go b/pkg/magic/mjpeg/client.go new file mode 100644 index 00000000..ff8a3278 --- /dev/null +++ b/pkg/magic/mjpeg/client.go @@ -0,0 +1,104 @@ +package mjpeg + +import ( + "bytes" + "encoding/json" + "io" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type Client struct { + rd *core.ReadSeeker + + media *core.Media + receiver *core.Receiver + + recv int +} + +func NewClient(rd io.Reader) *Client { + return &Client{ + rd: core.NewReadSeeker(rd), + media: &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecJPEG, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + }, + }, + }, + } +} + +func (c *Client) GetMedias() []*core.Media { + return []*core.Media{c.media} +} + +func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + if c.receiver == nil { + c.receiver = core.NewReceiver(media, codec) + } + return c.receiver, nil +} + +func (c *Client) Start() error { + var buf []byte // total bufer + b := make([]byte, core.BufferSize) // reading buffer + + for { + // one JPEG end and next start + i := bytes.Index(buf, []byte{0xFF, 0xD9, 0xFF, 0xD8}) + if i < 0 { + n, err := c.rd.Read(b) + if err != nil { + return err + } + + c.recv += n + + buf = append(buf, b[:n]...) + + // if we receive frame + if n >= 2 && b[n-2] == 0xFF && b[n-1] == 0xD9 { + i = len(buf) + } else { + continue + } + } else { + i += 2 + } + + pkt := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: buf[:i], + } + c.receiver.WriteRTP(pkt) + + buf = buf[i:] + } +} + +func (c *Client) Stop() error { + if c.receiver != nil { + c.receiver.Close() + } + if closer, ok := c.rd.Reader.(io.Closer); ok { + return closer.Close() + } + return nil +} + +func (c *Client) MarshalJSON() ([]byte, error) { + info := &core.Info{ + Type: "MJPEG active producer", + Medias: []*core.Media{c.media}, + Receivers: []*core.Receiver{c.receiver}, + Recv: c.recv, + } + return json.Marshal(info) +} diff --git a/pkg/magic/producer.go b/pkg/magic/producer.go index 716a1eec..5a28a3b3 100644 --- a/pkg/magic/producer.go +++ b/pkg/magic/producer.go @@ -2,40 +2,26 @@ package magic import ( "encoding/json" + "github.com/AlexxIT/go2rtc/pkg/core" ) func (c *Client) GetMedias() []*core.Media { - return c.medias + return c.prod.GetMedias() } func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - if c.receiver == nil { - c.receiver = core.NewReceiver(media, codec) - } - return c.receiver, nil + return c.prod.GetTrack(media, codec) } func (c *Client) Start() error { - return c.Handle() + return c.prod.Start() } func (c *Client) Stop() (err error) { - if c.receiver != nil { - c.receiver.Close() - } - return c.Close() + return c.prod.Stop() } func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: c.Desc, - URL: c.URL, - Medias: c.medias, - Recv: c.recv, - } - if c.receiver != nil { - info.Receivers = append(info.Receivers, c.receiver) - } - return json.Marshal(info) + return json.Marshal(c.prod) }