Refactoring for RTMP source

This commit is contained in:
Alexey Khit
2023-08-17 07:59:21 +03:00
parent 42f6441512
commit b016b7dc2a
3 changed files with 51 additions and 85 deletions
-67
View File
@@ -1,67 +0,0 @@
package rtmp
import (
"bufio"
"net"
"net/url"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
)
func Dial(rawURL string) (*flv.Client, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
host := u.Host
if strings.IndexByte(host, ':') < 0 {
host += ":1935"
}
conn, err := net.DialTimeout("tcp", host, core.ConnDialTimeout)
if err != nil {
return nil, err
}
rd := &rtmp{
url: rawURL,
headers: map[uint32]*header{},
conn: conn,
rd: bufio.NewReaderSize(conn, core.BufferSize),
}
if args := strings.Split(u.Path, "/"); len(args) >= 2 {
rd.app = args[1]
if len(args) >= 3 {
rd.stream = args[2]
if u.RawQuery != "" {
rd.stream += "?" + u.RawQuery
}
}
}
if err = rd.handshake(); err != nil {
return nil, err
}
if err = rd.sendConfig(); err != nil {
return nil, err
}
if err = rd.sendConnect(); err != nil {
return nil, err
}
if err = rd.sendPlay(); err != nil {
return nil, err
}
rd.buf = []byte{
'F', 'L', 'V', // signature
1, // version
0, // flags (has video/audio)
0, 0, 0, 9, // header size
}
return flv.Open(rd)
}
+34
View File
@@ -0,0 +1,34 @@
package rtmp
import (
"net"
"net/url"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
)
func Dial(rawURL string) (core.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
host := u.Host
if strings.IndexByte(host, ':') < 0 {
host += ":1935"
}
conn, err := net.DialTimeout("tcp", host, core.ConnDialTimeout)
if err != nil {
return nil, err
}
rd, err := NewReader(u, conn)
if err != nil {
return nil, err
}
return flv.Open(rd)
}
+17 -18
View File
@@ -26,8 +26,7 @@ const (
var ErrResponse = errors.New("rtmp: wrong response")
// rtmp - implements flv.Transport
type rtmp struct {
type Reader struct {
url string
app string
stream string
@@ -41,7 +40,7 @@ type rtmp struct {
buf []byte
}
func (c *rtmp) Read(p []byte) (n int, err error) {
func (c *Reader) Read(p []byte) (n int, err error) {
// 1. Check temporary tempbuffer
if len(c.buf) == 0 {
msgType, timeMS, payload, err2 := c.readMessage()
@@ -71,7 +70,7 @@ func (c *rtmp) Read(p []byte) (n int, err error) {
return
}
func (c *rtmp) Close() error {
func (c *Reader) Close() error {
return c.conn.Close()
}
@@ -94,7 +93,7 @@ type header struct {
msgType byte
}
func (c *rtmp) readMessage() (byte, uint32, []byte, error) {
func (c *Reader) readMessage() (byte, uint32, []byte, error) {
hdrType, sid, err := c.readHeader()
if err != nil {
return 0, 0, nil, err
@@ -148,7 +147,7 @@ func (c *rtmp) readMessage() (byte, uint32, []byte, error) {
timeMS = binary.BigEndian.Uint32(b)
}
//log.Printf("[rtmp] hdrType=%d sid=%d msdTime=%d msgSize=%d msgType=%d", hdrType, sid, hdr.msgTime, hdr.msgSize, hdr.msgType)
//log.Printf("[Reader] hdrType=%d sid=%d msdTime=%d msgSize=%d msgType=%d", hdrType, sid, hdr.msgTime, hdr.msgSize, hdr.msgType)
// 1. Response zero size
if hdr.msgSize == 0 {
@@ -192,7 +191,7 @@ func (c *rtmp) readMessage() (byte, uint32, []byte, error) {
return hdr.msgType, timeMS, b, nil
}
func (c *rtmp) handshake() error {
func (c *Reader) handshake() error {
// simple handshake without real random and check response
const randomSize = 4 + 4 + 1528
@@ -207,7 +206,7 @@ func (c *rtmp) handshake() error {
}
if b[0] != 3 {
return errors.New("rtmp: wrong handshake")
return errors.New("Reader: wrong handshake")
}
if _, err := c.conn.Write(b[1:]); err != nil {
@@ -221,7 +220,7 @@ func (c *rtmp) handshake() error {
return nil
}
func (c *rtmp) sendConfig() error {
func (c *Reader) sendConfig() error {
b := make([]byte, 5)
binary.BigEndian.PutUint32(b, 65536)
if err := c.sendRequest(MsgSetPacketSize, 0, b[:4]); err != nil {
@@ -242,7 +241,7 @@ func (c *rtmp) sendConfig() error {
return nil
}
func (c *rtmp) sendConnect() error {
func (c *Reader) sendConnect() error {
msg := amf.AMF{}
msg.WriteString("connect")
msg.WriteNumber(1)
@@ -267,13 +266,13 @@ func (c *rtmp) sendConnect() error {
}
if s != "NetConnection.Connect.Success" {
return errors.New("rtmp: wrong code: " + s)
return errors.New("Reader: wrong code: " + s)
}
return nil
}
func (c *rtmp) sendPlay() error {
func (c *Reader) sendPlay() error {
msg := amf.NewWriter()
msg.WriteString("createStream")
msg.WriteNumber(2)
@@ -314,10 +313,10 @@ func (c *rtmp) sendPlay() error {
return nil
}
return errors.New("rtmp: wrong code: " + s)
return errors.New("Reader: wrong code: " + s)
}
func (c *rtmp) sendRequest(msgType byte, streamID uint32, payload []byte) error {
func (c *Reader) sendRequest(msgType byte, streamID uint32, payload []byte) error {
n := len(payload)
b := make([]byte, 12+n)
_ = b[12]
@@ -346,7 +345,7 @@ func (c *rtmp) sendRequest(msgType byte, streamID uint32, payload []byte) error
return nil
}
func (c *rtmp) readHeader() (byte, uint32, error) {
func (c *Reader) readHeader() (byte, uint32, error) {
b, err := c.readSize(1)
if err != nil {
return 0, 0, err
@@ -371,7 +370,7 @@ func (c *rtmp) readHeader() (byte, uint32, error) {
return hdrType, sid, nil
}
func (c *rtmp) readSize(n uint32) ([]byte, error) {
func (c *Reader) readSize(n uint32) ([]byte, error) {
b := make([]byte, n)
if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil {
return nil, err
@@ -379,7 +378,7 @@ func (c *rtmp) readSize(n uint32) ([]byte, error) {
return b, nil
}
func (c *rtmp) waitResponse(cmd any, tid any) ([]any, error) {
func (c *Reader) waitResponse(cmd any, tid any) ([]any, error) {
for {
msgType, _, b, err := c.readMessage()
if err != nil {
@@ -407,7 +406,7 @@ func (c *rtmp) waitResponse(cmd any, tid any) ([]any, error) {
}
}
func (c *rtmp) waitCode(cmd any, tid any) (string, error) {
func (c *Reader) waitCode(cmd any, tid any) (string, error) {
args, err := c.waitResponse(cmd, tid)
if err != nil {
return "", err