Add kasa client and simplify multipart client

This commit is contained in:
Alexey Khit
2023-08-28 22:31:52 +03:00
parent 7f232c5cf2
commit c6d5bb4eeb
6 changed files with 289 additions and 240 deletions
+3 -24
View File
@@ -2,10 +2,8 @@ package http
import ( import (
"errors" "errors"
"io"
"net" "net"
"net/http" "net/http"
"net/http/httputil"
"net/url" "net/url"
"strings" "strings"
@@ -35,8 +33,6 @@ func handleHTTP(rawURL string) (core.Producer, error) {
return nil, err return nil, err
} }
var chunked bool
if rawQuery != "" { if rawQuery != "" {
query := streams.ParseQuery(rawQuery) query := streams.ParseQuery(rawQuery)
@@ -44,8 +40,6 @@ func handleHTTP(rawURL string) (core.Producer, error) {
key, value, _ := strings.Cut(header, ":") key, value, _ := strings.Cut(header, ":")
req.Header.Add(key, strings.TrimSpace(value)) req.Header.Add(key, strings.TrimSpace(value))
} }
chunked = query.Get("chunked") == "1"
} }
res, err := tcp.Do(req) res, err := tcp.Do(req)
@@ -68,33 +62,18 @@ func handleHTTP(rawURL string) (core.Producer, error) {
ext = req.URL.Path[i+1:] ext = req.URL.Path[i+1:]
} }
var rd io.ReadCloser
// support buggy clients, like TP-Link cameras with HTTP/1.0 chunked encoding
if chunked {
rd = struct {
io.Reader
io.Closer
}{
httputil.NewChunkedReader(res.Body),
res.Body,
}
} else {
rd = res.Body
}
switch { switch {
case ct == "image/jpeg": case ct == "image/jpeg":
return mjpeg.NewClient(res), nil return mjpeg.NewClient(res), nil
case ct == "multipart/x-mixed-replace": case ct == "multipart/x-mixed-replace":
return multipart.Open(rd) return multipart.Open(res.Body)
case ct == "application/vnd.apple.mpegurl" || ext == "m3u8": case ct == "application/vnd.apple.mpegurl" || ext == "m3u8":
return hls.OpenURL(req.URL, rd) return hls.OpenURL(req.URL, res.Body)
} }
return magic.Open(rd) return magic.Open(res.Body)
} }
func handleTCP(rawURL string) (core.Producer, error) { func handleTCP(rawURL string) (core.Producer, error) {
+7 -8
View File
@@ -3,17 +3,16 @@ package tapo
import ( import (
"github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/kasa"
"github.com/AlexxIT/go2rtc/pkg/tapo" "github.com/AlexxIT/go2rtc/pkg/tapo"
) )
func Init() { func Init() {
streams.HandleFunc("tapo", handle) streams.HandleFunc("kasa", func(url string) (core.Producer, error) {
} return kasa.Dial(url)
})
func handle(url string) (core.Producer, error) { streams.HandleFunc("tapo", func(url string) (core.Producer, error) {
conn := tapo.NewClient(url) return tapo.Dial(url)
if err := conn.Dial(); err != nil { })
return nil, err
}
return conn, nil
} }
+192
View File
@@ -0,0 +1,192 @@
package kasa
import (
"bufio"
"errors"
"io"
"net/http"
"net/http/httputil"
"strconv"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtp"
)
type Producer struct {
core.SuperProducer
rd *core.ReadBuffer
reader *bufio.Reader
}
func Dial(url string) (*Producer, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.URL.Scheme = "httpx"
res, err := tcp.Do(req)
if err != nil {
return nil, err
}
rd := struct {
io.Reader
io.Closer
}{
httputil.NewChunkedReader(res.Body),
res.Body,
}
prod := &Producer{rd: core.NewReadBuffer(rd)}
if err = prod.probe(); err != nil {
return nil, err
}
prod.Type = "Kasa producer"
return prod, nil
}
func (c *Producer) Start() error {
if len(c.Receivers) == 0 {
return errors.New("multipart: no receivers")
}
var video, audio *core.Receiver
for _, receiver := range c.Receivers {
switch receiver.Codec.Name {
case core.CodecH264:
video = receiver
case core.CodecPCMU:
audio = receiver
}
}
for {
header, body, err := tcp.NextMultipart(c.reader)
if err != nil {
return err
}
c.Recv += len(body)
ct := header.Get("Content-Type")
switch ct {
case MimeVideo:
if video != nil {
ts := GetTimestamp(header)
pkt := &rtp.Packet{
Header: rtp.Header{
Timestamp: uint32(ts * 90000),
},
Payload: annexb.EncodeToAVCC(body, false),
}
video.WriteRTP(pkt)
}
case MimeG711U:
if audio != nil {
ts := GetTimestamp(header)
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
Timestamp: uint32(ts * 8000),
},
Payload: body,
}
audio.WriteRTP(pkt)
}
}
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
const (
MimeVideo = "video/x-h264"
MimeG711U = "audio/g711u"
)
func (c *Producer) probe() error {
c.rd.BufferSize = core.ProbeSize
c.reader = bufio.NewReader(c.rd)
defer func() {
c.rd.Reset()
c.reader = bufio.NewReader(c.rd)
}()
waitVideo, waitAudio := true, true
timeout := time.Now().Add(core.ProbeTimeout)
for (waitVideo || waitAudio) && time.Now().Before(timeout) {
header, body, err := tcp.NextMultipart(c.reader)
if err != nil {
return err
}
var media *core.Media
ct := header.Get("Content-Type")
switch ct {
case MimeVideo:
if !waitVideo {
continue
}
waitVideo = false
body = annexb.EncodeToAVCC(body, false)
codec := h264.AVCCToCodec(body)
media = &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
case MimeG711U:
if !waitAudio {
continue
}
waitAudio = false
media = &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecPCMU,
ClockRate: 8000,
},
},
}
default:
return errors.New("kasa: unsupported type: " + ct)
}
c.Medias = append(c.Medias, media)
}
return nil
}
// GetTimestamp - return timestamp in seconds
func GetTimestamp(header http.Header) float64 {
if s := header.Get("X-Timestamp"); s != "" {
if f, _ := strconv.ParseFloat(s, 32); f != 0 {
return f
}
}
return float64(time.Duration(time.Now().UnixNano()) / time.Second)
}
+24 -201
View File
@@ -4,55 +4,49 @@ import (
"bufio" "bufio"
"errors" "errors"
"io" "io"
"net/http"
"net/textproto"
"strconv"
"strings"
"time"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/pion/rtp" "github.com/pion/rtp"
) )
type Producer struct { type Producer struct {
core.SuperProducer core.SuperProducer
rd *core.ReadBuffer closer io.Closer
reader *bufio.Reader
boundary string
reader *bufio.Reader
} }
func Open(rd io.Reader) (*Producer, error) { func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{rd: core.NewReadBuffer(rd)} prod := &Producer{
if err := prod.probe(); err != nil { closer: rd.(io.Closer),
return nil, err reader: bufio.NewReader(rd),
}
prod.Medias = []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
},
} }
prod.Type = "Multipart producer" prod.Type = "Multipart producer"
return prod, nil return prod, nil
} }
func (c *Producer) Start() error { func (c *Producer) Start() error {
if len(c.Receivers) == 0 { if len(c.Receivers) != 1 {
return errors.New("multipart: no receivers") return errors.New("mjpeg: no receivers")
} }
var mjpeg, video, audio *core.Receiver mjpeg := c.Receivers[0]
for _, receiver := range c.Receivers {
switch receiver.Codec.Name {
case core.CodecH264:
video = receiver
case core.CodecPCMU:
audio = receiver
default:
mjpeg = receiver
}
}
for { for {
header, body, err := c.next() _, body, err := tcp.NextMultipart(c.reader)
if err != nil { if err != nil {
return err return err
} }
@@ -65,182 +59,11 @@ func (c *Producer) Start() error {
Payload: body, Payload: body,
} }
mjpeg.WriteRTP(packet) mjpeg.WriteRTP(packet)
continue
}
ct := header.Get("Content-Type")
switch ct {
case MimeVideo:
if video != nil {
ts := GetTimestamp(header)
pkt := &rtp.Packet{
Header: rtp.Header{
Timestamp: uint32(ts * 90000),
},
Payload: annexb.EncodeToAVCC(body, false),
}
video.WriteRTP(pkt)
}
case MimeG711U:
if audio != nil {
ts := GetTimestamp(header)
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
Timestamp: uint32(ts * 8000),
},
Payload: body,
}
audio.WriteRTP(pkt)
}
} }
} }
} }
func (c *Producer) Stop() error { func (c *Producer) Stop() error {
_ = c.SuperProducer.Close() _ = c.SuperProducer.Close()
return c.rd.Close() return c.closer.Close()
}
func (c *Producer) next() (http.Header, []byte, error) {
for {
// search next boundary and skip empty lines
s, err := c.reader.ReadString('\n')
if err != nil {
return nil, nil, err
}
// auto guess boundary
if c.boundary == "" && strings.HasPrefix(s, "--") {
c.boundary = s
break
} else if strings.HasPrefix(s, c.boundary) {
break
}
if s == "\r\n" {
continue
}
return nil, nil, errors.New("multipart: wrong boundary: " + s)
}
tp := textproto.NewReader(c.reader)
header, err := tp.ReadMIMEHeader()
if err != nil {
return nil, nil, err
}
s := header.Get("Content-Length")
if s == "" {
return nil, nil, errors.New("multipart: no content length")
}
size, err := strconv.Atoi(s)
if err != nil {
return nil, nil, err
}
buf := make([]byte, size)
if _, err = io.ReadFull(c.reader, buf); err != nil {
return nil, nil, err
}
_, _ = c.reader.Discard(2) // skip "\r\n"
return http.Header(header), buf, nil
}
const (
MimeVideo = "video/x-h264"
MimeG711U = "audio/g711u"
)
func (c *Producer) probe() error {
c.rd.BufferSize = core.ProbeSize
c.reader = bufio.NewReader(c.rd)
defer func() {
c.rd.Reset()
c.reader = bufio.NewReader(c.rd)
}()
waitVideo, waitAudio := true, true
timeout := time.Now().Add(core.ProbeTimeout)
for (waitVideo || waitAudio) && time.Now().Before(timeout) {
header, body, err := c.next()
if err != nil {
return err
}
var media *core.Media
ct := header.Get("Content-Type")
switch ct {
case MimeVideo:
if !waitVideo {
continue
}
waitVideo = false
body = annexb.EncodeToAVCC(body, false)
codec := h264.AVCCToCodec(body)
media = &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
}
case MimeG711U:
if !waitAudio {
continue
}
waitAudio = false
media = &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecPCMU,
ClockRate: 8000,
},
},
}
default:
waitVideo = false
waitAudio = false
media = &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
}
}
c.Medias = append(c.Medias, media)
}
return nil
}
// GetTimestamp - return timestamp in seconds
func GetTimestamp(header http.Header) float64 {
if s := header.Get("X-Timestamp"); s != "" {
if f, _ := strconv.ParseFloat(s, 32); f != 0 {
return f
}
}
return float64(time.Duration(time.Now().UnixNano()) / time.Second)
} }
+7 -7
View File
@@ -47,13 +47,13 @@ type cbcMode interface {
SetIV([]byte) SetIV([]byte)
} }
func NewClient(url string) *Client { func Dial(url string) (*Client, error) {
return &Client{url: url} var err error
} c := &Client{url: url}
if c.conn1, err = c.newConn(); err != nil {
func (c *Client) Dial() (err error) { return nil, err
c.conn1, err = c.newConn() }
return return c, nil
} }
func (c *Client) newConn() (net.Conn, error) { func (c *Client) newConn() (net.Conn, error) {
+56
View File
@@ -0,0 +1,56 @@
package tcp
import (
"bufio"
"errors"
"io"
"net/http"
"net/textproto"
"strconv"
"strings"
)
func NextMultipart(rd *bufio.Reader) (http.Header, []byte, error) {
for {
// search next boundary and skip empty lines
s, err := rd.ReadString('\n')
if err != nil {
return nil, nil, err
}
if strings.HasPrefix(s, "--") {
break
}
if s == "\r\n" {
continue
}
return nil, nil, errors.New("multipart: wrong boundary: " + s)
}
tp := textproto.NewReader(rd)
header, err := tp.ReadMIMEHeader()
if err != nil {
return nil, nil, err
}
s := header.Get("Content-Length")
if s == "" {
return nil, nil, errors.New("multipart: no content length")
}
size, err := strconv.Atoi(s)
if err != nil {
return nil, nil, err
}
buf := make([]byte, size)
if _, err = io.ReadFull(rd, buf); err != nil {
return nil, nil, err
}
_, _ = rd.Discard(2) // skip "\r\n"
return http.Header(header), buf, nil
}