Compare commits

..

18 Commits

Author SHA1 Message Date
Alexey Khit 71051e7dcf Update version to 1.3.1 2023-03-26 15:00:47 +03:00
Alex X cdb3ee45cf Merge pull request #271 from skrashevich/testing-ci
Testing action CI
2023-03-26 11:23:15 +03:00
Alex X ae99c1da03 Merge pull request #273 from skrashevich/fix-urlencoding-in-delete-request
Fix double url-encoding in streams DELETE request
2023-03-26 11:15:51 +03:00
Alexey Khit 863cc0c1d7 Add tests for FFmpeg parse args 2023-03-26 11:13:36 +03:00
Alexey Khit 40494ab87c Code refactoring 2023-03-26 11:13:23 +03:00
Alex X bffe5f0aa2 Merge pull request #280 from horttorrell32/master
Add HW ROTATION support to vaapi engine
2023-03-26 11:02:25 +03:00
Alexey Khit 8241af8b9d Fix GetMedias on stream reconnection issue 2023-03-26 08:09:54 +03:00
Alexey Khit 5c164de393 Fix listening on hassio interface 2023-03-26 07:27:29 +03:00
Alexey Khit 8bf5c85b79 Add support X-Forwarded-For 2023-03-25 11:59:55 +03:00
Alexey Khit a42c3e21c9 Fix input browser via WebTorrent 2023-03-25 11:41:15 +03:00
Alexey Khit 7016289f14 Adds dynamic timeouts on reconnect 2023-03-25 11:39:29 +03:00
Alexey Khit 54302d3bda Fix json locked 2023-03-25 07:36:21 +03:00
Alexey Khit af6b8a400d Adds about pin for Roborock source 2023-03-23 15:06:45 +03:00
Alexey Khit a1b5eae653 Update readme 2023-03-23 14:07:29 +03:00
Galindo, Alex 1912a43679 Add HW ROTATION support to vaapi engine 2023-03-09 12:22:47 +01:00
Sergey Krashevich eca311717a Update index.html 2023-02-27 02:02:32 +03:00
Sergey Krashevich d3b2b8fdae add docker testing 2023-02-21 18:24:41 +03:00
Sergey Krashevich 3b9a0059df Create test.yml 2023-02-21 17:54:55 +03:00
16 changed files with 413 additions and 68 deletions
+103
View File
@@ -0,0 +1,103 @@
name: Test Build and Run
on:
push:
branches:
- '*'
pull_request:
merge_group:
workflow_dispatch:
jobs:
build-test:
strategy:
matrix:
os: [windows-latest, ubuntu-latest, macos-latest]
arch: [amd64, arm64]
runs-on: ${{ matrix.os }}
continue-on-error: true
env:
GOARCH: ${{ matrix.arch }}
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: '1.19'
- name: Build Go binary
run: go build -ldflags "-s -w" -trimpath -o ./go2rtc
- name: Test Go binary on linux
if: matrix.os == 'ubuntu-latest'
run: |
if [ "${{ matrix.arch }}" = "amd64" ]; then
./go2rtc -version
else
sudo apt-get update && sudo apt-get install -y qemu-user-static
sudo cp /usr/bin/qemu-aarch64-static .
sudo chown $USER:$USER ./qemu-aarch64-static
qemu-aarch64-static ./go2rtc -version
fi
- name: Test Go binary on macos
if: matrix.os == 'macos-latest'
run: |
if [ "${{ matrix.arch }}" = "amd64" ]; then
./go2rtc -version
else
echo "ARM64 architecture is not yet supported on macOS"
fi
- name: Test Go binary on windows
if: matrix.os == 'windows-latest'
run: |
if ("${{ matrix.arch }}" -eq "amd64") {
.\go2rtc* -version
} else {
Write-Host "ARM64 architecture is not yet supported on Windows"
}
docker-test:
strategy:
matrix:
platform:
- amd64
- "386"
- arm/v7
- arm64/v8
continue-on-error: true
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Build and push
uses: docker/build-push-action@v3
with:
context: .
platforms: linux/${{ matrix.platform }}
push: false
load: true
tags: go2rtc-${{ matrix.platform }}
- name: test run
run: |
docker run --platform=linux/${{ matrix.platform }} --rm go2rtc-${{ matrix.platform }} go2rtc -version
- name: Build and push Hardware
if: matrix.platform == 'amd64'
uses: docker/build-push-action@v3
with:
context: .
file: hardware.Dockerfile
platforms: linux/amd64
push: false
load: true
tags: go2rtc-${{ matrix.platform }}-hardware
- name: test run
if: matrix.platform == 'amd64'
run: |
docker run --platform=linux/${{ matrix.platform }} --rm go2rtc-${{ matrix.platform }}-hardware go2rtc -version
+103 -5
View File
@@ -51,11 +51,16 @@ Ultimate camera streaming application with support RTSP, WebRTC, HomeKit, FFmpeg
* [Source: Tapo](#source-tapo)
* [Source: Ivideon](#source-ivideon)
* [Source: Hass](#source-hass)
* [Source: ISAPI](#source-isapi)
* [Source: Roborock](#source-roborock)
* [Source: WebRTC](#source-webrtc)
* [Source: WebTorrent](#source-webtorrent)
* [Incoming sources](#incoming-sources)
* [Stream to camera](#stream-to-camera)
* [Module: API](#module-api)
* [Module: RTSP](#module-rtsp)
* [Module: WebRTC](#module-webrtc)
* [Module: WebTorrent](#module-webtorrent)
* [Module: Ngrok](#module-ngrok)
* [Module: Hass](#module-hass)
* [Module: MP4](#module-mp4)
@@ -161,6 +166,10 @@ Available source types:
- [tapo](#source-tapo) - TP-Link Tapo cameras with [two way audio](#two-way-audio) support
- [ivideon](#source-ivideon) - public cameras from [Ivideon](https://tv.ivideon.com/) service
- [hass](#source-hass) - Home Assistant integration
- [isapi](#source-isapi) - two way audio for Hikvision (ISAPI) cameras
- [roborock](#source-roborock) - Roborock vacuums with cameras
- [webrtc](#source-webrtc) - WebRTC/WHEP sources
- [webtorrent](#source-webtorrent) - WebTorrent source from another go2rtc
Read more about [incoming sources](#incoming-sources)
@@ -168,8 +177,11 @@ Read more about [incoming sources](#incoming-sources)
Supported for sources:
- RTSP cameras with [ONVIF Profile T](https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec.pdf) (back channel connection)
- TP-Link Tapo cameras
- [RTSP cameras](#source-rtsp) with [ONVIF Profile T](https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec.pdf) (back channel connection)
- [TP-Link Tapo](#source-tapo) cameras
- [Hikvision ISAPI](#source-isapi) cameras
- [Roborock vacuums](#source-roborock) models with cameras
- [Any Browser](#incoming-browser) as IP-camera
Two way audio can be used in browser with [WebRTC](#module-webrtc) technology. The browser will give access to the microphone only for HTTPS sites ([read more](https://stackoverflow.com/questions/52759992/how-to-access-camera-and-microphone-in-chrome-without-https)).
@@ -414,11 +426,55 @@ streams:
More cameras, like [Tuya](https://www.home-assistant.io/integrations/tuya/), [ONVIF](https://www.home-assistant.io/integrations/onvif/), and possibly others can also be imported by using [this method](https://github.com/felipecrs/hass-expose-camera-stream-source#importing-home-assistant-cameras-to-go2rtc-andor-frigate).
### Incoming sources
#### Source: ISAPI
This source type support only backchannel audio for Hikvision ISAPI protocol. So it should be used as second source in addition to the RTSP protocol.
```yaml
streams:
hikvision1:
- rtsp://admin:password@192.168.1.123:554/Streaming/Channels/101
- isapi://admin:password@192.168.1.123:80/
```
#### Source: Roborock
This source type support Roborock vacuums with cameras. Known working models:
- Roborock S6 MaxV - only video (the vacuum has no microphone)
- Roborock S7 MaxV - video and two way audio
Source support load Roborock credentials from Home Assistant [custom integration](https://github.com/humbertogontijo/homeassistant-roborock). Otherwise, you need to log in to your Roborock account (MiHome account is not supported). Go to: go2rtc WebUI > Add webpage. Copy `roborock://...` source for your vacuum and paste it to `go2rtc.yaml` config.
If you have graphic pin for your vacuum - add it as numeric pin (lines: 123, 456, 678) to the end of the roborock-link.
#### Source: WebRTC
This source type support two connection formats:
- [WebRTC/WHEP](https://www.ietf.org/id/draft-murillo-whep-01.html) - is an unapproved standard for WebRTC video/audio viewers. But it may already be supported in some third-party software. It is supported in go2rtc.
- `go2rtc/WebSocket` - This format is only supported in go2rtc. Unlike WHEP it supports asynchronous WebRTC connection and two way audio.
```yaml
streams:
webrtc1: webrtc:http://192.168.1.123:1984/api/webrtc?src=dahua1
webrtc2: webrtc:ws://192.168.1.123:1984/api/ws?src=dahua1
```
#### Source: WebTorrent
This source can get a stream from another go2rtc via [WebTorrent](#module-webtorrent) protocol.
```yaml
streams:
webtorrent1: webtorrent:?share=huofssuxaty00izc&pwd=k3l2j9djeg8v8r7e
```
#### Incoming sources
By default, go2rtc establishes a connection to the source when any client requests it. Go2rtc drops the connection to the source when it has no clients left.
- Go2rtc also can accepts incoming sources in [RTSP](#source-rtsp) and [HTTP](#source-http) formats
- Go2rtc also can accepts incoming sources in [RTSP](#source-rtsp), [HTTP](#source-http) and **WebRTC/WHIP** formats
- Go2rtc won't stop such a source if it has no clients
- You can push data only to existing stream (create stream with empty source in config)
- You can push multiple incoming sources to same stream
@@ -443,9 +499,25 @@ By default, go2rtc establishes a connection to the source when any client reques
ffmpeg -re -i BigBuckBunny.mp4 -c copy -f mpegts http://localhost:1984/api/stream.ts?dst=camera1
```
#### Incoming: Browser
You can turn the browser of any PC or mobile into an IP-camera with support video and two way audio. Or even broadcast your PC screen:
1. Create empty stream in the `go2rtc.yaml`
2. Go to go2rtc WebUI
3. Open `links` page for you stream
4. Select `camera+microphone` or `display+speaker` option
5. Open `webrtc` local page (your go2rtc **should work over HTTPS!**) or `share link` via [WebTorrent](#module-webtorrent) technology (work over HTTPS by default)
#### Incoming: WebRTC/WHIP
You can use **OBS Studio** or any other broadcast software with [WHIP](https://www.ietf.org/archive/id/draft-ietf-wish-whip-01.html) protocol support. This standard has not yet been approved. But you can download OBS Studio [dev version](https://github.com/obsproject/obs-studio/actions/runs/3969201209):
- Settings > Stream > Service: WHIP > http://192.168.1.123:1984/api/webrtc?dst=camera1
#### Stream to camera
go2rtc support play audio files (ex. music or [TTS](https://www.home-assistant.io/integrations/#text-to-speech)) and live streams (ex. radio) on cameras with [two way audio](#two-way-audio) support.
go2rtc support play audio files (ex. music or [TTS](https://www.home-assistant.io/integrations/#text-to-speech)) and live streams (ex. radio) on cameras with [two way audio](#two-way-audio) support (RTSP/ONVIF cameras, TP-Link Tapo, Hikvision ISAPI, Roborock vacuums, any Browser).
API example:
@@ -606,6 +678,32 @@ webrtc:
credential: your_pass
```
### Module: WebTorrent
This module support:
- Share any local stream via [WebTorrent](https://webtorrent.io/) technology
- Get any [incoming stream](#incoming-browser) from PC or mobile via [WebTorrent](https://webtorrent.io/) technology
- Get any remote [go2rtc source](#source-webtorrent) via [WebTorrent](https://webtorrent.io/) technology
Securely and free. You do not need to open a public access to the go2rtc server. But in some cases (Symmetric NAT) you may need to set up external access to [WebRTC module](#module-webrtc).
To generate sharing link or incoming link - goto go2rtc WebUI (stream links page). This link is **temporary** and will stop working after go2rtc is restarted!
You can create permanent external links in go2rtc config:
```yaml
webtorrent:
shares:
super-secret-share: # share name, should be unique among all go2rtc users!
pwd: super-secret-password
src: rtsp-dahua1 # stream name from streams section
```
Link example: https://alexxit.github.io/go2rtc/#share=02SNtgjKXY&pwd=wznEQqznxW&media=video+audio
TODO: article how it works...
### Module: Ngrok
With Ngrok integration you can get external access to your streams in situation when you have Internet with private IP-address.
+8 -5
View File
@@ -54,28 +54,31 @@ func Init() {
log.Info().Str("addr", cfg.Mod.Listen).Msg("[api] listen")
s := http.Server{}
s.Handler = http.DefaultServeMux // 4th
Handler = http.DefaultServeMux // 4th
if cfg.Mod.Origin == "*" {
s.Handler = middlewareCORS(s.Handler) // 3rd
Handler = middlewareCORS(Handler) // 3rd
}
if cfg.Mod.Username != "" {
s.Handler = middlewareAuth(cfg.Mod.Username, cfg.Mod.Password, s.Handler) // 2nd
Handler = middlewareAuth(cfg.Mod.Username, cfg.Mod.Password, Handler) // 2nd
}
if log.Trace().Enabled() {
s.Handler = middlewareLog(s.Handler) // 1st
Handler = middlewareLog(Handler) // 1st
}
go func() {
s := http.Server{}
s.Handler = Handler
if err = s.Serve(listener); err != nil {
log.Fatal().Err(err).Msg("[api] serve")
}
}()
}
var Handler http.Handler
// HandleFunc handle pattern with relative path:
// - "api/streams" => "{basepath}/api/streams"
// - "/streams" => "/streams"
+14 -5
View File
@@ -2,19 +2,21 @@ package app
import (
"flag"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v3"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/AlexxIT/go2rtc/pkg/shell"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v3"
)
var Version = "1.3.0"
var Version = "1.3.1"
var UserAgent = "go2rtc/" + Version
var ConfigPath string
@@ -24,10 +26,17 @@ var Info = map[string]any{
func Init() {
var confs Config
var version bool
flag.Var(&confs, "config", "go2rtc config (path to file or raw text), support multiple")
flag.BoolVar(&version, "version", false, "Print the version of the application and exit")
flag.Parse()
if version {
fmt.Println("Current version: ", Version)
os.Exit(0)
}
if confs == nil {
confs = []string{"go2rtc.yaml"}
}
+14
View File
@@ -0,0 +1,14 @@
package ffmpeg
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestParseArgs(t *testing.T) {
args := parseArgs("rtsp://example.com#video=h264#rotate=180")
assert.Equal(t, "ffmpeg -hide_banner -allowed_media_types video -fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -rtsp_transport tcp -i rtsp://example.com -c:v libx264 -g 50 -profile:v high -level:v 4.1 -preset:v superfast -tune:v zerolatency -an -vf transpose=1,transpose=1 -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}", args.String())
args = parseArgs("rtsp://example.com#video=h264#rotate=180#hardware=vaapi")
assert.Equal(t, "ffmpeg -hide_banner -hwaccel vaapi -hwaccel_output_format vaapi -allowed_media_types video -fflags nobuffer -flags low_delay -timeout 5000000 -user_agent go2rtc/ffmpeg -rtsp_transport tcp -i rtsp://example.com -c:v h264_vaapi -g 50 -bf 0 -profile:v high -level:v 4.1 -sei:v 0 -an -vf format=vaapi|nv12,hwupload,transpose_vaapi=4 -user_agent ffmpeg/go2rtc -rtsp_transport tcp -f rtsp {output}", args.String())
}
+9 -1
View File
@@ -1,9 +1,10 @@
package ffmpeg
import (
"github.com/rs/zerolog/log"
"os/exec"
"strings"
"github.com/rs/zerolog/log"
)
const (
@@ -54,6 +55,13 @@ func MakeHardware(args *Args, engine string) {
if strings.HasPrefix(filter, "scale=") {
args.filters[i] = "scale_vaapi=" + filter[6:]
}
if strings.HasPrefix(filter, "transpose=") {
if filter == "transpose=1,transpose=1" { // 180 degrees half-turn
args.filters[i] = "transpose_vaapi=4" // reversal
} else {
args.filters[i] = "transpose_vaapi=" + filter[10:]
}
}
}
// fix if input doesn't support hwaccel, do nothing when support
+20
View File
@@ -6,6 +6,7 @@ import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/cmd/webrtc"
"net"
"net/http"
"net/url"
"strings"
@@ -133,6 +134,25 @@ func initAPI() {
})
}
func HassioAddr() string {
ints, _ := net.Interfaces()
for _, i := range ints {
if i.Name != "hassio" {
continue
}
addrs, _ := i.Addrs()
for _, addr := range addrs {
if addr, ok := addr.(*net.IPNet); ok {
return addr.IP.String()
}
}
}
return ""
}
func rtspStream(url string) *streams.Stream {
if strings.HasPrefix(url, "rtsp://") {
if i := strings.IndexByte(url[7:], '/'); i > 0 {
+52 -25
View File
@@ -17,6 +17,9 @@ import (
func Init() {
var conf struct {
API struct {
Listen string `json:"listen"`
} `yaml:"api"`
Mod struct {
Config string `yaml:"config"`
} `yaml:"hass"`
@@ -28,35 +31,68 @@ func Init() {
initAPI()
// support load cameras from Hass config file
filename := path.Join(conf.Mod.Config, ".storage/core.config_entries")
b, err := os.ReadFile(filename)
if err != nil {
entries := importEntries(conf.Mod.Config)
if entries == nil {
api.HandleFunc("api/hass", func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "no hass config", http.StatusNotFound)
})
return
}
storage := new(entries)
if err = json.Unmarshal(b, storage); err != nil {
return
}
urls := map[string]string{}
api.HandleFunc("api/hass", func(w http.ResponseWriter, r *http.Request) {
api.HandleFunc("api/hass", func(w http.ResponseWriter, _ *http.Request) {
var items []api.Stream
for name, url := range urls {
for name, url := range entries {
items = append(items, api.Stream{Name: name, URL: url})
}
api.ResponseStreams(w, items)
})
streams.HandleFunc("hass", func(url string) (core.Producer, error) {
if hurl := urls[url[5:]]; hurl != "" {
if hurl := entries[url[5:]]; hurl != "" {
return streams.GetProducer(hurl)
}
return nil, fmt.Errorf("can't get url: %s", url)
})
// for Addon listen on hassio interface, so WebUI feature will work
if conf.API.Listen == "127.0.0.1:1984" {
if addr := HassioAddr(); addr != "" {
addr += ":1984"
go func() {
log.Info().Str("addr", addr).Msg("[hass] listen")
if err := http.ListenAndServe(addr, api.Handler); err != nil {
log.Error().Err(err).Caller().Send()
}
}()
}
}
}
func importEntries(config string) map[string]string {
// support load cameras from Hass config file
filename := path.Join(config, ".storage/core.config_entries")
b, err := os.ReadFile(filename)
if err != nil {
return nil
}
var storage struct {
Data struct {
Entries []struct {
Title string `json:"title"`
Domain string `json:"domain"`
Data json.RawMessage `json:"data"`
Options json.RawMessage `json:"options"`
} `json:"entries"`
} `json:"data"`
}
if err = json.Unmarshal(b, &storage); err != nil {
return nil
}
urls := map[string]string{}
for _, entrie := range storage.Data.Entries {
switch entrie.Domain {
case "generic":
@@ -102,17 +138,8 @@ func Init() {
log.Info().Str("url", "hass:"+entrie.Title).Msg("[hass] load stream")
//streams.Get("hass:" + entrie.Title)
}
return urls
}
var log zerolog.Logger
type entries struct {
Data struct {
Entries []struct {
Title string `json:"title"`
Domain string `json:"domain"`
Data json.RawMessage `json:"data"`
Options json.RawMessage `json:"options"`
} `json:"entries"`
} `json:"data"`
}
+3 -2
View File
@@ -7,6 +7,7 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog/log"
"net/http"
"strconv"
@@ -70,13 +71,13 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
medias := mp4.ParseQuery(r.URL.Query())
if medias != nil {
cons = &mp4.Consumer{
RemoteAddr: r.RemoteAddr,
RemoteAddr: tcp.RemoteAddr(r),
UserAgent: r.UserAgent(),
Medias: medias,
}
} else {
cons = &mpegts.Consumer{
RemoteAddr: r.RemoteAddr,
RemoteAddr: tcp.RemoteAddr(r),
UserAgent: r.UserAgent(),
}
}
+4 -3
View File
@@ -5,6 +5,7 @@ import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog/log"
"io"
"net/http"
@@ -29,7 +30,7 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
exit := make(chan []byte)
cons := &mjpeg.Consumer{
RemoteAddr: r.RemoteAddr,
RemoteAddr: tcp.RemoteAddr(r),
UserAgent: r.UserAgent(),
}
cons.Listen(func(msg any) {
@@ -81,7 +82,7 @@ func outputMjpeg(w http.ResponseWriter, r *http.Request) {
flusher := w.(http.Flusher)
cons := &mjpeg.Consumer{
RemoteAddr: r.RemoteAddr,
RemoteAddr: tcp.RemoteAddr(r),
UserAgent: r.UserAgent(),
}
cons.Listen(func(msg any) {
@@ -146,7 +147,7 @@ func handlerWS(tr *api.Transport, _ *api.Message) error {
}
cons := &mjpeg.Consumer{
RemoteAddr: tr.Request.RemoteAddr,
RemoteAddr: tcp.RemoteAddr(tr.Request),
UserAgent: tr.Request.UserAgent(),
}
cons.Listen(func(msg any) {
+2 -1
View File
@@ -6,6 +6,7 @@ import (
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog"
"net/http"
"strconv"
@@ -103,7 +104,7 @@ func handlerMP4(w http.ResponseWriter, r *http.Request) {
exit := make(chan error)
cons := &mp4.Consumer{
RemoteAddr: r.RemoteAddr,
RemoteAddr: tcp.RemoteAddr(r),
UserAgent: r.UserAgent(),
Medias: core.ParseQuery(r.URL.Query()),
}
+3 -2
View File
@@ -6,6 +6,7 @@ import (
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"strings"
)
@@ -17,7 +18,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
}
cons := &mp4.Consumer{
RemoteAddr: tr.Request.RemoteAddr,
RemoteAddr: tcp.RemoteAddr(tr.Request),
UserAgent: tr.Request.UserAgent(),
}
@@ -64,7 +65,7 @@ func handlerWSMP4(tr *api.Transport, msg *api.Message) error {
}
cons := &mp4.Segment{
RemoteAddr: tr.Request.RemoteAddr,
RemoteAddr: tcp.RemoteAddr(tr.Request),
UserAgent: tr.Request.UserAgent(),
OnlyKeyframe: true,
}
+23 -11
View File
@@ -157,10 +157,10 @@ func (p *Producer) worker(conn core.Producer, workerID int) {
log.Warn().Err(err).Str("url", p.url).Caller().Send()
}
p.reconnect(workerID)
p.reconnect(workerID, 0)
}
func (p *Producer) reconnect(workerID int) {
func (p *Producer) reconnect(workerID, retry int) {
p.mu.Lock()
defer p.mu.Unlock()
@@ -169,18 +169,28 @@ func (p *Producer) reconnect(workerID int) {
return
}
log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
log.Debug().Msgf("[streams] retry=%d to url=%s", retry, p.url)
if err := p.Dial(); err != nil {
conn, err := GetProducer(p.url)
if err != nil {
log.Debug().Msgf("[streams] producer=%s", err)
// TODO: dynamic timeout
time.AfterFunc(30*time.Second, func() {
p.reconnect(workerID)
timeout := time.Minute
if retry < 5 {
timeout = time.Second
} else if retry < 10 {
timeout = time.Second * 5
} else if retry < 20 {
timeout = time.Second * 10
}
time.AfterFunc(timeout, func() {
p.reconnect(workerID, retry+1)
})
return
}
for _, media := range p.conn.GetMedias() {
for _, media := range conn.GetMedias() {
switch media.Direction {
case core.DirectionRecvonly:
for _, receiver := range p.receivers {
@@ -189,7 +199,7 @@ func (p *Producer) reconnect(workerID int) {
continue
}
track, err := p.conn.GetTrack(media, codec)
track, err := conn.GetTrack(media, codec)
if err != nil {
continue
}
@@ -205,12 +215,14 @@ func (p *Producer) reconnect(workerID int) {
continue
}
_ = p.conn.(core.Consumer).AddTrack(media, codec, sender)
_ = conn.(core.Consumer).AddTrack(media, codec, sender)
}
}
}
go p.worker(p.conn, workerID)
p.conn = conn
go p.worker(conn, workerID)
}
func (p *Producer) stop() {
+41 -7
View File
@@ -201,13 +201,17 @@ func ExchangeSDP(stream *streams.Stream, offer, desc, userAgent string) (answer
// create new webrtc instance
conn := webrtc.NewConn(pc)
conn.Desc = desc
conn.Mode = core.ModePassiveConsumer
conn.UserAgent = userAgent
conn.Listen(func(msg any) {
switch msg := msg.(type) {
case pion.PeerConnectionState:
if msg == pion.PeerConnectionStateClosed {
if msg != pion.PeerConnectionStateClosed {
return
}
if conn.Mode == core.ModePassiveConsumer {
stream.RemoveConsumer(conn)
} else {
stream.RemoveProducer(conn)
}
}
})
@@ -220,11 +224,19 @@ func ExchangeSDP(stream *streams.Stream, offer, desc, userAgent string) (answer
return
}
// 2. AddConsumer, so we get new tracks
if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Caller().Send()
_ = conn.Close()
return
if IsConsumer(conn) {
conn.Mode = core.ModePassiveConsumer
// 2. AddConsumer, so we get new tracks
if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Caller().Send()
_ = conn.Close()
return
}
} else {
conn.Mode = core.ModePassiveProducer
stream.AddProducer(conn)
}
answer, err = conn.GetCompleteAnswer()
@@ -239,3 +251,25 @@ func ExchangeSDP(stream *streams.Stream, offer, desc, userAgent string) (answer
return
}
func IsConsumer(conn *webrtc.Conn) bool {
// if wants get video - consumer
for _, media := range conn.GetMedias() {
if media.Kind == core.KindVideo && media.Direction == core.DirectionSendonly {
return true
}
}
// if wants send video - producer
for _, media := range conn.GetMedias() {
if media.Kind == core.KindVideo && media.Direction == core.DirectionRecvonly {
return false
}
}
// if wants something - consumer
for _, media := range conn.GetMedias() {
if media.Direction == core.DirectionSendonly {
return true
}
}
return false
}
+12
View File
@@ -0,0 +1,12 @@
package tcp
import (
"net/http"
)
func RemoteAddr(r *http.Request) string {
if remote := r.Header.Get("X-Forwarded-For"); remote != "" {
return remote + ", " + r.RemoteAddr
}
return r.RemoteAddr
}
+2 -1
View File
@@ -112,7 +112,8 @@
ev.preventDefault();
const url = new URL("api/streams", location.href);
url.searchParams.set("src", ev.target.dataset.name);
const src = decodeURIComponent(ev.target.dataset.name);
url.searchParams.set("src", src);
fetch(url, {method: "DELETE"}).then(reload);
});