diff --git a/internal/ivideon/ivideon.go b/internal/ivideon/ivideon.go index 03feb742..51ddb890 100644 --- a/internal/ivideon/ivideon.go +++ b/internal/ivideon/ivideon.go @@ -2,12 +2,9 @@ package ivideon import ( "github.com/AlexxIT/go2rtc/internal/streams" - "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/ivideon" ) func Init() { - streams.HandleFunc("ivideon", func(source string) (core.Producer, error) { - return ivideon.Dial(source) - }) + streams.HandleFunc("ivideon", ivideon.Dial) } diff --git a/pkg/bits/reader.go b/pkg/bits/reader.go index 435cf5f7..2a957409 100644 --- a/pkg/bits/reader.go +++ b/pkg/bits/reader.go @@ -89,6 +89,12 @@ func (r *Reader) ReadBits64(n byte) (res uint64) { return } +func (r *Reader) ReadFloat32() float64 { + i := r.ReadUint16() + f := r.ReadUint16() + return float64(i) + float64(f)/65536 +} + func (r *Reader) ReadBytes(n int) (b []byte) { if r.bits == 0 { if r.pos+n > len(r.buf) { diff --git a/pkg/iso/reader.go b/pkg/iso/reader.go index ec436af7..175e2563 100644 --- a/pkg/iso/reader.go +++ b/pkg/iso/reader.go @@ -1,6 +1,7 @@ package iso import ( + "bytes" "encoding/binary" "io" @@ -10,89 +11,192 @@ import ( type Atom struct { Name string Data []byte - - DecodeTime uint64 - - SamplesDuration []uint32 - SamplesSize []uint32 } -func DecodeAtoms(b []byte) ([]*Atom, error) { - var atoms []*Atom - for len(b) > 8 { - size := binary.BigEndian.Uint32(b) - if uint32(len(b)) < size { - return nil, io.EOF +type AtomTkhd struct { + TrackID uint32 +} + +type AtomMdhd struct { + TimeScale uint32 +} + +type AtomVideo struct { + Name string + Config []byte +} + +type AtomAudio struct { + Name string + Channels uint16 + SampleRate uint32 + Config []byte +} + +type AtomMfhd struct { + Sequence uint32 +} + +type AtomMdat struct { + Data []byte +} + +type AtomTfhd struct { + TrackID uint32 + SampleDuration uint32 + SampleSize uint32 + SampleFlags uint32 +} +type AtomTfdt struct { + DecodeTime uint64 +} + +type AtomTrun struct { + DataOffset uint32 + FirstSampleFlags uint32 + SamplesDuration []uint32 + SamplesSize []uint32 + SamplesFlags []uint32 + SamplesCTS []uint32 +} + +func DecodeAtom(b []byte) (any, error) { + size := binary.BigEndian.Uint32(b) + if len(b) < int(size) { + return nil, io.EOF + } + + name := string(b[4:8]) + data := b[8:size] + + switch name { + // useful containers + case Moov, MoovTrak, MoovTrakMdia, MoovTrakMdiaMinf, MoovTrakMdiaMinfStbl, Moof, MoofTraf: + return DecodeAtoms(data) + + case MoovTrakTkhd: + return &AtomTkhd{TrackID: binary.BigEndian.Uint32(data[1+3+4+4:])}, nil + + case MoovTrakMdiaMdhd: + return &AtomMdhd{TimeScale: binary.BigEndian.Uint32(data[1+3+4+4:])}, nil + + case MoovTrakMdiaMinfStblStsd: + // support only 1 codec entry + if n := binary.BigEndian.Uint32(data[1+3:]); n == 1 { + return DecodeAtom(data[1+3+4:]) } - name := string(b[4:8]) - data := b[8:size] + case "avc1", "hev1": + b = data[6+2+2+2+4+4+4+2+2+4+4+4+2+32+2+2:] + atom, err := DecodeAtom(b) + if err != nil { + return nil, err + } + if conf, ok := atom.(*Atom); ok { + return &AtomVideo{Name: name, Config: conf.Data}, nil + } - b = b[size:] + case "mp4a": + atom := &AtomAudio{Name: name} - switch name { - case Moof, MoofTraf: - childs, err := DecodeAtoms(data) - if err != nil { - return nil, err + rd := bits.NewReader(data) + rd.ReadBytes(6 + 2 + 2 + 2 + 4) // skip + atom.Channels = rd.ReadUint16() + rd.ReadBytes(2 + 2 + 2) // skip + atom.SampleRate = uint32(rd.ReadFloat32()) + + atom2, _ := DecodeAtom(rd.Left()) + if conf, ok := atom2.(*Atom); ok { + _, b, _ = bytes.Cut(conf.Data, []byte{5, 0x80, 0x80, 0x80}) + if n := len(b); n > 0 && n > 1+int(b[0]) { + atom.Config = b[1 : 1+b[0]] } + } + return atom, nil + + case MoofMfhd: + return &AtomMfhd{Sequence: binary.BigEndian.Uint32(data[4:])}, nil + + case MoofTrafTfhd: + rd := bits.NewReader(data) + _ = rd.ReadByte() // version + flags := rd.ReadUint24() + + atom := &AtomTfhd{ + TrackID: rd.ReadUint32(), + } + + if flags&TfhdDefaultSampleDuration != 0 { + atom.SampleDuration = rd.ReadUint32() + + } + if flags&TfhdDefaultSampleSize != 0 { + atom.SampleSize = rd.ReadUint32() + } + if flags&TfhdDefaultSampleFlags != 0 { + atom.SampleFlags = rd.ReadUint32() // skip + } + + return atom, nil + + case MoofTrafTfdt: + return &AtomTfdt{DecodeTime: binary.BigEndian.Uint64(data[4:])}, nil + + case MoofTrafTrun: + rd := bits.NewReader(data) + _ = rd.ReadByte() // version + flags := rd.ReadUint24() + samples := rd.ReadUint32() + + atom := &AtomTrun{} + + if flags&TrunDataOffset != 0 { + atom.DataOffset = rd.ReadUint32() + } + if flags&TrunFirstSampleFlags != 0 { + atom.FirstSampleFlags = rd.ReadUint32() + } + + for i := uint32(0); i < samples; i++ { + if flags&TrunSampleDuration != 0 { + atom.SamplesDuration = append(atom.SamplesDuration, rd.ReadUint32()) + } + if flags&TrunSampleSize != 0 { + atom.SamplesSize = append(atom.SamplesSize, rd.ReadUint32()) + } + if flags&TrunSampleFlags != 0 { + atom.SamplesFlags = append(atom.SamplesFlags, rd.ReadUint32()) + } + if flags&TrunSampleCTS != 0 { + atom.SamplesCTS = append(atom.SamplesCTS, rd.ReadUint32()) + } + } + + return atom, nil + + case Mdat: + return &AtomMdat{Data: data}, nil + } + + return &Atom{Name: name, Data: data}, nil +} + +func DecodeAtoms(b []byte) (atoms []any, err error) { + for len(b) > 0 { + atom, err := DecodeAtom(b) + if err != nil { + return nil, err + } + + if childs, ok := atom.([]any); ok { atoms = append(atoms, childs...) - - case MoofMfhd, MoofTrafTfhd: - continue - - case MoofTrafTfdt: - if len(data) < 8 { - return nil, io.EOF - } - - dt := binary.BigEndian.Uint64(data[4:]) - atoms = append(atoms, &Atom{Name: name, DecodeTime: dt}) - - case MoofTrafTrun: - rd := bits.NewReader(data) - - _ = rd.ReadByte() // version - flags := rd.ReadUint24() - samples := rd.ReadUint32() - - if flags&TrunDataOffset != 0 { - _ = rd.ReadUint32() // skip - } - if flags&TrunFirstSampleFlags != 0 { - _ = rd.ReadUint32() // skip - } - - atom := &Atom{Name: name} - - for i := uint32(0); i < samples; i++ { - if flags&TrunSampleDuration != 0 { - atom.SamplesDuration = append(atom.SamplesDuration, rd.ReadUint32()) - } - if flags&TrunSampleSize != 0 { - atom.SamplesSize = append(atom.SamplesSize, rd.ReadUint32()) - } - if flags&TrunSampleFlags != 0 { - _ = rd.ReadUint32() // skip - } - if flags&TrunSampleCTS != 0 { - _ = rd.ReadUint32() // skip - } - } - - if rd.EOF { - return nil, io.EOF - } - + } else { atoms = append(atoms, atom) - - case Mdat: - atoms = append(atoms, &Atom{Name: name, Data: data}) - - default: - println("iso: unsupported atom: " + name) } + + size := binary.BigEndian.Uint32(b) + b = b[size:] } return atoms, nil diff --git a/pkg/ivideon/client.go b/pkg/ivideon/client.go deleted file mode 100644 index ef79010e..00000000 --- a/pkg/ivideon/client.go +++ /dev/null @@ -1,314 +0,0 @@ -package ivideon - -import ( - "bytes" - "encoding/binary" - "encoding/json" - "fmt" - "io" - "net/http" - "strings" - "sync" - "time" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/h264" - "github.com/AlexxIT/go2rtc/pkg/iso" - "github.com/gorilla/websocket" - "github.com/pion/rtp" -) - -type State byte - -const ( - StateNone State = iota - StateConn - StateHandle -) - -// Deprecated: should be rewritten to core.Connection -type Client struct { - core.Listener - - ID string - - conn *websocket.Conn - - medias []*core.Media - receiver *core.Receiver - - msg *message - t0 time.Time - - buffer chan []byte - state State - mu sync.Mutex - - recv int -} - -func Dial(source string) (*Client, error) { - id := strings.Replace(source[8:], "/", ":", 1) - client := &Client{ID: id} - if err := client.Dial(); err != nil { - return nil, err - } - return client, nil -} - -func (c *Client) Dial() (err error) { - resp, err := http.Get( - "https://openapi-alpha.ivideon.com/cameras/" + c.ID + - "/live_stream?op=GET&access_token=public&q=2&" + - "video_codecs=h264&format=ws-fmp4", - ) - - data, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - - var v liveResponse - if err = json.Unmarshal(data, &v); err != nil { - return err - } - - if !v.Success { - return fmt.Errorf("wrong response: %s", data) - } - - c.conn, _, err = websocket.DefaultDialer.Dial(v.Result.URL, nil) - if err != nil { - return err - } - - if err = c.getTracks(); err != nil { - _ = c.conn.Close() - return err - } - - c.state = StateConn - - return nil -} - -func (c *Client) Handle() error { - // add delay to the stream for smooth playing (not a best solution) - c.t0 = time.Now().Add(time.Second) - - c.mu.Lock() - - if c.state == StateConn { - c.buffer = make(chan []byte, 5) - c.state = StateHandle - - // processing stream in separate thread for lower delay between packets - go c.worker(c.buffer) - } - - c.mu.Unlock() - - _, data, err := c.conn.ReadMessage() - if err != nil { - return err - } - - if c.receiver != nil && c.receiver.ID == c.msg.Track { - c.mu.Lock() - if c.state == StateHandle { - c.buffer <- data - c.recv += len(data) - } - c.mu.Unlock() - } - - // we have one unprocessed msg after getTracks - for { - _, data, err = c.conn.ReadMessage() - if err != nil { - return err - } - - var msg message - if err = json.Unmarshal(data, &msg); err != nil { - return err - } - - switch msg.Type { - case "stream-init": - continue - - case "metadata": - continue - - case "fragment": - _, data, err = c.conn.ReadMessage() - if err != nil { - return err - } - - if c.receiver != nil && c.receiver.ID == msg.Track { - c.mu.Lock() - if c.state == StateHandle { - c.buffer <- data - c.recv += len(data) - } - c.mu.Unlock() - } - - default: - return fmt.Errorf("wrong message type: %s", data) - } - } -} - -func (c *Client) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - - switch c.state { - case StateNone: - return nil - case StateConn: - case StateHandle: - close(c.buffer) - } - - c.state = StateNone - - return c.conn.Close() -} - -func (c *Client) getTracks() error { - for { - _, data, err := c.conn.ReadMessage() - if err != nil { - return err - } - - var msg message - if err = json.Unmarshal(data, &msg); err != nil { - return err - } - - switch msg.Type { - case "metadata": - continue - - case "stream-init": - s := msg.CodecString - i := strings.IndexByte(s, '.') - if i > 0 { - s = s[:i] - } - - switch s { - case "avc1": // avc1.4d0029 - // skip multiple identical init - if c.receiver != nil { - continue - } - - i = bytes.Index(msg.Data, []byte("avcC")) - 4 - if i < 0 { - return fmt.Errorf("ivideon: wrong AVC: %s", msg.Data) - } - - avccLen := binary.BigEndian.Uint32(msg.Data[i:]) - data = msg.Data[i+8 : i+int(avccLen)] - - codec := h264.ConfigToCodec(data) - - media := &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, - } - c.medias = append(c.medias, media) - - c.receiver = core.NewReceiver(media, codec) - c.receiver.ID = msg.TrackID - - case "mp4a": // mp4a.40.2 - } - - case "fragment": - c.msg = &msg - return nil - - default: - return fmt.Errorf("wrong message type: %s", data) - } - } -} - -func (c *Client) worker(buffer chan []byte) { - for data := range buffer { - atoms, err := iso.DecodeAtoms(data) - if err != nil { - continue - } - - var trun *iso.Atom - var ts uint32 - - for _, atom := range atoms { - switch atom.Name { - case iso.MoofTrafTrun: - trun = atom - case iso.MoofTrafTfdt: - ts = uint32(atom.DecodeTime) - case iso.Mdat: - data = atom.Data - } - } - - if trun == nil || trun.SamplesDuration == nil || trun.SamplesSize == nil { - continue - } - - for i := 0; i < len(trun.SamplesDuration); i++ { - duration := trun.SamplesDuration[i] - size := trun.SamplesSize[i] - - // synchronize framerate for WebRTC and MSE - d := time.Duration(ts)*time.Millisecond - time.Since(c.t0) - if d < 0 { - d = time.Duration(duration) * time.Millisecond / 2 - } - time.Sleep(d) - - // can be SPS, PPS and IFrame in one packet - packet := &rtp.Packet{ - // ivideon clockrate=1000, RTP clockrate=90000 - Header: rtp.Header{Timestamp: ts * 90}, - Payload: data[:size], - } - c.receiver.WriteRTP(packet) - - data = data[size:] - ts += duration - } - } -} - -type liveResponse struct { - Result struct { - URL string `json:"url"` - } `json:"result"` - Success bool `json:"success"` -} - -type message struct { - Type string `json:"type"` - - CodecString string `json:"codec_string"` - Data []byte `json:"data"` - TrackID byte `json:"track_id"` - - Track byte `json:"track"` - StartTime float32 `json:"start_time"` - Duration float32 `json:"duration"` - IsKey bool `json:"is_key"` - DataOffset uint32 `json:"data_offset"` -} diff --git a/pkg/ivideon/ivideon.go b/pkg/ivideon/ivideon.go new file mode 100644 index 00000000..973b9ba0 --- /dev/null +++ b/pkg/ivideon/ivideon.go @@ -0,0 +1,187 @@ +package ivideon + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/mp4" + "github.com/gorilla/websocket" +) + +type Producer struct { + core.Connection + conn *websocket.Conn + + buf []byte + + dem *mp4.Demuxer +} + +func Dial(source string) (core.Producer, error) { + id := strings.Replace(source[8:], "/", ":", 1) + + url, err := GetLiveStream(id) + if err != nil { + return nil, err + } + + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, err + } + + prod := &Producer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "ivideon", + Protocol: core.Before(url, ":"), // wss + RemoteAddr: conn.RemoteAddr().String(), + Source: source, + URL: url, + Transport: conn, + }, + conn: conn, + } + + if err = prod.probe(); err != nil { + _ = conn.Close() + return nil, err + } + + return prod, nil +} + +func GetLiveStream(id string) (string, error) { + // &video_codecs=h264,h265&audio_codecs=aac,mp3,pcma,pcmu,none + resp, err := http.Get( + "https://openapi-alpha.ivideon.com/cameras/" + id + + "/live_stream?op=GET&access_token=public&q=2&video_codecs=h264&format=ws-fmp4", + ) + if err != nil { + return "", err + } + + var v struct { + Message string `json:"message"` + Result struct { + URL string `json:"url"` + } `json:"result"` + Success bool `json:"success"` + } + if err = json.NewDecoder(resp.Body).Decode(&v); err != nil { + return "", err + } + + if !v.Success { + return "", fmt.Errorf("ivideon: can't get live_stream: " + v.Message) + } + + return v.Result.URL, nil +} + +func (p *Producer) Start() error { + receivers := make(map[uint32]*core.Receiver) + for _, receiver := range p.Receivers { + trackID := p.dem.GetTrackID(receiver.Codec) + receivers[trackID] = receiver + } + + ch := make(chan []byte, 10) + defer close(ch) + + ch <- p.buf + + go func() { + // add delay to the stream for smooth playing (not a best solution) + t0 := time.Now() + + for data := range ch { + trackID, packets := p.dem.Demux(data) + if receiver := receivers[trackID]; receiver != nil { + clockRate := time.Duration(receiver.Codec.ClockRate) + for _, packet := range packets { + // synchronize framerate for WebRTC and MSE + ts := time.Second * time.Duration(packet.Timestamp) / clockRate + d := ts - time.Since(t0) + if d < 0 { + d = 10 * time.Millisecond + } + time.Sleep(d) + + receiver.WriteRTP(packet) + } + } + } + }() + + for { + var msg message + if err := p.conn.ReadJSON(&msg); err != nil { + return err + } + + switch msg.Type { + case "stream-init", "metadata": + continue + + case "fragment": + _, b, err := p.conn.ReadMessage() + if err != nil { + return err + } + + p.Recv += len(b) + ch <- b + + default: + return errors.New("ivideon: wrong message type: " + msg.Type) + } + } +} + +func (p *Producer) probe() (err error) { + p.dem = &mp4.Demuxer{} + + for { + var msg message + if err = p.conn.ReadJSON(&msg); err != nil { + return err + } + + switch msg.Type { + case "metadata": + continue + + case "stream-init": + // it's difficult to maintain audio + if strings.HasPrefix(msg.CodecString, "avc1") { + medias := p.dem.Probe(msg.Data) + p.Medias = append(p.Medias, medias...) + } + + case "fragment": + _, p.buf, err = p.conn.ReadMessage() + return + + default: + return errors.New("ivideon: wrong message type: " + msg.Type) + } + } +} + +type message struct { + Type string `json:"type"` + CodecString string `json:"codec_string"` + Data []byte `json:"data"` + //TrackID byte `json:"track_id"` + //Track byte `json:"track"` + //StartTime float32 `json:"start_time"` + //Duration float32 `json:"duration"` + //IsKey bool `json:"is_key"` + //DataOffset uint32 `json:"data_offset"` +} diff --git a/pkg/ivideon/producer.go b/pkg/ivideon/producer.go deleted file mode 100644 index 78084123..00000000 --- a/pkg/ivideon/producer.go +++ /dev/null @@ -1,51 +0,0 @@ -package ivideon - -import ( - "encoding/json" - - "github.com/AlexxIT/go2rtc/pkg/core" -) - -func (c *Client) GetMedias() []*core.Media { - return c.medias -} - -func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - if c.receiver != nil { - return c.receiver, nil - } - return nil, core.ErrCantGetTrack -} - -func (c *Client) Start() error { - err := c.Handle() - if c.buffer == nil { - return nil - } - return err -} - -func (c *Client) Stop() error { - if c.receiver != nil { - c.receiver.Close() - } - return c.Close() -} - -func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Connection{ - ID: core.ID(c), - FormatName: "ivideon", - Protocol: "ws", - URL: c.ID, - Medias: c.medias, - Recv: c.recv, - } - if c.conn != nil { - info.RemoteAddr = c.conn.RemoteAddr().String() - } - if c.receiver != nil { - info.Receivers = []*core.Receiver{c.receiver} - } - return json.Marshal(info) -} diff --git a/pkg/mp4/demuxer.go b/pkg/mp4/demuxer.go new file mode 100644 index 00000000..25c8c70e --- /dev/null +++ b/pkg/mp4/demuxer.go @@ -0,0 +1,116 @@ +package mp4 + +import ( + "github.com/AlexxIT/go2rtc/pkg/aac" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/iso" + "github.com/pion/rtp" +) + +type Demuxer struct { + codecs map[uint32]*core.Codec + timeScales map[uint32]float32 +} + +func (d *Demuxer) Probe(init []byte) (medias []*core.Media) { + var trackID, timeScale uint32 + + if d.codecs == nil { + d.codecs = make(map[uint32]*core.Codec) + d.timeScales = make(map[uint32]float32) + } + + atoms, _ := iso.DecodeAtoms(init) + for _, atom := range atoms { + var codec *core.Codec + + switch atom := atom.(type) { + case *iso.AtomTkhd: + trackID = atom.TrackID + case *iso.AtomMdhd: + timeScale = atom.TimeScale + case *iso.AtomVideo: + switch atom.Name { + case "avc1": + codec = h264.ConfigToCodec(atom.Config) + } + case *iso.AtomAudio: + switch atom.Name { + case "mp4a": + codec = aac.ConfigToCodec(atom.Config) + } + } + + if codec != nil { + d.codecs[trackID] = codec + d.timeScales[trackID] = float32(codec.ClockRate) / float32(timeScale) + + medias = append(medias, &core.Media{ + Kind: codec.Kind(), + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + }) + } + } + + return +} + +func (d *Demuxer) GetTrackID(codec *core.Codec) uint32 { + for trackID, c := range d.codecs { + if c == codec { + return trackID + } + } + return 0 +} + +func (d *Demuxer) Demux(data2 []byte) (trackID uint32, packets []*core.Packet) { + atoms, err := iso.DecodeAtoms(data2) + if err != nil { + return 0, nil + } + + var ts uint32 + var trun *iso.AtomTrun + var data []byte + + for _, atom := range atoms { + switch atom := atom.(type) { + case *iso.AtomTfhd: + trackID = atom.TrackID + case *iso.AtomTfdt: + ts = uint32(atom.DecodeTime) + case *iso.AtomTrun: + trun = atom + case *iso.AtomMdat: + data = atom.Data + } + } + + timeScale := d.timeScales[trackID] + if timeScale == 0 { + return 0, nil + } + + n := len(trun.SamplesDuration) + packets = make([]*core.Packet, n) + + for i := 0; i < n; i++ { + duration := trun.SamplesDuration[i] + size := trun.SamplesSize[i] + + // can be SPS, PPS and IFrame in one packet + timestamp := uint32(float32(ts) * timeScale) + packets[i] = &rtp.Packet{ + Header: rtp.Header{Timestamp: timestamp}, + Payload: data[:size], + } + + data = data[size:] + ts += duration + } + + return +}