Rewrite FLV/RTMP clients

This commit is contained in:
Alexey Khit
2023-08-16 19:35:13 +03:00
parent 4e1a0e1ab9
commit c3ba8db660
9 changed files with 227 additions and 204 deletions
+26 -9
View File
@@ -26,25 +26,42 @@ func Dial(rawURL string) (*flv.Client, error) {
return nil, err
}
tr := &rtmp{
url: rawURL,
conn: conn,
rd: bufio.NewReaderSize(conn, core.BufferSize),
rd := &rtmp{
url: rawURL,
headers: map[uint32]*header{},
conn: conn,
rd: bufio.NewReaderSize(conn, core.BufferSize),
}
if args := strings.Split(u.Path, "/"); len(args) >= 2 {
tr.app = args[1]
rd.app = args[1]
if len(args) >= 3 {
tr.stream = args[2]
rd.stream = args[2]
if u.RawQuery != "" {
tr.stream += "?" + u.RawQuery
rd.stream += "?" + u.RawQuery
}
}
}
if err = tr.init(); err != nil {
if err = rd.handshake(); err != nil {
return nil, err
}
if err = rd.sendConfig(); err != nil {
return nil, err
}
if err = rd.sendConnect(); err != nil {
return nil, err
}
if err = rd.sendPlay(); err != nil {
return nil, err
}
return &flv.Client{Transport: tr, URL: rawURL}, nil
rd.buf = []byte{
'F', 'L', 'V', // signature
1, // version
0, // flags (has video/audio)
0, 0, 0, 9, // header size
}
return flv.Open(rd)
}
+70 -34
View File
@@ -37,6 +37,55 @@ type rtmp struct {
conn net.Conn
rd io.Reader
buf []byte
}
func (c *rtmp) Read(p []byte) (n int, err error) {
// 1. Check temporary tempbuffer
if len(c.buf) == 0 {
msgType, timeMS, payload, err2 := c.readMessage()
if err2 != nil {
return 0, err2
}
payloadSize := len(payload)
// previous tag size (4 byte) + header (11 byte) + payload
n = 4 + 11 + payloadSize
// 2. Check if the message fits in the buffer
if n <= len(p) {
encodeFLV(p, msgType, timeMS, uint32(payloadSize), payload)
return
}
// 3. Put the message into a temporary buffer
c.buf = make([]byte, n)
encodeFLV(c.buf, msgType, timeMS, uint32(payloadSize), payload)
}
// 4. Send temporary buffer
n = copy(p, c.buf)
c.buf = c.buf[n:]
return
}
func (c *rtmp) Close() error {
return c.conn.Close()
}
func encodeFLV(b []byte, msgType byte, time, size uint32, payload []byte) {
b[0] = 0
b[1] = 0
b[2] = 0
b[3] = 0
b[4+0] = msgType
PutUint24(b[4+1:], size)
PutUint24(b[4+4:], time)
b[4+7] = byte(time >> 24)
copy(b[4+11:], payload)
}
type header struct {
@@ -45,7 +94,7 @@ type header struct {
msgType byte
}
func (c *rtmp) ReadTag() (byte, uint32, []byte, error) {
func (c *rtmp) readMessage() (byte, uint32, []byte, error) {
hdrType, sid, err := c.readHeader()
if err != nil {
return 0, 0, nil, err
@@ -143,30 +192,6 @@ func (c *rtmp) ReadTag() (byte, uint32, []byte, error) {
return hdr.msgType, timeMS, b, nil
}
func (c *rtmp) Close() error {
return c.conn.Close()
}
func (c *rtmp) init() error {
if err := c.handshake(); err != nil {
return err
}
if err := c.sendConfig(); err != nil {
return err
}
c.headers = map[uint32]*header{}
if err := c.sendConnect(); err != nil {
return err
}
if err := c.sendPlay(); err != nil {
return err
}
return nil
}
func (c *rtmp) handshake() error {
// simple handshake without real random and check response
const randomSize = 4 + 4 + 1528
@@ -236,7 +261,7 @@ func (c *rtmp) sendConnect() error {
return err
}
s, err := c.waitCode()
s, err := c.waitCode("_result", float64(1)) // result with same ID
if err != nil {
return err
}
@@ -258,7 +283,7 @@ func (c *rtmp) sendPlay() error {
return err
}
args, err := c.waitResponse()
args, err := c.waitResponse("_result", float64(2)) // result with same ID
if err != nil {
return err
}
@@ -271,7 +296,7 @@ func (c *rtmp) sendPlay() error {
msg = amf.NewWriter()
msg.WriteString("play")
msg.WriteNumber(3)
msg.WriteNumber(0)
msg.WriteNull()
msg.WriteString(c.stream)
@@ -279,7 +304,7 @@ func (c *rtmp) sendPlay() error {
return err
}
s, err := c.waitCode()
s, err := c.waitCode("onStatus", float64(0)) // events has zero transaction ID
if err != nil {
return err
}
@@ -354,9 +379,9 @@ func (c *rtmp) readSize(n uint32) ([]byte, error) {
return b, nil
}
func (c *rtmp) waitResponse() ([]any, error) {
func (c *rtmp) waitResponse(cmd any, tid any) ([]any, error) {
for {
msgType, _, b, err := c.ReadTag()
msgType, _, b, err := c.readMessage()
if err != nil {
return nil, err
}
@@ -366,13 +391,24 @@ func (c *rtmp) waitResponse() ([]any, error) {
c.pktSize = binary.BigEndian.Uint32(b)
case MsgCommand:
return amf.NewReader(b).ReadItems()
var v []any
if v, err = amf.NewReader(b).ReadItems(); err != nil {
return nil, err
}
if len(v) < 4 {
return nil, ErrResponse
}
if v[0] == cmd && v[1] == tid {
return v, nil
}
}
}
}
func (c *rtmp) waitCode() (string, error) {
args, err := c.waitResponse()
func (c *rtmp) waitCode(cmd any, tid any) (string, error) {
args, err := c.waitResponse(cmd, tid)
if err != nil {
return "", err
}