Rewrite FLV source
This commit is contained in:
@@ -1,226 +0,0 @@
|
||||
package flv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/AlexxIT/go2rtc/pkg/aac"
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
const Signature = "FLV"
|
||||
|
||||
type Client struct {
|
||||
URL string
|
||||
|
||||
rd *core.ReadBuffer
|
||||
|
||||
medias []*core.Media
|
||||
receivers []*core.Receiver
|
||||
|
||||
video, audio *core.Receiver
|
||||
|
||||
recv int
|
||||
}
|
||||
|
||||
func Open(rd io.Reader) (*Client, error) {
|
||||
client := &Client{
|
||||
rd: core.NewReadBuffer(rd),
|
||||
}
|
||||
if err := client.describe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
const (
|
||||
TagAudio = 8
|
||||
TagVideo = 9
|
||||
TagData = 18
|
||||
|
||||
CodecAAC = 10
|
||||
CodecAVC = 7
|
||||
)
|
||||
|
||||
func (c *Client) describe() error {
|
||||
if err := c.readHeader(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.rd.BufferSize = core.ProbeSize
|
||||
defer c.rd.Reset()
|
||||
|
||||
// Normal software sends:
|
||||
// 1. Video/audio flag in header
|
||||
// 2. MetaData as first tag (with video/audio codec info)
|
||||
// 3. Video/audio headers in 2nd and 3rd tag
|
||||
|
||||
// Reolink camera sends:
|
||||
// 1. Empty video/audio flag
|
||||
// 2. MedaData without stereo key for AAC
|
||||
// 3. Audio header after Video keyframe tag
|
||||
waitType := []byte{TagData}
|
||||
timeout := time.Now().Add(core.ProbeTimeout)
|
||||
|
||||
for len(waitType) != 0 && time.Now().Before(timeout) {
|
||||
pkt, err := c.readPacket()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if i := bytes.IndexByte(waitType, pkt.PayloadType); i < 0 {
|
||||
continue
|
||||
} else {
|
||||
waitType = append(waitType[:i], waitType[i+1:]...)
|
||||
}
|
||||
|
||||
switch pkt.PayloadType {
|
||||
case TagAudio:
|
||||
_ = pkt.Payload[1] // bounds
|
||||
|
||||
codecID := pkt.Payload[0] >> 4 // SoundFormat
|
||||
_ = pkt.Payload[0] & 0b1100 // SoundRate
|
||||
_ = pkt.Payload[0] & 0b0010 // SoundSize
|
||||
_ = pkt.Payload[0] & 0b0001 // SoundType
|
||||
|
||||
if codecID != CodecAAC {
|
||||
continue
|
||||
}
|
||||
|
||||
if pkt.Payload[1] != 0 { // check if header
|
||||
continue
|
||||
}
|
||||
|
||||
codec := aac.ConfigToCodec(pkt.Payload[2:])
|
||||
media := &core.Media{
|
||||
Kind: core.KindAudio,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{codec},
|
||||
}
|
||||
c.medias = append(c.medias, media)
|
||||
|
||||
case TagVideo:
|
||||
_ = pkt.Payload[1] // bounds
|
||||
|
||||
_ = pkt.Payload[0] >> 4 // FrameType
|
||||
codecID := pkt.Payload[0] & 0b1111 // CodecID
|
||||
|
||||
if codecID != CodecAVC {
|
||||
continue
|
||||
}
|
||||
|
||||
if pkt.Payload[1] != 0 { // check if header
|
||||
continue
|
||||
}
|
||||
|
||||
codec := h264.ConfigToCodec(pkt.Payload[5:])
|
||||
media := &core.Media{
|
||||
Kind: core.KindVideo,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{codec},
|
||||
}
|
||||
c.medias = append(c.medias, media)
|
||||
|
||||
case TagData:
|
||||
if !bytes.Contains(pkt.Payload, []byte("onMetaData")) {
|
||||
waitType = append(waitType, TagData)
|
||||
}
|
||||
if bytes.Contains(pkt.Payload, []byte("videocodecid")) {
|
||||
waitType = append(waitType, TagVideo)
|
||||
}
|
||||
if bytes.Contains(pkt.Payload, []byte("audiocodecid")) {
|
||||
waitType = append(waitType, TagAudio)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) play() error {
|
||||
for {
|
||||
pkt, err := c.readPacket()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.recv += len(pkt.Payload)
|
||||
|
||||
switch pkt.PayloadType {
|
||||
case TagAudio:
|
||||
if c.audio == nil || pkt.Payload[1] == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
pkt.Timestamp = TimeToRTP(pkt.Timestamp, c.audio.Codec.ClockRate)
|
||||
pkt.Payload = pkt.Payload[2:]
|
||||
c.audio.WriteRTP(pkt)
|
||||
|
||||
case TagVideo:
|
||||
// frame type 4b, codecID 4b, avc packet type 8b, composition time 24b
|
||||
if c.video == nil || pkt.Payload[1] == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
pkt.Timestamp = TimeToRTP(pkt.Timestamp, c.video.Codec.ClockRate)
|
||||
pkt.Payload = pkt.Payload[5:]
|
||||
c.video.WriteRTP(pkt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) readHeader() error {
|
||||
b := make([]byte, 9)
|
||||
if _, err := io.ReadFull(c.rd, b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if string(b[:3]) != Signature {
|
||||
return errors.New("flv: wrong header")
|
||||
}
|
||||
|
||||
_ = b[4] // flags (skip because unsupported by Reolink cameras)
|
||||
|
||||
if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 {
|
||||
if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) readPacket() (*rtp.Packet, error) {
|
||||
// https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf
|
||||
b := make([]byte, 4+11)
|
||||
if _, err := io.ReadFull(c.rd, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b = b[4 : 4+11] // skip previous tag size
|
||||
|
||||
size := uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
|
||||
|
||||
pkt := &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
PayloadType: b[0],
|
||||
Timestamp: uint32(b[4])<<16 | uint32(b[5])<<8 | uint32(b[6]) | uint32(b[7])<<24,
|
||||
},
|
||||
Payload: make([]byte, size),
|
||||
}
|
||||
|
||||
if _, err := io.ReadFull(c.rd, pkt.Payload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pkt, nil
|
||||
}
|
||||
|
||||
func TimeToRTP(timeMS uint32, clockRate uint32) uint32 {
|
||||
return timeMS * clockRate / 1000
|
||||
}
|
||||
+212
-28
@@ -1,50 +1,234 @@
|
||||
package flv
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/AlexxIT/go2rtc/pkg/aac"
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
func (c *Client) GetMedias() []*core.Media {
|
||||
return c.medias
|
||||
type Producer struct {
|
||||
core.SuperProducer
|
||||
rd *core.ReadBuffer
|
||||
|
||||
video, audio *core.Receiver
|
||||
}
|
||||
|
||||
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
|
||||
for _, track := range c.receivers {
|
||||
if track.Codec == codec {
|
||||
return track, nil
|
||||
func Open(rd io.Reader) (*Producer, error) {
|
||||
prod := &Producer{rd: core.NewReadBuffer(rd)}
|
||||
if err := prod.probe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
prod.Type = "FLV producer"
|
||||
return prod, nil
|
||||
}
|
||||
|
||||
const (
|
||||
Signature = "FLV"
|
||||
|
||||
TagAudio = 8
|
||||
TagVideo = 9
|
||||
TagData = 18
|
||||
|
||||
CodecAAC = 10
|
||||
CodecAVC = 7
|
||||
)
|
||||
|
||||
func (c *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
|
||||
receiver, _ := c.SuperProducer.GetTrack(media, codec)
|
||||
if media.Kind == core.KindVideo {
|
||||
c.video = receiver
|
||||
} else {
|
||||
c.audio = receiver
|
||||
}
|
||||
return receiver, nil
|
||||
}
|
||||
|
||||
func (c *Producer) Start() error {
|
||||
for {
|
||||
pkt, err := c.readPacket()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.Recv += len(pkt.Payload)
|
||||
|
||||
switch pkt.PayloadType {
|
||||
case TagAudio:
|
||||
if c.audio == nil || pkt.Payload[1] == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
pkt.Timestamp = TimeToRTP(pkt.Timestamp, c.audio.Codec.ClockRate)
|
||||
pkt.Payload = pkt.Payload[2:]
|
||||
c.audio.WriteRTP(pkt)
|
||||
|
||||
case TagVideo:
|
||||
// frame type 4b, codecID 4b, avc packet type 8b, composition time 24b
|
||||
if c.video == nil || pkt.Payload[1] == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
pkt.Timestamp = TimeToRTP(pkt.Timestamp, c.video.Codec.ClockRate)
|
||||
pkt.Payload = pkt.Payload[5:]
|
||||
c.video.WriteRTP(pkt)
|
||||
}
|
||||
}
|
||||
track := core.NewReceiver(media, codec)
|
||||
if media.Kind == core.KindVideo {
|
||||
c.video = track
|
||||
} else {
|
||||
c.audio = track
|
||||
}
|
||||
c.receivers = append(c.receivers, track)
|
||||
return track, nil
|
||||
}
|
||||
|
||||
func (c *Client) Start() error {
|
||||
return c.play()
|
||||
func (c *Producer) Stop() error {
|
||||
_ = c.SuperProducer.Close()
|
||||
return c.rd.Close()
|
||||
}
|
||||
|
||||
func (c *Client) Stop() error {
|
||||
if closer, ok := c.rd.Reader.(io.Closer); ok {
|
||||
return closer.Close()
|
||||
func (c *Producer) probe() error {
|
||||
if err := c.readHeader(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.rd.BufferSize = core.ProbeSize
|
||||
defer c.rd.Reset()
|
||||
|
||||
// Normal software sends:
|
||||
// 1. Video/audio flag in header
|
||||
// 2. MetaData as first tag (with video/audio codec info)
|
||||
// 3. Video/audio headers in 2nd and 3rd tag
|
||||
|
||||
// Reolink camera sends:
|
||||
// 1. Empty video/audio flag
|
||||
// 2. MedaData without stereo key for AAC
|
||||
// 3. Audio header after Video keyframe tag
|
||||
waitType := []byte{TagData}
|
||||
timeout := time.Now().Add(core.ProbeTimeout)
|
||||
|
||||
for len(waitType) != 0 && time.Now().Before(timeout) {
|
||||
pkt, err := c.readPacket()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if i := bytes.IndexByte(waitType, pkt.PayloadType); i < 0 {
|
||||
continue
|
||||
} else {
|
||||
waitType = append(waitType[:i], waitType[i+1:]...)
|
||||
}
|
||||
|
||||
switch pkt.PayloadType {
|
||||
case TagAudio:
|
||||
_ = pkt.Payload[1] // bounds
|
||||
|
||||
codecID := pkt.Payload[0] >> 4 // SoundFormat
|
||||
_ = pkt.Payload[0] & 0b1100 // SoundRate
|
||||
_ = pkt.Payload[0] & 0b0010 // SoundSize
|
||||
_ = pkt.Payload[0] & 0b0001 // SoundType
|
||||
|
||||
if codecID != CodecAAC {
|
||||
continue
|
||||
}
|
||||
|
||||
if pkt.Payload[1] != 0 { // check if header
|
||||
continue
|
||||
}
|
||||
|
||||
codec := aac.ConfigToCodec(pkt.Payload[2:])
|
||||
media := &core.Media{
|
||||
Kind: core.KindAudio,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{codec},
|
||||
}
|
||||
c.Medias = append(c.Medias, media)
|
||||
|
||||
case TagVideo:
|
||||
_ = pkt.Payload[1] // bounds
|
||||
|
||||
_ = pkt.Payload[0] >> 4 // FrameType
|
||||
codecID := pkt.Payload[0] & 0b1111 // CodecID
|
||||
|
||||
if codecID != CodecAVC {
|
||||
continue
|
||||
}
|
||||
|
||||
if pkt.Payload[1] != 0 { // check if header
|
||||
continue
|
||||
}
|
||||
|
||||
codec := h264.ConfigToCodec(pkt.Payload[5:])
|
||||
media := &core.Media{
|
||||
Kind: core.KindVideo,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{codec},
|
||||
}
|
||||
c.Medias = append(c.Medias, media)
|
||||
|
||||
case TagData:
|
||||
if !bytes.Contains(pkt.Payload, []byte("onMetaData")) {
|
||||
waitType = append(waitType, TagData)
|
||||
}
|
||||
if bytes.Contains(pkt.Payload, []byte("videocodecid")) {
|
||||
waitType = append(waitType, TagVideo)
|
||||
}
|
||||
if bytes.Contains(pkt.Payload, []byte("audiocodecid")) {
|
||||
waitType = append(waitType, TagAudio)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) MarshalJSON() ([]byte, error) {
|
||||
info := &core.Info{
|
||||
Type: "FLV active producer",
|
||||
URL: c.URL,
|
||||
Medias: c.medias,
|
||||
Receivers: c.receivers,
|
||||
Recv: c.recv,
|
||||
func (c *Producer) readHeader() error {
|
||||
b := make([]byte, 9)
|
||||
if _, err := io.ReadFull(c.rd, b); err != nil {
|
||||
return err
|
||||
}
|
||||
return json.Marshal(info)
|
||||
|
||||
if string(b[:3]) != Signature {
|
||||
return errors.New("flv: wrong header")
|
||||
}
|
||||
|
||||
_ = b[4] // flags (skip because unsupported by Reolink cameras)
|
||||
|
||||
if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 {
|
||||
if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Producer) readPacket() (*rtp.Packet, error) {
|
||||
// https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf
|
||||
b := make([]byte, 4+11)
|
||||
if _, err := io.ReadFull(c.rd, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b = b[4 : 4+11] // skip previous tag size
|
||||
|
||||
size := uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
|
||||
|
||||
pkt := &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
PayloadType: b[0],
|
||||
Timestamp: uint32(b[4])<<16 | uint32(b[5])<<8 | uint32(b[6]) | uint32(b[7])<<24,
|
||||
},
|
||||
Payload: make([]byte, size),
|
||||
}
|
||||
|
||||
if _, err := io.ReadFull(c.rd, pkt.Payload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pkt, nil
|
||||
}
|
||||
|
||||
func TimeToRTP(timeMS uint32, clockRate uint32) uint32 {
|
||||
return timeMS * clockRate / 1000
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user