From 775b1818d1bd0feb15b6cea2443a651e6eb856ee Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Mon, 6 Mar 2023 17:17:30 +0300 Subject: [PATCH] Add WebTorrent module --- cmd/webtorrent/init.go | 164 ++++++++++++++++++++++++++++ cmd/webtorrent/tracker.go | 107 ++++++++++++++++++ main.go | 2 + pkg/core/helpers.go | 72 ++----------- pkg/core/waiter.go | 71 ++++++++++++ pkg/core/worker.go | 52 +++++++++ pkg/webtorrent/client.go | 91 ++++++++++++++++ pkg/webtorrent/crypto.go | 72 +++++++++++++ pkg/webtorrent/server.go | 222 ++++++++++++++++++++++++++++++++++++++ www/links.html | 104 +++++++++++++----- 10 files changed, 866 insertions(+), 91 deletions(-) create mode 100644 cmd/webtorrent/init.go create mode 100644 cmd/webtorrent/tracker.go create mode 100644 pkg/core/waiter.go create mode 100644 pkg/core/worker.go create mode 100644 pkg/webtorrent/client.go create mode 100644 pkg/webtorrent/crypto.go create mode 100644 pkg/webtorrent/server.go diff --git a/cmd/webtorrent/init.go b/cmd/webtorrent/init.go new file mode 100644 index 00000000..3872e89c --- /dev/null +++ b/cmd/webtorrent/init.go @@ -0,0 +1,164 @@ +package webtorrent + +import ( + "errors" + "fmt" + "github.com/AlexxIT/go2rtc/cmd/api" + "github.com/AlexxIT/go2rtc/cmd/app" + "github.com/AlexxIT/go2rtc/cmd/streams" + "github.com/AlexxIT/go2rtc/cmd/webrtc" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/AlexxIT/go2rtc/pkg/webtorrent" + "github.com/rs/zerolog" + "net/http" + "net/url" +) + +func Init() { + var cfg struct { + Mod struct { + Trackers []string `yaml:"trackers"` + Shares map[string]struct { + Pwd string `yaml:"pwd"` + Src string `yaml:"src"` + } `yaml:"shares"` + } `yaml:"webtorrent"` + } + + cfg.Mod.Trackers = []string{"wss://tracker.openwebtorrent.com"} + + app.LoadConfig(&cfg) + + if len(cfg.Mod.Trackers) == 0 { + return + } + + log = app.GetLogger("webtorrent") + + streams.HandleFunc("webtorrent", streamHandle) + + api.HandleFunc("api/webtorrent", apiHandle) + + srv = &webtorrent.Server{ + URL: cfg.Mod.Trackers[0], + Exchange: func(src, offer string) (answer string, err error) { + stream := streams.Get(src) + if stream == nil { + return "", errors.New(api.StreamNotFound) + } + return webrtc.ExchangeSDP(stream, offer, "webtorrent") + }, + } + + if log.Debug().Enabled() { + srv.Listen(func(msg interface{}) { + switch msg.(type) { + case string: + log.Debug().Msgf("[webtorrent] %s", msg) + case *webtorrent.Message: + log.Trace().Any("msg", msg).Msgf("[webtorrent]") + } + }) + } + + for name, share := range cfg.Mod.Shares { + if len(name) < 8 { + log.Warn().Str("name", name).Msgf("min share name len - 8 symbols") + continue + } + if len(share.Pwd) < 4 { + log.Warn().Str("name", name).Str("pwd", share.Pwd).Msgf("min share pwd len - 4 symbols") + continue + } + if streams.Get(share.Src) == nil { + log.Warn().Str("stream", share.Src).Msgf("stream not exists") + continue + } + + srv.AddShare(name, share.Pwd, share.Src) + } +} + +var log zerolog.Logger + +var shares map[string]string +var srv *webtorrent.Server + +func apiHandle(w http.ResponseWriter, r *http.Request) { + src := r.URL.Query().Get("src") + share, ok := shares[src] + + switch r.Method { + case "GET": + // support act as WebTorrent tracker (for testing purposes) + if r.Header.Get("Connection") == "Upgrade" { + tracker(w, r) + return + } + + if ok { + pwd := srv.GetSharePwd(share) + data := fmt.Sprintf(`{"share":%q,"pwd":%q}`, share, pwd) + _, _ = w.Write([]byte(data)) + } else { + http.Error(w, "", http.StatusNotFound) + } + + case "POST": + // check if share already exist + if ok { + http.Error(w, "", http.StatusBadRequest) + return + } + + // check if stream exists + if stream := streams.Get(src); stream == nil { + http.Error(w, "", http.StatusNotFound) + return + } + + // create new random share + share = core.RandString(16) + pwd := core.RandString(16) + srv.AddShare(share, pwd, src) + + if shares == nil { + shares = map[string]string{} + } + shares[src] = share + + w.WriteHeader(http.StatusCreated) + data := fmt.Sprintf(`{"share":%q,"pwd":%q}`, share, pwd) + _, _ = w.Write([]byte(data)) + + case "DELETE": + if ok { + srv.RemoveShare(share) + delete(shares, src) + } else { + http.Error(w, "", http.StatusNotFound) + } + } +} + +func streamHandle(rawURL string) (streamer.Producer, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + query := u.Query() + share := query.Get("share") + pwd := query.Get("pwd") + if len(share) < 8 || len(pwd) < 4 { + return nil, errors.New("wrong URL: " + rawURL) + } + + pc, err := webrtc.PeerConnection(true) + if err != nil { + return nil, err + } + + return webtorrent.NewClient(srv.URL, share, pwd, pc) +} diff --git a/cmd/webtorrent/tracker.go b/cmd/webtorrent/tracker.go new file mode 100644 index 00000000..4e5bd9b3 --- /dev/null +++ b/cmd/webtorrent/tracker.go @@ -0,0 +1,107 @@ +package webtorrent + +import ( + "fmt" + "github.com/AlexxIT/go2rtc/pkg/webtorrent" + "github.com/gorilla/websocket" + "net/http" +) + +var upgrader *websocket.Upgrader +var hashes map[string]map[string]*websocket.Conn + +func tracker(w http.ResponseWriter, r *http.Request) { + if upgrader == nil { + upgrader = &websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 2028, + } + upgrader.CheckOrigin = func(r *http.Request) bool { + return true + } + } + + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Warn().Err(err).Send() + return + } + + defer ws.Close() + + for { + var msg webtorrent.Message + if err = ws.ReadJSON(&msg); err != nil { + return + } + + //log.Trace().Msgf("[webtorrent] message=%v", msg) + + if msg.InfoHash == "" || msg.PeerId == "" { + continue + } + + if hashes == nil { + hashes = map[string]map[string]*websocket.Conn{} + } + + // new or old client with offers + clients := hashes[msg.InfoHash] + if clients == nil { + clients = map[string]*websocket.Conn{ + msg.PeerId: ws, + } + hashes[msg.InfoHash] = clients + } else { + clients[msg.PeerId] = ws + } + + switch { + case msg.Offers != nil: + // ask for ping + raw := fmt.Sprintf( + `{"action":"announce","interval":120,"info_hash":"%s","complete":0,"incomplete":1}`, + msg.InfoHash, + ) + if err = ws.WriteMessage(websocket.TextMessage, []byte(raw)); err != nil { + log.Warn().Err(err).Send() + return + } + + // skip if no offers (server) + if len(msg.Offers) == 0 { + continue + } + + // get and check only first offer + offer := msg.Offers[0] + if offer.OfferId == "" || offer.Offer.Type != "offer" || offer.Offer.SDP == "" { + continue + } + + // send offer to all clients (one of them - server) + raw = fmt.Sprintf( + `{"action":"announce","info_hash":"%s","peer_id":"%s","offer_id":"%s","offer":{"type":"offer","sdp":"%s"}}`, + msg.InfoHash, msg.PeerId, offer.OfferId, offer.Offer.SDP, + ) + + for _, server := range clients { + if server != ws { + _ = server.WriteMessage(websocket.TextMessage, []byte(raw)) + } + } + + case msg.OfferId != "" && msg.ToPeerId != "" && msg.Answer != nil: + ws1, ok := clients[msg.ToPeerId] + if !ok { + continue + } + + raw := fmt.Sprintf( + `{"action":"announce","info_hash":"%s","peer_id":"%s","offer_id":"%s","answer":{"type":"answer","sdp":"%s"}}`, + msg.InfoHash, msg.PeerId, msg.OfferId, msg.Answer.SDP, + ) + _ = ws1.WriteMessage(websocket.TextMessage, []byte(raw)) + } + } +} diff --git a/main.go b/main.go index fb62c3b6..2dbea5db 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ import ( "github.com/AlexxIT/go2rtc/cmd/streams" "github.com/AlexxIT/go2rtc/cmd/tapo" "github.com/AlexxIT/go2rtc/cmd/webrtc" + "github.com/AlexxIT/go2rtc/cmd/webtorrent" "os" "os/signal" "syscall" @@ -55,6 +56,7 @@ func main() { hls.Init() mjpeg.Init() + webtorrent.Init() ngrok.Init() debug.Init() diff --git a/pkg/core/helpers.go b/pkg/core/helpers.go index eed265da..2813e399 100644 --- a/pkg/core/helpers.go +++ b/pkg/core/helpers.go @@ -1,71 +1,19 @@ package core import ( - "sync" + cryptorand "crypto/rand" ) -// Waiter support: -// - autotart on first Wait -// - block new waiters after last Done -// - safe Done after finish -type Waiter struct { - sync.WaitGroup - mu sync.Mutex - state int // state < 0 means finish -} +const digits = "0123456789abcdefghijklmnopqrstuvwxyz" +const maxSize = byte(len(digits)) -func (w *Waiter) Add(delta int) { - w.mu.Lock() - if w.state >= 0 { - w.state += delta - w.WaitGroup.Add(delta) +func RandString(size byte) string { + b := make([]byte, size) + if _, err := cryptorand.Read(b); err != nil { + panic(err) } - w.mu.Unlock() -} - -func (w *Waiter) Wait() { - w.mu.Lock() - // first wait auto start waiter - if w.state == 0 { - w.state++ - w.WaitGroup.Add(1) + for i := byte(0); i < size; i++ { + b[i] = digits[b[i]%maxSize] } - w.mu.Unlock() - - w.WaitGroup.Wait() -} - -func (w *Waiter) Done() { - w.mu.Lock() - - // safe run Done only when have tasks - if w.state > 0 { - w.state-- - w.WaitGroup.Done() - } - - // block waiter for any operations after last done - if w.state == 0 { - w.state = -1 - } - - w.mu.Unlock() -} - -func (w *Waiter) WaitChan() <-chan struct{} { - var ch chan struct{} - - w.mu.Lock() - - if w.state >= 0 { - ch = make(chan struct{}) - go func() { - w.Wait() - ch <- struct{}{} - }() - } - - w.mu.Unlock() - - return ch + return string(b) } diff --git a/pkg/core/waiter.go b/pkg/core/waiter.go new file mode 100644 index 00000000..eed265da --- /dev/null +++ b/pkg/core/waiter.go @@ -0,0 +1,71 @@ +package core + +import ( + "sync" +) + +// Waiter support: +// - autotart on first Wait +// - block new waiters after last Done +// - safe Done after finish +type Waiter struct { + sync.WaitGroup + mu sync.Mutex + state int // state < 0 means finish +} + +func (w *Waiter) Add(delta int) { + w.mu.Lock() + if w.state >= 0 { + w.state += delta + w.WaitGroup.Add(delta) + } + w.mu.Unlock() +} + +func (w *Waiter) Wait() { + w.mu.Lock() + // first wait auto start waiter + if w.state == 0 { + w.state++ + w.WaitGroup.Add(1) + } + w.mu.Unlock() + + w.WaitGroup.Wait() +} + +func (w *Waiter) Done() { + w.mu.Lock() + + // safe run Done only when have tasks + if w.state > 0 { + w.state-- + w.WaitGroup.Done() + } + + // block waiter for any operations after last done + if w.state == 0 { + w.state = -1 + } + + w.mu.Unlock() +} + +func (w *Waiter) WaitChan() <-chan struct{} { + var ch chan struct{} + + w.mu.Lock() + + if w.state >= 0 { + ch = make(chan struct{}) + go func() { + w.Wait() + ch <- struct{}{} + }() + } + + w.mu.Unlock() + + return ch +} diff --git a/pkg/core/worker.go b/pkg/core/worker.go new file mode 100644 index 00000000..f0138751 --- /dev/null +++ b/pkg/core/worker.go @@ -0,0 +1,52 @@ +package core + +import ( + "time" +) + +type Worker struct { + timer *time.Timer + done chan struct{} +} + +// NewWorker run f after d +func NewWorker(d time.Duration, f func() time.Duration) *Worker { + timer := time.NewTimer(d) + done := make(chan struct{}) + + go func() { + for { + select { + case <-timer.C: + if d = f(); d > 0 { + timer.Reset(d) + continue + } + case <-done: + timer.Stop() + } + break + } + }() + + return &Worker{timer: timer, done: done} +} + +// Do - instant timer run +func (w *Worker) Do() { + if w == nil { + return + } + w.timer.Reset(0) +} + +func (w *Worker) Stop() { + if w == nil { + return + } + + select { + case w.done <- struct{}{}: + default: + } +} diff --git a/pkg/webtorrent/client.go b/pkg/webtorrent/client.go new file mode 100644 index 00000000..0142d50e --- /dev/null +++ b/pkg/webtorrent/client.go @@ -0,0 +1,91 @@ +package webtorrent + +import ( + "encoding/base64" + "fmt" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/AlexxIT/go2rtc/pkg/webrtc" + "github.com/gorilla/websocket" + pion "github.com/pion/webrtc/v3" + "strconv" + "time" +) + +func NewClient(tracker, share, pwd string, pc *pion.PeerConnection) (*webrtc.Conn, error) { + // 1. Create WebRTC producer + prod := webrtc.NewConn(pc) + + medias := []*streamer.Media{ + {Kind: streamer.KindVideo, Direction: streamer.DirectionRecvonly}, + {Kind: streamer.KindAudio, Direction: streamer.DirectionRecvonly}, + } + + // 2. Create offer + offer, err := prod.CreateCompleteOffer(medias) + if err != nil { + return nil, err + } + + // 3. Encrypt offer + nonce := strconv.FormatInt(time.Now().UnixNano(), 36) + + cipher, err := NewCipher(share, pwd, nonce) + if err != nil { + return nil, err + } + + enc := cipher.Encrypt([]byte(offer)) + + // 4. Connect to Tracker + ws, _, err := websocket.DefaultDialer.Dial(tracker, nil) + if err != nil { + return nil, err + } + + defer ws.Close() + + // 5. Send offer + msg := fmt.Sprintf( + `{"action":"announce","info_hash":"%s","peer_id":"%s","offers":[{"offer_id":"%s","offer":{"type":"offer","sdp":"%s"}}],"numwant":1}`, + InfoHash(share), core.RandString(16), nonce, base64.StdEncoding.EncodeToString(enc), + ) + if err = ws.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil { + return nil, err + } + + // wait 30 seconds until full answer + if err = ws.SetReadDeadline(time.Now().Add(time.Second * 30)); err != nil { + return nil, err + } + + for { + // 6. Read answer + var v Message + if err = ws.ReadJSON(&v); err != nil { + return nil, err + } + + if v.Answer == nil { + continue + } + + // 7. Decrypt answer + enc, err = base64.StdEncoding.DecodeString(v.Answer.SDP) + if err != nil { + return nil, err + } + + answer, err := cipher.Decrypt(enc) + if err != nil { + return nil, err + } + + // 8. Set answer + if err = prod.SetAnswer(string(answer)); err != nil { + return nil, err + } + + return prod, nil + } +} diff --git a/pkg/webtorrent/crypto.go b/pkg/webtorrent/crypto.go new file mode 100644 index 00000000..27f21187 --- /dev/null +++ b/pkg/webtorrent/crypto.go @@ -0,0 +1,72 @@ +package webtorrent + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/sha256" + "encoding/base64" + "fmt" + "strconv" + "time" +) + +type Cipher struct { + gcm cipher.AEAD + iv []byte + nonce []byte +} + +func NewCipher(share, pwd, nonce string) (*Cipher, error) { + timestamp, err := strconv.ParseInt(nonce, 36, 0) + if err != nil { + return nil, err + } + + delta := time.Duration(time.Now().UnixNano() - timestamp) + if delta < 0 { + delta = -delta + } + + // protect from replay attack, but respect wrong timezone on server + if delta > 12*time.Hour { + return nil, fmt.Errorf("wrong timedelta %s", delta) + } + + c := &Cipher{} + + hash := sha256.New() + hash.Write([]byte(nonce + ":" + pwd)) + key := hash.Sum(nil) + + hash.Reset() + hash.Write([]byte(share + ":" + nonce)) + c.iv = hash.Sum(nil)[:12] + + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + c.gcm, err = cipher.NewGCM(block) + if err != nil { + return nil, err + } + + c.nonce = []byte(nonce) + + return c, nil +} + +func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) { + return c.gcm.Open(nil, c.iv, ciphertext, c.nonce) +} + +func (c *Cipher) Encrypt(plaintext []byte) []byte { + return c.gcm.Seal(nil, c.iv, plaintext, c.nonce) +} + +func InfoHash(share string) string { + hash := sha256.New() + hash.Write([]byte(share)) + sum := hash.Sum(nil) + return base64.StdEncoding.EncodeToString(sum) +} diff --git a/pkg/webtorrent/server.go b/pkg/webtorrent/server.go new file mode 100644 index 00000000..a2850b80 --- /dev/null +++ b/pkg/webtorrent/server.go @@ -0,0 +1,222 @@ +package webtorrent + +import ( + "encoding/base64" + "fmt" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/gorilla/websocket" + "sync" + "time" +) + +type Server struct { + streamer.Element + + URL string + Exchange func(src, offer string) (answer string, err error) + + shares map[string]*Share + mu sync.Mutex + announce *core.Worker +} + +type Share struct { + name string + pwd string + src string +} + +func (s *Server) AddShare(name, pwd, src string) { + s.mu.Lock() + + if s.shares == nil { + s.shares = map[string]*Share{} + } + + if len(s.shares) == 0 { + go s.Serve() + } + + hash := InfoHash(name) + s.shares[hash] = &Share{ + name: name, + pwd: pwd, + src: src, + } + + s.announce.Do() + + s.mu.Unlock() +} + +func (s *Server) GetSharePwd(name string) (pwd string) { + hash := InfoHash(name) + s.mu.Lock() + if share, ok := s.shares[hash]; ok { + pwd = share.pwd + } + s.mu.Unlock() + return +} + +func (s *Server) RemoveShare(name string) { + hash := InfoHash(name) + s.mu.Lock() + if _, ok := s.shares[hash]; ok { + delete(s.shares, hash) + } + s.mu.Unlock() +} + +// Serve - run reconnection loop, will exit on?? +func (s *Server) Serve() error { + for s.HasShares() { + s.Fire("connect to tracker: " + s.URL) + + ws, _, err := websocket.DefaultDialer.Dial(s.URL, nil) + if err != nil { + s.Fire(err) + time.Sleep(time.Minute) + continue + } + + peerID := core.RandString(16) + + // instant run announce worker + s.announce = core.NewWorker(0, func() time.Duration { + if err = s.writer(ws, peerID); err != nil { + return 0 + } + return time.Minute + }) + + // run reader forewer + for { + if err = s.reader(ws, peerID); err != nil { + break + } + } + + // stop announcing worker + s.announce.Stop() + + // ensure ws is stopped + _ = ws.Close() + + time.Sleep(time.Minute) + } + + s.Fire("disconnect") + + return nil +} + +// reader - receive offers in the loop, will exit on ws.Close +func (s *Server) reader(ws *websocket.Conn, peerID string) error { + var v Message + if err := ws.ReadJSON(&v); err != nil { + return err + } + + s.Fire(&v) + + s.mu.Lock() + share, ok := s.shares[v.InfoHash] + s.mu.Unlock() + + // skip any unknown shares + if !ok || v.OfferId == "" || v.Offer == nil { + return nil + } + + cipher, err := NewCipher(share.name, share.pwd, v.OfferId) + if err != nil { + return nil + } + + enc, err := base64.StdEncoding.DecodeString(v.Offer.SDP) + if err != nil { + return nil + } + + offer, err := cipher.Decrypt(enc) + if err != nil { + return nil + } + + answer, err := s.Exchange(share.src, string(offer)) + if err != nil { + return nil + } + + enc = cipher.Encrypt([]byte(answer)) + + raw := fmt.Sprintf( + `{"action":"announce","info_hash":"%s","peer_id":"%s","offer_id":"%s","answer":{"type":"answer","sdp":"%s"},"to_peer_id":"%s"}`, + v.InfoHash, peerID, v.OfferId, base64.StdEncoding.EncodeToString(enc), v.PeerId, + ) + return ws.WriteMessage(websocket.TextMessage, []byte(raw)) +} + +func (s *Server) writer(ws *websocket.Conn, peerID string) error { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.shares) == 0 { + return ws.Close() + } + + for hash := range s.shares { + msg := fmt.Sprintf( + `{"action":"announce","info_hash":"%s","peer_id":"%s","offers":[],"numwant":10}`, + hash, peerID, + ) + if err := ws.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil { + return err + } + } + + return nil +} + +func (s *Server) HasShares() bool { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.shares) > 0 +} + +type Message struct { + Action string `json:"action"` + InfoHash string `json:"info_hash"` + + // Announce msg + Numwant int `json:"numwant,omitempty"` + PeerId string `json:"peer_id,omitempty"` + Offers []struct { + OfferId string `json:"offer_id"` + Offer struct { + Type string `json:"type"` + SDP string `json:"sdp"` + } `json:"offer"` + } `json:"offers,omitempty"` + + // Interval msg + Interval int `json:"interval,omitempty"` + Complete int `json:"complete,omitempty"` + Incomplete int `json:"incomplete,omitempty"` + + // Offer msg + OfferId string `json:"offer_id,omitempty"` + Offer *struct { + Type string `json:"type"` + SDP string `json:"sdp"` + } `json:"offer,omitempty"` + + // Answer msg + ToPeerId string `json:"to_peer_id,omitempty"` + Answer *struct { + Type string `json:"type"` + SDP string `json:"sdp"` + } `json:"answer,omitempty"` +} diff --git a/www/links.html b/www/links.html index 78b37bc3..f0623e08 100644 --- a/www/links.html +++ b/www/links.html @@ -16,7 +16,7 @@ flex-direction: column; } - html, body, #config { + html, body { width: 100%; height: 100%; } @@ -41,43 +41,26 @@ -
-

Play audio

- - send / cameras with two way audio support -
+ +
+

Play audio

+ + send / cameras with two way audio support +
+ + +
+

Share stream

+ share + copy link + delete +
+ +