diff --git a/internal/bubble/bubble.go b/internal/bubble/bubble.go new file mode 100644 index 00000000..65d0237e --- /dev/null +++ b/internal/bubble/bubble.go @@ -0,0 +1,19 @@ +package bubble + +import ( + "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/pkg/bubble" + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func Init() { + streams.HandleFunc("bubble", handle) +} + +func handle(url string) (core.Producer, error) { + conn := bubble.NewClient(url) + if err := conn.Dial(); err != nil { + return nil, err + } + return conn, nil +} diff --git a/main.go b/main.go index cd7b2939..fa90e809 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/internal/app" + "github.com/AlexxIT/go2rtc/internal/bubble" "github.com/AlexxIT/go2rtc/internal/debug" "github.com/AlexxIT/go2rtc/internal/dvrip" "github.com/AlexxIT/go2rtc/internal/echo" @@ -74,6 +75,7 @@ func main() { roborock.Init() // roborock source homekit.Init() // homekit source nest.Init() // nest source + bubble.Init() // bubble source // 6. Helper modules diff --git a/pkg/bubble/client.go b/pkg/bubble/client.go new file mode 100644 index 00000000..f97c8450 --- /dev/null +++ b/pkg/bubble/client.go @@ -0,0 +1,245 @@ +// Package bubble, because: +// Request URL: /bubble/live?ch=0&stream=0 +// Response Conten-Type: video/bubble +// https://github.com/Lynch234ok/lynch-git/blob/master/app_rebulid/src/bubble.c +package bubble + +import ( + "bufio" + "encoding/binary" + "errors" + "io" + "net" + "net/http" + "net/url" + "regexp" + "strings" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/tcp" + "github.com/pion/rtp" +) + +type Client struct { + core.Listener + + url string + conn net.Conn + + videoCodec string + stream int + + r *bufio.Reader + + medias []*core.Media + receivers []*core.Receiver + + videoTrack *core.Receiver + audioTrack *core.Receiver + + recv int +} + +func NewClient(url string) *Client { + return &Client{url: url} +} + +const ( + SyncByte = 0xAA + PacketAuth = 0x00 + PacketMedia = 0x01 + PacketStart = 0x0A +) + +const Timeout = time.Second * 5 + +func (c *Client) Dial() (err error) { + u, err := url.Parse(c.url) + if err != nil { + return + } + + c.conn, err = net.DialTimeout("tcp4", u.Host, Timeout) + + if err = c.conn.SetDeadline(time.Now().Add(Timeout)); err != nil { + return err + } + + req := &tcp.Request{Method: "GET", URL: &url.URL{Path: u.Path, RawQuery: u.RawQuery}, Proto: "HTTP/1.1"} + err = req.Write(c.conn) + + c.r = bufio.NewReader(c.conn) + res, err := tcp.ReadResponse(c.r) + + if res.StatusCode != http.StatusOK { + return errors.New("wrong response: " + res.Status) + } + + // 1. Read 1024 bytes with XML + xml := make([]byte, 1024) + if _, err = io.ReadFull(c.r, xml); err != nil { + return + } + + // + // + // + // + // + stream := u.Query().Get("stream") + if stream != "" { + c.stream = core.Atoi(stream) + } else { + stream = "0" + } + + re := regexp.MustCompile("]+`) + stream = re.FindString(string(xml)) + if strings.Contains(stream, ".265") { + c.videoCodec = core.CodecH265 + } else { + c.videoCodec = core.CodecH264 + } + + // 2. Write size uint32 + unknown 4b + user 20b + pass 20b + b := make([]byte, 48) + binary.BigEndian.PutUint32(b, 44) + + if u.User != nil { + copy(b[8:], u.User.Username()) + pass, _ := u.User.Password() + copy(b[28:], pass) + } else { + copy(b[8:], "admin") + } + + if err = c.Write(PacketAuth, 0x0E16C271, b); err != nil { + return + } + + // 3. Read response + cmd, b, err := c.Read() // don't know how to parse + if err != nil { + return + } + + if cmd != PacketAuth || len(b) != 44 || b[4] != 3 || b[8] != 1 { + return errors.New("wrong auth response") + } + + return +} + +func (c *Client) Write(command byte, timestamp uint32, payload []byte) error { + if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { + return err + } + + // 0xAA + size uint32 + cmd byte + ts uint32 + size2 uint32 + payload + b := make([]byte, 14+len(payload)) + b[0] = SyncByte + binary.BigEndian.PutUint32(b[1:], uint32(5+len(payload))) + b[5] = command + binary.BigEndian.PutUint32(b[6:], timestamp) + copy(b[10:], payload) + + _, err := c.conn.Write(b) + return err +} + +func (c *Client) Read() (byte, []byte, error) { + if err := c.conn.SetReadDeadline(time.Now().Add(Timeout)); err != nil { + return 0, nil, err + } + + // 0xAA + size uint32 + cmd byte + ts uint32 + size2 uint32 + b := make([]byte, 10) + if _, err := io.ReadFull(c.r, b); err != nil { + return 0, nil, err + } + + if b[0] != SyncByte { + return 0, nil, errors.New("wrong start byte") + } + + size := binary.BigEndian.Uint32(b[1:]) + payload := make([]byte, size-1-4) + if _, err := io.ReadFull(c.r, payload); err != nil { + return 0, nil, err + } + + //timestamp := binary.BigEndian.Uint32(b[6:]) // in ms + + return b[5], payload, nil +} + +func (c *Client) Play() error { + // yeah, there's no mistake about the little endian + b := make([]byte, 16) + //binary.LittleEndian.PutUint32(b, 0) // channel + binary.LittleEndian.PutUint32(b[4:], uint32(c.stream)) + binary.LittleEndian.PutUint32(b[8:], 1) // opened + return c.Write(PacketStart, 0x0E16C2DF, b) +} + +func (c *Client) Handle() error { + var audioTS uint32 + + for { + cmd, b, err := c.Read() + if err != nil { + return err + } + + c.recv += len(b) + + if cmd != PacketMedia { + continue + } + + // size uint32 + type 1b + channel 1b + // type = 1 for keyframe, 2 for other frame, 0 for audio + + if b[4] > 0 { + if c.videoTrack == nil { + continue + } + + pkt := &rtp.Packet{ + Header: rtp.Header{ + Timestamp: core.Now90000(), + }, + Payload: h264.AnnexB2AVC(b[6:]), + } + c.videoTrack.WriteRTP(pkt) + } else { + if c.audioTrack == nil { + continue + } + + //binary.LittleEndian.Uint32(b[6:]) // entries (always 1) + //size := binary.LittleEndian.Uint32(b[10:]) // size + //mk := binary.LittleEndian.Uint64(b[14:]) // pts (uint64_t) + //binary.LittleEndian.Uint32(b[22:]) // gtime (time_t) + //name := b[26:34] // g711 + //rate := binary.LittleEndian.Uint32(b[34:]) // sample rate + //width := binary.LittleEndian.Uint32(b[38:]) // samplewidth + + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Timestamp: audioTS, + }, + Payload: b[6+36:], // don't know what is in first 36 bytes + } + audioTS += uint32(len(pkt.Payload)) + c.audioTrack.WriteRTP(pkt) + } + } +} + +func (c *Client) Close() error { + return c.conn.Close() +} diff --git a/pkg/bubble/producer.go b/pkg/bubble/producer.go new file mode 100644 index 00000000..a7aaa56e --- /dev/null +++ b/pkg/bubble/producer.go @@ -0,0 +1,75 @@ +package bubble + +import ( + "encoding/json" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func (c *Client) GetMedias() []*core.Media { + if c.medias == nil { + c.medias = []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + {Name: c.videoCodec, ClockRate: 90000, PayloadType: core.PayloadTypeRAW}, + }, + }, + { + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + {Name: core.CodecPCMA, ClockRate: 8000, PayloadType: 8}, + }, + }, + } + } + + return c.medias +} + +func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + for _, track := range c.receivers { + if track.Codec == codec { + return track, nil + } + } + + track := core.NewReceiver(media, codec) + + switch media.Kind { + case core.KindVideo: + c.videoTrack = track + case core.KindAudio: + c.audioTrack = track + } + + c.receivers = append(c.receivers, track) + + return track, nil +} + +func (c *Client) Start() error { + if err := c.Play(); err != nil { + return err + } + return c.Handle() +} + +func (c *Client) Stop() error { + for _, receiver := range c.receivers { + receiver.Close() + } + return c.Close() +} + +func (c *Client) MarshalJSON() ([]byte, error) { + info := &core.Info{ + Type: "Bubble active producer", + Medias: c.medias, + Recv: c.recv, + Receivers: c.receivers, + } + return json.Marshal(info) +}