diff --git a/cmd/tapo/tapo.go b/cmd/tapo/tapo.go new file mode 100644 index 00000000..a48cde45 --- /dev/null +++ b/cmd/tapo/tapo.go @@ -0,0 +1,25 @@ +package tapo + +import ( + "github.com/AlexxIT/go2rtc/cmd/streams" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/AlexxIT/go2rtc/pkg/tapo" +) + +func Init() { + streams.HandleFunc("tapo", handle) +} + +func handle(url string) (streamer.Producer, error) { + conn := tapo.NewClient(url) + if err := conn.Dial(); err != nil { + return nil, err + } + if err := conn.Play(); err != nil { + return nil, err + } + if err := conn.Handle(); err != nil { + return nil, err + } + return conn, nil +} diff --git a/main.go b/main.go index c6c164f7..33fed57a 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ import ( "github.com/AlexxIT/go2rtc/cmd/rtsp" "github.com/AlexxIT/go2rtc/cmd/srtp" "github.com/AlexxIT/go2rtc/cmd/streams" + "github.com/AlexxIT/go2rtc/cmd/tapo" "github.com/AlexxIT/go2rtc/cmd/webrtc" "os" "os/signal" @@ -40,6 +41,7 @@ func main() { ivideon.Init() http.Init() dvrip.Init() + tapo.Init() srtp.Init() homekit.Init() diff --git a/pkg/mpegts/helpers.go b/pkg/mpegts/helpers.go index fbe232f7..bdac1dee 100644 --- a/pkg/mpegts/helpers.go +++ b/pkg/mpegts/helpers.go @@ -1,6 +1,8 @@ package mpegts import ( + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/streamer" "time" ) @@ -57,3 +59,35 @@ func ParseTime(b []byte) time.Duration { ts := (uint64(b[0]) >> 1 & 0x7 << 30) | (uint64(b[1]) << 22) | (uint64(b[2]) >> 1 & 0x7F << 15) | (uint64(b[3]) << 7) | (uint64(b[4]) >> 1 & 0x7F) return time.Duration(ts) } + +func GetMedia(pkt *Packet) *streamer.Media { + var codec *streamer.Codec + var kind string + + switch pkt.StreamType { + case StreamTypeH264: + codec = &streamer.Codec{ + Name: streamer.CodecH264, + ClockRate: 90000, + PayloadType: streamer.PayloadTypeRAW, + FmtpLine: h264.GetFmtpLine(pkt.Payload), + } + kind = streamer.KindVideo + + case StreamTypePCMA: + codec = &streamer.Codec{ + Name: streamer.CodecPCMA, + ClockRate: 8000, + } + kind = streamer.KindAudio + + default: + return nil + } + + return &streamer.Media{ + Kind: kind, + Direction: streamer.DirectionSendonly, + Codecs: []*streamer.Codec{codec}, + } +} diff --git a/pkg/streamer/helpers.go b/pkg/streamer/helpers.go index d376f605..10688a70 100644 --- a/pkg/streamer/helpers.go +++ b/pkg/streamer/helpers.go @@ -2,6 +2,7 @@ package streamer import ( "strings" + "time" ) type Info struct { @@ -50,3 +51,31 @@ func Contains(medias []*Media, media *Media, codec *Codec) bool { } return ok1 && ok2 } + +type Probe struct { + deadline time.Time + items map[interface{}]struct{} +} + +func NewProbe(enable bool) *Probe { + if enable { + return &Probe{ + deadline: time.Now().Add(time.Second * 3), + items: map[interface{}]struct{}{}, + } + } else { + return nil + } +} + +// Active return true if probe enabled and not finish +func (p *Probe) Active() bool { + return len(p.items) < 2 && time.Now().Before(p.deadline) +} + +// Append safe to run if Probe is nil +func (p *Probe) Append(v interface{}) { + if p != nil { + p.items[v] = struct{}{} + } +} diff --git a/pkg/streamer/track.go b/pkg/streamer/track.go index 1e71dc84..58b9d014 100644 --- a/pkg/streamer/track.go +++ b/pkg/streamer/track.go @@ -21,6 +21,13 @@ func NewTrack(codec *Codec, direction string) *Track { return &Track{Codec: codec, Direction: direction, sinkMu: new(sync.RWMutex)} } +func NewTrack2(media *Media, codec *Codec) *Track { + if codec == nil { + codec = media.Codecs[0] + } + return &Track{Codec: codec, Direction: media.Direction, sinkMu: new(sync.RWMutex)} +} + func (t *Track) String() string { s := t.Codec.String() if t.sinkMu.TryRLock() { diff --git a/pkg/tapo/client.go b/pkg/tapo/client.go new file mode 100644 index 00000000..d0e67d3d --- /dev/null +++ b/pkg/tapo/client.go @@ -0,0 +1,235 @@ +package tapo + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/md5" + "errors" + "fmt" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/mpegts" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/AlexxIT/go2rtc/pkg/tcp" + "github.com/pion/rtp" + "mime/multipart" + "net" + "net/http" + "net/url" + "strconv" + "strings" +) + +type Client struct { + streamer.Element + + url string + + medias []*streamer.Media + tracks map[byte]*streamer.Track + + conn net.Conn + reader *multipart.Reader + + decrypt func(b []byte) []byte +} + +// block ciphers using cipher block chaining. +type cbcMode interface { + cipher.BlockMode + SetIV([]byte) +} + +func NewClient(url string) *Client { + return &Client{url: url} +} + +func (c *Client) Dial() (err error) { + u, err := url.Parse(c.url) + if err != nil { + return + } + + // support raw username/password + username := u.User.Username() + password, _ := u.User.Password() + + // or cloud password in place of username + if password == "" { + password = fmt.Sprintf("%16X", md5.Sum([]byte(username))) + username = "admin" + u.User = url.UserPassword(username, password) + } + + u.Scheme = "http" + u.Path = "/stream" + if u.Port() == "" { + u.Host += ":8800" + } + + req, err := http.NewRequest("POST", u.String(), nil) + if err != nil { + return + } + + req.Header.Set("Content-Type", "multipart/mixed; boundary=--client-stream-boundary--") + + res, err := tcp.Do(req) + if err != nil { + return + } + + if res.StatusCode != http.StatusOK { + return errors.New(res.Status) + } + + // extract nonce from response + // cipher="AES_128_CBC" username="admin" padding="PKCS7_16" algorithm="MD5" nonce="***" + nonce := res.Header.Get("Key-Exchange") + nonce = streamer.Between(nonce, `nonce="`, `"`) + + key := md5.Sum([]byte(nonce + ":" + password)) + iv := md5.Sum([]byte(username + ":" + nonce)) + + block, err := aes.NewCipher(key[:]) + if err != nil { + return + } + + cbc := cipher.NewCBCDecrypter(block, iv[:]).(cbcMode) + + c.decrypt = func(b []byte) []byte { + // restore IV + cbc.SetIV(iv[:]) + + // decrypt + cbc.CryptBlocks(b, b) + + // unpad + padSize := int(b[len(b)-1]) + return b[:len(b)-padSize] + } + + c.conn = res.Body.(net.Conn) + + boundary := res.Header.Get("Content-Type") + _, boundary, _ = strings.Cut(boundary, "boundary=") + + c.reader = multipart.NewReader(c.conn, boundary) + + return nil +} + +func (c *Client) Play() (err error) { + // audio: default, disable, enable + body := []byte( + "----client-stream-boundary--\r\n" + + "Content-Type: application/json\r\nContent-Length: 120\r\n\r\n" + + `{"params":{"preview":{"audio":["default"],"channels":[0],"resolutions":["HD"]},"method":"get"},"seq":1,"type":"request"}` + + "\r\n", + ) + + _, err = c.conn.Write(body) + return nil +} + +// Handle - first run will be in probe state +func (c *Client) Handle() error { + if c.tracks == nil { + c.tracks = map[byte]*streamer.Track{} + } + + var audioSeq uint16 + var audioTS uint32 + + reader := mpegts.NewReader() + + probe := streamer.NewProbe(c.medias == nil) + for probe == nil || probe.Active() { + p, err := c.reader.NextRawPart() + if err != nil { + return err + } + + ct := p.Header.Get("Content-Type") + if ct != "video/mp2t" { + continue + } + + cl := p.Header.Get("Content-Length") + + size, err := strconv.Atoi(cl) + if err != nil { + return err + } + + body := make([]byte, size) + + b := body + for { + if n, err2 := p.Read(b); err2 == nil { + b = b[n:] + } else { + break + } + } + + body = c.decrypt(body) + reader.SetBuffer(body) + + for { + pkt := reader.GetPacket() + if pkt == nil { + break + } + + track := c.tracks[pkt.StreamType] + if track == nil { + // count track on probe state even if not support it + probe.Append(pkt.StreamType) + + media := mpegts.GetMedia(pkt) + if media == nil { + continue // unsupported codec + } + + track = streamer.NewTrack2(media, nil) + + c.medias = append(c.medias, media) + c.tracks[pkt.StreamType] = track + } + + switch track.Codec.Name { + case streamer.CodecH264: + packet := &rtp.Packet{ + Header: rtp.Header{Timestamp: uint32(pkt.PTS)}, + Payload: h264.AnnexB2AVC(pkt.Payload), + } + _ = track.WriteRTP(packet) + + //log.Printf("[AVC] %v, len: %d, pts: %d ts: %10d", h264.Types(packet.Payload), len(packet.Payload), pkt.PTS, packet.Timestamp) + + case streamer.CodecPCMA: + audioSeq++ + audioTS += uint32(len(pkt.Payload)) + + packet := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Timestamp: audioTS, + SequenceNumber: audioSeq, + }, + Payload: pkt.Payload, + } + _ = track.WriteRTP(packet) + //log.Printf("[PCM]len: %d, pts: %d ts: %10d, buf: %x", len(packet.Payload), pkt.PTS, packet.Timestamp, packet.Payload[:32]) + } + + } + } + + return nil +} + +func (c *Client) Close() error { + return c.conn.Close() +} diff --git a/pkg/tapo/producer.go b/pkg/tapo/producer.go new file mode 100644 index 00000000..cc9e5c44 --- /dev/null +++ b/pkg/tapo/producer.go @@ -0,0 +1,24 @@ +package tapo + +import "github.com/AlexxIT/go2rtc/pkg/streamer" + +func (c *Client) GetMedias() []*streamer.Media { + return c.medias +} + +func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { + for _, track := range c.tracks { + if track.Codec == codec { + return track + } + } + return nil +} + +func (c *Client) Start() error { + return c.Handle() +} + +func (c *Client) Stop() error { + return c.Close() +}