Merge branch 'master' of https://github.com/AlexxIT/go2rtc into update-readme
This commit is contained in:
+10
-10
@@ -124,7 +124,7 @@ jobs:
|
|||||||
uses: docker/metadata-action@v5
|
uses: docker/metadata-action@v5
|
||||||
with:
|
with:
|
||||||
images: |
|
images: |
|
||||||
${{ github.repository }}
|
name=${{ github.repository }},enable=${{ github.event.repository.fork == false }}
|
||||||
ghcr.io/${{ github.repository }}
|
ghcr.io/${{ github.repository }}
|
||||||
tags: |
|
tags: |
|
||||||
type=ref,event=branch
|
type=ref,event=branch
|
||||||
@@ -138,14 +138,14 @@ jobs:
|
|||||||
uses: docker/setup-buildx-action@v3
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
- name: Login to DockerHub
|
- name: Login to DockerHub
|
||||||
if: github.event_name != 'pull_request'
|
if: github.event_name == 'push' && github.event.repository.fork == false
|
||||||
uses: docker/login-action@v3
|
uses: docker/login-action@v3
|
||||||
with:
|
with:
|
||||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||||
|
|
||||||
- name: Login to GitHub Container Registry
|
- name: Login to GitHub Container Registry
|
||||||
if: github.event_name != 'pull_request'
|
if: github.event_name == 'push'
|
||||||
uses: docker/login-action@v3
|
uses: docker/login-action@v3
|
||||||
with:
|
with:
|
||||||
registry: ghcr.io
|
registry: ghcr.io
|
||||||
@@ -181,7 +181,7 @@ jobs:
|
|||||||
uses: docker/metadata-action@v5
|
uses: docker/metadata-action@v5
|
||||||
with:
|
with:
|
||||||
images: |
|
images: |
|
||||||
${{ github.repository }}
|
name=${{ github.repository }},enable=${{ github.event.repository.fork == false }}
|
||||||
ghcr.io/${{ github.repository }}
|
ghcr.io/${{ github.repository }}
|
||||||
flavor: |
|
flavor: |
|
||||||
suffix=-hardware,onlatest=true
|
suffix=-hardware,onlatest=true
|
||||||
@@ -198,14 +198,14 @@ jobs:
|
|||||||
uses: docker/setup-buildx-action@v3
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
- name: Login to DockerHub
|
- name: Login to DockerHub
|
||||||
if: github.event_name != 'pull_request'
|
if: github.event_name == 'push' && github.event.repository.fork == false
|
||||||
uses: docker/login-action@v3
|
uses: docker/login-action@v3
|
||||||
with:
|
with:
|
||||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||||
|
|
||||||
- name: Login to GitHub Container Registry
|
- name: Login to GitHub Container Registry
|
||||||
if: github.event_name != 'pull_request'
|
if: github.event_name == 'push'
|
||||||
uses: docker/login-action@v3
|
uses: docker/login-action@v3
|
||||||
with:
|
with:
|
||||||
registry: ghcr.io
|
registry: ghcr.io
|
||||||
@@ -236,7 +236,7 @@ jobs:
|
|||||||
uses: docker/metadata-action@v5
|
uses: docker/metadata-action@v5
|
||||||
with:
|
with:
|
||||||
images: |
|
images: |
|
||||||
${{ github.repository }}
|
name=${{ github.repository }},enable=${{ github.event.repository.fork == false }}
|
||||||
ghcr.io/${{ github.repository }}
|
ghcr.io/${{ github.repository }}
|
||||||
flavor: |
|
flavor: |
|
||||||
suffix=-rockchip,onlatest=true
|
suffix=-rockchip,onlatest=true
|
||||||
@@ -253,14 +253,14 @@ jobs:
|
|||||||
uses: docker/setup-buildx-action@v3
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
- name: Login to DockerHub
|
- name: Login to DockerHub
|
||||||
if: github.event_name != 'pull_request'
|
if: github.event_name == 'push' && github.event.repository.fork == false
|
||||||
uses: docker/login-action@v3
|
uses: docker/login-action@v3
|
||||||
with:
|
with:
|
||||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||||
|
|
||||||
- name: Login to GitHub Container Registry
|
- name: Login to GitHub Container Registry
|
||||||
if: github.event_name != 'pull_request'
|
if: github.event_name == 'push'
|
||||||
uses: docker/login-action@v3
|
uses: docker/login-action@v3
|
||||||
with:
|
with:
|
||||||
registry: ghcr.io
|
registry: ghcr.io
|
||||||
|
|||||||
@@ -702,7 +702,7 @@ Supports [Amazon Kinesis Video Streams](https://aws.amazon.com/kinesis/video-str
|
|||||||
|
|
||||||
**switchbot**
|
**switchbot**
|
||||||
|
|
||||||
Support connection to [SwitchBot](https://us.switch-bot.com/) cameras that are based on Kinesis Video Streams. Specifically, this includes [Pan/Tilt Cam Plus 2K](https://us.switch-bot.com/pages/switchbot-pan-tilt-cam-plus-2k) and [Pan/Tilt Cam Plus 3K](https://us.switch-bot.com/pages/switchbot-pan-tilt-cam-plus-3k). `Outdoor Spotlight Cam 1080P`, `Outdoor Spotlight Cam 2K`, `Pan/Tilt Cam`, `Pan/Tilt Cam 2K`, `Indoor Cam` are based on Tuya, so this feature is not available.
|
Support connection to [SwitchBot](https://us.switch-bot.com/) cameras that are based on Kinesis Video Streams. Specifically, this includes [Pan/Tilt Cam Plus 2K](https://us.switch-bot.com/pages/switchbot-pan-tilt-cam-plus-2k) and [Pan/Tilt Cam Plus 3K](https://us.switch-bot.com/pages/switchbot-pan-tilt-cam-plus-3k) and [Smart Video Doorbell](https://www.switchbot.jp/products/switchbot-smart-video-doorbell). `Outdoor Spotlight Cam 1080P`, `Outdoor Spotlight Cam 2K`, `Pan/Tilt Cam`, `Pan/Tilt Cam 2K`, `Indoor Cam` are based on Tuya, so this feature is not available.
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
streams:
|
streams:
|
||||||
@@ -711,7 +711,7 @@ streams:
|
|||||||
webrtc-openipc: webrtc:ws://192.168.1.123/webrtc_ws#format=openipc#ice_servers=[{"urls":"stun:stun.kinesisvideo.eu-north-1.amazonaws.com:443"}]
|
webrtc-openipc: webrtc:ws://192.168.1.123/webrtc_ws#format=openipc#ice_servers=[{"urls":"stun:stun.kinesisvideo.eu-north-1.amazonaws.com:443"}]
|
||||||
webrtc-wyze: webrtc:http://192.168.1.123:5000/signaling/camera1?kvs#format=wyze
|
webrtc-wyze: webrtc:http://192.168.1.123:5000/signaling/camera1?kvs#format=wyze
|
||||||
webrtc-kinesis: webrtc:wss://...amazonaws.com/?...#format=kinesis#client_id=...#ice_servers=[{...},{...}]
|
webrtc-kinesis: webrtc:wss://...amazonaws.com/?...#format=kinesis#client_id=...#ice_servers=[{...},{...}]
|
||||||
webrtc-switchbot: webrtc:wss://...amazonaws.com/?...#format=switchbot#resolution=hd#client_id=...#ice_servers=[{...},{...}]
|
webrtc-switchbot: webrtc:wss://...amazonaws.com/?...#format=switchbot#resolution=hd#play_type=0#client_id=...#ice_servers=[{...},{...}]
|
||||||
```
|
```
|
||||||
|
|
||||||
**PS.** For `kinesis` sources, you can use [echo](#source-echo) to get connection params using `bash`, `python` or any other script language.
|
**PS.** For `kinesis` sources, you can use [echo](#source-echo) to get connection params using `bash`, `python` or any other script language.
|
||||||
|
|||||||
+1
-1
@@ -1,7 +1,7 @@
|
|||||||
# syntax=docker/dockerfile:labs
|
# syntax=docker/dockerfile:labs
|
||||||
|
|
||||||
# 0. Prepare images
|
# 0. Prepare images
|
||||||
ARG PYTHON_VERSION="3.11"
|
ARG PYTHON_VERSION="3.13"
|
||||||
ARG GO_VERSION="1.25"
|
ARG GO_VERSION="1.25"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/AlexxIT/go2rtc/pkg/shell"
|
"github.com/AlexxIT/go2rtc/pkg/creds"
|
||||||
"github.com/AlexxIT/go2rtc/pkg/yaml"
|
"github.com/AlexxIT/go2rtc/pkg/yaml"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -71,13 +71,15 @@ func initConfig(confs flagConfig) {
|
|||||||
// config as file
|
// config as file
|
||||||
if ConfigPath == "" {
|
if ConfigPath == "" {
|
||||||
ConfigPath = conf
|
ConfigPath = conf
|
||||||
|
initStorage()
|
||||||
}
|
}
|
||||||
|
|
||||||
if data, _ = os.ReadFile(conf); data == nil {
|
if data, _ = os.ReadFile(conf); data == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
data = []byte(shell.ReplaceEnvVars(string(data)))
|
loadEnv(data)
|
||||||
|
data = creds.ReplaceVars(data)
|
||||||
configs = append(configs, data)
|
configs = append(configs, data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/creds"
|
||||||
"github.com/mattn/go-isatty"
|
"github.com/mattn/go-isatty"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
)
|
)
|
||||||
@@ -88,6 +89,8 @@ func initLogger() {
|
|||||||
writer = MemoryLog
|
writer = MemoryLog
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writer = creds.SecretWriter(writer)
|
||||||
|
|
||||||
lvl, _ := zerolog.ParseLevel(modules["level"])
|
lvl, _ := zerolog.ParseLevel(modules["level"])
|
||||||
Logger = zerolog.New(writer).Level(lvl)
|
Logger = zerolog.New(writer).Level(lvl)
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/creds"
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/yaml"
|
||||||
|
)
|
||||||
|
|
||||||
|
func initStorage() {
|
||||||
|
storage = &envStorage{data: make(map[string]string)}
|
||||||
|
creds.SetStorage(storage)
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadEnv(data []byte) {
|
||||||
|
var cfg struct {
|
||||||
|
Env map[string]string `yaml:"env"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := yaml.Unmarshal(data, &cfg); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
storage.mu.Lock()
|
||||||
|
for name, value := range cfg.Env {
|
||||||
|
storage.data[name] = value
|
||||||
|
creds.AddSecret(value)
|
||||||
|
}
|
||||||
|
storage.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
var storage *envStorage
|
||||||
|
|
||||||
|
type envStorage struct {
|
||||||
|
data map[string]string
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *envStorage) SetValue(name, value string) error {
|
||||||
|
if err := PatchConfig([]string{"env", name}, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
s.data[name] = value
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *envStorage) GetValue(name string) (value string, ok bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
value, ok = s.data[name]
|
||||||
|
s.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
@@ -29,8 +29,8 @@ var stackSkip = [][]byte{
|
|||||||
[]byte("created by github.com/AlexxIT/go2rtc/internal/homekit.Init"),
|
[]byte("created by github.com/AlexxIT/go2rtc/internal/homekit.Init"),
|
||||||
|
|
||||||
// webrtc/api.go
|
// webrtc/api.go
|
||||||
[]byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"),
|
[]byte("created by github.com/pion/ice/v4.NewTCPMuxDefault"),
|
||||||
[]byte("created by github.com/pion/ice/v2.NewUDPMuxDefault"),
|
[]byte("created by github.com/pion/ice/v4.NewUDPMuxDefault"),
|
||||||
}
|
}
|
||||||
|
|
||||||
func stackHandler(w http.ResponseWriter, r *http.Request) {
|
func stackHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|||||||
@@ -6,10 +6,13 @@ import (
|
|||||||
"github.com/AlexxIT/go2rtc/internal/api"
|
"github.com/AlexxIT/go2rtc/internal/api"
|
||||||
"github.com/AlexxIT/go2rtc/internal/app"
|
"github.com/AlexxIT/go2rtc/internal/app"
|
||||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/creds"
|
||||||
"github.com/AlexxIT/go2rtc/pkg/probe"
|
"github.com/AlexxIT/go2rtc/pkg/probe"
|
||||||
)
|
)
|
||||||
|
|
||||||
func apiStreams(w http.ResponseWriter, r *http.Request) {
|
func apiStreams(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w = creds.SecretResponse(w)
|
||||||
|
|
||||||
query := r.URL.Query()
|
query := r.URL.Query()
|
||||||
src := query.Get("src")
|
src := query.Get("src")
|
||||||
|
|
||||||
@@ -121,6 +124,8 @@ func apiStreamsDOT(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
dot = append(dot, '}')
|
dot = append(dot, '}')
|
||||||
|
|
||||||
|
dot = []byte(creds.SecretString(string(dot)))
|
||||||
|
|
||||||
api.Response(w, dot, "text/vnd.graphviz")
|
api.Response(w, dot, "text/vnd.graphviz")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -33,8 +33,12 @@ func switchbotClient(rawURL string, query url.Values) (core.Producer, error) {
|
|||||||
v.Resolution = 0
|
v.Resolution = 0
|
||||||
case "sd":
|
case "sd":
|
||||||
v.Resolution = 1
|
v.Resolution = 1
|
||||||
|
case "auto":
|
||||||
|
v.Resolution = 2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
v.PlayType = core.Atoi(query.Get("play_type")) // zero by default
|
||||||
|
|
||||||
return v, nil
|
return v, nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,7 @@
|
|||||||
|
# Credentials
|
||||||
|
|
||||||
|
This module allows you to get variables:
|
||||||
|
|
||||||
|
- from custom storage (ex. config file)
|
||||||
|
- from [credential files](https://systemd.io/CREDENTIALS/)
|
||||||
|
- from environment variables
|
||||||
@@ -0,0 +1,79 @@
|
|||||||
|
package creds
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Storage interface {
|
||||||
|
SetValue(name, value string) error
|
||||||
|
GetValue(name string) (string, bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
var storage Storage
|
||||||
|
|
||||||
|
func SetStorage(s Storage) {
|
||||||
|
storage = s
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetValue(name, value string) error {
|
||||||
|
if storage == nil {
|
||||||
|
return errors.New("credentials: storage not initialized")
|
||||||
|
}
|
||||||
|
if err := storage.SetValue(name, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
AddSecret(value)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetValue(name string) (value string, ok bool) {
|
||||||
|
value, ok = getValue(name)
|
||||||
|
AddSecret(value)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func getValue(name string) (string, bool) {
|
||||||
|
if storage != nil {
|
||||||
|
if value, ok := storage.GetValue(name); ok {
|
||||||
|
return value, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if dir, ok := os.LookupEnv("CREDENTIALS_DIRECTORY"); ok {
|
||||||
|
if value, _ := os.ReadFile(filepath.Join(dir, name)); value != nil {
|
||||||
|
return strings.TrimSpace(string(value)), true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.LookupEnv(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplaceVars - support format ${CAMERA_PASSWORD} and ${RTSP_USER:admin}
|
||||||
|
func ReplaceVars(data []byte) []byte {
|
||||||
|
re := regexp.MustCompile(`\${([^}{]+)}`)
|
||||||
|
return re.ReplaceAllFunc(data, func(match []byte) []byte {
|
||||||
|
key := string(match[2 : len(match)-1])
|
||||||
|
|
||||||
|
var def string
|
||||||
|
var defok bool
|
||||||
|
|
||||||
|
if i := strings.IndexByte(key, ':'); i > 0 {
|
||||||
|
key, def = key[:i], key[i+1:]
|
||||||
|
defok = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if value, ok := GetValue(key); ok {
|
||||||
|
return []byte(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
if defok {
|
||||||
|
return []byte(def)
|
||||||
|
}
|
||||||
|
|
||||||
|
return match
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -0,0 +1,83 @@
|
|||||||
|
package creds
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"slices"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
func AddSecret(value string) {
|
||||||
|
if value == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
secretsMu.Lock()
|
||||||
|
defer secretsMu.Unlock()
|
||||||
|
|
||||||
|
if slices.Contains(secrets, value) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
secrets = append(secrets, value)
|
||||||
|
secretsReplacer = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var secrets []string
|
||||||
|
var secretsMu sync.Mutex
|
||||||
|
var secretsReplacer *strings.Replacer
|
||||||
|
|
||||||
|
func getReplacer() *strings.Replacer {
|
||||||
|
secretsMu.Lock()
|
||||||
|
defer secretsMu.Unlock()
|
||||||
|
|
||||||
|
if secretsReplacer == nil {
|
||||||
|
oldnew := make([]string, 0, 2*len(secrets))
|
||||||
|
for _, s := range secrets {
|
||||||
|
oldnew = append(oldnew, s, "***")
|
||||||
|
}
|
||||||
|
secretsReplacer = strings.NewReplacer(oldnew...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return secretsReplacer
|
||||||
|
}
|
||||||
|
|
||||||
|
func SecretString(s string) string {
|
||||||
|
re := getReplacer()
|
||||||
|
return re.Replace(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func SecretWriter(w io.Writer) io.Writer {
|
||||||
|
return &secretWriter{w}
|
||||||
|
}
|
||||||
|
|
||||||
|
type secretWriter struct {
|
||||||
|
w io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *secretWriter) Write(b []byte) (int, error) {
|
||||||
|
re := getReplacer()
|
||||||
|
return re.WriteString(s.w, string(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
type secretResponse struct {
|
||||||
|
w http.ResponseWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *secretResponse) Header() http.Header {
|
||||||
|
return s.w.Header()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *secretResponse) Write(b []byte) (int, error) {
|
||||||
|
re := getReplacer()
|
||||||
|
return re.WriteString(s.w, string(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *secretResponse) WriteHeader(statusCode int) {
|
||||||
|
s.w.WriteHeader(statusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func SecretResponse(w http.ResponseWriter) http.ResponseWriter {
|
||||||
|
return &secretResponse{w}
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package creds
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestString(t *testing.T) {
|
||||||
|
AddSecret("admin")
|
||||||
|
AddSecret("pa$$word")
|
||||||
|
|
||||||
|
s := SecretString("rtsp://admin:pa$$word@192.168.1.123/stream1")
|
||||||
|
require.Equal(t, "rtsp://***:***@192.168.1.123/stream1", s)
|
||||||
|
}
|
||||||
@@ -88,6 +88,8 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Start() (err error) {
|
func (c *Client) Start() (err error) {
|
||||||
_, err = c.conn.Read(nil)
|
// just block until c.conn closed
|
||||||
|
b := make([]byte, 1)
|
||||||
|
_, err = c.conn.Read(b)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
+33
-7
@@ -14,6 +14,7 @@ func RTPDepay(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc {
|
|||||||
|
|
||||||
buf := make([]byte, 0, 512*1024) // 512K
|
buf := make([]byte, 0, 512*1024) // 512K
|
||||||
var nuStart int
|
var nuStart int
|
||||||
|
var seqNum uint16
|
||||||
|
|
||||||
return func(packet *rtp.Packet) {
|
return func(packet *rtp.Packet) {
|
||||||
data := packet.Payload
|
data := packet.Payload
|
||||||
@@ -34,9 +35,19 @@ func RTPDepay(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// when we collect data into one buffer, we need to make sure
|
||||||
|
// that all of it falls into the same sequence
|
||||||
|
if len(buf) > 0 && packet.SequenceNumber-seqNum != 1 {
|
||||||
|
//log.Printf("broken H265 sequence")
|
||||||
|
buf = buf[:0] // drop data
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
seqNum = packet.SequenceNumber
|
||||||
|
|
||||||
if nuType == NALUTypeFU {
|
if nuType == NALUTypeFU {
|
||||||
switch data[2] >> 6 {
|
switch data[2] >> 6 {
|
||||||
case 2: // begin
|
case 0b10: // begin
|
||||||
nuType = data[2] & 0x3F
|
nuType = data[2] & 0x3F
|
||||||
|
|
||||||
// push PS data before keyframe
|
// push PS data before keyframe
|
||||||
@@ -49,13 +60,30 @@ func RTPDepay(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc {
|
|||||||
buf = append(buf, (data[0]&0x81)|(nuType<<1), data[1])
|
buf = append(buf, (data[0]&0x81)|(nuType<<1), data[1])
|
||||||
buf = append(buf, data[3:]...)
|
buf = append(buf, data[3:]...)
|
||||||
return
|
return
|
||||||
case 0: // continue
|
case 0b00: // continue
|
||||||
|
if len(buf) == 0 {
|
||||||
|
//log.Printf("broken H265 fragment")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
buf = append(buf, data[3:]...)
|
buf = append(buf, data[3:]...)
|
||||||
return
|
return
|
||||||
case 1: // end
|
case 0b01: // end
|
||||||
|
if len(buf) == 0 {
|
||||||
|
//log.Printf("broken H265 fragment")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
buf = append(buf, data[3:]...)
|
buf = append(buf, data[3:]...)
|
||||||
|
|
||||||
|
if nuStart > len(buf)+4 {
|
||||||
|
//log.Printf("broken H265 fragment")
|
||||||
|
buf = buf[:0] // drop data
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
binary.BigEndian.PutUint32(buf[nuStart:], uint32(len(buf)-nuStart-4))
|
binary.BigEndian.PutUint32(buf[nuStart:], uint32(len(buf)-nuStart-4))
|
||||||
case 3: // wrong RFC 7798 realisation from OpenIPC project
|
case 0b11: // wrong RFC 7798 realisation from OpenIPC project
|
||||||
// A non-fragmented NAL unit MUST NOT be transmitted in one FU; i.e.,
|
// A non-fragmented NAL unit MUST NOT be transmitted in one FU; i.e.,
|
||||||
// the Start bit and End bit must not both be set to 1 in the same FU
|
// the Start bit and End bit must not both be set to 1 in the same FU
|
||||||
// header.
|
// header.
|
||||||
@@ -65,10 +93,8 @@ func RTPDepay(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc {
|
|||||||
buf = append(buf, data[3:]...)
|
buf = append(buf, data[3:]...)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nuStart = len(buf)
|
buf = binary.BigEndian.AppendUint32(buf, uint32(len(data))) // NAL unit size
|
||||||
buf = append(buf, 0, 0, 0, 0) // NAL unit size
|
|
||||||
buf = append(buf, data...)
|
buf = append(buf, data...)
|
||||||
binary.BigEndian.PutUint32(buf[nuStart:], uint32(len(data)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// collect all NAL Units for Access Unit
|
// collect all NAL Units for Access Unit
|
||||||
|
|||||||
+55
-11
@@ -3,9 +3,10 @@ package onvif
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"html"
|
"html"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -43,7 +44,14 @@ func NewClient(rawURL string) (*Client, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
client.mediaURL = FindTagValue(b, "Media.+?XAddr")
|
client.mediaURL = FindTagValue(b, "Media.+?XAddr")
|
||||||
|
if client.mediaURL == "" {
|
||||||
|
client.mediaURL = baseURL + "/onvif/media_service"
|
||||||
|
}
|
||||||
|
|
||||||
client.imaginURL = FindTagValue(b, "Imaging.+?XAddr")
|
client.imaginURL = FindTagValue(b, "Imaging.+?XAddr")
|
||||||
|
if client.imaginURL == "" {
|
||||||
|
client.imaginURL = baseURL + "/onvif/imaging_service"
|
||||||
|
}
|
||||||
|
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
@@ -175,26 +183,62 @@ func (c *Client) MediaRequest(operation string) ([]byte, error) {
|
|||||||
return c.Request(c.mediaURL, operation)
|
return c.Request(c.mediaURL, operation)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Request(url, body string) ([]byte, error) {
|
func (c *Client) Request(rawUrl, body string) ([]byte, error) {
|
||||||
if url == "" {
|
if rawUrl == "" {
|
||||||
return nil, errors.New("onvif: unsupported service")
|
return nil, errors.New("onvif: unsupported service")
|
||||||
}
|
}
|
||||||
|
|
||||||
e := NewEnvelopeWithUser(c.url.User)
|
e := NewEnvelopeWithUser(c.url.User)
|
||||||
e.Append(body)
|
e.Append(body)
|
||||||
|
|
||||||
client := &http.Client{Timeout: time.Second * 5000}
|
u, err := url.Parse(rawUrl)
|
||||||
res, err := client.Post(url, `application/soap+xml;charset=utf-8`, bytes.NewReader(e.Bytes()))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// need to close body with eny response status
|
host := u.Host
|
||||||
b, err := io.ReadAll(res.Body)
|
if u.Port() == "" {
|
||||||
|
host += ":80"
|
||||||
if err == nil && res.StatusCode != http.StatusOK {
|
|
||||||
err = errors.New("onvif: " + res.Status + " for " + url)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return b, err
|
conn, err := net.DialTimeout("tcp", host, 5*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
reqBody := e.Bytes()
|
||||||
|
rawReq := fmt.Appendf(nil, "POST %s HTTP/1.1\r\n"+
|
||||||
|
"Host: %s\r\n"+
|
||||||
|
"Content-Type: application/soap+xml;charset=utf-8\r\n"+
|
||||||
|
"Content-Length: %d\r\n"+
|
||||||
|
"Connection: close\r\n"+
|
||||||
|
"\r\n", u.Path, u.Host, len(reqBody))
|
||||||
|
rawReq = append(rawReq, reqBody...)
|
||||||
|
|
||||||
|
if _, err = conn.Write(rawReq); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rawRes, err := io.ReadAll(conn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look for XML in complete response
|
||||||
|
if i := bytes.Index(rawRes, []byte("<?xml")); i > 0 {
|
||||||
|
return rawRes[i:], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// No XML found - might be an error response
|
||||||
|
if i := bytes.Index(rawRes, []byte("\r\n\r\n")); i > 0 {
|
||||||
|
if bytes.Contains(rawRes[:i], []byte("chunked")) {
|
||||||
|
return nil, errors.New("onvif: TODO: support chunked encoding")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return body after headers
|
||||||
|
return rawRes[i+4:], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return rawRes, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,3 +47,7 @@ func (p *Probe) AddTrack(media *core.Media, codec *core.Codec, track *core.Recei
|
|||||||
p.Senders = append(p.Senders, sender)
|
p.Senders = append(p.Senders, sender)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Probe) Start() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
+128
-27
@@ -9,6 +9,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/AlexxIT/go2rtc/pkg/tcp/websocket"
|
"github.com/AlexxIT/go2rtc/pkg/tcp/websocket"
|
||||||
@@ -36,14 +37,22 @@ func (c *Conn) Dial() (err error) {
|
|||||||
|
|
||||||
var conn net.Conn
|
var conn net.Conn
|
||||||
|
|
||||||
if c.Transport == "" {
|
switch c.Transport {
|
||||||
timeout := core.ConnDialTimeout
|
case "", "tcp", "udp":
|
||||||
|
var timeout time.Duration
|
||||||
if c.Timeout != 0 {
|
if c.Timeout != 0 {
|
||||||
timeout = time.Second * time.Duration(c.Timeout)
|
timeout = time.Second * time.Duration(c.Timeout)
|
||||||
|
} else {
|
||||||
|
timeout = core.ConnDialTimeout
|
||||||
}
|
}
|
||||||
conn, err = tcp.Dial(c.URL, timeout)
|
conn, err = tcp.Dial(c.URL, timeout)
|
||||||
c.Protocol = "rtsp+tcp"
|
|
||||||
} else {
|
if c.Transport != "udp" {
|
||||||
|
c.Protocol = "rtsp+tcp"
|
||||||
|
} else {
|
||||||
|
c.Protocol = "rtsp+udp"
|
||||||
|
}
|
||||||
|
default:
|
||||||
conn, err = websocket.Dial(c.Transport)
|
conn, err = websocket.Dial(c.Transport)
|
||||||
c.Protocol = "ws"
|
c.Protocol = "ws"
|
||||||
}
|
}
|
||||||
@@ -61,6 +70,9 @@ func (c *Conn) Dial() (err error) {
|
|||||||
c.sequence = 0
|
c.sequence = 0
|
||||||
c.state = StateConn
|
c.state = StateConn
|
||||||
|
|
||||||
|
c.udpConn = nil
|
||||||
|
c.udpAddr = nil
|
||||||
|
|
||||||
c.Connection.RemoteAddr = conn.RemoteAddr().String()
|
c.Connection.RemoteAddr = conn.RemoteAddr().String()
|
||||||
c.Connection.Transport = conn
|
c.Connection.Transport = conn
|
||||||
c.Connection.URL = c.uri
|
c.Connection.URL = c.uri
|
||||||
@@ -218,15 +230,27 @@ func (c *Conn) Record() (err error) {
|
|||||||
func (c *Conn) SetupMedia(media *core.Media) (byte, error) {
|
func (c *Conn) SetupMedia(media *core.Media) (byte, error) {
|
||||||
var transport string
|
var transport string
|
||||||
|
|
||||||
// try to use media position as channel number
|
if c.Transport == "udp" {
|
||||||
for i, m := range c.Medias {
|
conn1, conn2, err := ListenUDPPair()
|
||||||
if m.Equal(media) {
|
if err != nil {
|
||||||
transport = fmt.Sprintf(
|
return 0, err
|
||||||
// i - RTP (data channel)
|
}
|
||||||
// i+1 - RTCP (control channel)
|
|
||||||
"RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1,
|
c.udpConn = append(c.udpConn, conn1, conn2)
|
||||||
)
|
|
||||||
break
|
port := conn1.LocalAddr().(*net.UDPAddr).Port
|
||||||
|
transport = fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", port, port+1)
|
||||||
|
} else {
|
||||||
|
// try to use media position as channel number
|
||||||
|
for i, m := range c.Medias {
|
||||||
|
if m.Equal(media) {
|
||||||
|
transport = fmt.Sprintf(
|
||||||
|
// i - RTP (data channel)
|
||||||
|
// i+1 - RTCP (control channel)
|
||||||
|
"RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1,
|
||||||
|
)
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -286,27 +310,53 @@ func (c *Conn) SetupMedia(media *core.Media) (byte, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// we send our `interleaved`, but camera can answer with another
|
// Parse server response
|
||||||
|
|
||||||
// Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7
|
|
||||||
// Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0
|
|
||||||
// Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1
|
|
||||||
transport = res.Header.Get("Transport")
|
transport = res.Header.Get("Transport")
|
||||||
if !strings.HasPrefix(transport, "RTP/AVP/TCP;") {
|
|
||||||
|
if c.Transport == "udp" {
|
||||||
|
channel := byte(len(c.udpConn) - 2)
|
||||||
|
|
||||||
|
// Dahua: RTP/AVP/UDP;unicast;client_port=49292-49293;server_port=43670-43671;ssrc=7CB694B4
|
||||||
|
// OpenIPC: RTP/AVP/UDP;unicast;client_port=59612-59613
|
||||||
|
if s := core.Between(transport, "server_port=", ";"); s != "" {
|
||||||
|
s1, s2, _ := strings.Cut(s, "-")
|
||||||
|
port1 := core.Atoi(s1)
|
||||||
|
port2 := core.Atoi(s2)
|
||||||
|
// TODO: more smart handling empty server ports
|
||||||
|
if port1 > 0 && port2 > 0 {
|
||||||
|
remoteIP := c.conn.RemoteAddr().(*net.TCPAddr).IP
|
||||||
|
c.udpAddr = append(c.udpAddr,
|
||||||
|
&net.UDPAddr{IP: remoteIP, Port: port1},
|
||||||
|
&net.UDPAddr{IP: remoteIP, Port: port2},
|
||||||
|
)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// Try to open a hole in the NAT router (to allow incoming UDP packets)
|
||||||
|
// by send a UDP packet for RTP and RTCP to the remote RTSP server.
|
||||||
|
// https://github.com/FFmpeg/FFmpeg/blob/aa91ae25b88e195e6af4248e0ab30605735ca1cd/libavformat/rtpdec.c#L416-L438
|
||||||
|
_, _ = c.WriteToUDP([]byte{0x80, 0x00, 0x00, 0x00}, channel)
|
||||||
|
_, _ = c.WriteToUDP([]byte{0x80, 0xC8, 0x00, 0x01}, channel+1)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return channel, nil
|
||||||
|
} else {
|
||||||
|
// we send our `interleaved`, but camera can answer with another
|
||||||
|
|
||||||
|
// Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7
|
||||||
|
// Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0
|
||||||
|
// Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1
|
||||||
// Escam Q6 has a bug:
|
// Escam Q6 has a bug:
|
||||||
// Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1
|
// Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1
|
||||||
if !strings.Contains(transport, ";interleaved=") {
|
s := core.Between(transport, "interleaved=", "-")
|
||||||
|
i, err := strconv.Atoi(s)
|
||||||
|
if err != nil {
|
||||||
return 0, fmt.Errorf("wrong transport: %s", transport)
|
return 0, fmt.Errorf("wrong transport: %s", transport)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
channel := core.Between(transport, "interleaved=", "-")
|
return byte(i), nil
|
||||||
i, err := strconv.Atoi(channel)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return byte(i), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Play() (err error) {
|
func (c *Conn) Play() (err error) {
|
||||||
@@ -327,5 +377,56 @@ func (c *Conn) Close() error {
|
|||||||
if c.OnClose != nil {
|
if c.OnClose != nil {
|
||||||
_ = c.OnClose()
|
_ = c.OnClose()
|
||||||
}
|
}
|
||||||
|
for _, conn := range c.udpConn {
|
||||||
|
_ = conn.Close()
|
||||||
|
}
|
||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Conn) WriteToUDP(b []byte, channel byte) (int, error) {
|
||||||
|
return c.udpConn[channel].WriteToUDP(b, c.udpAddr[channel])
|
||||||
|
}
|
||||||
|
|
||||||
|
const listenUDPAttemps = 10
|
||||||
|
|
||||||
|
var listenUDPMu sync.Mutex
|
||||||
|
|
||||||
|
func ListenUDPPair() (*net.UDPConn, *net.UDPConn, error) {
|
||||||
|
listenUDPMu.Lock()
|
||||||
|
defer listenUDPMu.Unlock()
|
||||||
|
|
||||||
|
for i := 0; i < listenUDPAttemps; i++ {
|
||||||
|
// Get a random even port from the OS
|
||||||
|
ln1, err := net.ListenUDP("udp", &net.UDPAddr{IP: nil, Port: 0})
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var port1 = ln1.LocalAddr().(*net.UDPAddr).Port
|
||||||
|
var port2 int
|
||||||
|
|
||||||
|
// 11. RTP over Network and Transport Protocols (https://www.ietf.org/rfc/rfc3550.txt)
|
||||||
|
// For UDP and similar protocols,
|
||||||
|
// RTP SHOULD use an even destination port number and the corresponding
|
||||||
|
// RTCP stream SHOULD use the next higher (odd) destination port number
|
||||||
|
if port1&1 > 0 {
|
||||||
|
port2 = port1 - 1
|
||||||
|
} else {
|
||||||
|
port2 = port1 + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
ln2, err := net.ListenUDP("udp", &net.UDPAddr{IP: nil, Port: port2})
|
||||||
|
if err != nil {
|
||||||
|
_ = ln1.Close()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if port1 < port2 {
|
||||||
|
return ln1, ln2, nil
|
||||||
|
} else {
|
||||||
|
return ln2, ln1, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil, fmt.Errorf("can't open two UDP ports")
|
||||||
|
}
|
||||||
|
|||||||
+186
-139
@@ -2,6 +2,7 @@ package rtsp
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@@ -13,7 +14,6 @@ import (
|
|||||||
|
|
||||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||||
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
"github.com/AlexxIT/go2rtc/pkg/tcp"
|
||||||
"github.com/pion/rtcp"
|
|
||||||
"github.com/pion/rtp"
|
"github.com/pion/rtp"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -40,6 +40,7 @@ type Conn struct {
|
|||||||
keepalive int
|
keepalive int
|
||||||
mode core.Mode
|
mode core.Mode
|
||||||
playOK bool
|
playOK bool
|
||||||
|
playErr error
|
||||||
reader *bufio.Reader
|
reader *bufio.Reader
|
||||||
sequence int
|
sequence int
|
||||||
session string
|
session string
|
||||||
@@ -47,6 +48,9 @@ type Conn struct {
|
|||||||
|
|
||||||
state State
|
state State
|
||||||
stateMu sync.Mutex
|
stateMu sync.Mutex
|
||||||
|
|
||||||
|
udpConn []*net.UDPConn
|
||||||
|
udpAddr []*net.UDPAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -68,7 +72,6 @@ func (s State) String() string {
|
|||||||
case StateNone:
|
case StateNone:
|
||||||
return "NONE"
|
return "NONE"
|
||||||
case StateConn:
|
case StateConn:
|
||||||
|
|
||||||
return "CONN"
|
return "CONN"
|
||||||
case StateSetup:
|
case StateSetup:
|
||||||
return MethodSetup
|
return MethodSetup
|
||||||
@@ -88,23 +91,25 @@ const (
|
|||||||
func (c *Conn) Handle() (err error) {
|
func (c *Conn) Handle() (err error) {
|
||||||
var timeout time.Duration
|
var timeout time.Duration
|
||||||
|
|
||||||
var keepaliveDT time.Duration
|
|
||||||
var keepaliveTS time.Time
|
|
||||||
|
|
||||||
switch c.mode {
|
switch c.mode {
|
||||||
case core.ModeActiveProducer:
|
case core.ModeActiveProducer:
|
||||||
|
var keepaliveDT time.Duration
|
||||||
|
|
||||||
if c.keepalive > 5 {
|
if c.keepalive > 5 {
|
||||||
keepaliveDT = time.Duration(c.keepalive-5) * time.Second
|
keepaliveDT = time.Duration(c.keepalive-5) * time.Second
|
||||||
} else {
|
} else {
|
||||||
keepaliveDT = 25 * time.Second
|
keepaliveDT = 25 * time.Second
|
||||||
}
|
}
|
||||||
keepaliveTS = time.Now().Add(keepaliveDT)
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go c.handleKeepalive(ctx, keepaliveDT)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
if c.Timeout == 0 {
|
if c.Timeout == 0 {
|
||||||
// polling frames from remote RTSP Server (ex Camera)
|
// polling frames from remote RTSP Server (ex Camera)
|
||||||
timeout = time.Second * 5
|
timeout = time.Second * 5
|
||||||
|
|
||||||
if len(c.Receivers) == 0 {
|
if len(c.Receivers) == 0 || c.Transport == "udp" {
|
||||||
// if we only send audio to camera
|
// if we only send audio to camera
|
||||||
// https://github.com/AlexxIT/go2rtc/issues/659
|
// https://github.com/AlexxIT/go2rtc/issues/659
|
||||||
timeout += keepaliveDT
|
timeout += keepaliveDT
|
||||||
@@ -129,148 +134,190 @@ func (c *Conn) Handle() (err error) {
|
|||||||
return fmt.Errorf("wrong RTSP conn mode: %d", c.mode)
|
return fmt.Errorf("wrong RTSP conn mode: %d", c.mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(c.udpConn); i++ {
|
||||||
|
go c.handleUDPData(byte(i))
|
||||||
|
}
|
||||||
|
|
||||||
for c.state != StateNone {
|
for c.state != StateNone {
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
|
|
||||||
if err = c.conn.SetReadDeadline(ts.Add(timeout)); err != nil {
|
_ = c.conn.SetReadDeadline(ts.Add(timeout))
|
||||||
|
|
||||||
|
if err = c.handleTCPData(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can read:
|
|
||||||
// 1. RTP interleaved: `$` + 1B channel number + 2B size
|
|
||||||
// 2. RTSP response: RTSP/1.0 200 OK
|
|
||||||
// 3. RTSP request: OPTIONS ...
|
|
||||||
var buf4 []byte // `$` + 1B channel number + 2B size
|
|
||||||
buf4, err = c.reader.Peek(4)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var channelID byte
|
|
||||||
var size uint16
|
|
||||||
|
|
||||||
if buf4[0] != '$' {
|
|
||||||
switch string(buf4) {
|
|
||||||
case "RTSP":
|
|
||||||
var res *tcp.Response
|
|
||||||
if res, err = c.ReadResponse(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.Fire(res)
|
|
||||||
// for playing backchannel only after OK response on play
|
|
||||||
c.playOK = true
|
|
||||||
continue
|
|
||||||
|
|
||||||
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
|
|
||||||
var req *tcp.Request
|
|
||||||
if req, err = c.ReadRequest(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.Fire(req)
|
|
||||||
if req.Method == MethodOptions {
|
|
||||||
res := &tcp.Response{Request: req}
|
|
||||||
if err = c.WriteResponse(res); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
|
|
||||||
default:
|
|
||||||
c.Fire("RTSP wrong input")
|
|
||||||
|
|
||||||
for i := 0; ; i++ {
|
|
||||||
// search next start symbol
|
|
||||||
if _, err = c.reader.ReadBytes('$'); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if channelID, err = c.reader.ReadByte(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: better check maximum good channel ID
|
|
||||||
if channelID >= 20 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
buf4 = make([]byte, 2)
|
|
||||||
if _, err = io.ReadFull(c.reader, buf4); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if size good for RTP
|
|
||||||
size = binary.BigEndian.Uint16(buf4)
|
|
||||||
if size <= 1500 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// 10 tries to find good packet
|
|
||||||
if i >= 10 {
|
|
||||||
return fmt.Errorf("RTSP wrong input")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// hope that the odd channels are always RTCP
|
|
||||||
channelID = buf4[1]
|
|
||||||
|
|
||||||
// get data size
|
|
||||||
size = binary.BigEndian.Uint16(buf4[2:])
|
|
||||||
|
|
||||||
// skip 4 bytes from c.reader.Peek
|
|
||||||
if _, err = c.reader.Discard(4); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// init memory for data
|
|
||||||
buf := make([]byte, size)
|
|
||||||
if _, err = io.ReadFull(c.reader, buf); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Recv += int(size)
|
|
||||||
|
|
||||||
if channelID&1 == 0 {
|
|
||||||
packet := &rtp.Packet{}
|
|
||||||
if err = packet.Unmarshal(buf); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, receiver := range c.Receivers {
|
|
||||||
if receiver.ID == channelID {
|
|
||||||
receiver.WriteRTP(packet)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
msg := &RTCP{Channel: channelID}
|
|
||||||
|
|
||||||
if err = msg.Header.Unmarshal(buf); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
msg.Packets, err = rtcp.Unmarshal(buf)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Fire(msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
if keepaliveDT != 0 && ts.After(keepaliveTS) {
|
|
||||||
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
|
|
||||||
if err = c.WriteRequest(req); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
keepaliveTS = ts.Add(keepaliveDT)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Conn) handleKeepalive(ctx context.Context, d time.Duration) {
|
||||||
|
ticker := time.NewTicker(d)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
|
||||||
|
if err := c.WriteRequest(req); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) handleUDPData(channel byte) {
|
||||||
|
// TODO: handle timeouts and drop TCP connection after any error
|
||||||
|
conn := c.udpConn[channel]
|
||||||
|
|
||||||
|
for {
|
||||||
|
// TP-Link Tapo camera has crazy 10000 bytes packet size
|
||||||
|
buf := make([]byte, 10240)
|
||||||
|
|
||||||
|
n, _, err := conn.ReadFromUDP(buf)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = c.handleRawPacket(channel, buf[:n]); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) handleTCPData() error {
|
||||||
|
// we can read:
|
||||||
|
// 1. RTP interleaved: `$` + 1B channel number + 2B size
|
||||||
|
// 2. RTSP response: RTSP/1.0 200 OK
|
||||||
|
// 3. RTSP request: OPTIONS ...
|
||||||
|
var buf4 []byte // `$` + 1B channel number + 2B size
|
||||||
|
var err error
|
||||||
|
|
||||||
|
buf4, err = c.reader.Peek(4)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var channel byte
|
||||||
|
var size uint16
|
||||||
|
|
||||||
|
if buf4[0] != '$' {
|
||||||
|
switch string(buf4) {
|
||||||
|
case "RTSP":
|
||||||
|
var res *tcp.Response
|
||||||
|
if res, err = c.ReadResponse(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.Fire(res)
|
||||||
|
// for playing backchannel only after OK response on play
|
||||||
|
c.playOK = true
|
||||||
|
return nil
|
||||||
|
|
||||||
|
case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_":
|
||||||
|
var req *tcp.Request
|
||||||
|
if req, err = c.ReadRequest(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.Fire(req)
|
||||||
|
if req.Method == MethodOptions {
|
||||||
|
res := &tcp.Response{Request: req}
|
||||||
|
if err = c.WriteResponse(res); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
c.Fire("RTSP wrong input")
|
||||||
|
|
||||||
|
for i := 0; ; i++ {
|
||||||
|
// search next start symbol
|
||||||
|
if _, err = c.reader.ReadBytes('$'); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if channel, err = c.reader.ReadByte(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: better check maximum good channel ID
|
||||||
|
if channel >= 20 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
buf4 = make([]byte, 2)
|
||||||
|
if _, err = io.ReadFull(c.reader, buf4); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if size good for RTP
|
||||||
|
size = binary.BigEndian.Uint16(buf4)
|
||||||
|
if size <= 1500 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// 10 tries to find good packet
|
||||||
|
if i >= 10 {
|
||||||
|
return fmt.Errorf("RTSP wrong input")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// hope that the odd channels are always RTCP
|
||||||
|
channel = buf4[1]
|
||||||
|
|
||||||
|
// get data size
|
||||||
|
size = binary.BigEndian.Uint16(buf4[2:])
|
||||||
|
|
||||||
|
// skip 4 bytes from c.reader.Peek
|
||||||
|
if _, err = c.reader.Discard(4); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// init memory for data
|
||||||
|
buf := make([]byte, size)
|
||||||
|
if _, err = io.ReadFull(c.reader, buf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Recv += int(size)
|
||||||
|
|
||||||
|
return c.handleRawPacket(channel, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) handleRawPacket(channel byte, buf []byte) error {
|
||||||
|
if channel&1 == 0 {
|
||||||
|
packet := &rtp.Packet{}
|
||||||
|
if err := packet.Unmarshal(buf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, receiver := range c.Receivers {
|
||||||
|
if receiver.ID == channel {
|
||||||
|
receiver.WriteRTP(packet)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
msg := &RTCP{Channel: channel}
|
||||||
|
|
||||||
|
if err := msg.Header.Unmarshal(buf); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//var err error
|
||||||
|
//msg.Packets, err = rtcp.Unmarshal(buf)
|
||||||
|
//if err != nil {
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
c.Fire(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Conn) WriteRequest(req *tcp.Request) error {
|
func (c *Conn) WriteRequest(req *tcp.Request) error {
|
||||||
if req.Proto == "" {
|
if req.Proto == "" {
|
||||||
req.Proto = ProtoRTSP
|
req.Proto = ProtoRTSP
|
||||||
|
|||||||
+23
-4
@@ -85,11 +85,8 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core.
|
|||||||
}
|
}
|
||||||
|
|
||||||
flushBuf := func() {
|
flushBuf := func() {
|
||||||
if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
//log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf))
|
//log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf))
|
||||||
if _, err := c.conn.Write(buf[:n]); err == nil {
|
if err := c.writeInterleavedData(buf[:n]); err != nil {
|
||||||
c.Send += n
|
c.Send += n
|
||||||
}
|
}
|
||||||
n = 0
|
n = 0
|
||||||
@@ -177,3 +174,25 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core.
|
|||||||
|
|
||||||
return handlerFunc
|
return handlerFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Conn) writeInterleavedData(data []byte) error {
|
||||||
|
if c.Transport != "udp" {
|
||||||
|
_ = c.conn.SetWriteDeadline(time.Now().Add(Timeout))
|
||||||
|
_, err := c.conn.Write(data)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for len(data) >= 4 && data[0] == '$' {
|
||||||
|
channel := data[1]
|
||||||
|
size := uint16(data[2])<<8 | uint16(data[3])
|
||||||
|
rtpData := data[4 : 4+size]
|
||||||
|
|
||||||
|
if _, err := c.WriteToUDP(rtpData, channel); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
data = data[4+size:]
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,8 +3,6 @@ package shell
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"path/filepath"
|
|
||||||
"regexp"
|
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
)
|
)
|
||||||
@@ -38,39 +36,6 @@ func QuoteSplit(s string) []string {
|
|||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReplaceEnvVars - support format ${CAMERA_PASSWORD} and ${RTSP_USER:admin}
|
|
||||||
func ReplaceEnvVars(text string) string {
|
|
||||||
re := regexp.MustCompile(`\${([^}{]+)}`)
|
|
||||||
return re.ReplaceAllStringFunc(text, func(match string) string {
|
|
||||||
key := match[2 : len(match)-1]
|
|
||||||
|
|
||||||
var def string
|
|
||||||
var dok bool
|
|
||||||
|
|
||||||
if i := strings.IndexByte(key, ':'); i > 0 {
|
|
||||||
key, def = key[:i], key[i+1:]
|
|
||||||
dok = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if dir, vok := os.LookupEnv("CREDENTIALS_DIRECTORY"); vok {
|
|
||||||
value, err := os.ReadFile(filepath.Join(dir, key))
|
|
||||||
if err == nil {
|
|
||||||
return strings.TrimSpace(string(value))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if value, vok := os.LookupEnv(key); vok {
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
|
|
||||||
if dok {
|
|
||||||
return def
|
|
||||||
}
|
|
||||||
|
|
||||||
return match
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func RunUntilSignal() {
|
func RunUntilSignal() {
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|||||||
Reference in New Issue
Block a user