Fix timestamps for RTMP client
This commit is contained in:
@@ -17,3 +17,4 @@ response []interface {}{"onStatus", 0, interface {}(nil), map[string]interface {
|
|||||||
- https://en.wikipedia.org/wiki/Flash_Video
|
- https://en.wikipedia.org/wiki/Flash_Video
|
||||||
- https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol
|
- https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol
|
||||||
- https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf
|
- https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf
|
||||||
|
- https://rtmp.veriskope.com/docs/spec/
|
||||||
|
|||||||
+1
-1
@@ -65,7 +65,7 @@ func NewClient(conn net.Conn, u *url.URL) (*Conn, error) {
|
|||||||
rd: bufio.NewReaderSize(conn, core.BufferSize),
|
rd: bufio.NewReaderSize(conn, core.BufferSize),
|
||||||
wr: conn,
|
wr: conn,
|
||||||
|
|
||||||
chunks: map[uint8]*header{},
|
chunks: map[uint8]*chunk{},
|
||||||
|
|
||||||
rdPacketSize: 128,
|
rdPacketSize: 128,
|
||||||
wrPacketSize: 4096, // OBS - 4096, Reolink - 4096
|
wrPacketSize: 4096, // OBS - 4096, Reolink - 4096
|
||||||
|
|||||||
+70
-58
@@ -29,7 +29,7 @@ type Conn struct {
|
|||||||
rdPacketSize uint32
|
rdPacketSize uint32
|
||||||
wrPacketSize uint32
|
wrPacketSize uint32
|
||||||
|
|
||||||
chunks map[byte]*header
|
chunks map[byte]*chunk
|
||||||
streamID byte
|
streamID byte
|
||||||
url string
|
url string
|
||||||
|
|
||||||
@@ -66,11 +66,59 @@ func (c *Conn) readResponse(transID float64) ([]any, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type header struct {
|
type chunk struct {
|
||||||
timeMS uint32
|
conn *Conn
|
||||||
|
rawTime uint32
|
||||||
dataSize uint32
|
dataSize uint32
|
||||||
tagType byte
|
tagType byte
|
||||||
streamID uint32
|
streamID uint32
|
||||||
|
timeMS uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *chunk) readHeader(typ byte) error {
|
||||||
|
switch typ {
|
||||||
|
case 0: // 12 byte header (full header)
|
||||||
|
b, err := c.conn.readSize(11)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.rawTime = Uint24(b)
|
||||||
|
c.dataSize = Uint24(b[3:])
|
||||||
|
c.tagType = b[6]
|
||||||
|
c.streamID = binary.LittleEndian.Uint32(b[7:])
|
||||||
|
c.timeMS = c.readExtendedTime()
|
||||||
|
|
||||||
|
case 1: // 8 bytes - like type b00, not including message ID (4 last bytes)
|
||||||
|
b, err := c.conn.readSize(7)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.rawTime = Uint24(b)
|
||||||
|
c.dataSize = Uint24(b[3:]) // msgdatalen
|
||||||
|
c.tagType = b[6] // msgtypeid
|
||||||
|
c.timeMS += c.readExtendedTime()
|
||||||
|
|
||||||
|
case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included
|
||||||
|
b, err := c.conn.readSize(3)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.rawTime = Uint24(b) // timestamp
|
||||||
|
c.timeMS += c.readExtendedTime()
|
||||||
|
|
||||||
|
case 3: // 1 byte - only the Basic Header is included
|
||||||
|
// use here hdr from previous msg with same session ID (sid)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *chunk) readExtendedTime() uint32 {
|
||||||
|
if c.rawTime == 0xFFFFFF {
|
||||||
|
if b, err := c.conn.readSize(4); err == nil {
|
||||||
|
return binary.BigEndian.Uint32(b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return c.rawTime
|
||||||
}
|
}
|
||||||
|
|
||||||
//var ErrNotImplemented = errors.New("rtmp: not implemented")
|
//var ErrNotImplemented = errors.New("rtmp: not implemented")
|
||||||
@@ -85,93 +133,57 @@ func (c *Conn) readMessage() (byte, uint32, []byte, error) {
|
|||||||
chunkID := b[0] & 0b111111
|
chunkID := b[0] & 0b111111
|
||||||
|
|
||||||
// storing header information for support header type 3
|
// storing header information for support header type 3
|
||||||
hdr, ok := c.chunks[chunkID]
|
ch, ok := c.chunks[chunkID]
|
||||||
if !ok {
|
if !ok {
|
||||||
hdr = &header{}
|
ch = &chunk{conn: c}
|
||||||
c.chunks[chunkID] = hdr
|
c.chunks[chunkID] = ch
|
||||||
}
|
}
|
||||||
|
|
||||||
switch hdrType {
|
if err = ch.readHeader(hdrType); err != nil {
|
||||||
case 0: // 12 byte header (full header)
|
|
||||||
if b, err = c.readSize(11); err != nil {
|
|
||||||
return 0, 0, nil, err
|
return 0, 0, nil, err
|
||||||
}
|
}
|
||||||
_ = b[7]
|
|
||||||
hdr.timeMS = Uint24(b)
|
|
||||||
hdr.dataSize = Uint24(b[3:])
|
|
||||||
hdr.tagType = b[6]
|
|
||||||
hdr.streamID = binary.LittleEndian.Uint32(b[7:])
|
|
||||||
|
|
||||||
case 1: // 8 bytes - like type b00, not including message ID (4 last bytes)
|
//log.Printf("[rtmp] hdr=%d chunkID=%d timeMS=%d size=%d tagType=%d streamID=%d", hdrType, chunkID, ch.timeMS, ch.dataSize, ch.tagType, ch.streamID)
|
||||||
if b, err = c.readSize(7); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
_ = b[6]
|
|
||||||
hdr.timeMS = Uint24(b) // timestamp
|
|
||||||
hdr.dataSize = Uint24(b[3:]) // msgdatalen
|
|
||||||
hdr.tagType = b[6] // msgtypeid
|
|
||||||
|
|
||||||
case 2: // 4 bytes - Basic Header and timestamp (3 bytes) are included
|
|
||||||
if b, err = c.readSize(3); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
hdr.timeMS = Uint24(b) // timestamp
|
|
||||||
|
|
||||||
case 3: // 1 byte - only the Basic Header is included
|
|
||||||
// use here hdr from previous msg with same session ID (sid)
|
|
||||||
}
|
|
||||||
|
|
||||||
timeMS := hdr.timeMS
|
|
||||||
if timeMS == 0xFFFFFF {
|
|
||||||
if b, err = c.readSize(4); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
timeMS = binary.BigEndian.Uint32(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
//log.Printf("[rtmp] hdr=%d chunkID=%d timeMS=%d size=%d tagType=%d streamID=%d", hdrType, chunkID, hdr.timeMS, hdr.dataSize, hdr.tagType, hdr.streamID)
|
|
||||||
|
|
||||||
// 1. Response zero size
|
// 1. Response zero size
|
||||||
if hdr.dataSize == 0 {
|
if ch.dataSize == 0 {
|
||||||
return hdr.tagType, timeMS, nil, nil
|
return ch.tagType, ch.timeMS, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
b = make([]byte, hdr.dataSize)
|
data := make([]byte, ch.dataSize)
|
||||||
|
|
||||||
// 2. Response small packet
|
// 2. Response small packet
|
||||||
if hdr.dataSize <= c.rdPacketSize {
|
if ch.dataSize <= c.rdPacketSize {
|
||||||
if _, err = io.ReadFull(c.rd, b); err != nil {
|
if _, err = io.ReadFull(c.rd, data); err != nil {
|
||||||
return 0, 0, nil, err
|
return 0, 0, nil, err
|
||||||
}
|
}
|
||||||
return hdr.tagType, timeMS, b, nil
|
return ch.tagType, ch.timeMS, data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Response big packet
|
// 3. Response big packet
|
||||||
var i0 uint32
|
var i0 uint32
|
||||||
for i1 := c.rdPacketSize; i1 < hdr.dataSize; i1 += c.rdPacketSize {
|
for i1 := c.rdPacketSize; i1 < ch.dataSize; i1 += c.rdPacketSize {
|
||||||
if _, err = io.ReadFull(c.rd, b[i0:i1]); err != nil {
|
if _, err = io.ReadFull(c.rd, data[i0:i1]); err != nil {
|
||||||
return 0, 0, nil, err
|
return 0, 0, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// hopefully this will be hdrType=3 with same chunkID
|
||||||
if _, err = c.readSize(1); err != nil {
|
if _, err = c.readSize(1); err != nil {
|
||||||
return 0, 0, nil, err
|
return 0, 0, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if hdr.timeMS == 0xFFFFFF {
|
_ = ch.readExtendedTime()
|
||||||
if _, err = c.readSize(4); err != nil {
|
|
||||||
return 0, 0, nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
i0 = i1
|
i0 = i1
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = io.ReadFull(c.rd, b[i0:]); err != nil {
|
if _, err = io.ReadFull(c.rd, data[i0:]); err != nil {
|
||||||
return 0, 0, nil, err
|
return 0, 0, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return hdr.tagType, timeMS, b, nil
|
return ch.tagType, ch.timeMS, data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) writeMessage(chunkID, tagType byte, timeMS uint32, payload []byte) error {
|
func (c *Conn) writeMessage(chunkID, tagType byte, timeMS uint32, payload []byte) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
c.resetBuffer()
|
c.resetBuffer()
|
||||||
@@ -324,7 +336,7 @@ func (c *Conn) writePlay() error {
|
|||||||
|
|
||||||
func (c *Conn) readSize(n uint32) ([]byte, error) {
|
func (c *Conn) readSize(n uint32) ([]byte, error) {
|
||||||
b := make([]byte, n)
|
b := make([]byte, n)
|
||||||
if _, err := io.ReadAtLeast(c.rd, b, int(n)); err != nil {
|
if _, err := io.ReadFull(c.rd, b); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return b, nil
|
return b, nil
|
||||||
|
|||||||
+1
-1
@@ -17,7 +17,7 @@ func NewServer(conn net.Conn) (*Conn, error) {
|
|||||||
rd: bufio.NewReaderSize(conn, core.BufferSize),
|
rd: bufio.NewReaderSize(conn, core.BufferSize),
|
||||||
wr: conn,
|
wr: conn,
|
||||||
|
|
||||||
chunks: map[uint8]*header{},
|
chunks: map[uint8]*chunk{},
|
||||||
|
|
||||||
rdPacketSize: 128,
|
rdPacketSize: 128,
|
||||||
wrPacketSize: 4096,
|
wrPacketSize: 4096,
|
||||||
|
|||||||
Reference in New Issue
Block a user