diff --git a/internal/http/http.go b/internal/http/http.go index e83597f7..10744cee 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -10,10 +10,10 @@ import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/flv" "github.com/AlexxIT/go2rtc/pkg/magic" "github.com/AlexxIT/go2rtc/pkg/mjpeg" "github.com/AlexxIT/go2rtc/pkg/multipart" - "github.com/AlexxIT/go2rtc/pkg/rtmp" "github.com/AlexxIT/go2rtc/pkg/tcp" ) @@ -54,14 +54,12 @@ func handleHTTP(url string) (core.Producer, error) { return multipart.NewClient(res) case "video/x-flv": - var conn *rtmp.Client - if conn, err = rtmp.Accept(res); err != nil { + client := flv.NewClient(res.Body) + if err = client.Describe(); err != nil { return nil, err } - if err = conn.Describe(); err != nil { - return nil, err - } - return conn, nil + client.URL = url + return client, nil default: // "video/mpeg": } diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go new file mode 100644 index 00000000..b4aebeac --- /dev/null +++ b/pkg/aac/aac.go @@ -0,0 +1,52 @@ +package aac + +import ( + "encoding/hex" + "fmt" + + "github.com/AlexxIT/go2rtc/pkg/bits" + "github.com/AlexxIT/go2rtc/pkg/core" +) + +const ( + TypeAACMain = 1 + TypeAACLC = 2 + TypeESCAPE = 31 +) + +// streamtype=5 - audio stream +const fmtp = "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=" + +var sampleRates = []uint32{96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350} + +func ConfigToCodec(conf []byte) *core.Codec { + // https://en.wikipedia.org/wiki/MPEG-4_Part_3#MPEG-4_Audio_Object_Types + rd := bits.NewReader(conf) + + codec := &core.Codec{ + FmtpLine: fmtp + hex.EncodeToString(conf), + PayloadType: core.PayloadTypeRAW, + } + + objType := rd.ReadBits(5) + if objType == TypeESCAPE { + objType = 32 + rd.ReadBits(6) + } + + switch objType { + case TypeAACLC: + codec.Name = core.CodecAAC + default: + codec.Name = fmt.Sprintf("AAC-%X", objType) + } + + if sampleRateIdx := rd.ReadBits8(4); sampleRateIdx < 12 { + codec.ClockRate = sampleRates[sampleRateIdx] + } else if sampleRateIdx == 0x0F { + codec.ClockRate = rd.ReadBits(24) + } + + codec.Channels = rd.ReadBits16(4) + + return codec +} diff --git a/pkg/bits/reader.go b/pkg/bits/reader.go new file mode 100644 index 00000000..cea80353 --- /dev/null +++ b/pkg/bits/reader.go @@ -0,0 +1,67 @@ +package bits + +type Reader struct { + buf []byte // packets buffer + byte byte + bits byte + pos int +} + +func NewReader(b []byte) *Reader { + return &Reader{buf: b} +} + +//goland:noinspection GoStandardMethods +func (r *Reader) ReadByte() byte { + if r.bits == 0 { + b := r.buf[r.pos] + r.pos++ + return b + } + + return r.ReadBits8(8) +} + +func (r *Reader) ReadBit() byte { + if r.bits == 0 { + r.byte = r.ReadByte() + r.bits = 7 + } else { + r.bits-- + } + + return (r.byte >> r.bits) & 0b1 +} + +func (r *Reader) ReadBits(n byte) (res uint32) { + for i := n - 1; i != 255; i-- { + res |= uint32(r.ReadBit()) << i + } + return +} + +func (r *Reader) ReadBits8(n byte) (res uint8) { + for i := n - 1; i != 255; i-- { + res |= r.ReadBit() << i + } + return +} + +func (r *Reader) ReadBits16(n byte) (res uint16) { + for i := n - 1; i != 255; i-- { + res |= uint16(r.ReadBit()) << i + } + return +} + +func (r *Reader) SkipBits(n int) { + for i := 0; i < n; i++ { + if r.bits == 0 { + r.byte = r.buf[r.pos] + r.pos++ + r.bits = 7 + } else { + r.bits-- + } + } +} diff --git a/pkg/core/helpers.go b/pkg/core/helpers.go index 9a9bf11c..4f0a6e99 100644 --- a/pkg/core/helpers.go +++ b/pkg/core/helpers.go @@ -13,6 +13,7 @@ import ( const ( ConnDialTimeout = time.Second * 3 ConnDeadline = time.Second * 3 + ProbeTimeout = time.Second * 3 ) // Now90000 - timestamp for Video (clock rate = 90000 samples per second) diff --git a/pkg/core/track.go b/pkg/core/track.go index ade3ca8d..a2e2575f 100644 --- a/pkg/core/track.go +++ b/pkg/core/track.go @@ -4,9 +4,10 @@ import ( "encoding/json" "errors" "fmt" - "github.com/pion/rtp" "strconv" "sync" + + "github.com/pion/rtp" ) var ErrCantGetTrack = errors.New("can't get track") @@ -181,3 +182,16 @@ func (s *Sender) String() string { func (s *Sender) MarshalJSON() ([]byte, error) { return json.Marshal(s.String()) } + +// VA - helper, for extract video and audio receivers from list +func VA(receivers []*Receiver) (video, audio *Receiver) { + for _, receiver := range receivers { + switch GetKind(receiver.Codec.Name) { + case KindVideo: + video = receiver + case KindAudio: + audio = receiver + } + } + return +} diff --git a/pkg/flv/amf/amf.go b/pkg/flv/amf/amf.go new file mode 100644 index 00000000..8cbdff2e --- /dev/null +++ b/pkg/flv/amf/amf.go @@ -0,0 +1,200 @@ +package amf + +import ( + "encoding/binary" + "errors" + "math" +) + +const ( + TypeNumber byte = iota + TypeBoolean + TypeString + TypeObject + TypeNull = 5 + TypeEcmaArray = 8 + TypeObjectEnd = 9 +) + +// AMF spec: http://download.macromedia.com/pub/labs/amf/amf0_spec_121207.pdf +type AMF struct { + buf []byte + pos int +} + +var ErrRead = errors.New("amf: read error") + +func NewReader(b []byte) *AMF { + return &AMF{buf: b} +} + +func (a *AMF) ReadItems() ([]any, error) { + var items []any + for a.pos < len(a.buf) { + v, err := a.ReadItem() + if err != nil { + return nil, err + } + items = append(items, v) + } + return items, nil +} + +func (a *AMF) ReadItem() (any, error) { + dataType, err := a.ReadByte() + if err != nil { + return nil, err + } + + switch dataType { + case TypeNumber: + return a.ReadNumber() + + case TypeBoolean: + b, err := a.ReadByte() + return b != 0, err + + case TypeString: + return a.ReadString() + + case TypeObject: + return a.ReadObject() + + case TypeNull: + return nil, nil + + case TypeObjectEnd: + return nil, nil + } + + return nil, ErrRead +} + +func (a *AMF) ReadByte() (byte, error) { + if a.pos >= len(a.buf) { + return 0, ErrRead + } + + v := a.buf[a.pos] + a.pos++ + return v, nil +} + +func (a *AMF) ReadNumber() (float64, error) { + if a.pos+8 > len(a.buf) { + return 0, ErrRead + } + + v := binary.BigEndian.Uint64(a.buf[a.pos : a.pos+8]) + a.pos += 8 + return math.Float64frombits(v), nil +} + +func (a *AMF) ReadString() (string, error) { + if a.pos+2 > len(a.buf) { + return "", ErrRead + } + + size := int(binary.BigEndian.Uint16(a.buf[a.pos:])) + a.pos += 2 + + if a.pos+size > len(a.buf) { + return "", ErrRead + } + + s := string(a.buf[a.pos : a.pos+size]) + a.pos += size + + return s, nil +} + +func (a *AMF) ReadObject() (map[string]any, error) { + obj := make(map[string]any) + + for { + k, err := a.ReadString() + if err != nil { + return nil, err + } + + v, err := a.ReadItem() + if err != nil { + return nil, err + } + + if k == "" { + break + } + + obj[k] = v + } + + return obj, nil +} + +func (a *AMF) ReadEcmaArray() (map[string]any, error) { + if a.pos+4 > len(a.buf) { + return nil, ErrRead + } + a.pos += 4 // skip size + + return a.ReadObject() +} + +func NewWriter() *AMF { + return &AMF{} +} + +func (a *AMF) Bytes() []byte { + return a.buf +} + +func (a *AMF) WriteNumber(n float64) { + b := math.Float64bits(n) + a.buf = append( + a.buf, TypeNumber, + byte(b>>56), byte(b>>48), byte(b>>40), byte(b>>32), + byte(b>>24), byte(b>>16), byte(b>>8), byte(b), + ) +} + +func (a *AMF) WriteBool(b bool) { + if b { + a.buf = append(a.buf, TypeBoolean, 1) + } else { + a.buf = append(a.buf, TypeBoolean, 0) + } +} + +func (a *AMF) WriteString(s string) { + n := len(s) + a.buf = append(a.buf, TypeString, byte(n>>8), byte(n)) + a.buf = append(a.buf, s...) +} + +func (a *AMF) WriteObject(obj map[string]any) { + a.buf = append(a.buf, TypeObject) + + for k, v := range obj { + n := len(k) + a.buf = append(a.buf, byte(n>>8), byte(n)) + a.buf = append(a.buf, k...) + + switch v := v.(type) { + case string: + a.WriteString(v) + case int: + a.WriteNumber(float64(v)) + case bool: + a.WriteBool(v) + default: + panic(v) + } + } + + a.buf = append(a.buf, 0, 0, TypeObjectEnd) +} + +func (a *AMF) WriteNull() { + a.buf = append(a.buf, TypeNull) +} diff --git a/pkg/flv/client.go b/pkg/flv/client.go new file mode 100644 index 00000000..6a397378 --- /dev/null +++ b/pkg/flv/client.go @@ -0,0 +1,162 @@ +package flv + +import ( + "bytes" + "io" + "time" + + "github.com/AlexxIT/go2rtc/pkg/aac" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264/avc" + "github.com/pion/rtp" +) + +type Client struct { + URL string + + rd io.Reader + + medias []*core.Media + receivers []*core.Receiver + + recv int +} + +func NewClient(rd io.Reader) *Client { + return &Client{rd: rd} +} + +func (c *Client) Describe() error { + if err := c.ReadHeader(); err != nil { + return err + } + + // Normal software sends: + // 1. Video/audio flag in header + // 2. MetaData as first tag (with video/audio codec info) + // 3. Video/audio headers in 2nd and 3rd tag + + // Reolink camera sends: + // 1. Empty video/audio flag + // 2. MedaData without stereo key for AAC + // 3. Audio header after Video keyframe tag + waitVideo := true + waitAudio := true + timeout := time.Now().Add(core.ProbeTimeout) + + for (waitVideo || waitAudio) && time.Now().Before(timeout) { + tagType, _, b, err := c.ReadTag() + if err != nil { + return err + } + + c.recv += len(b) + + switch tagType { + case TagAudio: + if !waitAudio { + continue + } + + waitAudio = false + + codecID := b[0] >> 4 // SoundFormat + _ = b[0] & 0b1100 // SoundRate + _ = b[0] & 0b0010 // SoundSize + _ = b[0] & 0b0001 // SoundType + + if codecID != CodecAAC { + continue + } + + if b[1] != 0 { // check if header + continue + } + + codec := aac.ConfigToCodec(b[2:]) + media := &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + } + c.medias = append(c.medias, media) + + case TagVideo: + if !waitVideo { + continue + } + + waitVideo = false + + _ = b[0] >> 4 // FrameType + codecID := b[0] & 0b1111 // CodecID + + if codecID != CodecAVC { + continue + } + + if b[1] != 0 { // check if header + continue + } + + codec := avc.ConfigToCodec(b[5:]) + media := &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + } + c.medias = append(c.medias, media) + + case TagData: + if !bytes.Contains(b, []byte("onMetaData")) { + continue + } + waitVideo = bytes.Contains(b, []byte("videocodecid")) + waitAudio = bytes.Contains(b, []byte("audiocodecid")) + } + } + + return nil +} + +func (c *Client) Play() error { + video, audio := core.VA(c.receivers) + + for { + tagType, timeMS, b, err := c.ReadTag() + if err != nil { + return err + } + + c.recv += len(b) + + switch tagType { + case TagAudio: + if audio == nil || b[1] == 0 { + continue + } + + pkt := &rtp.Packet{ + Header: rtp.Header{ + Timestamp: TimeToRTP(timeMS, audio.Codec.ClockRate), + }, + Payload: b[2:], + } + audio.WriteRTP(pkt) + + case TagVideo: + // frame type 4b, codecID 4b, avc packet type 8b, composition time 24b + if video == nil || b[1] == 0 { + continue + } + + pkt := &rtp.Packet{ + Header: rtp.Header{ + Timestamp: TimeToRTP(timeMS, video.Codec.ClockRate), + }, + Payload: b[5:], + } + video.WriteRTP(pkt) + } + } +} diff --git a/pkg/flv/flv.go b/pkg/flv/flv.go new file mode 100644 index 00000000..641bf946 --- /dev/null +++ b/pkg/flv/flv.go @@ -0,0 +1,62 @@ +package flv + +import ( + "encoding/binary" + "errors" + "io" +) + +const ( + TagAudio = 8 + TagVideo = 9 + TagData = 18 + + CodecAAC = 10 + CodecAVC = 7 +) + +func (c *Client) ReadHeader() error { + b := make([]byte, 9) + if _, err := io.ReadFull(c.rd, b); err != nil { + return err + } + + if string(b[:3]) != "FLV" { + return errors.New("flv: wrong header") + } + + _ = b[4] // flags (skip because unsupported by Reolink cameras) + + if skip := binary.BigEndian.Uint32(b[5:]) - 9; skip > 0 { + if _, err := io.ReadFull(c.rd, make([]byte, skip)); err != nil { + return err + } + } + + return nil +} + +func (c *Client) ReadTag() (byte, uint32, []byte, error) { + // https://rtmp.veriskope.com/pdf/video_file_format_spec_v10.pdf + b := make([]byte, 4+11) + if _, err := io.ReadFull(c.rd, b); err != nil { + return 0, 0, nil, err + } + + b = b[4 : 4+11] // skip previous tag size + + tagType := b[0] + size := uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]) + timeMS := uint32(b[4])<<16 | uint32(b[5])<<8 | uint32(b[6]) | uint32(b[7])<<24 + + b = make([]byte, size) + if _, err := io.ReadFull(c.rd, b); err != nil { + return 0, 0, nil, err + } + + return tagType, timeMS, b, nil +} + +func TimeToRTP(timeMS uint32, clockRate uint32) uint32 { + return timeMS * clockRate / 1000 +} diff --git a/pkg/flv/producer.go b/pkg/flv/producer.go new file mode 100644 index 00000000..6d04d2db --- /dev/null +++ b/pkg/flv/producer.go @@ -0,0 +1,45 @@ +package flv + +import ( + "encoding/json" + "io" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func (c *Client) GetMedias() []*core.Media { + return c.medias +} + +func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + for _, track := range c.receivers { + if track.Codec == codec { + return track, nil + } + } + track := core.NewReceiver(media, codec) + c.receivers = append(c.receivers, track) + return track, nil +} + +func (c *Client) Start() error { + return c.Play() +} + +func (c *Client) Stop() error { + if closer, ok := c.rd.(io.Closer); ok { + return closer.Close() + } + return nil +} + +func (c *Client) MarshalJSON() ([]byte, error) { + info := &core.Info{ + Type: "FLV active producer", + URL: c.URL, + Medias: c.medias, + Receivers: c.receivers, + Recv: c.recv, + } + return json.Marshal(info) +} diff --git a/pkg/h264/avc/avc.go b/pkg/h264/avc/avc.go new file mode 100644 index 00000000..8b43537d --- /dev/null +++ b/pkg/h264/avc/avc.go @@ -0,0 +1,75 @@ +package avc + +import ( + "bytes" + "encoding/base64" + "encoding/binary" + "encoding/hex" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func DecodeConfig(conf []byte) (profile []byte, sps []byte, pps []byte) { + if len(conf) < 6 || conf[0] != 1 { + return + } + + profile = conf[1:4] + + count := conf[5] & 0x1F + conf = conf[6:] + for i := byte(0); i < count; i++ { + if len(conf) < 2 { + return + } + size := 2 + int(binary.BigEndian.Uint16(conf)) + if len(conf) < size { + return + } + if sps == nil { + sps = conf[2:size] + } + conf = conf[size:] + } + + count = conf[0] + conf = conf[1:] + for i := byte(0); i < count; i++ { + if len(conf) < 2 { + return + } + size := 2 + int(binary.BigEndian.Uint16(conf)) + if len(conf) < size { + return + } + if pps == nil { + pps = conf[2:size] + } + conf = conf[size:] + } + + return +} + +func ConfigToCodec(conf []byte) *core.Codec { + buf := bytes.NewBufferString("packetization-mode=1") + + profile, sps, pps := DecodeConfig(conf) + if profile != nil { + buf.WriteString(";profile-level-id=") + buf.WriteString(hex.EncodeToString(profile)) + } + if sps != nil && pps != nil { + buf.WriteString(";sprop-parameter-sets=") + buf.WriteString(base64.StdEncoding.EncodeToString(sps)) + buf.WriteString(",") + buf.WriteString(base64.StdEncoding.EncodeToString(pps)) + } + + return &core.Codec{ + Name: core.CodecH264, + ClockRate: 90000, + FmtpLine: buf.String(), + PayloadType: core.PayloadTypeRAW, + } +} diff --git a/pkg/h264/avc/avc_test.go b/pkg/h264/avc/avc_test.go new file mode 100644 index 00000000..eb1019ac --- /dev/null +++ b/pkg/h264/avc/avc_test.go @@ -0,0 +1,19 @@ +package avc + +import ( + "encoding/hex" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDecodeConfig(t *testing.T) { + s := "01640033ffe1000c67640033ac1514a02800f19001000468ee3cb0" + b, err := hex.DecodeString(s) + require.Nil(t, err) + + profile, sps, pps := DecodeConfig(b) + require.NotNil(t, profile) + require.NotNil(t, sps) + require.NotNil(t, pps) +}