From c6d5bb4eeb75306819c3be13721167a03b399a86 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Mon, 28 Aug 2023 22:31:52 +0300 Subject: [PATCH] Add kasa client and simplify multipart client --- internal/http/http.go | 27 +---- internal/tapo/tapo.go | 15 ++- pkg/kasa/producer.go | 192 ++++++++++++++++++++++++++++++++ pkg/multipart/producer.go | 225 ++++---------------------------------- pkg/tapo/client.go | 14 +-- pkg/tcp/multipart.go | 56 ++++++++++ 6 files changed, 289 insertions(+), 240 deletions(-) create mode 100644 pkg/kasa/producer.go create mode 100644 pkg/tcp/multipart.go diff --git a/internal/http/http.go b/internal/http/http.go index bd6fc265..4b55d2cf 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -2,10 +2,8 @@ package http import ( "errors" - "io" "net" "net/http" - "net/http/httputil" "net/url" "strings" @@ -35,8 +33,6 @@ func handleHTTP(rawURL string) (core.Producer, error) { return nil, err } - var chunked bool - if rawQuery != "" { query := streams.ParseQuery(rawQuery) @@ -44,8 +40,6 @@ func handleHTTP(rawURL string) (core.Producer, error) { key, value, _ := strings.Cut(header, ":") req.Header.Add(key, strings.TrimSpace(value)) } - - chunked = query.Get("chunked") == "1" } res, err := tcp.Do(req) @@ -68,33 +62,18 @@ func handleHTTP(rawURL string) (core.Producer, error) { ext = req.URL.Path[i+1:] } - var rd io.ReadCloser - - // support buggy clients, like TP-Link cameras with HTTP/1.0 chunked encoding - if chunked { - rd = struct { - io.Reader - io.Closer - }{ - httputil.NewChunkedReader(res.Body), - res.Body, - } - } else { - rd = res.Body - } - switch { case ct == "image/jpeg": return mjpeg.NewClient(res), nil case ct == "multipart/x-mixed-replace": - return multipart.Open(rd) + return multipart.Open(res.Body) case ct == "application/vnd.apple.mpegurl" || ext == "m3u8": - return hls.OpenURL(req.URL, rd) + return hls.OpenURL(req.URL, res.Body) } - return magic.Open(rd) + return magic.Open(res.Body) } func handleTCP(rawURL string) (core.Producer, error) { diff --git a/internal/tapo/tapo.go b/internal/tapo/tapo.go index 971928c7..a54c8c5e 100644 --- a/internal/tapo/tapo.go +++ b/internal/tapo/tapo.go @@ -3,17 +3,16 @@ package tapo import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/kasa" "github.com/AlexxIT/go2rtc/pkg/tapo" ) func Init() { - streams.HandleFunc("tapo", handle) -} + streams.HandleFunc("kasa", func(url string) (core.Producer, error) { + return kasa.Dial(url) + }) -func handle(url string) (core.Producer, error) { - conn := tapo.NewClient(url) - if err := conn.Dial(); err != nil { - return nil, err - } - return conn, nil + streams.HandleFunc("tapo", func(url string) (core.Producer, error) { + return tapo.Dial(url) + }) } diff --git a/pkg/kasa/producer.go b/pkg/kasa/producer.go new file mode 100644 index 00000000..cbad1028 --- /dev/null +++ b/pkg/kasa/producer.go @@ -0,0 +1,192 @@ +package kasa + +import ( + "bufio" + "errors" + "io" + "net/http" + "net/http/httputil" + "strconv" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/h264/annexb" + "github.com/AlexxIT/go2rtc/pkg/tcp" + "github.com/pion/rtp" +) + +type Producer struct { + core.SuperProducer + rd *core.ReadBuffer + + reader *bufio.Reader +} + +func Dial(url string) (*Producer, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + req.URL.Scheme = "httpx" + + res, err := tcp.Do(req) + if err != nil { + return nil, err + } + + rd := struct { + io.Reader + io.Closer + }{ + httputil.NewChunkedReader(res.Body), + res.Body, + } + + prod := &Producer{rd: core.NewReadBuffer(rd)} + if err = prod.probe(); err != nil { + return nil, err + } + prod.Type = "Kasa producer" + return prod, nil +} + +func (c *Producer) Start() error { + if len(c.Receivers) == 0 { + return errors.New("multipart: no receivers") + } + + var video, audio *core.Receiver + + for _, receiver := range c.Receivers { + switch receiver.Codec.Name { + case core.CodecH264: + video = receiver + case core.CodecPCMU: + audio = receiver + } + } + + for { + header, body, err := tcp.NextMultipart(c.reader) + if err != nil { + return err + } + + c.Recv += len(body) + + ct := header.Get("Content-Type") + switch ct { + case MimeVideo: + if video != nil { + ts := GetTimestamp(header) + pkt := &rtp.Packet{ + Header: rtp.Header{ + Timestamp: uint32(ts * 90000), + }, + Payload: annexb.EncodeToAVCC(body, false), + } + video.WriteRTP(pkt) + } + + case MimeG711U: + if audio != nil { + ts := GetTimestamp(header) + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + Timestamp: uint32(ts * 8000), + }, + Payload: body, + } + audio.WriteRTP(pkt) + } + } + } +} + +func (c *Producer) Stop() error { + _ = c.SuperProducer.Close() + return c.rd.Close() +} + +const ( + MimeVideo = "video/x-h264" + MimeG711U = "audio/g711u" +) + +func (c *Producer) probe() error { + c.rd.BufferSize = core.ProbeSize + c.reader = bufio.NewReader(c.rd) + + defer func() { + c.rd.Reset() + c.reader = bufio.NewReader(c.rd) + }() + + waitVideo, waitAudio := true, true + timeout := time.Now().Add(core.ProbeTimeout) + + for (waitVideo || waitAudio) && time.Now().Before(timeout) { + header, body, err := tcp.NextMultipart(c.reader) + if err != nil { + return err + } + + var media *core.Media + + ct := header.Get("Content-Type") + switch ct { + case MimeVideo: + if !waitVideo { + continue + } + waitVideo = false + + body = annexb.EncodeToAVCC(body, false) + codec := h264.AVCCToCodec(body) + media = &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{codec}, + } + + case MimeG711U: + if !waitAudio { + continue + } + waitAudio = false + + media = &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecPCMU, + ClockRate: 8000, + }, + }, + } + + default: + return errors.New("kasa: unsupported type: " + ct) + } + + c.Medias = append(c.Medias, media) + } + + return nil +} + +// GetTimestamp - return timestamp in seconds +func GetTimestamp(header http.Header) float64 { + if s := header.Get("X-Timestamp"); s != "" { + if f, _ := strconv.ParseFloat(s, 32); f != 0 { + return f + } + } + + return float64(time.Duration(time.Now().UnixNano()) / time.Second) +} diff --git a/pkg/multipart/producer.go b/pkg/multipart/producer.go index f77061ee..9ab268fe 100644 --- a/pkg/multipart/producer.go +++ b/pkg/multipart/producer.go @@ -4,55 +4,49 @@ import ( "bufio" "errors" "io" - "net/http" - "net/textproto" - "strconv" - "strings" - "time" "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/h264" - "github.com/AlexxIT/go2rtc/pkg/h264/annexb" + "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/pion/rtp" ) type Producer struct { core.SuperProducer - rd *core.ReadBuffer - - boundary string - reader *bufio.Reader + closer io.Closer + reader *bufio.Reader } func Open(rd io.Reader) (*Producer, error) { - prod := &Producer{rd: core.NewReadBuffer(rd)} - if err := prod.probe(); err != nil { - return nil, err + prod := &Producer{ + closer: rd.(io.Closer), + reader: bufio.NewReader(rd), + } + prod.Medias = []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecJPEG, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + }, + }, + }, } prod.Type = "Multipart producer" return prod, nil } func (c *Producer) Start() error { - if len(c.Receivers) == 0 { - return errors.New("multipart: no receivers") + if len(c.Receivers) != 1 { + return errors.New("mjpeg: no receivers") } - var mjpeg, video, audio *core.Receiver - - for _, receiver := range c.Receivers { - switch receiver.Codec.Name { - case core.CodecH264: - video = receiver - case core.CodecPCMU: - audio = receiver - default: - mjpeg = receiver - } - } + mjpeg := c.Receivers[0] for { - header, body, err := c.next() + _, body, err := tcp.NextMultipart(c.reader) if err != nil { return err } @@ -65,182 +59,11 @@ func (c *Producer) Start() error { Payload: body, } mjpeg.WriteRTP(packet) - continue - } - - ct := header.Get("Content-Type") - switch ct { - case MimeVideo: - if video != nil { - ts := GetTimestamp(header) - pkt := &rtp.Packet{ - Header: rtp.Header{ - Timestamp: uint32(ts * 90000), - }, - Payload: annexb.EncodeToAVCC(body, false), - } - video.WriteRTP(pkt) - } - - case MimeG711U: - if audio != nil { - ts := GetTimestamp(header) - pkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - Timestamp: uint32(ts * 8000), - }, - Payload: body, - } - audio.WriteRTP(pkt) - } } } } func (c *Producer) Stop() error { _ = c.SuperProducer.Close() - return c.rd.Close() -} - -func (c *Producer) next() (http.Header, []byte, error) { - for { - // search next boundary and skip empty lines - s, err := c.reader.ReadString('\n') - if err != nil { - return nil, nil, err - } - - // auto guess boundary - if c.boundary == "" && strings.HasPrefix(s, "--") { - c.boundary = s - break - } else if strings.HasPrefix(s, c.boundary) { - break - } - - if s == "\r\n" { - continue - } - - return nil, nil, errors.New("multipart: wrong boundary: " + s) - } - - tp := textproto.NewReader(c.reader) - header, err := tp.ReadMIMEHeader() - if err != nil { - return nil, nil, err - } - - s := header.Get("Content-Length") - if s == "" { - return nil, nil, errors.New("multipart: no content length") - } - - size, err := strconv.Atoi(s) - if err != nil { - return nil, nil, err - } - - buf := make([]byte, size) - if _, err = io.ReadFull(c.reader, buf); err != nil { - return nil, nil, err - } - - _, _ = c.reader.Discard(2) // skip "\r\n" - - return http.Header(header), buf, nil -} - -const ( - MimeVideo = "video/x-h264" - MimeG711U = "audio/g711u" -) - -func (c *Producer) probe() error { - c.rd.BufferSize = core.ProbeSize - c.reader = bufio.NewReader(c.rd) - - defer func() { - c.rd.Reset() - c.reader = bufio.NewReader(c.rd) - }() - - waitVideo, waitAudio := true, true - timeout := time.Now().Add(core.ProbeTimeout) - - for (waitVideo || waitAudio) && time.Now().Before(timeout) { - header, body, err := c.next() - if err != nil { - return err - } - - var media *core.Media - - ct := header.Get("Content-Type") - switch ct { - case MimeVideo: - if !waitVideo { - continue - } - waitVideo = false - - body = annexb.EncodeToAVCC(body, false) - codec := h264.AVCCToCodec(body) - media = &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{codec}, - } - - case MimeG711U: - if !waitAudio { - continue - } - waitAudio = false - - media = &core.Media{ - Kind: core.KindAudio, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: core.CodecPCMU, - ClockRate: 8000, - }, - }, - } - - default: - waitVideo = false - waitAudio = false - - media = &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: core.CodecJPEG, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - }, - }, - } - } - - c.Medias = append(c.Medias, media) - } - - return nil -} - -// GetTimestamp - return timestamp in seconds -func GetTimestamp(header http.Header) float64 { - if s := header.Get("X-Timestamp"); s != "" { - if f, _ := strconv.ParseFloat(s, 32); f != 0 { - return f - } - } - - return float64(time.Duration(time.Now().UnixNano()) / time.Second) + return c.closer.Close() } diff --git a/pkg/tapo/client.go b/pkg/tapo/client.go index 4f2a3f80..79f37005 100644 --- a/pkg/tapo/client.go +++ b/pkg/tapo/client.go @@ -47,13 +47,13 @@ type cbcMode interface { SetIV([]byte) } -func NewClient(url string) *Client { - return &Client{url: url} -} - -func (c *Client) Dial() (err error) { - c.conn1, err = c.newConn() - return +func Dial(url string) (*Client, error) { + var err error + c := &Client{url: url} + if c.conn1, err = c.newConn(); err != nil { + return nil, err + } + return c, nil } func (c *Client) newConn() (net.Conn, error) { diff --git a/pkg/tcp/multipart.go b/pkg/tcp/multipart.go new file mode 100644 index 00000000..bc13c45b --- /dev/null +++ b/pkg/tcp/multipart.go @@ -0,0 +1,56 @@ +package tcp + +import ( + "bufio" + "errors" + "io" + "net/http" + "net/textproto" + "strconv" + "strings" +) + +func NextMultipart(rd *bufio.Reader) (http.Header, []byte, error) { + for { + // search next boundary and skip empty lines + s, err := rd.ReadString('\n') + if err != nil { + return nil, nil, err + } + + if strings.HasPrefix(s, "--") { + break + } + + if s == "\r\n" { + continue + } + + return nil, nil, errors.New("multipart: wrong boundary: " + s) + } + + tp := textproto.NewReader(rd) + header, err := tp.ReadMIMEHeader() + if err != nil { + return nil, nil, err + } + + s := header.Get("Content-Length") + if s == "" { + return nil, nil, errors.New("multipart: no content length") + } + + size, err := strconv.Atoi(s) + if err != nil { + return nil, nil, err + } + + buf := make([]byte, size) + if _, err = io.ReadFull(rd, buf); err != nil { + return nil, nil, err + } + + _, _ = rd.Discard(2) // skip "\r\n" + + return http.Header(header), buf, nil +}