Update dvrip source processing

This commit is contained in:
Alex X
2023-10-11 22:23:22 +03:00
parent 843a3ae9c9
commit cb710ea2be
3 changed files with 104 additions and 77 deletions
+57 -13
View File
@@ -2,6 +2,7 @@ package dvrip
import (
"bufio"
"bytes"
"crypto/md5"
"encoding/binary"
"encoding/json"
@@ -28,7 +29,8 @@ type Client struct {
seq uint32
stream string
rd io.Reader
rd io.Reader
buf []byte
}
func (c *Client) Dial(rawURL string) (err error) {
@@ -87,11 +89,11 @@ func (c *Client) Login(user, pass string) (err error) {
SofiaHash(pass), user,
)
if _, err = c.Request(Login, []byte(data)); err != nil {
if _, err = c.WriteCmd(Login, []byte(data)); err != nil {
return
}
_, err = c.ResponseJSON()
_, err = c.ReadJSON()
return
}
@@ -99,15 +101,15 @@ func (c *Client) Play() error {
format := `{"Name":"OPMonitor","SessionID":"0x%08X","OPMonitor":{"Action":"%s","Parameter":%s}}` + "\x0A\x00"
data := fmt.Sprintf(format, c.session, "Claim", c.stream)
if _, err := c.Request(OPMonitorClaim, []byte(data)); err != nil {
if _, err := c.WriteCmd(OPMonitorClaim, []byte(data)); err != nil {
return err
}
if _, err := c.ResponseJSON(); err != nil {
if _, err := c.ReadJSON(); err != nil {
return err
}
data = fmt.Sprintf(format, c.session, "Start", c.stream)
_, err := c.Request(OPMonitorStart, []byte(data))
_, err := c.WriteCmd(OPMonitorStart, []byte(data))
return err
}
@@ -115,19 +117,19 @@ func (c *Client) Talk() error {
format := `{"Name":"OPTalk","SessionID":"0x%08X","OPTalk":{"Action":"%s"}}` + "\x0A\x00"
data := fmt.Sprintf(format, c.session, "Claim")
if _, err := c.Request(OPTalkClaim, []byte(data)); err != nil {
if _, err := c.WriteCmd(OPTalkClaim, []byte(data)); err != nil {
return err
}
if _, err := c.ResponseJSON(); err != nil {
if _, err := c.ReadJSON(); err != nil {
return err
}
data = fmt.Sprintf(format, c.session, "Start")
_, err := c.Request(OPTalkStart, []byte(data))
_, err := c.WriteCmd(OPTalkStart, []byte(data))
return err
}
func (c *Client) Request(cmd uint16, payload []byte) (n int, err error) {
func (c *Client) WriteCmd(cmd uint16, payload []byte) (n int, err error) {
b := make([]byte, 20, 128)
b[0] = 255
binary.LittleEndian.PutUint32(b[4:], c.session)
@@ -145,7 +147,7 @@ func (c *Client) Request(cmd uint16, payload []byte) (n int, err error) {
return c.conn.Write(b)
}
func (c *Client) Response() (b []byte, err error) {
func (c *Client) ReadChunk() (b []byte, err error) {
if err = c.conn.SetReadDeadline(time.Now().Add(time.Second * 5)); err != nil {
return
}
@@ -170,10 +172,52 @@ func (c *Client) Response() (b []byte, err error) {
return
}
func (c *Client) ReadPacket() (pType byte, payload []byte, err error) {
var b []byte
// many cameras may split packet to multiple chunks
// some rare cameras may put multiple packets to single chunk
for len(c.buf) < 16 {
if b, err = c.ReadChunk(); err != nil {
return 0, nil, err
}
c.buf = append(c.buf, b...)
}
if !bytes.HasPrefix(c.buf, []byte{0, 0, 1}) {
return 0, nil, fmt.Errorf("dvrip: wrong packet: %0.16x", c.buf)
}
var size int
switch pType = c.buf[3]; pType {
case 0xFC, 0xFE:
size = int(binary.LittleEndian.Uint32(c.buf[12:])) + 16
case 0xFD: // PFrame
size = int(binary.LittleEndian.Uint32(c.buf[4:])) + 8
case 0xFA, 0xF9:
size = int(binary.LittleEndian.Uint16(c.buf[6:])) + 8
default:
return 0, nil, fmt.Errorf("dvrip: unknown packet type: %X", pType)
}
for len(c.buf) < size {
if b, err = c.ReadChunk(); err != nil {
return 0, nil, err
}
c.buf = append(c.buf, b...)
}
payload = c.buf[:size]
c.buf = c.buf[size:]
return
}
type Response map[string]any
func (c *Client) ResponseJSON() (res Response, err error) {
b, err := c.Response()
func (c *Client) ReadJSON() (res Response, err error) {
b, err := c.ReadChunk()
if err != nil {
return
}
+1 -1
View File
@@ -70,7 +70,7 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
for len(payload) >= PacketSize {
buf = append(buf[:8], payload[:PacketSize]...)
if n, err := c.client.Request(OPTalkData, buf); err != nil {
if n, err := c.client.WriteCmd(OPTalkData, buf); err != nil {
c.Send += n
}
+46 -63
View File
@@ -29,21 +29,21 @@ type Producer struct {
func (c *Producer) Start() error {
for {
tag, size, b, err := c.readPacket()
pType, b, err := c.client.ReadPacket()
if err != nil {
return err
}
//log.Printf("[DVR] type: %d, len: %d", dataType, len(b))
switch tag {
switch pType {
case 0xFC, 0xFE, 0xFD:
if c.video == nil {
continue
}
var payload []byte
if tag != 0xFD {
if pType != 0xFD {
payload = b[16:] // iframe
} else {
payload = b[8:] // pframe
@@ -65,31 +65,29 @@ func (c *Producer) Start() error {
continue
}
for b != nil {
payload := b[8:size]
if len(b) > size {
b = b[size:]
} else {
b = nil
}
payload := b[8:]
c.audioTS += uint32(len(payload))
c.audioSeq++
c.audioTS += uint32(len(payload))
c.audioSeq++
packet := &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: c.audioSeq,
Timestamp: c.audioTS,
},
Payload: payload,
}
//log.Printf("[DVR] len: %d, ts: %10d", len(packet.Payload), packet.Timestamp)
c.audio.WriteRTP(packet)
packet := &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
SequenceNumber: c.audioSeq,
Timestamp: c.audioTS,
},
Payload: payload,
}
//log.Printf("[DVR] len: %d, ts: %10d", len(packet.Payload), packet.Timestamp)
c.audio.WriteRTP(packet)
case 0xF9: // unknown
default:
println(fmt.Sprintf("dvrip: unknown packet type: %d", pType))
}
}
}
@@ -105,14 +103,33 @@ func (c *Producer) probe() error {
rd := core.NewReadBuffer(c.client.rd)
rd.BufferSize = core.ProbeSize
defer rd.Reset()
defer func() {
c.client.buf = nil
rd.Reset()
}()
c.client.rd = rd
timeout := time.Now().Add(core.ProbeTimeout)
// some awful cameras has VERY rare keyframes
// so we wait video+audio for default probe time
// and wait anything for 15 seconds
timeoutBoth := time.Now().Add(core.ProbeTimeout)
timeoutAny := time.Now().Add(time.Second * 15)
for (c.video == nil || c.audio == nil) && time.Now().Before(timeout) {
tag, _, b, err := c.readPacket()
for {
if now := time.Now(); now.Before(timeoutBoth) {
if c.video != nil && c.audio != nil {
return nil
}
} else if now.Before(timeoutAny) {
if c.video != nil || c.audio != nil {
return nil
}
} else {
return errors.New("dvrip: can't probe medias")
}
tag, b, err := c.client.ReadPacket()
if err != nil {
return err
}
@@ -147,40 +164,6 @@ func (c *Producer) probe() error {
c.addAudioTrack(b[4], b[5])
}
}
return nil
}
func (c *Producer) readPacket() (tag byte, size int, data []byte, err error) {
if data, err = c.client.Response(); err != nil {
return 0, 0, nil, err
}
switch tag = data[3]; tag {
case 0xFC, 0xFE:
size = int(binary.LittleEndian.Uint32(data[12:])) + 16
case 0xFD: // PFrame
size = int(binary.LittleEndian.Uint32(data[4:])) + 8
case 0xFA, 0xF9:
size = int(binary.LittleEndian.Uint16(data[6:])) + 8
default:
return 0, 0, nil, fmt.Errorf("unknown type: %X", tag)
}
// collect data from multiple packets
for len(data) < size {
b, err := c.client.Response()
if err != nil {
return 0, 0, nil, err
}
data = append(data, b...)
}
if len(data) > size {
return 0, 0, nil, errors.New("wrong size")
}
return
}
func (c *Producer) addVideoTrack(mediaCode byte, payload []byte) {