Add WebTorrent module

This commit is contained in:
Alexey Khit
2023-03-06 17:17:30 +03:00
parent 1e83dc85f7
commit 775b1818d1
10 changed files with 866 additions and 91 deletions
+164
View File
@@ -0,0 +1,164 @@
package webtorrent
import (
"errors"
"fmt"
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/cmd/webrtc"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/AlexxIT/go2rtc/pkg/webtorrent"
"github.com/rs/zerolog"
"net/http"
"net/url"
)
func Init() {
var cfg struct {
Mod struct {
Trackers []string `yaml:"trackers"`
Shares map[string]struct {
Pwd string `yaml:"pwd"`
Src string `yaml:"src"`
} `yaml:"shares"`
} `yaml:"webtorrent"`
}
cfg.Mod.Trackers = []string{"wss://tracker.openwebtorrent.com"}
app.LoadConfig(&cfg)
if len(cfg.Mod.Trackers) == 0 {
return
}
log = app.GetLogger("webtorrent")
streams.HandleFunc("webtorrent", streamHandle)
api.HandleFunc("api/webtorrent", apiHandle)
srv = &webtorrent.Server{
URL: cfg.Mod.Trackers[0],
Exchange: func(src, offer string) (answer string, err error) {
stream := streams.Get(src)
if stream == nil {
return "", errors.New(api.StreamNotFound)
}
return webrtc.ExchangeSDP(stream, offer, "webtorrent")
},
}
if log.Debug().Enabled() {
srv.Listen(func(msg interface{}) {
switch msg.(type) {
case string:
log.Debug().Msgf("[webtorrent] %s", msg)
case *webtorrent.Message:
log.Trace().Any("msg", msg).Msgf("[webtorrent]")
}
})
}
for name, share := range cfg.Mod.Shares {
if len(name) < 8 {
log.Warn().Str("name", name).Msgf("min share name len - 8 symbols")
continue
}
if len(share.Pwd) < 4 {
log.Warn().Str("name", name).Str("pwd", share.Pwd).Msgf("min share pwd len - 4 symbols")
continue
}
if streams.Get(share.Src) == nil {
log.Warn().Str("stream", share.Src).Msgf("stream not exists")
continue
}
srv.AddShare(name, share.Pwd, share.Src)
}
}
var log zerolog.Logger
var shares map[string]string
var srv *webtorrent.Server
func apiHandle(w http.ResponseWriter, r *http.Request) {
src := r.URL.Query().Get("src")
share, ok := shares[src]
switch r.Method {
case "GET":
// support act as WebTorrent tracker (for testing purposes)
if r.Header.Get("Connection") == "Upgrade" {
tracker(w, r)
return
}
if ok {
pwd := srv.GetSharePwd(share)
data := fmt.Sprintf(`{"share":%q,"pwd":%q}`, share, pwd)
_, _ = w.Write([]byte(data))
} else {
http.Error(w, "", http.StatusNotFound)
}
case "POST":
// check if share already exist
if ok {
http.Error(w, "", http.StatusBadRequest)
return
}
// check if stream exists
if stream := streams.Get(src); stream == nil {
http.Error(w, "", http.StatusNotFound)
return
}
// create new random share
share = core.RandString(16)
pwd := core.RandString(16)
srv.AddShare(share, pwd, src)
if shares == nil {
shares = map[string]string{}
}
shares[src] = share
w.WriteHeader(http.StatusCreated)
data := fmt.Sprintf(`{"share":%q,"pwd":%q}`, share, pwd)
_, _ = w.Write([]byte(data))
case "DELETE":
if ok {
srv.RemoveShare(share)
delete(shares, src)
} else {
http.Error(w, "", http.StatusNotFound)
}
}
}
func streamHandle(rawURL string) (streamer.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
}
query := u.Query()
share := query.Get("share")
pwd := query.Get("pwd")
if len(share) < 8 || len(pwd) < 4 {
return nil, errors.New("wrong URL: " + rawURL)
}
pc, err := webrtc.PeerConnection(true)
if err != nil {
return nil, err
}
return webtorrent.NewClient(srv.URL, share, pwd, pc)
}
+107
View File
@@ -0,0 +1,107 @@
package webtorrent
import (
"fmt"
"github.com/AlexxIT/go2rtc/pkg/webtorrent"
"github.com/gorilla/websocket"
"net/http"
)
var upgrader *websocket.Upgrader
var hashes map[string]map[string]*websocket.Conn
func tracker(w http.ResponseWriter, r *http.Request) {
if upgrader == nil {
upgrader = &websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 2028,
}
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
}
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Warn().Err(err).Send()
return
}
defer ws.Close()
for {
var msg webtorrent.Message
if err = ws.ReadJSON(&msg); err != nil {
return
}
//log.Trace().Msgf("[webtorrent] message=%v", msg)
if msg.InfoHash == "" || msg.PeerId == "" {
continue
}
if hashes == nil {
hashes = map[string]map[string]*websocket.Conn{}
}
// new or old client with offers
clients := hashes[msg.InfoHash]
if clients == nil {
clients = map[string]*websocket.Conn{
msg.PeerId: ws,
}
hashes[msg.InfoHash] = clients
} else {
clients[msg.PeerId] = ws
}
switch {
case msg.Offers != nil:
// ask for ping
raw := fmt.Sprintf(
`{"action":"announce","interval":120,"info_hash":"%s","complete":0,"incomplete":1}`,
msg.InfoHash,
)
if err = ws.WriteMessage(websocket.TextMessage, []byte(raw)); err != nil {
log.Warn().Err(err).Send()
return
}
// skip if no offers (server)
if len(msg.Offers) == 0 {
continue
}
// get and check only first offer
offer := msg.Offers[0]
if offer.OfferId == "" || offer.Offer.Type != "offer" || offer.Offer.SDP == "" {
continue
}
// send offer to all clients (one of them - server)
raw = fmt.Sprintf(
`{"action":"announce","info_hash":"%s","peer_id":"%s","offer_id":"%s","offer":{"type":"offer","sdp":"%s"}}`,
msg.InfoHash, msg.PeerId, offer.OfferId, offer.Offer.SDP,
)
for _, server := range clients {
if server != ws {
_ = server.WriteMessage(websocket.TextMessage, []byte(raw))
}
}
case msg.OfferId != "" && msg.ToPeerId != "" && msg.Answer != nil:
ws1, ok := clients[msg.ToPeerId]
if !ok {
continue
}
raw := fmt.Sprintf(
`{"action":"announce","info_hash":"%s","peer_id":"%s","offer_id":"%s","answer":{"type":"answer","sdp":"%s"}}`,
msg.InfoHash, msg.PeerId, msg.OfferId, msg.Answer.SDP,
)
_ = ws1.WriteMessage(websocket.TextMessage, []byte(raw))
}
}
}