Compare commits

...

27 Commits

Author SHA1 Message Date
Alex X a4885c2c3a Update version to 1.9.4 2024-06-18 21:33:36 +03:00
Alex X f5aaee006e Merge pull request #1168 from skrashevich/fix-flags-daemon
fix(app): Refactor daemon initialization and add syscall import
2024-06-18 20:53:38 +03:00
Alex X db6745e8ff Code refactoring after #1168 2024-06-18 20:35:17 +03:00
Alex X ba34855602 Merge pull request #1196 from skrashevich/feat-network-dot-enhancements
refactor(webui): enhance network visualization in network.html
2024-06-16 22:24:11 +03:00
Alex X e6fa97c738 Code refactoring after #1196 2024-06-16 22:12:52 +03:00
Sergey Krashevich 5b481a27c6 fix(network): enable autoResize in network settings 2024-06-16 21:57:48 +03:00
Alex X bdc7ff1035 Fix forwarded remote_addr in the network 2024-06-16 19:04:34 +03:00
Alex X da5f060741 Add killsignal and killtimeout to exec/rtsp 2024-06-16 19:03:57 +03:00
Alex X a56d335380 Fix homekit producer remote_addr 2024-06-16 15:26:18 +03:00
Sergey Krashevich d8aed552bc fix(network): ensure consistent node positions by storing and reusing seed 2024-06-16 15:22:33 +03:00
Alex X d7286fa06e Merge pull request #1195 from skrashevich/fix-append-dot
fix(streams): handle missing codec_name in appendDOT function
2024-06-16 15:20:51 +03:00
Alex X 906f554d74 Code refactoring after #1195 2024-06-16 15:19:50 +03:00
Sergey Krashevich cb44d5431a feat(network): preserve pan and scale on data reload 2024-06-16 15:01:40 +03:00
Sergey Krashevich a69eb8a66e style(network): add flex-grow to network div and move script tag 2024-06-16 14:54:02 +03:00
Sergey Krashevich 1b411b1fed refactor(streams): optimize label generation with strings.Builder
feat(network): add periodic data fetching and network update
2024-06-16 10:19:17 +03:00
Sergey Krashevich 5d57959608 fix(streams): handle missing codec_name in appendDOT function 2024-06-16 08:59:06 +03:00
Alex X 31e57c2ff8 Fix errors output for webrtc client and server 2024-06-16 06:37:42 +03:00
Alex X 734393d638 Add streaming network visualisation 2024-06-16 06:36:24 +03:00
Alex X 96504e2fb0 BIG rewrite stream info 2024-06-16 06:20:45 +03:00
Alex X ecfe802065 Code refactoring for streams HandleFunc 2024-06-14 12:52:55 +03:00
Alex X 1ac9d54dab Code refactoring for stream MarshalJSON 2024-06-10 16:42:34 +03:00
Sergey Krashevich 72d7e8aaaa refactor(app): remove syscall import and improve error messages 2024-06-08 15:05:26 +03:00
Alex X 0395696866 Fix exec pipe output 2024-06-07 17:59:21 +03:00
Alex X 0667683e4d Restore support old cipher suites after go1.22 #1172 2024-06-07 17:57:36 +03:00
Alex X aca0781c4b Code refactoring for api/streams 2024-06-07 12:25:58 +03:00
Sergey Krashevich b389d0eb9c fix(app): handle daemon process correctly on Unix systems 2024-06-06 18:54:40 +03:00
Alex X bf303ed471 Fix -d flag 2024-06-06 17:58:31 +03:00
110 changed files with 1595 additions and 1152 deletions
+4 -10
View File
@@ -45,22 +45,16 @@ func Init() {
os.Exit(0)
}
if daemon {
if daemon && os.Getppid() != 1 {
if runtime.GOOS == "windows" {
fmt.Println("Daemon not supported on Windows")
fmt.Println("Daemon mode is not supported on Windows")
os.Exit(1)
}
args := os.Args[1:]
for i, arg := range args {
if arg == "-daemon" {
args[i] = ""
}
}
// Re-run the program in background and exit
cmd := exec.Command(os.Args[0], args...)
cmd := exec.Command(os.Args[0], os.Args[1:]...)
if err := cmd.Start(); err != nil {
fmt.Println(err)
fmt.Println("Failed to start daemon:", err)
os.Exit(1)
}
fmt.Println("Running in daemon mode with PID:", cmd.Process.Pid)
+3 -9
View File
@@ -7,13 +7,7 @@ import (
)
func Init() {
streams.HandleFunc("bubble", handle)
}
func handle(url string) (core.Producer, error) {
conn := bubble.NewClient(url)
if err := conn.Dial(); err != nil {
return nil, err
}
return conn, nil
streams.HandleFunc("bubble", func(source string) (core.Producer, error) {
return bubble.Dial(source)
})
}
-8
View File
@@ -2,16 +2,8 @@ package debug
import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func Init() {
api.HandleFunc("api/stack", stackHandler)
streams.HandleFunc("null", nullHandler)
}
func nullHandler(string) (core.Producer, error) {
return nil, nil
}
+1 -10
View File
@@ -10,25 +10,16 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/dvrip"
)
func Init() {
streams.HandleFunc("dvrip", handle)
streams.HandleFunc("dvrip", dvrip.Dial)
// DVRIP client autodiscovery
api.HandleFunc("api/dvrip", apiDvrip)
}
func handle(url string) (core.Producer, error) {
client, err := dvrip.Dial(url)
if err != nil {
return nil, err
}
return client, nil
}
const Port = 34569 // UDP port number for dvrip discovery
func apiDvrip(w http.ResponseWriter, r *http.Request) {
+39
View File
@@ -0,0 +1,39 @@
package exec
import (
"errors"
"net/url"
"os"
"os/exec"
"syscall"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
)
// closer support custom killsignal with custom killtimeout
type closer struct {
cmd *exec.Cmd
query url.Values
}
func (c *closer) Close() (err error) {
sig := os.Kill
if s := c.query.Get("killsignal"); s != "" {
sig = syscall.Signal(core.Atoi(s))
}
log.Trace().Msgf("[exec] kill with signal=%d", sig)
err = c.cmd.Process.Signal(sig)
if s := c.query.Get("killtimeout"); s != "" {
timeout := time.Duration(core.Atoi(s)) * time.Second
timer := time.AfterFunc(timeout, func() {
log.Trace().Msgf("[exec] kill after timeout=%s", s)
_ = c.cmd.Process.Kill()
})
defer timer.Stop() // stop timer if Wait ends before timeout
}
return errors.Join(err, c.cmd.Wait())
}
+54 -25
View File
@@ -1,13 +1,16 @@
package exec
import (
"bufio"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"net/url"
"os"
"os/exec"
"slices"
"strings"
"sync"
"time"
@@ -48,8 +51,10 @@ func Init() {
}
func execHandle(rawURL string) (core.Producer, error) {
rawURL, rawQuery, _ := strings.Cut(rawURL, "#")
query := streams.ParseQuery(rawQuery)
var path string
var query url.Values
// RTSP flow should have `{output}` inside URL
// pipe flow may have `#{params}` inside URL
@@ -61,9 +66,6 @@ func execHandle(rawURL string) (core.Producer, error) {
sum := md5.Sum([]byte(rawURL))
path = "/" + hex.EncodeToString(sum[:])
rawURL = rawURL[:i] + "rtsp://127.0.0.1:" + rtsp.Port + path + rawURL[i+8:]
} else if i = strings.IndexByte(rawURL, '#'); i > 0 {
query = streams.ParseQuery(rawURL[i+1:])
rawURL = rawURL[:i]
}
args := shell.QuoteSplit(rawURL[5:]) // remove `exec:`
@@ -73,23 +75,34 @@ func execHandle(rawURL string) (core.Producer, error) {
debug: log.Debug().Enabled(),
}
if path == "" {
return handlePipe(rawURL, cmd, query)
}
return handleRTSP(rawURL, cmd, path)
}
func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error) {
if query.Get("backchannel") == "1" {
return stdin.NewClient(cmd)
}
r, err := PipeCloser(cmd, query)
cl := &closer{cmd: cmd, query: query}
if path == "" {
return handlePipe(rawURL, cmd, cl)
}
return handleRTSP(rawURL, cmd, cl, path)
}
func handlePipe(source string, cmd *exec.Cmd, cl io.Closer) (core.Producer, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
rc := struct {
io.Reader
io.Closer
}{
// add buffer for pipe reader to reduce syscall
bufio.NewReaderSize(stdout, core.BufferSize),
cl,
}
log.Debug().Strs("args", cmd.Args).Msg("[exec] run pipe")
ts := time.Now()
@@ -98,17 +111,23 @@ func handlePipe(_ string, cmd *exec.Cmd, query url.Values) (core.Producer, error
return nil, err
}
prod, err := magic.Open(r)
prod, err := magic.Open(rc)
if err != nil {
_ = r.Close()
_ = rc.Close()
return nil, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr)
}
if info, ok := prod.(core.Info); ok {
info.SetProtocol("pipe")
setRemoteInfo(info, source, cmd.Args)
}
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run pipe")
return prod, fmt.Errorf("exec/pipe: %w\n%s", err, cmd.Stderr)
return prod, nil
}
func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
func handleRTSP(source string, cmd *exec.Cmd, cl io.Closer, path string) (core.Producer, error) {
if log.Trace().Enabled() {
cmd.Stdout = os.Stdout
}
@@ -130,7 +149,7 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
ts := time.Now()
if err := cmd.Start(); err != nil {
log.Error().Err(err).Str("url", url).Msg("[exec]")
log.Error().Err(err).Str("source", source).Msg("[exec]")
return nil, err
}
@@ -140,19 +159,17 @@ func handleRTSP(url string, cmd *exec.Cmd, path string) (core.Producer, error) {
}()
select {
case <-time.After(time.Second * 60):
_ = cmd.Process.Kill()
log.Error().Str("url", url).Msg("[exec] timeout")
case <-time.After(time.Minute):
log.Error().Str("source", source).Msg("[exec] timeout")
_ = cl.Close()
return nil, errors.New("exec: timeout")
case <-done:
// limit message size
return nil, fmt.Errorf("exec/rtsp\n%s", cmd.Stderr)
case prod := <-waiter:
log.Debug().Stringer("launch", time.Since(ts)).Msg("[exec] run rtsp")
prod.OnClose = func() error {
log.Debug().Msgf("[exec] kill rtsp")
return errors.Join(cmd.Process.Kill(), cmd.Wait())
}
setRemoteInfo(prod, source, cmd.Args)
prod.OnClose = cl.Close
return prod, nil
}
}
@@ -209,3 +226,15 @@ func trimSpace(b []byte) []byte {
}
return b[start:stop]
}
func setRemoteInfo(info core.Info, source string, args []string) {
info.SetSource(source)
if i := slices.Index(args, "-i"); i > 0 && i < len(args)-1 {
rawURL := args[i+1]
if u, err := url.Parse(rawURL); err == nil && u.Host != "" {
info.SetRemoteAddr(u.Host)
info.SetURL(rawURL)
}
}
}
-56
View File
@@ -1,56 +0,0 @@
package exec
import (
"bufio"
"errors"
"io"
"net/url"
"os/exec"
"syscall"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
)
// PipeCloser - return StdoutPipe that Kill cmd on Close call
func PipeCloser(cmd *exec.Cmd, query url.Values) (io.ReadCloser, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
// add buffer for pipe reader to reduce syscall
return &pipeCloser{bufio.NewReaderSize(stdout, core.BufferSize), stdout, cmd, query}, nil
}
type pipeCloser struct {
io.Reader
io.Closer
cmd *exec.Cmd
query url.Values
}
func (p *pipeCloser) Close() error {
return errors.Join(p.Closer.Close(), p.Kill(), p.Wait())
}
func (p *pipeCloser) Kill() error {
if s := p.query.Get("killsignal"); s != "" {
log.Trace().Msgf("[exec] kill with custom sig=%s", s)
sig := syscall.Signal(core.Atoi(s))
return p.cmd.Process.Signal(sig)
}
return p.cmd.Process.Kill()
}
func (p *pipeCloser) Wait() error {
if s := p.query.Get("killtimeout"); s != "" {
timeout := time.Duration(core.Atoi(s)) * time.Second
timer := time.AfterFunc(timeout, func() {
log.Trace().Msgf("[exec] kill after timeout=%s", s)
_ = p.cmd.Process.Kill()
})
defer timer.Stop() // stop timer if Wait ends before timeout
}
return p.cmd.Wait()
}
+4 -3
View File
@@ -13,7 +13,7 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
url string
query url.Values
ffmpeg core.Producer
@@ -31,7 +31,8 @@ func NewProducer(url string) (core.Producer, error) {
return nil, errors.New("ffmpeg: unsupported params: " + url[i:])
}
p.Type = "FFmpeg producer"
p.ID = core.NewID()
p.FormatName = "ffmpeg"
p.Medias = []*core.Media{
{
// we can support only audio, because don't know FmtpLine for H264 and PayloadType for MJPEG
@@ -81,7 +82,7 @@ func (p *Producer) Stop() error {
func (p *Producer) MarshalJSON() ([]byte, error) {
if p.ffmpeg == nil {
return json.Marshal(p.SuperProducer)
return json.Marshal(p.Connection)
}
return json.Marshal(p.ffmpeg)
}
+3 -5
View File
@@ -10,15 +10,13 @@ import (
)
func Init() {
streams.HandleFunc("gopro", handleGoPro)
streams.HandleFunc("gopro", func(source string) (core.Producer, error) {
return gopro.Dial(source)
})
api.HandleFunc("api/gopro", apiGoPro)
}
func handleGoPro(rawURL string) (core.Producer, error) {
return gopro.Dial(rawURL)
}
func apiGoPro(w http.ResponseWriter, r *http.Request) {
var items []*api.Source
+1 -1
View File
@@ -63,7 +63,7 @@ func apiStream(w http.ResponseWriter, r *http.Request) {
return
}
s, err = webrtc.ExchangeSDP(stream, string(offer), "WebRTC/Hass sync", r.UserAgent())
s, err = webrtc.ExchangeSDP(stream, string(offer), "hass/webrtc", r.UserAgent())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
+2 -7
View File
@@ -45,14 +45,9 @@ func Init() {
return "", nil
})
streams.HandleFunc("hass", func(url string) (core.Producer, error) {
streams.HandleFunc("hass", func(source string) (core.Producer, error) {
// support hass://supervisor?entity_id=camera.driveway_doorbell
client, err := hass.NewClient(url)
if err != nil {
return nil, err
}
return client, nil
return hass.NewClient(source)
})
// load static entries from Hass config
+4 -7
View File
@@ -12,7 +12,6 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)
@@ -63,15 +62,13 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
medias := mp4.ParseQuery(r.URL.Query())
if medias != nil {
c := mp4.NewConsumer(medias)
c.Type = "HLS/fMP4 consumer"
c.RemoteAddr = tcp.RemoteAddr(r)
c.UserAgent = r.UserAgent()
c.FormatName = "hls/fmp4"
c.WithRequest(r)
cons = c
} else {
c := mpegts.NewConsumer()
c.Type = "HLS/TS consumer"
c.RemoteAddr = tcp.RemoteAddr(r)
c.UserAgent = r.UserAgent()
c.FormatName = "hls/mpegts"
c.WithRequest(r)
cons = c
}
+2 -4
View File
@@ -8,7 +8,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error {
@@ -20,9 +19,8 @@ func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error {
codecs := msg.String()
medias := mp4.ParseCodecs(codecs, true)
cons := mp4.NewConsumer(medias)
cons.Type = "HLS/fMP4 consumer"
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.FormatName = "hls/fmp4"
cons.WithRequest(tr.Request)
log.Trace().Msgf("[hls] new ws consumer codecs=%s", codecs)
+21 -8
View File
@@ -11,9 +11,9 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/hls"
"github.com/AlexxIT/go2rtc/pkg/image"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/multipart"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
@@ -45,6 +45,21 @@ func handleHTTP(rawURL string) (core.Producer, error) {
}
}
prod, err := do(req)
if err != nil {
return nil, err
}
if info, ok := prod.(core.Info); ok {
info.SetProtocol("http")
info.SetRemoteAddr(req.URL.Host) // TODO: rewrite to net.Conn
info.SetURL(rawURL)
}
return prod, nil
}
func do(req *http.Request) (core.Producer, error) {
res, err := tcp.Do(req)
if err != nil {
return nil, err
@@ -66,14 +81,12 @@ func handleHTTP(rawURL string) (core.Producer, error) {
}
switch {
case ct == "image/jpeg":
return mjpeg.NewClient(res), nil
case ct == "multipart/x-mixed-replace":
return multipart.Open(res.Body)
case ct == "application/vnd.apple.mpegurl" || ext == "m3u8":
return hls.OpenURL(req.URL, res.Body)
case ct == "image/jpeg":
return image.Open(res)
case ct == "multipart/x-mixed-replace":
return mpjpeg.Open(res.Body)
}
return magic.Open(res.Body)
+3 -12
View File
@@ -7,16 +7,7 @@ import (
)
func Init() {
streams.HandleFunc("isapi", handle)
}
func handle(url string) (core.Producer, error) {
conn, err := isapi.NewClient(url)
if err != nil {
return nil, err
}
if err = conn.Dial(); err != nil {
return nil, err
}
return conn, nil
streams.HandleFunc("isapi", func(source string) (core.Producer, error) {
return isapi.Dial(source)
})
}
+2 -8
View File
@@ -4,16 +4,10 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/ivideon"
"strings"
)
func Init() {
streams.HandleFunc("ivideon", func(url string) (core.Producer, error) {
id := strings.Replace(url[8:], "/", ":", 1)
prod := ivideon.NewClient(id)
if err := prod.Dial(); err != nil {
return nil, err
}
return prod, nil
streams.HandleFunc("ivideon", func(source string) (core.Producer, error) {
return ivideon.Dial(source)
})
}
+11 -16
View File
@@ -17,7 +17,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/y4m"
"github.com/rs/zerolog"
)
@@ -44,8 +44,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
}
cons := magic.NewKeyframe()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
@@ -100,8 +99,7 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) {
}
cons := mjpeg.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Msg("[api.mjpeg] add consumer")
@@ -117,7 +115,7 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) {
wr := mjpeg.NewWriter(w)
_, _ = cons.WriteTo(wr)
} else {
cons.Type = "ASCII passive consumer "
cons.FormatName = "ascii"
query := r.URL.Query()
wr := ascii.NewWriter(w, query.Get("color"), query.Get("back"), query.Get("text"))
@@ -135,17 +133,16 @@ func inputMjpeg(w http.ResponseWriter, r *http.Request) {
return
}
res := &http.Response{Body: r.Body, Header: r.Header, Request: r}
res.Header.Set("Content-Type", "multipart/mixed;boundary=")
prod, _ := mpjpeg.Open(r.Body)
prod.WithRequest(r)
client := mjpeg.NewClient(res)
stream.AddProducer(client)
stream.AddProducer(prod)
if err := client.Start(); err != nil && err != io.EOF {
if err := prod.Start(); err != nil && err != io.EOF {
log.Warn().Err(err).Caller().Send()
}
stream.RemoveProducer(client)
stream.RemoveProducer(prod)
}
func handlerWS(tr *ws.Transport, _ *ws.Message) error {
@@ -155,8 +152,7 @@ func handlerWS(tr *ws.Transport, _ *ws.Message) error {
}
cons := mjpeg.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.WithRequest(tr.Request)
if err := stream.AddConsumer(cons); err != nil {
log.Debug().Err(err).Msg("[mjpeg] add consumer")
@@ -183,8 +179,7 @@ func apiStreamY4M(w http.ResponseWriter, r *http.Request) {
}
cons := y4m.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
+3 -4
View File
@@ -13,7 +13,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)
@@ -100,9 +99,9 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
medias := mp4.ParseQuery(r.URL.Query())
cons := mp4.NewConsumer(medias)
cons.Type = "MP4/HTTP active consumer"
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.FormatName = "mp4"
cons.Protocol = "http"
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
+3 -7
View File
@@ -8,7 +8,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
@@ -24,9 +23,8 @@ func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
}
cons := mp4.NewConsumer(medias)
cons.Type = "MSE/WebSocket active consumer"
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.FormatName = "mse/fmp4"
cons.WithRequest(tr.Request)
if err := stream.AddConsumer(cons); err != nil {
log.Debug().Err(err).Msg("[mp4] add consumer")
@@ -57,9 +55,7 @@ func handlerWSMP4(tr *ws.Transport, msg *ws.Message) error {
}
cons := mp4.NewKeyframe(medias)
cons.Type = "MP4/WebSocket active consumer"
cons.RemoteAddr = tcp.RemoteAddr(tr.Request)
cons.UserAgent = tr.Request.UserAgent()
cons.WithRequest(tr.Request)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
+1 -3
View File
@@ -6,7 +6,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/aac"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func apiStreamAAC(w http.ResponseWriter, r *http.Request) {
@@ -18,8 +17,7 @@ func apiStreamAAC(w http.ResponseWriter, r *http.Request) {
}
cons := aac.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
+1 -3
View File
@@ -6,7 +6,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func Init() {
@@ -31,8 +30,7 @@ func outputMpegTS(w http.ResponseWriter, r *http.Request) {
}
cons := mpegts.NewConsumer()
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
+3 -9
View File
@@ -10,19 +10,13 @@ import (
)
func Init() {
streams.HandleFunc("nest", streamNest)
streams.HandleFunc("nest", func(source string) (core.Producer, error) {
return nest.Dial(source)
})
api.HandleFunc("api/nest", apiNest)
}
func streamNest(url string) (core.Producer, error) {
client, err := nest.NewClient(url)
if err != nil {
return nil, err
}
return client, nil
}
func apiNest(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
cliendID := query.Get("client_id")
+3 -12
View File
@@ -11,22 +11,13 @@ import (
)
func Init() {
streams.HandleFunc("roborock", handle)
streams.HandleFunc("roborock", func(source string) (core.Producer, error) {
return roborock.Dial(source)
})
api.HandleFunc("api/roborock", apiHandle)
}
func handle(url string) (core.Producer, error) {
conn := roborock.NewClient(url)
if err := conn.Dial(); err != nil {
return nil, err
}
if err := conn.Connect(); err != nil {
return nil, err
}
return conn, nil
}
var Auth struct {
UserData *roborock.UserInfo `json:"user_data"`
BaseURL string `json:"base_url"`
+2 -9
View File
@@ -12,7 +12,6 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
"github.com/AlexxIT/go2rtc/pkg/rtmp"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)
@@ -128,11 +127,7 @@ func tcpHandle(netConn net.Conn) error {
var log zerolog.Logger
func streamsHandle(url string) (core.Producer, error) {
client, err := rtmp.DialPlay(url)
if err != nil {
return nil, err
}
return client, nil
return rtmp.DialPlay(url)
}
func streamsConsumerHandle(url string) (core.Consumer, func(), error) {
@@ -165,9 +160,7 @@ func outputFLV(w http.ResponseWriter, r *http.Request) {
}
cons := flv.NewConsumer()
cons.Type = "HTTP-FLV consumer"
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
+124
View File
@@ -0,0 +1,124 @@
package streams
import (
"net/http"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/pkg/probe"
)
func apiStreams(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
src := query.Get("src")
// without source - return all streams list
if src == "" && r.Method != "POST" {
api.ResponseJSON(w, streams)
return
}
// Not sure about all this API. Should be rewrited...
switch r.Method {
case "GET":
stream := Get(src)
if stream == nil {
http.Error(w, "", http.StatusNotFound)
return
}
cons := probe.NewProbe(query)
if len(cons.Medias) != 0 {
cons.WithRequest(r)
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
api.ResponsePrettyJSON(w, stream)
stream.RemoveConsumer(cons)
} else {
api.ResponsePrettyJSON(w, streams[src])
}
case "PUT":
name := query.Get("name")
if name == "" {
name = src
}
if New(name, src) == nil {
http.Error(w, "", http.StatusBadRequest)
return
}
if err := app.PatchConfig(name, src, "streams"); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
case "PATCH":
name := query.Get("name")
if name == "" {
http.Error(w, "", http.StatusBadRequest)
return
}
// support {input} templates: https://github.com/AlexxIT/go2rtc#module-hass
if Patch(name, src) == nil {
http.Error(w, "", http.StatusBadRequest)
}
case "POST":
// with dst - redirect source to dst
if dst := query.Get("dst"); dst != "" {
if stream := Get(dst); stream != nil {
if err := Validate(src); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Play(src); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
api.ResponseJSON(w, stream)
}
} else if stream = Get(src); stream != nil {
if err := Validate(dst); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Publish(dst); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
} else {
http.Error(w, "", http.StatusNotFound)
}
} else {
http.Error(w, "", http.StatusBadRequest)
}
case "DELETE":
delete(streams, src)
if err := app.PatchConfig(src, nil, "streams"); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
}
func apiStreamsDOT(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
dot := make([]byte, 0, 1024)
dot = append(dot, "digraph {\n"...)
if query.Has("src") {
for _, name := range query["src"] {
if stream := streams[name]; stream != nil {
dot = AppendDOT(dot, stream)
}
}
} else {
for _, stream := range streams {
dot = AppendDOT(dot, stream)
}
}
dot = append(dot, '}')
api.Response(w, dot, "text/vnd.graphviz")
}
+175
View File
@@ -0,0 +1,175 @@
package streams
import (
"encoding/json"
"fmt"
"strings"
)
func AppendDOT(dot []byte, stream *Stream) []byte {
for _, prod := range stream.producers {
if prod.conn == nil {
continue
}
c, err := marshalConn(prod.conn)
if err != nil {
continue
}
dot = c.appendDOT(dot, "producer")
}
for _, cons := range stream.consumers {
c, err := marshalConn(cons)
if err != nil {
continue
}
dot = c.appendDOT(dot, "consumer")
}
return dot
}
func marshalConn(v any) (*conn, error) {
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
var c conn
if err = json.Unmarshal(b, &c); err != nil {
return nil, err
}
return &c, nil
}
const bytesK = "KMGTP"
func humanBytes(i int) string {
if i < 1000 {
return fmt.Sprintf("%d B", i)
}
f := float64(i) / 1000
var n uint8
for f >= 1000 && n < 5 {
f /= 1000
n++
}
return fmt.Sprintf("%.2f %cB", f, bytesK[n])
}
type node struct {
ID uint32 `json:"id"`
Codec map[string]any `json:"codec"`
Parent uint32 `json:"parent"`
Childs []uint32 `json:"childs"`
Bytes int `json:"bytes"`
//Packets uint32 `json:"packets"`
//Drops uint32 `json:"drops"`
}
var codecKeys = []string{"codec_name", "sample_rate", "channels", "profile", "level"}
func (n *node) name() string {
if name, ok := n.Codec["codec_name"].(string); ok {
return name
}
return "unknown"
}
func (n *node) codec() []byte {
b := make([]byte, 0, 128)
for _, k := range codecKeys {
if v := n.Codec[k]; v != nil {
b = fmt.Appendf(b, "%s=%v\n", k, v)
}
}
if l := len(b); l > 0 {
return b[:l-1]
}
return b
}
func (n *node) appendDOT(dot []byte, group string) []byte {
dot = fmt.Appendf(dot, "%d [group=%s, label=%q, title=%q];\n", n.ID, group, n.name(), n.codec())
//for _, sink := range n.Childs {
// dot = fmt.Appendf(dot, "%d -> %d;\n", n.ID, sink)
//}
return dot
}
type conn struct {
ID uint32 `json:"id"`
FormatName string `json:"format_name"`
Protocol string `json:"protocol"`
RemoteAddr string `json:"remote_addr"`
Source string `json:"source"`
URL string `json:"url"`
UserAgent string `json:"user_agent"`
Receivers []node `json:"receivers"`
Senders []node `json:"senders"`
BytesRecv int `json:"bytes_recv"`
BytesSend int `json:"bytes_send"`
}
func (c *conn) appendDOT(dot []byte, group string) []byte {
host := c.host()
dot = fmt.Appendf(dot, "%s [group=host];\n", host)
dot = fmt.Appendf(dot, "%d [group=%s, label=%q, title=%q];\n", c.ID, group, c.FormatName, c.label())
if group == "producer" {
dot = fmt.Appendf(dot, "%s -> %d [label=%q];\n", host, c.ID, humanBytes(c.BytesRecv))
} else {
dot = fmt.Appendf(dot, "%d -> %s [label=%q];\n", c.ID, host, humanBytes(c.BytesSend))
}
for _, recv := range c.Receivers {
dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", c.ID, recv.ID, humanBytes(recv.Bytes))
dot = recv.appendDOT(dot, "node")
}
for _, send := range c.Senders {
dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.Parent, c.ID, humanBytes(send.Bytes))
//dot = fmt.Appendf(dot, "%d -> %d [label=%q];\n", send.ID, c.ID, humanBytes(send.Bytes))
//dot = send.appendDOT(dot, "node")
}
return dot
}
func (c *conn) host() (s string) {
if c.Protocol == "pipe" {
return "127.0.0.1"
}
if s = c.RemoteAddr; s == "" {
return "unknown"
}
if i := strings.Index(s, "forwarded"); i > 0 {
s = s[i+10:]
}
if s[0] == '[' {
if i := strings.Index(s, "]"); i > 0 {
return s[1:i]
}
}
if i := strings.IndexAny(s, " ,:"); i > 0 {
return s[:i]
}
return
}
func (c *conn) label() string {
var sb strings.Builder
sb.WriteString("format_name=" + c.FormatName)
if c.Protocol != "" {
sb.WriteString("\nprotocol=" + c.Protocol)
}
if c.Source != "" {
sb.WriteString("\nsource=" + c.Source)
}
if c.URL != "" {
sb.WriteString("\nurl=" + c.URL)
}
if c.UserAgent != "" {
sb.WriteString("\nuser_agent=" + c.UserAgent)
}
return sb.String()
}
+1 -1
View File
@@ -7,7 +7,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
)
type Handler func(url string) (core.Producer, error)
type Handler func(source string) (core.Producer, error)
var handlers = map[string]Handler{}
+3 -4
View File
@@ -132,11 +132,10 @@ func (p *Producer) AddTrack(media *core.Media, codec *core.Codec, track *core.Re
}
func (p *Producer) MarshalJSON() ([]byte, error) {
if p.conn != nil {
return json.Marshal(p.conn)
if conn := p.conn; conn != nil {
return json.Marshal(conn)
}
info := core.Info{URL: p.url}
info := map[string]string{"url": p.url}
return json.Marshal(info)
}
+4 -11
View File
@@ -112,19 +112,12 @@ producers:
}
func (s *Stream) MarshalJSON() ([]byte, error) {
if !s.mu.TryLock() {
log.Warn().Msgf("[streams] json locked")
return json.Marshal(nil)
}
var info struct {
var info = struct {
Producers []*Producer `json:"producers"`
Consumers []core.Consumer `json:"consumers"`
}{
Producers: s.producers,
Consumers: s.consumers,
}
info.Producers = s.producers
info.Consumers = s.consumers
s.mu.Unlock()
return json.Marshal(info)
}
+2 -99
View File
@@ -2,7 +2,6 @@ package streams
import (
"errors"
"net/http"
"net/url"
"regexp"
"sync"
@@ -10,8 +9,6 @@ import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/pkg/probe"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
)
@@ -29,7 +26,8 @@ func Init() {
streams[name] = NewStream(item)
}
api.HandleFunc("api/streams", streamsHandler)
api.HandleFunc("api/streams", apiStreams)
api.HandleFunc("api/streams.dot", apiStreamsDOT)
if cfg.Publish == nil {
return
@@ -145,101 +143,6 @@ func Delete(id string) {
delete(streams, id)
}
func streamsHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
src := query.Get("src")
// without source - return all streams list
if src == "" && r.Method != "POST" {
api.ResponseJSON(w, streams)
return
}
// Not sure about all this API. Should be rewrited...
switch r.Method {
case "GET":
stream := Get(src)
if stream == nil {
http.Error(w, "", http.StatusNotFound)
return
}
cons := probe.NewProbe(query)
if len(cons.Medias) != 0 {
cons.RemoteAddr = tcp.RemoteAddr(r)
cons.UserAgent = r.UserAgent()
if err := stream.AddConsumer(cons); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
api.ResponsePrettyJSON(w, stream)
stream.RemoveConsumer(cons)
} else {
api.ResponsePrettyJSON(w, streams[src])
}
case "PUT":
name := query.Get("name")
if name == "" {
name = src
}
if New(name, src) == nil {
http.Error(w, "", http.StatusBadRequest)
return
}
if err := app.PatchConfig(name, src, "streams"); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
case "PATCH":
name := query.Get("name")
if name == "" {
http.Error(w, "", http.StatusBadRequest)
return
}
// support {input} templates: https://github.com/AlexxIT/go2rtc#module-hass
if Patch(name, src) == nil {
http.Error(w, "", http.StatusBadRequest)
}
case "POST":
// with dst - redirect source to dst
if dst := query.Get("dst"); dst != "" {
if stream := Get(dst); stream != nil {
if err := Validate(src); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Play(src); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
api.ResponseJSON(w, stream)
}
} else if stream = Get(src); stream != nil {
if err := Validate(dst); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
} else if err = stream.Publish(dst); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
} else {
http.Error(w, "", http.StatusNotFound)
}
} else {
http.Error(w, "", http.StatusBadRequest)
}
case "DELETE":
delete(streams, src)
if err := app.PatchConfig(src, nil, "streams"); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}
}
var log zerolog.Logger
var streams = map[string]*Stream{}
var streamsMu sync.Mutex
+4 -4
View File
@@ -8,11 +8,11 @@ import (
)
func Init() {
streams.HandleFunc("kasa", func(url string) (core.Producer, error) {
return kasa.Dial(url)
streams.HandleFunc("kasa", func(source string) (core.Producer, error) {
return kasa.Dial(source)
})
streams.HandleFunc("tapo", func(url string) (core.Producer, error) {
return tapo.Dial(url)
streams.HandleFunc("tapo", func(source string) (core.Producer, error) {
return tapo.Dial(source)
})
}
+13 -5
View File
@@ -41,7 +41,7 @@ func streamsHandler(rawURL string) (core.Producer, error) {
// https://aws.amazon.com/kinesis/video-streams/
// https://docs.aws.amazon.com/kinesisvideostreams-webrtc-dg/latest/devguide/what-is-kvswebrtc.html
// https://github.com/orgs/awslabs/repositories?q=kinesis+webrtc
return kinesisClient(rawURL, query, "WebRTC/Kinesis")
return kinesisClient(rawURL, query, "webrtc/kinesis")
} else if format == "openipc" {
return openIPCClient(rawURL, query)
} else {
@@ -77,17 +77,23 @@ func go2rtcClient(url string) (core.Producer, error) {
// 2. Create PeerConnection
pc, err := PeerConnection(true)
if err != nil {
log.Error().Err(err).Caller().Send()
return nil, err
}
defer func() {
if err != nil {
_ = pc.Close()
}
}()
// waiter will wait PC error or WS error or nil (connection OK)
var connState core.Waiter
var connMu sync.Mutex
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/WebSocket async"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "ws"
prod.URL = url
prod.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:
@@ -132,7 +138,8 @@ func go2rtcClient(url string) (core.Producer, error) {
}
if msg.Type != "webrtc/answer" {
return nil, errors.New("wrong answer: " + msg.Type)
err = errors.New("wrong answer: " + msg.String())
return nil, err
}
answer := msg.String()
@@ -180,8 +187,9 @@ func whepClient(url string) (core.Producer, error) {
}
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/WHEP sync"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "http"
prod.URL = url
medias := []*core.Media{
{Kind: core.KindVideo, Direction: core.DirectionRecvonly},
+5 -3
View File
@@ -34,7 +34,7 @@ func (k kinesisResponse) String() string {
return fmt.Sprintf("type=%s, payload=%s", k.Type, k.Payload)
}
func kinesisClient(rawURL string, query url.Values, desc string) (core.Producer, error) {
func kinesisClient(rawURL string, query url.Values, format string) (core.Producer, error) {
// 1. Connect to signalign server
conn, _, err := websocket.DefaultDialer.Dial(rawURL, nil)
if err != nil {
@@ -79,8 +79,10 @@ func kinesisClient(rawURL string, query url.Values, desc string) (core.Producer,
}
prod := webrtc.NewConn(pc)
prod.Desc = desc
prod.FormatName = format
prod.Mode = core.ModeActiveProducer
prod.Protocol = "ws"
prod.URL = rawURL
prod.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:
@@ -216,5 +218,5 @@ func wyzeClient(rawURL string) (core.Producer, error) {
"ice_servers": []string{string(kvs.Servers)},
}
return kinesisClient(kvs.URL, query, "WebRTC/Wyze")
return kinesisClient(kvs.URL, query, "webrtc/wyze")
}
+3 -1
View File
@@ -193,8 +193,10 @@ func milestoneClient(rawURL string, query url.Values) (core.Producer, error) {
}
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/Milestone"
prod.FormatName = "webrtc/milestone"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "http"
prod.URL = rawURL
offer, err := mc.GetOffer()
if err != nil {
+3 -1
View File
@@ -53,8 +53,10 @@ func openIPCClient(rawURL string, query url.Values) (core.Producer, error) {
var connState core.Waiter
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/OpenIPC"
prod.FormatName = "webrtc/openipc"
prod.Mode = core.ModeActiveProducer
prod.Protocol = "ws"
prod.URL = rawURL
prod.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:
+5 -4
View File
@@ -65,6 +65,7 @@ func outputWebRTC(w http.ResponseWriter, r *http.Request) {
url := r.URL.Query().Get("src")
stream := streams.Get(url)
if stream == nil {
http.Error(w, api.StreamNotFound, http.StatusNotFound)
return
}
@@ -100,11 +101,11 @@ func outputWebRTC(w http.ResponseWriter, r *http.Request) {
switch mediaType {
case "application/json":
desc = "WebRTC/JSON sync"
desc = "webrtc/json"
case MimeSDP:
desc = "WebRTC/WHEP sync"
desc = "webrtc/whep"
default:
desc = "WebRTC/HTTP sync"
desc = "webrtc/post"
}
answer, err := ExchangeSDP(stream, offer, desc, r.UserAgent())
@@ -168,8 +169,8 @@ func inputWebRTC(w http.ResponseWriter, r *http.Request) {
// create new webrtc instance
prod := webrtc.NewConn(pc)
prod.Desc = "WebRTC/WHIP sync"
prod.Mode = core.ModePassiveProducer
prod.Protocol = "http"
prod.UserAgent = r.UserAgent()
if err = prod.SetOffer(string(offer)); err != nil {
+3 -2
View File
@@ -117,8 +117,8 @@ func asyncHandler(tr *ws.Transport, msg *ws.Message) error {
defer sendAnswer.Done(nil)
conn := webrtc.NewConn(pc)
conn.Desc = "WebRTC/WebSocket async"
conn.Mode = mode
conn.Protocol = "ws"
conn.UserAgent = tr.Request.UserAgent()
conn.Listen(func(msg any) {
switch msg := msg.(type) {
@@ -207,8 +207,9 @@ func ExchangeSDP(stream *streams.Stream, offer, desc, userAgent string) (answer
// create new webrtc instance
conn := webrtc.NewConn(pc)
conn.Desc = desc
conn.FormatName = desc
conn.UserAgent = userAgent
conn.Protocol = "http"
conn.Listen(func(msg any) {
switch msg := msg.(type) {
case pion.PeerConnectionState:
+1 -1
View File
@@ -47,7 +47,7 @@ func Init() {
if stream == nil {
return "", errors.New(api.StreamNotFound)
}
return webrtc.ExchangeSDP(stream, offer, "WebRTC/WebTorrent sync", "")
return webrtc.ExchangeSDP(stream, offer, "webtorrent", "")
},
}
+1 -1
View File
@@ -36,7 +36,7 @@ import (
)
func main() {
app.Version = "1.9.3"
app.Version = "1.9.4"
// 1. Core modules: app, api/ws, streams
+82
View File
@@ -1,3 +1,85 @@
# Notes
go2rtc tries to name formats, protocols and codecs the same way they are named in FFmpeg.
Some formats and protocols go2rtc supports exclusively. They have no equivalent in FFmpeg.
## Producers (input)
- The initiator of the connection can be go2rtc - **Source protocols**
- The initiator of the connection can be an external program - **Ingress protocols**
- Codecs can be incoming - **Recevers codecs**
- Codecs can be outgoing (two way audio) - **Senders codecs**
| Format | Source protocols | Ingress protocols | Recevers codecs | Senders codecs | Example |
|--------------|------------------|-------------------|------------------------------|--------------------|---------------|
| adts | http,tcp,pipe | http | aac | | `http:` |
| bubble | http | | h264,hevc,pcm_alaw | | `bubble:` |
| dvrip | tcp | | h264,hevc,pcm_alaw,pcm_mulaw | pcm_alaw | `dvrip:` |
| flv | http,tcp,pipe | http | h264,aac | | `http:` |
| gopro | http+udp | | TODO | | `gopro:` |
| hass/webrtc | ws+udp,tcp | | TODO | | `hass:` |
| hls/mpegts | http | | h264,h265,aac,opus | | `http:` |
| homekit | homekit+udp | | h264,eld* | | `homekit:` |
| isapi | http | | | pcm_alaw,pcm_mulaw | `isapi:` |
| ivideon | ws | | h264 | | `ivideon:` |
| kasa | http | | h264,pcm_mulaw | | `kasa:` |
| h264 | http,tcp,pipe | http | h264 | | `http:` |
| hevc | http,tcp,pipe | http | hevc | | `http:` |
| mjpeg | http,tcp,pipe | http | mjpeg | | `http:` |
| mpjpeg | http,tcp,pipe | http | mjpeg | | `http:` |
| mpegts | http,tcp,pipe | http | h264,hevc,aac,opus | | `http:` |
| nest/webrtc | http+udp | | TODO | | `nest:` |
| roborock | mqtt+udp | | h264,opus | opus | `roborock:` |
| rtmp | rtmp | rtmp | h264,aac | | `rtmp:` |
| rtsp | rtsp+tcp,ws | rtsp+tcp | h264,hevc,aac,pcm*,opus | pcm*,opus | `rtsp:` |
| stdin | pipe | | | pcm_alaw,pcm_mulaw | `stdin:` |
| tapo | http | | h264,pcma | pcm_alaw | `tapo:` |
| wav | http,tcp,pipe | http | pcm_alaw,pcm_mulaw | | `http:` |
| webrtc* | TODO | TODO | h264,pcm_alaw,pcm_mulaw,opus | pcm_alaw,pcm_mulaw | `webrtc:` |
| webtorrent | TODO | TODO | TODO | TODO | `webtorrent:` |
| yuv4mpegpipe | http,tcp,pipe | http | rawvideo | | `http:` |
- **eld** - rare variant of aac codec
- **pcm** - pcm_alaw pcm_mulaw pcm_s16be pcm_s16le
- **webrtc** - webrtc/kinesis, webrtc/openipc, webrtc/milestone, webrtc/wyze, webrtc/whep
## Consumers (output)
| Format | Protocol | Send codecs | Recv codecs | Example |
|--------------|-------------|------------------------------|-------------------------|---------------------------------------|
| adts | http | aac | | `GET /api/stream.adts` |
| ascii | http | mjpeg | | `GET /api/stream.ascii` |
| flv | http | h264,aac | | `GET /api/stream.flv` |
| hls/mpegts | http | h264,hevc,aac | | `GET /api/stream.m3u8` |
| hls/fmp4 | http | h264,hevc,aac,pcm*,opus | | `GET /api/stream.m3u8?mp4` |
| homekit | homekit+udp | h264,opus | | Apple HomeKit app |
| mjpeg | ws | mjpeg | | `{"type":"mjpeg"}` -> `/api/ws` |
| mpjpeg | http | mjpeg | | `GET /api/stream.mjpeg` |
| mp4 | http | h264,hevc,aac,pcm*,opus | | `GET /api/stream.mp4` |
| mse/fmp4 | ws | h264,hevc,aac,pcm*,opus | | `{"type":"mse"}` -> `/api/ws` |
| mpegts | http | h264,hevc,aac | | `GET /api/stream.ts` |
| rtmp | rtmp | h264,aac | | `rtmp://localhost:1935/{stream_name}` |
| rtsp | rtsp+tcp | h264,hevc,aac,pcm*,opus | | `rtsp://localhost:8554/{stream_name}` |
| webrtc | TODO | h264,pcm_alaw,pcm_mulaw,opus | pcm_alaw,pcm_mulaw,opus | `{"type":"webrtc"}` -> `/api/ws` |
| yuv4mpegpipe | http | rawvideo | | `GET /api/stream.y4m` |
- **pcm** - pcm_alaw pcm_mulaw pcm_s16be pcm_s16le
## Snapshots
| Format | Protocol | Send codecs | Example |
|--------|----------|-------------|-----------------------|
| jpeg | http | mjpeg | `GET /api/frame.jpeg` |
| mp4 | http | h264,hevc | `GET /api/frame.mp4` |
## Developers
File naming:
- `pkg/{format}/producer.go` - producer for this format (also if support backchannel)
- `pkg/{format}/consumer.go` - consumer for this format
- `pkg/{format}/backchanel.go` - producer with only backchannel func
## Useful links
- https://www.wowza.com/blog/streaming-protocols
+12 -11
View File
@@ -8,15 +8,12 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
}
func NewConsumer() *Consumer {
cons := &Consumer{
wr: core.NewWriteBuffer(nil),
}
cons.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
@@ -25,7 +22,16 @@ func NewConsumer() *Consumer {
},
},
}
return cons
wr := core.NewWriteBuffer(nil)
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "adts",
Medias: medias,
Transport: wr,
},
wr: wr,
}
}
func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
@@ -51,8 +57,3 @@ func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Re
func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}
+12 -14
View File
@@ -10,9 +10,8 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *bufio.Reader
cl io.Closer
}
func Open(r io.Reader) (*Producer, error) {
@@ -23,18 +22,22 @@ func Open(r io.Reader) (*Producer, error) {
return nil, err
}
codec := ADTSToCodec(b)
prod := &Producer{rd: rd, cl: r.(io.Closer)}
prod.Type = "ADTS producer"
prod.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
Codecs: []*core.Codec{ADTSToCodec(b)},
},
}
return prod, nil
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "adts",
Medias: medias,
Transport: r,
},
rd: rd,
}, nil
}
func (c *Producer) Start() error {
@@ -66,8 +69,3 @@ func (c *Producer) Start() error {
c.Receivers[0].WriteRTP(pkt)
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.cl.Close()
}
+7 -2
View File
@@ -22,6 +22,7 @@ import (
"github.com/pion/rtp"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener
@@ -43,8 +44,12 @@ type Client struct {
recv int
}
func NewClient(url string) *Client {
return &Client{url: url}
func Dial(rawURL string) (*Client, error) {
client := &Client{url: rawURL}
if err := client.Dial(); err != nil {
return nil, err
}
return client, nil
}
const (
+10 -5
View File
@@ -65,11 +65,16 @@ func (c *Client) Stop() error {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "Bubble active producer",
Medias: c.medias,
Recv: c.recv,
Receivers: c.receivers,
info := &core.Connection{
ID: core.ID(c),
FormatName: "bubble",
Protocol: "http",
Medias: c.medias,
Recv: c.recv,
Receivers: c.receivers,
}
if c.conn != nil {
info.RemoteAddr = c.conn.RemoteAddr().String()
}
return json.Marshal(info)
}
+8 -2
View File
@@ -46,7 +46,7 @@ func FFmpegCodecName(name string) string {
case CodecH264:
return "h264"
case CodecH265:
return "h265"
return "hevc"
case CodecJPEG:
return "mjpeg"
case CodecRAW:
@@ -69,8 +69,14 @@ func FFmpegCodecName(name string) string {
return "vp9"
case CodecAV1:
return "av1"
case CodecELD:
return "aac/eld"
case CodecFLAC:
return "flac"
case CodecMP3:
return "mp3"
}
return ""
return name
}
func (c *Codec) String() (s string) {
+139
View File
@@ -0,0 +1,139 @@
package core
import (
"io"
"net/http"
"reflect"
"sync/atomic"
)
func NewID() uint32 {
return id.Add(1)
}
// Deprecated: use NewID instead
func ID(v any) uint32 {
p := uintptr(reflect.ValueOf(v).UnsafePointer())
return 0x8000_0000 | uint32(p)
}
var id atomic.Uint32
type Info interface {
SetProtocol(string)
SetRemoteAddr(string)
SetSource(string)
SetURL(string)
WithRequest(*http.Request)
}
// Connection just like webrtc.PeerConnection
// - ID and RemoteAddr used for building Connection(s) graph
// - FormatName, Protocol, RemoteAddr, Source, URL, SDP, UserAgent used for info about Connection
// - FormatName and Protocol has FFmpeg compatible names
// - Transport used for auto closing on Stop
type Connection struct {
ID uint32 `json:"id,omitempty"`
FormatName string `json:"format_name,omitempty"` // rtsp, webrtc, mp4, mjpeg, mpjpeg...
Protocol string `json:"protocol,omitempty"` // tcp, udp, http, ws, pipe...
RemoteAddr string `json:"remote_addr,omitempty"` // host:port other info
Source string `json:"source,omitempty"`
URL string `json:"url,omitempty"`
SDP string `json:"sdp,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Medias []*Media `json:"medias,omitempty"`
Receivers []*Receiver `json:"receivers,omitempty"`
Senders []*Sender `json:"senders,omitempty"`
Recv int `json:"bytes_recv,omitempty"`
Send int `json:"bytes_send,omitempty"`
Transport any `json:"-"`
}
func (c *Connection) GetMedias() []*Media {
return c.Medias
}
func (c *Connection) GetTrack(media *Media, codec *Codec) (*Receiver, error) {
for _, receiver := range c.Receivers {
if receiver.Codec == codec {
return receiver, nil
}
}
receiver := NewReceiver(media, codec)
c.Receivers = append(c.Receivers, receiver)
return receiver, nil
}
func (c *Connection) Stop() error {
for _, receiver := range c.Receivers {
receiver.Close()
}
for _, sender := range c.Senders {
sender.Close()
}
if closer, ok := c.Transport.(io.Closer); ok {
return closer.Close()
}
return nil
}
// Deprecated:
func (c *Connection) Codecs() []*Codec {
codecs := make([]*Codec, len(c.Senders))
for i, sender := range c.Senders {
codecs[i] = sender.Codec
}
return codecs
}
func (c *Connection) SetProtocol(s string) {
c.Protocol = s
}
func (c *Connection) SetRemoteAddr(s string) {
if c.RemoteAddr == "" {
c.RemoteAddr = s
} else {
c.RemoteAddr += " forwarded " + s
}
}
func (c *Connection) SetSource(s string) {
c.Source = s
}
func (c *Connection) SetURL(s string) {
c.URL = s
}
func (c *Connection) WithRequest(r *http.Request) {
if r.Header.Get("Upgrade") == "websocket" {
c.Protocol = "ws"
} else {
c.Protocol = "http"
}
c.RemoteAddr = r.RemoteAddr
if remote := r.Header.Get("X-Forwarded-For"); remote != "" {
c.RemoteAddr += " forwarded " + remote
}
c.UserAgent = r.UserAgent()
}
// Create like os.Create, init Consumer with existing Transport
func Create(w io.Writer) (*Connection, error) {
return &Connection{Transport: w}, nil
}
// Open like os.Open, init Producer from existing Transport
func Open(r io.Reader) (*Connection, error) {
return &Connection{Transport: r}, nil
}
// Dial like net.Dial, init Producer via Dialing
func Dial(rawURL string) (*Connection, error) {
return &Connection{}, nil
}
+4 -85
View File
@@ -1,5 +1,7 @@
package core
import "encoding/json"
const (
DirectionRecvonly = "recvonly"
DirectionSendonly = "sendonly"
@@ -90,89 +92,6 @@ func (m Mode) String() string {
return "unknown"
}
type Info struct {
Type string `json:"type,omitempty"`
URL string `json:"url,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
SDP string `json:"sdp,omitempty"`
Medias []*Media `json:"medias,omitempty"`
Receivers []*Receiver `json:"receivers,omitempty"`
Senders []*Sender `json:"senders,omitempty"`
Recv int `json:"recv,omitempty"`
Send int `json:"send,omitempty"`
}
const (
UnsupportedCodec = "unsupported codec"
WrongMediaDirection = "wrong media direction"
)
type SuperProducer struct {
Type string `json:"type,omitempty"`
URL string `json:"url,omitempty"`
SDP string `json:"sdp,omitempty"`
Medias []*Media `json:"medias,omitempty"`
Receivers []*Receiver `json:"receivers,omitempty"`
Recv int `json:"recv,omitempty"`
}
func (s *SuperProducer) GetMedias() []*Media {
return s.Medias
}
func (s *SuperProducer) GetTrack(media *Media, codec *Codec) (*Receiver, error) {
for _, receiver := range s.Receivers {
if receiver.Codec == codec {
return receiver, nil
}
}
receiver := NewReceiver(media, codec)
s.Receivers = append(s.Receivers, receiver)
return receiver, nil
}
func (s *SuperProducer) Close() error {
for _, receiver := range s.Receivers {
receiver.Close()
}
return nil
}
type SuperConsumer struct {
Type string `json:"type,omitempty"`
URL string `json:"url,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
SDP string `json:"sdp,omitempty"`
Medias []*Media `json:"medias,omitempty"`
Senders []*Sender `json:"senders,omitempty"`
Send int `json:"send,omitempty"`
}
func (s *SuperConsumer) GetMedias() []*Media {
return s.Medias
}
func (s *SuperConsumer) AddTrack(media *Media, codec *Codec, track *Receiver) error {
return nil
}
//func (b *SuperConsumer) WriteTo(w io.Writer) (n int64, err error) {
// return 0, nil
//}
func (s *SuperConsumer) Close() error {
for _, sender := range s.Senders {
sender.Close()
}
return nil
}
func (s *SuperConsumer) Codecs() []*Codec {
codecs := make([]*Codec, len(s.Senders))
for i, sender := range s.Senders {
codecs[i] = sender.Codec
}
return codecs
func (m Mode) MarshalJSON() ([]byte, error) {
return json.Marshal(m.String())
}
+1 -1
View File
@@ -92,7 +92,7 @@ func (m *Media) Equal(media *Media) bool {
func GetKind(name string) string {
switch name {
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG:
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG, CodecRAW:
return KindVideo
case CodecPCMU, CodecPCMA, CodecAAC, CodecOpus, CodecG722, CodecMP3, CodecPCM, CodecPCML, CodecELD, CodecFLAC:
return KindAudio
+4 -3
View File
@@ -23,10 +23,11 @@ type Filter func(handler HandlerFunc) HandlerFunc
// Node - Receiver or Sender or Filter (transform)
type Node struct {
Codec *Codec `json:"codec"`
Input HandlerFunc `json:"-"`
Output HandlerFunc `json:"-"`
Codec *Codec
Input HandlerFunc
Output HandlerFunc
id uint32
childs []*Node
parent *Node
+43 -2
View File
@@ -1,6 +1,7 @@
package core
import (
"encoding/json"
"errors"
"github.com/pion/rtp"
@@ -22,7 +23,7 @@ type Receiver struct {
func NewReceiver(media *Media, codec *Codec) *Receiver {
r := &Receiver{
Node: Node{Codec: codec},
Node: Node{id: NewID(), Codec: codec},
Media: media,
}
r.Input = func(packet *Packet) {
@@ -91,7 +92,7 @@ func NewSender(media *Media, codec *Codec) *Sender {
buf := make(chan *Packet, bufSize)
s := &Sender{
Node: Node{Codec: codec},
Node: Node{id: NewID(), Codec: codec},
Media: media,
buf: buf,
}
@@ -171,3 +172,43 @@ func (s *Sender) Close() {
s.Node.Close()
}
func (r *Receiver) MarshalJSON() ([]byte, error) {
v := struct {
ID uint32 `json:"id"`
Codec *Codec `json:"codec"`
Childs []uint32 `json:"childs,omitempty"`
Bytes int `json:"bytes,omitempty"`
Packets int `json:"packets,omitempty"`
}{
ID: r.Node.id,
Codec: r.Node.Codec,
Bytes: r.Bytes,
Packets: r.Packets,
}
for _, child := range r.childs {
v.Childs = append(v.Childs, child.id)
}
return json.Marshal(v)
}
func (s *Sender) MarshalJSON() ([]byte, error) {
v := struct {
ID uint32 `json:"id"`
Codec *Codec `json:"codec"`
Parent uint32 `json:"parent,omitempty"`
Bytes int `json:"bytes,omitempty"`
Packets int `json:"packets,omitempty"`
Drops int `json:"drops,omitempty"`
}{
ID: s.Node.id,
Codec: s.Node.Codec,
Bytes: s.Bytes,
Packets: s.Packets,
Drops: s.Drops,
}
if s.parent != nil {
v.Parent = s.parent.id
}
return json.Marshal(v)
}
@@ -8,16 +8,16 @@ import (
"github.com/pion/rtp"
)
type Consumer struct {
core.SuperConsumer
type Backchannel struct {
core.Connection
client *Client
}
func (c *Consumer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return nil, core.ErrCantGetTrack
}
func (c *Consumer) Start() error {
func (c *Backchannel) Start() error {
if err := c.client.conn.SetReadDeadline(time.Time{}); err != nil {
return err
}
@@ -30,12 +30,7 @@ func (c *Consumer) Start() error {
}
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.client.Close()
}
func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
func (c *Backchannel) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
if err := c.client.Talk(); err != nil {
return err
}
+11 -6
View File
@@ -8,17 +8,22 @@ func Dial(url string) (core.Producer, error) {
return nil, err
}
conn := core.Connection{
ID: core.NewID(),
FormatName: "dvrip",
Protocol: "tcp",
RemoteAddr: client.conn.RemoteAddr().String(),
Transport: client.conn,
}
if client.stream != "" {
prod := &Producer{client: client}
prod.Type = "DVRIP active producer"
prod := &Producer{Connection: conn, client: client}
if err := prod.probe(); err != nil {
return nil, err
}
return prod, nil
} else {
cons := &Consumer{client: client}
cons.Type = "DVRIP active consumer"
cons.Medias = []*core.Media{
conn.Medias = []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
@@ -29,6 +34,6 @@ func Dial(url string) (core.Producer, error) {
},
},
}
return cons, nil
return &Backchannel{Connection: conn, client: client}, nil
}
}
+1 -5
View File
@@ -15,7 +15,7 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
client *Client
@@ -92,10 +92,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
return c.client.Close()
}
func (c *Producer) probe() error {
if err := c.client.Play(); err != nil {
return err
+13 -12
View File
@@ -10,17 +10,13 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
muxer *Muxer
}
func NewConsumer() *Consumer {
c := &Consumer{
wr: core.NewWriteBuffer(nil),
muxer: &Muxer{},
}
c.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
@@ -36,7 +32,17 @@ func NewConsumer() *Consumer {
},
},
}
return c
wr := core.NewWriteBuffer(nil)
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "flv",
Medias: medias,
Transport: wr,
},
wr: wr,
muxer: &Muxer{},
}
}
func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
@@ -86,8 +92,3 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
}
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}
+10 -9
View File
@@ -15,18 +15,24 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
video, audio *core.Receiver
}
func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{rd: core.NewReadBuffer(rd)}
prod := &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "flv",
Transport: rd,
},
rd: core.NewReadBuffer(rd),
}
if err := prod.probe(); err != nil {
return nil, err
}
prod.Type = "FLV producer"
return prod, nil
}
@@ -57,7 +63,7 @@ const (
)
func (c *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
receiver, _ := c.SuperProducer.GetTrack(media, codec)
receiver, _ := c.Connection.GetTrack(media, codec)
if media.Kind == core.KindVideo {
c.video = receiver
} else {
@@ -117,11 +123,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
func (c *Producer) probe() error {
if err := c.readHeader(); err != nil {
return err
+10 -3
View File
@@ -8,11 +8,10 @@ import (
"net/url"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
)
func Dial(rawURL string) (core.Producer, error) {
func Dial(rawURL string) (*mpegts.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
@@ -32,7 +31,15 @@ func Dial(rawURL string) (core.Producer, error) {
return nil, err
}
return mpegts.Open(r)
prod, err := mpegts.Open(r)
if err != nil {
return nil, err
}
prod.FormatName = "gopro"
prod.RemoteAddr = u.Host
return prod, nil
}
type listener struct {
+3 -1
View File
@@ -61,8 +61,10 @@ func NewClient(rawURL string) (*Client, error) {
}
conn := webrtc.NewConn(pc)
conn.Desc = "Hass"
conn.FormatName = "hass/webrtc"
conn.Mode = core.ModeActiveProducer
conn.Protocol = "ws"
conn.URL = rawURL
// https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields
medias := []*core.Media{
+8 -3
View File
@@ -4,14 +4,19 @@ import (
"io"
"net/url"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
)
func OpenURL(u *url.URL, body io.ReadCloser) (core.Producer, error) {
func OpenURL(u *url.URL, body io.ReadCloser) (*mpegts.Producer, error) {
rd, err := NewReader(u, body)
if err != nil {
return nil, err
}
return mpegts.Open(rd)
prod, err := mpegts.Open(rd)
if err != nil {
return nil, err
}
prod.FormatName = "hls/mpegts"
prod.RemoteAddr = u.Host
return prod, nil
}
+25 -23
View File
@@ -16,7 +16,7 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
conn net.Conn
srtp *srtp.Server
@@ -29,28 +29,31 @@ type Consumer struct {
}
func NewConsumer(conn net.Conn, server *srtp.Server) *Consumer {
return &Consumer{
SuperConsumer: core.SuperConsumer{
Type: "HomeKit passive consumer",
RemoteAddr: conn.RemoteAddr().String(),
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecH264},
},
},
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecOpus},
},
},
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecH264},
},
},
{
Kind: core.KindAudio,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecOpus},
},
},
}
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "homekit",
Protocol: "udp",
RemoteAddr: conn.RemoteAddr().String(),
Medias: medias,
Transport: conn,
},
conn: conn,
srtp: server,
}
@@ -175,11 +178,10 @@ func (c *Consumer) WriteTo(io.Writer) (int64, error) {
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
if c.deadline != nil {
c.deadline.Reset(0)
}
return c.SuperConsumer.Close()
return c.Connection.Stop()
}
func (c *Consumer) srtpEndpoint() *srtp.Endpoint {
@@ -15,8 +15,9 @@ import (
"github.com/pion/rtp"
)
// Deprecated: rename to Producer
type Client struct {
core.SuperProducer
core.Connection
hap *hap.Client
srtp *srtp.Server
@@ -52,9 +53,13 @@ func Dial(rawURL string, server *srtp.Server) (*Client, error) {
}
client := &Client{
SuperProducer: core.SuperProducer{
Type: "HomeKit active producer",
URL: conn.URL(),
Connection: core.Connection{
ID: core.NewID(),
FormatName: "homekit",
Protocol: "udp",
RemoteAddr: conn.Conn.RemoteAddr().String(),
Source: rawURL,
Transport: conn,
},
hap: conn,
srtp: server,
@@ -93,7 +98,6 @@ func (c *Client) GetMedias() []*core.Media {
return nil
}
c.URL = c.hap.URL()
c.SDP = fmt.Sprintf("%+v\n%+v", c.videoConfig, c.audioConfig)
c.Medias = []*core.Media{
@@ -175,8 +179,6 @@ func (c *Client) Start() error {
}
func (c *Client) Stop() error {
_ = c.SuperProducer.Close()
if c.videoSession != nil && c.videoSession.Remote != nil {
c.srtp.DelSession(c.videoSession)
}
@@ -184,7 +186,7 @@ func (c *Client) Stop() error {
c.srtp.DelSession(c.audioSession)
}
return c.hap.Close()
return c.Connection.Stop()
}
func (c *Client) trackByKind(kind string) *core.Receiver {
+92
View File
@@ -0,0 +1,92 @@
package image
import (
"errors"
"io"
"net/http"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtp"
)
type Producer struct {
core.Connection
closed bool
res *http.Response
}
func Open(res *http.Response) (*Producer, error) {
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "image",
Protocol: "http",
RemoteAddr: res.Request.URL.Host,
Transport: res.Body,
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
},
},
},
res: res,
}, nil
}
func (c *Producer) Start() error {
body, err := io.ReadAll(c.res.Body)
if err != nil {
return err
}
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
c.Receivers[0].WriteRTP(pkt)
c.Recv += len(body)
req := c.res.Request
for !c.closed {
res, err := tcp.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return errors.New("wrong status: " + res.Status)
}
body, err = io.ReadAll(res.Body)
if err != nil {
return err
}
c.Recv += len(body)
pkt = &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
c.Receivers[0].WriteRTP(pkt)
}
return nil
}
func (c *Producer) Stop() error {
c.closed = true
return c.Connection.Stop()
}
@@ -2,6 +2,7 @@ package isapi
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
@@ -51,10 +52,15 @@ func (c *Client) Stop() (err error) {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "ISAPI active consumer",
Medias: c.medias,
Send: c.send,
info := &core.Connection{
ID: core.ID(c),
FormatName: "isapi",
Protocol: "http",
Medias: c.medias,
Send: c.send,
}
if c.conn != nil {
info.RemoteAddr = c.conn.RemoteAddr().String()
}
if c.sender != nil {
info.Senders = []*core.Sender{c.sender}
+7 -2
View File
@@ -11,6 +11,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener
@@ -23,7 +24,7 @@ type Client struct {
send int
}
func NewClient(rawURL string) (*Client, error) {
func Dial(rawURL string) (*Client, error) {
// check if url is valid url
u, err := url.Parse(rawURL)
if err != nil {
@@ -33,7 +34,11 @@ func NewClient(rawURL string) (*Client, error) {
u.Scheme = "http"
u.Path = ""
return &Client{url: u.String()}, nil
client := &Client{url: u.String()}
if err = client.Dial(); err != nil {
return nil, err
}
return client, err
}
func (c *Client) Dial() (err error) {
+8 -2
View File
@@ -26,6 +26,7 @@ const (
StateHandle
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener
@@ -46,8 +47,13 @@ type Client struct {
recv int
}
func NewClient(id string) *Client {
return &Client{ID: id}
func Dial(source string) (*Client, error) {
id := strings.Replace(source[8:], "/", ":", 1)
client := &Client{ID: id}
if err := client.Dial(); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) Dial() (err error) {
+11 -5
View File
@@ -2,6 +2,7 @@ package ivideon
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
)
@@ -32,11 +33,16 @@ func (c *Client) Stop() error {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "Ivideon active producer",
URL: c.ID,
Medias: c.medias,
Recv: c.recv,
info := &core.Connection{
ID: core.ID(c),
FormatName: "ivideon",
Protocol: "ws",
URL: c.ID,
Medias: c.medias,
Recv: c.recv,
}
if c.conn != nil {
info.RemoteAddr = c.conn.RemoteAddr().String()
}
if c.receiver != nil {
info.Receivers = []*core.Receiver{c.receiver}
+13 -11
View File
@@ -12,13 +12,13 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
"github.com/AlexxIT/go2rtc/pkg/multipart"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtp"
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
reader *bufio.Reader
@@ -65,11 +65,18 @@ func Dial(url string) (*Producer, error) {
rd.Reader = httputil.NewChunkedReader(buf)
}
prod := &Producer{rd: core.NewReadBuffer(rd)}
prod := &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "kasa",
Protocol: "http",
Transport: rd,
},
rd: core.NewReadBuffer(rd),
}
if err = prod.probe(); err != nil {
return nil, err
}
prod.Type = "Kasa producer"
return prod, nil
}
@@ -90,7 +97,7 @@ func (c *Producer) Start() error {
}
for {
header, body, err := multipart.Next(c.reader)
header, body, err := mpjpeg.Next(c.reader)
if err != nil {
return err
}
@@ -128,11 +135,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
const (
MimeVideo = "video/x-h264"
MimeG711U = "audio/g711u"
@@ -151,7 +153,7 @@ func (c *Producer) probe() error {
timeout := time.Now().Add(core.ProbeTimeout)
for (waitVideo || waitAudio) && time.Now().Before(timeout) {
header, body, err := multipart.Next(c.reader)
header, body, err := mpjpeg.Next(c.reader)
if err != nil {
return err
}
+14 -10
View File
@@ -13,7 +13,7 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
}
@@ -28,26 +28,35 @@ func Open(r io.Reader) (*Producer, error) {
buf = annexb.EncodeToAVCC(buf, false) // won't break original buffer
var codec *core.Codec
var format string
switch {
case h264.NALUType(buf) == h264.NALUTypeSPS:
codec = h264.AVCCToCodec(buf)
format = "h264"
case h265.NALUType(buf) == h265.NALUTypeVPS:
codec = h265.AVCCToCodec(buf)
format = "hevc"
default:
return nil, errors.New("bitstream: unsupported header: " + hex.EncodeToString(buf[:8]))
}
prod := &Producer{rd: rd}
prod.Type = "Bitstream producer"
prod.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
},
}
return prod, nil
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: format,
Medias: medias,
Transport: r,
},
rd: rd,
}, nil
}
func (c *Producer) Start() error {
@@ -84,8 +93,3 @@ func (c *Producer) Start() error {
}
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
+20 -19
View File
@@ -12,26 +12,32 @@ import (
)
type Keyframe struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
}
// Deprecated: should be rewritten
func NewKeyframe() *Keyframe {
return &Keyframe{
core.SuperConsumer{
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecJPEG},
{Name: core.CodecH264},
{Name: core.CodecH265},
},
},
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecJPEG},
{Name: core.CodecH264},
{Name: core.CodecH265},
},
},
core.NewWriteBuffer(nil),
}
wr := core.NewWriteBuffer(nil)
return &Keyframe{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "keyframe",
Medias: medias,
Transport: wr,
},
wr: wr,
}
}
@@ -98,8 +104,3 @@ func (k *Keyframe) CodecName() string {
func (k *Keyframe) WriteTo(wr io.Writer) (int64, error) {
return k.wr.WriteTo(wr)
}
func (k *Keyframe) Stop() error {
_ = k.SuperConsumer.Close()
return k.wr.Close()
}
+11 -10
View File
@@ -9,14 +9,12 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
}
func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{rd: core.NewReadBuffer(rd)}
prod.Type = "MJPEG producer"
prod.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
@@ -29,7 +27,15 @@ func Open(rd io.Reader) (*Producer, error) {
},
},
}
return prod, nil
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mjpeg",
Medias: medias,
Transport: rd,
},
rd: core.NewReadBuffer(rd),
}, nil
}
func (c *Producer) Start() error {
@@ -70,8 +76,3 @@ func (c *Producer) Start() error {
buf = buf[i:]
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
+18 -16
View File
@@ -13,7 +13,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/magic/bitstream"
"github.com/AlexxIT/go2rtc/pkg/magic/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/multipart"
"github.com/AlexxIT/go2rtc/pkg/mpjpeg"
"github.com/AlexxIT/go2rtc/pkg/wav"
"github.com/AlexxIT/go2rtc/pkg/y4m"
)
@@ -26,29 +26,31 @@ func Open(r io.Reader) (core.Producer, error) {
return nil, err
}
switch {
case string(b) == annexb.StartCode:
switch string(b) {
case annexb.StartCode:
return bitstream.Open(rd)
case string(b) == wav.FourCC:
case wav.FourCC:
return wav.Open(rd)
case string(b) == y4m.FourCC:
case y4m.FourCC:
return y4m.Open(rd)
}
case bytes.HasPrefix(b, []byte{0xFF, 0xD8}):
return mjpeg.Open(rd)
case bytes.HasPrefix(b, []byte(flv.Signature)):
switch string(b[:3]) {
case flv.Signature:
return flv.Open(rd)
}
case bytes.HasPrefix(b, []byte("--")):
return multipart.Open(rd)
case b[0] == 0xFF && (b[1] == 0xF1 || b[1] == 0xF9):
switch string(b[:2]) {
case "\xFF\xD8":
return mjpeg.Open(rd)
case "\xFF\xF1", "\xFF\xF9":
return aac.Open(rd)
case "--":
return mpjpeg.Open(rd)
}
case b[0] == mpegts.SyncByte:
switch b[0] {
case mpegts.SyncByte:
return mpegts.Open(rd)
}
-75
View File
@@ -1,75 +0,0 @@
package mjpeg
import (
"errors"
"io"
"net/http"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/pion/rtp"
)
type Client struct {
core.Listener
UserAgent string
RemoteAddr string
closed bool
res *http.Response
medias []*core.Media
receiver *core.Receiver
recv int
}
func NewClient(res *http.Response) *Client {
return &Client{res: res}
}
func (c *Client) Handle() error {
body, err := io.ReadAll(c.res.Body)
if err != nil {
return err
}
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
c.receiver.WriteRTP(pkt)
c.recv += len(body)
req := c.res.Request
for !c.closed {
res, err := tcp.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return errors.New("wrong status: " + res.Status)
}
body, err = io.ReadAll(res.Body)
if err != nil {
return err
}
c.recv += len(body)
if c.receiver != nil {
pkt = &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
c.receiver.WriteRTP(pkt)
}
}
return nil
}
+18 -19
View File
@@ -8,26 +8,30 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
}
func NewConsumer() *Consumer {
return &Consumer{
core.SuperConsumer{
Type: "MJPEG passive consumer",
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecJPEG},
{Name: core.CodecRAW},
},
},
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{
{Name: core.CodecJPEG},
{Name: core.CodecRAW},
},
},
core.NewWriteBuffer(nil),
}
wr := core.NewWriteBuffer(nil)
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mjpeg",
Medias: medias,
Transport: wr,
},
wr: wr,
}
}
@@ -53,8 +57,3 @@ func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}
-61
View File
@@ -1,61 +0,0 @@
package mjpeg
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (c *Client) GetMedias() []*core.Media {
if c.medias == nil {
c.medias = []*core.Media{{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
}}
}
return c.medias
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
if c.receiver == nil {
c.receiver = core.NewReceiver(media, codec)
}
return c.receiver, nil
}
func (c *Client) Start() error {
// https://github.com/AlexxIT/go2rtc/issues/278
return c.Handle()
}
func (c *Client) Stop() error {
if c.receiver != nil {
c.receiver.Close()
}
// important for close reader/writer gorutines
_ = c.res.Body.Close()
c.closed = true
return nil
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "JPEG active producer",
URL: c.res.Request.URL.String(),
RemoteAddr: c.RemoteAddr,
UserAgent: c.UserAgent,
Medias: c.medias,
Recv: c.recv,
}
if c.receiver != nil {
info.Receivers = []*core.Receiver{c.receiver}
}
return json.Marshal(info)
}
+10 -10
View File
@@ -14,7 +14,7 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
muxer *Muxer
mu sync.Mutex
@@ -47,12 +47,17 @@ func NewConsumer(medias []*core.Media) *Consumer {
}
}
cons := &Consumer{
wr := core.NewWriteBuffer(nil)
return &Consumer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mp4",
Medias: medias,
Transport: wr,
},
muxer: &Muxer{},
wr: core.NewWriteBuffer(nil),
wr: wr,
}
cons.Medias = medias
return cons
}
func (c *Consumer) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error {
@@ -182,8 +187,3 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}
+9 -7
View File
@@ -10,11 +10,12 @@ import (
)
type Keyframe struct {
core.SuperConsumer
core.Connection
wr *core.WriteBuffer
muxer *Muxer
}
// Deprecated: should be rewritten
func NewKeyframe(medias []*core.Media) *Keyframe {
if medias == nil {
medias = []*core.Media{
@@ -29,9 +30,15 @@ func NewKeyframe(medias []*core.Media) *Keyframe {
}
}
wr := core.NewWriteBuffer(nil)
cons := &Keyframe{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mp4",
Transport: wr,
},
muxer: &Muxer{},
wr: core.NewWriteBuffer(nil),
wr: wr,
}
cons.Medias = medias
return cons
@@ -95,8 +102,3 @@ func (c *Keyframe) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiv
func (c *Keyframe) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Keyframe) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}
+19 -18
View File
@@ -11,17 +11,13 @@ import (
)
type Consumer struct {
core.SuperConsumer
core.Connection
muxer *Muxer
wr *core.WriteBuffer
}
func NewConsumer() *Consumer {
c := &Consumer{
muxer: NewMuxer(),
wr: core.NewWriteBuffer(nil),
}
c.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionSendonly,
@@ -38,7 +34,17 @@ func NewConsumer() *Consumer {
},
},
}
return c
wr := core.NewWriteBuffer(nil)
return &Consumer{
core.Connection{
ID: core.NewID(),
FormatName: "mpegts",
Medias: medias,
Transport: wr,
},
NewMuxer(),
wr,
}
}
func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
@@ -110,14 +116,9 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
return c.wr.WriteTo(wr)
}
func (c *Consumer) Stop() error {
_ = c.SuperConsumer.Close()
return c.wr.Close()
}
func TimestampFromRTP(rtp *rtp.Packet, codec *core.Codec) {
if codec.ClockRate == ClockRate {
return
}
rtp.Timestamp = uint32(float64(rtp.Timestamp) / float64(codec.ClockRate) * ClockRate)
}
//func TimestampFromRTP(rtp *rtp.Packet, codec *core.Codec) {
// if codec.ClockRate == ClockRate {
// return
// }
// rtp.Timestamp = uint32(float64(rtp.Timestamp) / float64(codec.ClockRate) * ClockRate)
//}
+12 -8
View File
@@ -13,12 +13,19 @@ import (
)
type Producer struct {
core.SuperProducer
core.Connection
rd *core.ReadBuffer
}
func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{rd: core.NewReadBuffer(rd)}
prod := &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mpegts",
Transport: rd,
},
rd: core.NewReadBuffer(rd),
}
if err := prod.probe(); err != nil {
return nil, err
}
@@ -26,7 +33,7 @@ func Open(rd io.Reader) (*Producer, error) {
}
func (c *Producer) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
receiver, _ := c.SuperProducer.GetTrack(media, codec)
receiver, _ := c.Connection.GetTrack(media, codec)
receiver.ID = StreamType(codec)
return receiver, nil
}
@@ -40,6 +47,8 @@ func (c *Producer) Start() error {
return err
}
c.Recv += len(pkt.Payload)
//log.Printf("[mpegts] size: %6d, muxer: %10d, pt: %2d", len(pkt.Payload), pkt.Timestamp, pkt.PayloadType)
for _, receiver := range c.Receivers {
@@ -52,11 +61,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.rd.Close()
}
func (c *Producer) probe() error {
c.rd.BufferSize = core.ProbeSize
defer c.rd.Reset()
@@ -1,4 +1,4 @@
package multipart
package mpjpeg
import (
"bufio"
+65
View File
@@ -0,0 +1,65 @@
package mpjpeg
import (
"bufio"
"errors"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type Producer struct {
core.Connection
rd *bufio.Reader
}
func Open(rd io.Reader) (*Producer, error) {
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "mpjpeg", // Multipart JPEG
Transport: rd,
Medias: []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
},
},
},
}, nil
}
func (c *Producer) Start() error {
if len(c.Receivers) != 1 {
return errors.New("mjpeg: no receivers")
}
rd := bufio.NewReader(c.Transport.(io.Reader))
mjpeg := c.Receivers[0]
for {
_, body, err := Next(rd)
if err != nil {
return err
}
c.Recv += len(body)
if mjpeg != nil {
packet := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
mjpeg.WriteRTP(packet)
}
}
}
-68
View File
@@ -1,68 +0,0 @@
package multipart
import (
"bufio"
"errors"
"io"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
type Producer struct {
core.SuperProducer
closer io.Closer
reader *bufio.Reader
}
func Open(rd io.Reader) (*Producer, error) {
prod := &Producer{
closer: rd.(io.Closer),
reader: bufio.NewReader(rd),
}
prod.Medias = []*core.Media{
{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
},
}
prod.Type = "Multipart producer"
return prod, nil
}
func (c *Producer) Start() error {
if len(c.Receivers) != 1 {
return errors.New("mjpeg: no receivers")
}
mjpeg := c.Receivers[0]
for {
_, body, err := Next(c.reader)
if err != nil {
return err
}
c.Recv += len(body)
if mjpeg != nil {
packet := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
mjpeg.WriteRTP(packet)
}
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.closer.Close()
}
+4 -2
View File
@@ -14,7 +14,7 @@ type Client struct {
api *API
}
func NewClient(rawURL string) (*Client, error) {
func Dial(rawURL string) (*Client, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
@@ -48,8 +48,10 @@ func NewClient(rawURL string) (*Client, error) {
}
conn := webrtc.NewConn(pc)
conn.Desc = "Nest"
conn.FormatName = "nest/webrtc"
conn.Mode = core.ModeActiveProducer
conn.Protocol = "http"
conn.URL = rawURL
// https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields
medias := []*core.Media{
+10 -10
View File
@@ -8,17 +8,11 @@ import (
)
type Probe struct {
Type string `json:"type,omitempty"`
RemoteAddr string `json:"remote_addr,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
Medias []*core.Media `json:"medias,omitempty"`
Receivers []*core.Receiver `json:"receivers,omitempty"`
Senders []*core.Sender `json:"senders,omitempty"`
core.Connection
}
func NewProbe(query url.Values) *Probe {
c := &Probe{Type: "probe"}
c.Medias = core.ParseQuery(query)
medias := core.ParseQuery(query)
for _, value := range query["microphone"] {
media := &core.Media{Kind: core.KindAudio, Direction: core.DirectionRecvonly}
@@ -32,10 +26,16 @@ func NewProbe(query url.Values) *Probe {
media.Codecs = append(media.Codecs, &core.Codec{Name: name})
}
c.Medias = append(c.Medias, media)
medias = append(medias, media)
}
return c
return &Probe{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "probe",
Medias: medias,
},
}
}
func (p *Probe) GetMedias() []*core.Media {
+13 -3
View File
@@ -18,6 +18,7 @@ import (
pion "github.com/pion/webrtc/v3"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener
@@ -34,8 +35,15 @@ type Client struct {
backchannel bool
}
func NewClient(url string) *Client {
return &Client{url: url}
func Dial(rawURL string) (*Client, error) {
client := &Client{url: rawURL}
if err := client.Dial(); err != nil {
return nil, err
}
if err := client.Connect(); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) Dial() error {
@@ -103,8 +111,10 @@ func (c *Client) Connect() error {
var sendOffer sync.WaitGroup
c.conn = webrtc.NewConn(pc)
c.conn.Desc = "Roborock"
c.conn.FormatName = "roborock"
c.conn.Mode = core.ModeActiveProducer
c.conn.Protocol = "mqtt"
c.conn.URL = c.url
c.conn.Listen(func(msg any) {
switch msg := msg.(type) {
case *pion.ICECandidate:
+5 -4
View File
@@ -8,10 +8,11 @@ import (
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func DialPlay(rawURL string) (core.Producer, error) {
func DialPlay(rawURL string) (*flv.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
@@ -22,16 +23,16 @@ func DialPlay(rawURL string) (core.Producer, error) {
return nil, err
}
rtmpConn, err := NewClient(conn, u)
client, err := NewClient(conn, u)
if err != nil {
return nil, err
}
if err = rtmpConn.play(); err != nil {
if err = client.play(); err != nil {
return nil, err
}
return rtmpConn.Producer()
return client.Producer()
}
func DialPublish(rawURL string) (io.Writer, error) {
+12 -3
View File
@@ -1,11 +1,10 @@
package rtmp
import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv"
)
func (c *Conn) Producer() (core.Producer, error) {
func (c *Conn) Producer() (*flv.Producer, error) {
c.rdBuf = []byte{
'F', 'L', 'V', // signature
1, // version
@@ -13,7 +12,17 @@ func (c *Conn) Producer() (core.Producer, error) {
0, 0, 0, 9, // header size
}
return flv.Open(c)
prod, err := flv.Open(c)
if err != nil {
return nil, err
}
prod.FormatName = "rtmp"
prod.Protocol = "rtmp"
prod.RemoteAddr = c.conn.RemoteAddr().String()
prod.URL = c.url
return prod, nil
}
// Read - convert RTMP to FLV format
+14 -2
View File
@@ -20,7 +20,13 @@ import (
var Timeout = time.Second * 5
func NewClient(uri string) *Conn {
return &Conn{uri: uri}
return &Conn{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "rtsp",
},
uri: uri,
}
}
func (c *Conn) Dial() (err error) {
@@ -36,8 +42,10 @@ func (c *Conn) Dial() (err error) {
timeout = time.Second * time.Duration(c.Timeout)
}
conn, err = tcp.Dial(c.URL, timeout)
c.Protocol = "rtsp+tcp"
} else {
conn, err = websocket.Dial(c.Transport)
c.Protocol = "ws"
}
if err != nil {
return
@@ -53,6 +61,10 @@ func (c *Conn) Dial() (err error) {
c.sequence = 0
c.state = StateConn
c.Connection.RemoteAddr = conn.RemoteAddr().String()
c.Connection.Transport = conn
c.Connection.URL = c.uri
return nil
}
@@ -143,7 +155,7 @@ func (c *Conn) Describe() error {
}
}
c.sdp = string(res.Body) // for info
c.SDP = string(res.Body) // for info
medias, err := UnmarshalSDP(res.Body)
if err != nil {
+5 -15
View File
@@ -18,6 +18,7 @@ import (
)
type Conn struct {
core.Connection
core.Listener
// public
@@ -30,9 +31,7 @@ type Conn struct {
Timeout int
Transport string // custom transport support, ex. RTSP over WebSocket
Medias []*core.Media
UserAgent string
URL *url.URL
URL *url.URL
// internal
@@ -44,19 +43,10 @@ type Conn struct {
reader *bufio.Reader
sequence int
session string
sdp string
uri string
state State
stateMu sync.Mutex
receivers []*core.Receiver
senders []*core.Sender
// stats
recv int
send int
}
const (
@@ -114,7 +104,7 @@ func (c *Conn) Handle() (err error) {
// polling frames from remote RTSP Server (ex Camera)
timeout = time.Second * 5
if len(c.receivers) == 0 {
if len(c.Receivers) == 0 {
// if we only send audio to camera
// https://github.com/AlexxIT/go2rtc/issues/659
timeout += keepaliveDT
@@ -239,7 +229,7 @@ func (c *Conn) Handle() (err error) {
return
}
c.recv += int(size)
c.Recv += int(size)
if channelID&1 == 0 {
packet := &rtp.Packet{}
@@ -247,7 +237,7 @@ func (c *Conn) Handle() (err error) {
return
}
for _, receiver := range c.receivers {
for _, receiver := range c.Receivers {
if receiver.ID == channelID {
receiver.WriteRTP(packet)
break
+4 -13
View File
@@ -18,15 +18,6 @@ func (c *Conn) GetMedias() []*core.Media {
}
func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) (err error) {
core.Assert(media.Direction == core.DirectionSendonly)
for _, sender := range c.senders {
if sender.Codec == codec {
sender.HandleRTP(track)
return
}
}
var channel byte
switch c.mode {
@@ -47,12 +38,12 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
c.state = StateSetup
case core.ModePassiveConsumer:
channel = byte(len(c.senders)) * 2
channel = byte(len(c.Senders)) * 2
// for consumer is better to use original track codec
codec = track.Codec.Clone()
// generate new payload type, starting from 96
codec.PayloadType = byte(96 + len(c.senders))
codec.PayloadType = byte(96 + len(c.Senders))
default:
panic(core.Caller())
@@ -70,7 +61,7 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv
sender.HandleRTP(track)
c.senders = append(c.senders, sender)
c.Senders = append(c.Senders, sender)
return nil
}
@@ -99,7 +90,7 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core.
}
//log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf))
if _, err := c.conn.Write(buf[:n]); err == nil {
c.send += n
c.Send += n
}
n = 0
}
+7 -25
View File
@@ -10,7 +10,7 @@ import (
func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
core.Assert(media.Direction == core.DirectionRecvonly)
for _, track := range c.receivers {
for _, track := range c.Receivers {
if track.Codec == codec {
return track, nil
}
@@ -34,7 +34,7 @@ func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, e
track := core.NewReceiver(media, codec)
track.ID = channel
c.receivers = append(c.receivers, track)
c.Receivers = append(c.Receivers, track)
return track, nil
}
@@ -81,10 +81,10 @@ func (c *Conn) Start() (err error) {
}
func (c *Conn) Stop() (err error) {
for _, receiver := range c.receivers {
for _, receiver := range c.Receivers {
receiver.Close()
}
for _, sender := range c.senders {
for _, sender := range c.Senders {
sender.Close()
}
@@ -99,25 +99,7 @@ func (c *Conn) Stop() (err error) {
}
func (c *Conn) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "RTSP " + c.mode.String(),
SDP: c.sdp,
UserAgent: c.UserAgent,
Medias: c.Medias,
Receivers: c.receivers,
Senders: c.senders,
Recv: c.recv,
Send: c.send,
}
if c.URL != nil {
info.URL = c.URL.String()
}
if c.conn != nil {
info.RemoteAddr = c.conn.RemoteAddr().String()
}
return json.Marshal(info)
return json.Marshal(c.Connection)
}
func (c *Conn) Reconnect() error {
@@ -135,12 +117,12 @@ func (c *Conn) Reconnect() error {
}
// restore previous medias
for _, receiver := range c.receivers {
for _, receiver := range c.Receivers {
if _, err := c.SetupMedia(receiver.Media); err != nil {
return err
}
}
for _, sender := range c.senders {
for _, sender := range c.Senders {
if _, err := c.SetupMedia(sender.Media); err != nil {
return err
}
+18 -12
View File
@@ -14,10 +14,16 @@ import (
)
func NewServer(conn net.Conn) *Conn {
c := new(Conn)
c.conn = conn
c.reader = bufio.NewReader(conn)
return c
return &Conn{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "rtsp",
Protocol: "rtsp+tcp",
RemoteAddr: conn.RemoteAddr().String(),
},
conn: conn,
reader: bufio.NewReader(conn),
}
}
func (c *Conn) Auth(username, password string) {
@@ -70,7 +76,7 @@ func (c *Conn) Accept() error {
return errors.New("wrong content type")
}
c.sdp = string(req.Body) // for info
c.SDP = string(req.Body) // for info
c.Medias, err = UnmarshalSDP(req.Body)
if err != nil {
@@ -81,7 +87,7 @@ func (c *Conn) Accept() error {
for i, media := range c.Medias {
track := core.NewReceiver(media, media.Codecs[0])
track.ID = byte(i * 2)
c.receivers = append(c.receivers, track)
c.Receivers = append(c.Receivers, track)
}
c.mode = core.ModePassiveProducer
@@ -96,7 +102,7 @@ func (c *Conn) Accept() error {
c.mode = core.ModePassiveConsumer
c.Fire(MethodDescribe)
if c.senders == nil {
if c.Senders == nil {
res := &tcp.Response{
Status: "404 Not Found",
Request: req,
@@ -113,7 +119,7 @@ func (c *Conn) Accept() error {
// convert tracks to real output medias medias
var medias []*core.Media
for i, track := range c.senders {
for i, track := range c.Senders {
media := &core.Media{
Kind: core.GetKind(track.Codec.Name),
Direction: core.DirectionRecvonly,
@@ -128,7 +134,7 @@ func (c *Conn) Accept() error {
return err
}
c.sdp = string(res.Body) // for info
c.SDP = string(res.Body) // for info
if err = c.WriteResponse(res); err != nil {
return err
@@ -148,9 +154,9 @@ func (c *Conn) Accept() error {
c.state = StateSetup
if c.mode == core.ModePassiveConsumer {
if i := reqTrackID(req); i >= 0 && i < len(c.senders) {
if i := reqTrackID(req); i >= 0 && i < len(c.Senders) {
// mark sender as SETUP
c.senders[i].Media.ID = MethodSetup
c.Senders[i].Media.ID = MethodSetup
tr = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1)
res.Header.Set("Transport", tr)
} else {
@@ -170,7 +176,7 @@ func (c *Conn) Accept() error {
case MethodRecord, MethodPlay:
if c.mode == core.ModePassiveConsumer {
// stop unconfigured senders
for _, track := range c.senders {
for _, track := range c.Senders {
if track.Media.ID != MethodSetup {
track.Close()
}
@@ -49,10 +49,12 @@ func (c *Client) Stop() (err error) {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "Exec active consumer",
Medias: c.medias,
Send: c.send,
info := &core.Connection{
ID: core.ID(c),
FormatName: "exec",
Protocol: "pipe",
Medias: c.medias,
Send: c.send,
}
if c.sender != nil {
info.Senders = []*core.Sender{c.sender}
+1
View File
@@ -6,6 +6,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
cmd *exec.Cmd
+1
View File
@@ -23,6 +23,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
core.Listener
+12 -6
View File
@@ -2,6 +2,7 @@ package tapo
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
)
@@ -74,15 +75,20 @@ func (c *Client) Stop() error {
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "Tapo active producer",
Medias: c.medias,
Recv: c.recv,
Receivers: c.receivers,
Send: c.send,
info := &core.Connection{
ID: core.ID(c),
FormatName: "tapo",
Protocol: "http",
Medias: c.medias,
Recv: c.recv,
Receivers: c.receivers,
Send: c.send,
}
if c.sender != nil {
info.Senders = []*core.Sender{c.sender}
}
if c.conn1 != nil {
info.RemoteAddr = c.conn1.RemoteAddr().String()
}
return json.Marshal(info)
}
-12
View File
@@ -1,12 +0,0 @@
package tcp
import (
"net/http"
)
func RemoteAddr(r *http.Request) string {
if remote := r.Header.Get("X-Forwarded-For"); remote != "" {
return remote + ", " + r.RemoteAddr
}
return r.RemoteAddr
}
+18 -2
View File
@@ -19,11 +19,11 @@ func Do(req *http.Request) (*http.Response, error) {
switch req.URL.Scheme {
case "httpx":
secure = &tls.Config{InsecureSkipVerify: true}
secure = insecureConfig
req.URL.Scheme = "https"
case "https":
if hostname := req.URL.Hostname(); IsIP(hostname) {
secure = &tls.Config{InsecureSkipVerify: true}
secure = insecureConfig
}
}
@@ -144,6 +144,22 @@ type key string
var connKey = key("conn")
var secureKey = key("secure")
var insecureConfig = &tls.Config{
InsecureSkipVerify: true,
CipherSuites: []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
// this cipher suites disabled starting from https://tip.golang.org/doc/go1.22
// but cameras can't work without them https://github.com/AlexxIT/go2rtc/issues/1172
tls.TLS_RSA_WITH_AES_128_GCM_SHA256, // insecure
tls.TLS_RSA_WITH_AES_256_GCM_SHA384, // insecure
},
}
func WithConn() (context.Context, *net.Conn) {
pconn := new(net.Conn)
return context.WithValue(context.Background(), connKey, pconn), pconn
+11 -11
View File
@@ -54,22 +54,27 @@ func Open(r io.Reader) (*Producer, error) {
return nil, errors.New("waw: unsupported codec")
}
prod := &Producer{rd: rd, cl: r.(io.Closer)}
prod.Type = "WAV producer"
prod.Medias = []*core.Media{
medias := []*core.Media{
{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{codec},
},
}
return prod, nil
return &Producer{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "wav",
Medias: medias,
Transport: r,
},
rd: rd,
}, nil
}
type Producer struct {
core.SuperProducer
core.Connection
rd *bufio.Reader
cl io.Closer
}
func (c *Producer) Start() error {
@@ -106,11 +111,6 @@ func (c *Producer) Start() error {
}
}
func (c *Producer) Stop() error {
_ = c.SuperProducer.Close()
return c.cl.Close()
}
func readChunk(r io.Reader) (chunkID string, data []byte, err error) {
b := make([]byte, 8)
if _, err = io.ReadFull(r, b); err != nil {
+1 -1
View File
@@ -71,7 +71,7 @@ func (c *Conn) SetAnswer(answer string) (err error) {
return
}
c.medias = UnmarshalMedias(sd.MediaDescriptions)
c.Medias = UnmarshalMedias(sd.MediaDescriptions)
return nil
}
+35 -16
View File
@@ -1,6 +1,9 @@
package webrtc
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
@@ -10,28 +13,25 @@ import (
)
type Conn struct {
core.Connection
core.Listener
UserAgent string
Desc string
Mode core.Mode
Mode core.Mode `json:"mode"`
pc *webrtc.PeerConnection
medias []*core.Media
receivers []*core.Receiver
senders []*core.Sender
recv int
send int
offer string
remote string
closed core.Waiter
}
func NewConn(pc *webrtc.PeerConnection) *Conn {
c := &Conn{pc: pc}
c := &Conn{
Connection: core.Connection{
ID: core.NewID(),
FormatName: "webrtc",
},
pc: pc,
}
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
// last candidate will be empty
@@ -50,7 +50,15 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
}
pc.SCTP().Transport().ICETransport().OnSelectedCandidatePairChange(
func(pair *webrtc.ICECandidatePair) {
c.remote = pair.Remote.String()
c.Protocol += "+" + pair.Remote.Protocol.String()
c.RemoteAddr = fmt.Sprintf(
"%s:%d %s", sanitizeIP6(pair.Remote.Address), pair.Remote.Port, pair.Remote.Typ,
)
if pair.Remote.RelatedAddress != "" {
c.RemoteAddr += fmt.Sprintf(
" %s:%d", sanitizeIP6(pair.Remote.RelatedAddress), pair.Remote.RelatedPort,
)
}
},
)
})
@@ -92,7 +100,7 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
return
}
c.recv += n
c.Recv += n
packet := &rtp.Packet{}
if err := packet.Unmarshal(b[:n]); err != nil {
@@ -121,7 +129,7 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
switch state {
case webrtc.PeerConnectionStateConnected:
for _, sender := range c.senders {
for _, sender := range c.Senders {
sender.Start()
}
case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
@@ -134,6 +142,10 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
return c
}
func (c *Conn) MarshalJSON() ([]byte, error) {
return json.Marshal(c.Connection)
}
func (c *Conn) Close() error {
c.closed.Done(nil)
return c.pc.Close()
@@ -172,7 +184,7 @@ func (c *Conn) getMediaCodec(remote *webrtc.TrackRemote) (*core.Media, *core.Cod
}
// search Media for this MID
for _, media := range c.medias {
for _, media := range c.Medias {
if media.ID != tr.Mid() || media.Direction != core.DirectionRecvonly {
continue
}
@@ -194,3 +206,10 @@ func (c *Conn) getMediaCodec(remote *webrtc.TrackRemote) (*core.Media, *core.Cod
return nil, nil
}
func sanitizeIP6(host string) string {
if strings.IndexByte(host, ':') > 0 {
return "[" + host + "]"
}
return host
}

Some files were not shown because too many files have changed in this diff Show More