From b016b7dc2a881023f791e1f3ab97d3f54dbfbc23 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Thu, 17 Aug 2023 07:59:21 +0300 Subject: [PATCH] Refactoring for RTMP source --- pkg/rtmp/client.go | 67 --------------------------------- pkg/rtmp/producer.go | 34 +++++++++++++++++ pkg/rtmp/{rtmp.go => reader.go} | 35 +++++++++-------- 3 files changed, 51 insertions(+), 85 deletions(-) delete mode 100644 pkg/rtmp/client.go create mode 100644 pkg/rtmp/producer.go rename pkg/rtmp/{rtmp.go => reader.go} (89%) diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go deleted file mode 100644 index ba9454b0..00000000 --- a/pkg/rtmp/client.go +++ /dev/null @@ -1,67 +0,0 @@ -package rtmp - -import ( - "bufio" - "net" - "net/url" - "strings" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/flv" -) - -func Dial(rawURL string) (*flv.Client, error) { - u, err := url.Parse(rawURL) - if err != nil { - return nil, err - } - - host := u.Host - if strings.IndexByte(host, ':') < 0 { - host += ":1935" - } - - conn, err := net.DialTimeout("tcp", host, core.ConnDialTimeout) - if err != nil { - return nil, err - } - - rd := &rtmp{ - url: rawURL, - headers: map[uint32]*header{}, - conn: conn, - rd: bufio.NewReaderSize(conn, core.BufferSize), - } - - if args := strings.Split(u.Path, "/"); len(args) >= 2 { - rd.app = args[1] - if len(args) >= 3 { - rd.stream = args[2] - if u.RawQuery != "" { - rd.stream += "?" + u.RawQuery - } - } - } - - if err = rd.handshake(); err != nil { - return nil, err - } - if err = rd.sendConfig(); err != nil { - return nil, err - } - if err = rd.sendConnect(); err != nil { - return nil, err - } - if err = rd.sendPlay(); err != nil { - return nil, err - } - - rd.buf = []byte{ - 'F', 'L', 'V', // signature - 1, // version - 0, // flags (has video/audio) - 0, 0, 0, 9, // header size - } - - return flv.Open(rd) -} diff --git a/pkg/rtmp/producer.go b/pkg/rtmp/producer.go new file mode 100644 index 00000000..83637480 --- /dev/null +++ b/pkg/rtmp/producer.go @@ -0,0 +1,34 @@ +package rtmp + +import ( + "net" + "net/url" + "strings" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/flv" +) + +func Dial(rawURL string) (core.Producer, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + host := u.Host + if strings.IndexByte(host, ':') < 0 { + host += ":1935" + } + + conn, err := net.DialTimeout("tcp", host, core.ConnDialTimeout) + if err != nil { + return nil, err + } + + rd, err := NewReader(u, conn) + if err != nil { + return nil, err + } + + return flv.Open(rd) +} diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/reader.go similarity index 89% rename from pkg/rtmp/rtmp.go rename to pkg/rtmp/reader.go index 5baf00af..465fb27a 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/reader.go @@ -26,8 +26,7 @@ const ( var ErrResponse = errors.New("rtmp: wrong response") -// rtmp - implements flv.Transport -type rtmp struct { +type Reader struct { url string app string stream string @@ -41,7 +40,7 @@ type rtmp struct { buf []byte } -func (c *rtmp) Read(p []byte) (n int, err error) { +func (c *Reader) Read(p []byte) (n int, err error) { // 1. Check temporary tempbuffer if len(c.buf) == 0 { msgType, timeMS, payload, err2 := c.readMessage() @@ -71,7 +70,7 @@ func (c *rtmp) Read(p []byte) (n int, err error) { return } -func (c *rtmp) Close() error { +func (c *Reader) Close() error { return c.conn.Close() } @@ -94,7 +93,7 @@ type header struct { msgType byte } -func (c *rtmp) readMessage() (byte, uint32, []byte, error) { +func (c *Reader) readMessage() (byte, uint32, []byte, error) { hdrType, sid, err := c.readHeader() if err != nil { return 0, 0, nil, err @@ -148,7 +147,7 @@ func (c *rtmp) readMessage() (byte, uint32, []byte, error) { timeMS = binary.BigEndian.Uint32(b) } - //log.Printf("[rtmp] hdrType=%d sid=%d msdTime=%d msgSize=%d msgType=%d", hdrType, sid, hdr.msgTime, hdr.msgSize, hdr.msgType) + //log.Printf("[Reader] hdrType=%d sid=%d msdTime=%d msgSize=%d msgType=%d", hdrType, sid, hdr.msgTime, hdr.msgSize, hdr.msgType) // 1. Response zero size if hdr.msgSize == 0 { @@ -192,7 +191,7 @@ func (c *rtmp) readMessage() (byte, uint32, []byte, error) { return hdr.msgType, timeMS, b, nil } -func (c *rtmp) handshake() error { +func (c *Reader) handshake() error { // simple handshake without real random and check response const randomSize = 4 + 4 + 1528 @@ -207,7 +206,7 @@ func (c *rtmp) handshake() error { } if b[0] != 3 { - return errors.New("rtmp: wrong handshake") + return errors.New("Reader: wrong handshake") } if _, err := c.conn.Write(b[1:]); err != nil { @@ -221,7 +220,7 @@ func (c *rtmp) handshake() error { return nil } -func (c *rtmp) sendConfig() error { +func (c *Reader) sendConfig() error { b := make([]byte, 5) binary.BigEndian.PutUint32(b, 65536) if err := c.sendRequest(MsgSetPacketSize, 0, b[:4]); err != nil { @@ -242,7 +241,7 @@ func (c *rtmp) sendConfig() error { return nil } -func (c *rtmp) sendConnect() error { +func (c *Reader) sendConnect() error { msg := amf.AMF{} msg.WriteString("connect") msg.WriteNumber(1) @@ -267,13 +266,13 @@ func (c *rtmp) sendConnect() error { } if s != "NetConnection.Connect.Success" { - return errors.New("rtmp: wrong code: " + s) + return errors.New("Reader: wrong code: " + s) } return nil } -func (c *rtmp) sendPlay() error { +func (c *Reader) sendPlay() error { msg := amf.NewWriter() msg.WriteString("createStream") msg.WriteNumber(2) @@ -314,10 +313,10 @@ func (c *rtmp) sendPlay() error { return nil } - return errors.New("rtmp: wrong code: " + s) + return errors.New("Reader: wrong code: " + s) } -func (c *rtmp) sendRequest(msgType byte, streamID uint32, payload []byte) error { +func (c *Reader) sendRequest(msgType byte, streamID uint32, payload []byte) error { n := len(payload) b := make([]byte, 12+n) _ = b[12] @@ -346,7 +345,7 @@ func (c *rtmp) sendRequest(msgType byte, streamID uint32, payload []byte) error return nil } -func (c *rtmp) readHeader() (byte, uint32, error) { +func (c *Reader) readHeader() (byte, uint32, error) { b, err := c.readSize(1) if err != nil { return 0, 0, err @@ -371,7 +370,7 @@ func (c *rtmp) readHeader() (byte, uint32, error) { return hdrType, sid, nil } -func (c *rtmp) readSize(n uint32) ([]byte, error) { +func (c *Reader) readSize(n uint32) ([]byte, error) { b := make([]byte, n) if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil { return nil, err @@ -379,7 +378,7 @@ func (c *rtmp) readSize(n uint32) ([]byte, error) { return b, nil } -func (c *rtmp) waitResponse(cmd any, tid any) ([]any, error) { +func (c *Reader) waitResponse(cmd any, tid any) ([]any, error) { for { msgType, _, b, err := c.readMessage() if err != nil { @@ -407,7 +406,7 @@ func (c *rtmp) waitResponse(cmd any, tid any) ([]any, error) { } } -func (c *rtmp) waitCode(cmd any, tid any) (string, error) { +func (c *Reader) waitCode(cmd any, tid any) (string, error) { args, err := c.waitResponse(cmd, tid) if err != nil { return "", err