Merge branch 'AlexxIT:master' into onvif-client

This commit is contained in:
seydx
2025-03-11 01:45:25 +01:00
committed by GitHub
87 changed files with 1700 additions and 754 deletions
+4
View File
@@ -141,6 +141,10 @@ func MarshalSDP(name string, medias []*Media) ([]byte, error) {
}
md.WithCodec(codec.PayloadType, name, codec.ClockRate, codec.Channels, codec.FmtpLine)
if media.Direction != "" {
md.WithPropertyAttribute(media.Direction)
}
if media.ID != "" {
md.WithValueAttribute("control", media.ID)
}
+7 -4
View File
@@ -140,6 +140,7 @@ func (s *Sender) Start() {
s.done = make(chan struct{})
go func() {
// for range on nil chan is OK
for packet := range s.buf {
s.Output(packet)
}
@@ -148,7 +149,7 @@ func (s *Sender) Start() {
}
func (s *Sender) Wait() {
if done := s.done; s.done != nil {
if done := s.done; done != nil {
<-done
}
}
@@ -165,10 +166,12 @@ func (s *Sender) State() string {
func (s *Sender) Close() {
// close buffer if exists
if buf := s.buf; buf != nil {
s.buf = nil
defer close(buf)
s.mu.Lock()
if s.buf != nil {
close(s.buf) // exit from for range loop
s.buf = nil // prevent writing to closed chan
}
s.mu.Unlock()
s.Node.Close()
}
+53
View File
@@ -0,0 +1,53 @@
package core
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestSenser(t *testing.T) {
recv := make(chan *Packet) // blocking receiver
sender := NewSender(nil, &Codec{})
sender.Output = func(packet *Packet) {
recv <- packet
}
require.Equal(t, "new", sender.State())
sender.Start()
require.Equal(t, "connected", sender.State())
sender.Input(&Packet{})
sender.Input(&Packet{})
require.Equal(t, 2, sender.Packets)
require.Equal(t, 0, sender.Drops)
// important to read one before close
// because goroutine in Start() can run with nil chan
// it's OK in real life, but bad for test
_, ok := <-recv
require.True(t, ok)
sender.Close()
require.Equal(t, "closed", sender.State())
sender.Input(&Packet{})
require.Equal(t, 2, sender.Packets)
require.Equal(t, 1, sender.Drops)
// read 2nd
_, ok = <-recv
require.True(t, ok)
// read 3rd
select {
case <-recv:
ok = true
default:
ok = false
}
require.False(t, ok)
}
+1
View File
@@ -3,6 +3,7 @@ package h265
import (
"encoding/base64"
"encoding/binary"
"github.com/AlexxIT/go2rtc/pkg/core"
)
+2 -1
View File
@@ -2,8 +2,9 @@ package h265
import (
"encoding/binary"
"github.com/AlexxIT/go2rtc/pkg/h264"
"math"
"github.com/AlexxIT/go2rtc/pkg/h264"
)
//
+9
View File
@@ -2,12 +2,21 @@ package camera
import (
"encoding/base64"
"strings"
"testing"
"github.com/AlexxIT/go2rtc/pkg/hap"
"github.com/stretchr/testify/require"
)
func TestNilCharacter(t *testing.T) {
var res SetupEndpoints
char := &hap.Character{}
err := char.ReadTLV8(&res)
require.NotNil(t, err)
require.NotNil(t, strings.Contains(err.Error(), "can't read value"))
}
type testTLV8 struct {
name string
value string
+10 -3
View File
@@ -3,6 +3,7 @@ package hap
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
@@ -126,11 +127,17 @@ func (c *Character) Write(v any) (err error) {
// ReadTLV8 value to right struct
func (c *Character) ReadTLV8(v any) (err error) {
return tlv8.UnmarshalBase64(c.Value.(string), v)
if s, ok := c.Value.(string); ok {
return tlv8.UnmarshalBase64(s, v)
}
return fmt.Errorf("hap: can't read value: %v", v)
}
func (c *Character) ReadBool() bool {
return c.Value.(bool)
func (c *Character) ReadBool() (bool, error) {
if v, ok := c.Value.(bool); ok {
return v, nil
}
return false, fmt.Errorf("hap: can't read value: %v", c.Value)
}
func (c *Character) String() string {
+15
View File
@@ -235,3 +235,18 @@ func WriteResponse(w *bufio.Writer, statusCode int, contentType string, body []b
}
return w.Flush()
}
func WriteBackoff(rw *bufio.ReadWriter) error {
plainM2 := struct {
State byte `tlv8:"6"`
Error byte `tlv8:"7"`
}{
State: StateM2,
Error: 3, // BackoffError
}
body, err := tlv8.Marshal(plainM2)
if err != nil {
return err
}
return WriteResponse(rw.Writer, http.StatusOK, MimeTLV8, body)
}
+2 -1
View File
@@ -2,8 +2,9 @@ package hass
import (
"errors"
"github.com/gorilla/websocket"
"os"
"github.com/gorilla/websocket"
)
type API struct {
+12 -37
View File
@@ -10,10 +10,14 @@ import (
"syscall"
"time"
"github.com/AlexxIT/go2rtc/pkg/xnet"
"github.com/miekg/dns" // awesome library for parsing mDNS records
)
const ServiceHAP = "_hap._tcp.local." // HomeKit Accessory Protocol
const (
ServiceDNSSD = "_services._dns-sd._udp.local."
ServiceHAP = "_hap._tcp.local." // HomeKit Accessory Protocol
)
type ServiceEntry struct {
Name string `json:"name,omitempty"`
@@ -153,6 +157,7 @@ type Browser struct {
Service string
Addr net.Addr
Nets []*net.IPNet
Recv net.PacketConn
Sends []net.PacketConn
@@ -165,7 +170,9 @@ type Browser struct {
// Receiver will get multicast responses on senders requests.
func (b *Browser) ListenMulticastUDP() error {
// 1. Collect IPv4 interfaces
ip4s, err := InterfacesIP4()
nets, err := xnet.IPNets(func(ip net.IP) bool {
return !xnet.Docker.Contains(ip)
})
if err != nil {
return err
}
@@ -182,11 +189,12 @@ func (b *Browser) ListenMulticastUDP() error {
ctx := context.Background()
for _, ip4 := range ip4s {
conn, err := lc1.ListenPacket(ctx, "udp4", ip4.String()+":5353") // same port important
for _, ipn := range nets {
conn, err := lc1.ListenPacket(ctx, "udp4", ipn.IP.String()+":5353") // same port important
if err != nil {
continue
}
b.Nets = append(b.Nets, ipn)
b.Sends = append(b.Sends, conn)
}
@@ -364,36 +372,3 @@ func NewServiceEntries(msg *dns.Msg, ip net.IP) (entries []*ServiceEntry) {
return
}
func InterfacesIP4() ([]net.IP, error) {
intfs, err := net.Interfaces()
if err != nil {
return nil, err
}
var ips []net.IP
loop:
for _, intf := range intfs {
if intf.Flags&net.FlagUp == 0 || intf.Flags&net.FlagLoopback != 0 {
continue
}
addrs, err := intf.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
if ip := v.IP.To4(); ip != nil {
ips = append(ips, ip)
continue loop
}
}
}
}
return ips, nil
}
+2 -1
View File
@@ -1,8 +1,9 @@
package mdns
import (
"github.com/stretchr/testify/require"
"testing"
"github.com/stretchr/testify/require"
)
func TestDiscovery(t *testing.T) {
+101 -96
View File
@@ -20,7 +20,11 @@ func Serve(service string, entries []*ServiceEntry) error {
}
func (b *Browser) Serve(entries []*ServiceEntry) error {
var msg dns.Msg
names := make(map[string]*ServiceEntry, len(entries))
for _, entry := range entries {
name := entry.name() + "." + b.Service
names[name] = entry
}
buf := make([]byte, 1500)
for {
@@ -29,129 +33,130 @@ func (b *Browser) Serve(entries []*ServiceEntry) error {
break
}
if err = msg.Unpack(buf[:n]); err != nil {
var req dns.Msg // request
if err = req.Unpack(buf[:n]); err != nil {
continue
}
if !HasQuestionPTP(&msg, b.Service) {
// skip messages without Questions
if req.Question == nil {
continue
}
remoteIP := addr.(*net.UDPAddr).IP
localIP := MatchLocalIP(remoteIP)
localIP := b.MatchLocalIP(remoteIP)
// skip messages from unknown networks (can be docker network)
if localIP == nil {
continue
}
answer, err := NewDNSAnswer(entries, b.Service, localIP).Pack()
var res dns.Msg // response
for _, q := range req.Question {
if q.Qtype != dns.TypePTR || q.Qclass != dns.ClassINET {
continue
}
if q.Name == ServiceDNSSD {
AppendDNSSD(&res, b.Service)
} else if q.Name == b.Service {
for _, entry := range entries {
AppendEntry(&res, entry, b.Service, localIP)
}
} else if entry, ok := names[q.Name]; ok {
AppendEntry(&res, entry, b.Service, localIP)
}
}
if res.Answer == nil {
continue
}
res.MsgHdr.Response = true
res.MsgHdr.Authoritative = true
data, err := res.Pack()
if err != nil {
continue
}
for _, send := range b.Sends {
_, _ = send.WriteTo(answer, MulticastAddr)
_, _ = send.WriteTo(data, MulticastAddr)
}
}
return nil
}
func HasQuestionPTP(msg *dns.Msg, name string) bool {
for _, q := range msg.Question {
if q.Qtype == dns.TypePTR && q.Name == name {
return true
func (b *Browser) MatchLocalIP(remote net.IP) net.IP {
for _, ipn := range b.Nets {
if ipn.Contains(remote) {
return ipn.IP
}
}
return false
return nil
}
func NewDNSAnswer(entries []*ServiceEntry, service string, ip net.IP) *dns.Msg {
msg := dns.Msg{
MsgHdr: dns.MsgHdr{
Response: true,
Authoritative: true,
func AppendDNSSD(msg *dns.Msg, service string) {
msg.Answer = append(
msg.Answer,
&dns.PTR{
Hdr: dns.RR_Header{
Name: ServiceDNSSD, // _services._dns-sd._udp.local.
Rrtype: dns.TypePTR, // 12
Class: dns.ClassINET, // 1
Ttl: 4500,
},
Ptr: service, // _home-assistant._tcp.local.
},
}
for _, entry := range entries {
ptrName := entry.name() + "." + service
srvName := entry.name() + ".local."
msg.Answer = append(
msg.Answer,
&dns.PTR{
Hdr: dns.RR_Header{
Name: service,
Rrtype: dns.TypePTR,
Class: dns.ClassINET,
Ttl: 4500,
},
Ptr: ptrName,
},
)
msg.Extra = append(
msg.Extra,
&dns.TXT{
Hdr: dns.RR_Header{
Name: ptrName,
Rrtype: dns.TypeTXT,
Class: ClassCacheFlush,
Ttl: 4500,
},
Txt: entry.TXT(),
},
&dns.SRV{
Hdr: dns.RR_Header{
Name: ptrName,
Rrtype: dns.TypeSRV,
Class: ClassCacheFlush,
Ttl: 120,
Rdlength: 0,
},
Port: entry.Port,
Target: srvName,
},
&dns.A{
Hdr: dns.RR_Header{
Name: srvName,
Rrtype: dns.TypeA,
Class: ClassCacheFlush,
Ttl: 120,
Rdlength: 0,
},
A: ip,
},
)
}
return &msg
)
}
func MatchLocalIP(remote net.IP) net.IP {
intfs, err := net.Interfaces()
if err != nil {
return nil
}
func AppendEntry(msg *dns.Msg, entry *ServiceEntry, service string, ip net.IP) {
ptrName := entry.name() + "." + service
srvName := entry.name() + ".local."
for _, intf := range intfs {
if intf.Flags&net.FlagUp == 0 || intf.Flags&net.FlagLoopback != 0 {
continue
}
addrs, err := intf.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
if local := v.IP.To4(); local != nil && v.Contains(remote) {
return local
}
}
}
}
return nil
msg.Answer = append(
msg.Answer,
&dns.PTR{
Hdr: dns.RR_Header{
Name: service, // _home-assistant._tcp.local.
Rrtype: dns.TypePTR, // 12
Class: dns.ClassINET, // 1
Ttl: 4500,
},
Ptr: ptrName, // Home\ Assistant._home-assistant._tcp.local.
},
)
msg.Extra = append(
msg.Extra,
&dns.TXT{
Hdr: dns.RR_Header{
Name: ptrName, // Home\ Assistant._home-assistant._tcp.local.
Rrtype: dns.TypeTXT, // 16
Class: ClassCacheFlush, // 32769
Ttl: 4500,
},
Txt: entry.TXT(),
},
&dns.SRV{
Hdr: dns.RR_Header{
Name: ptrName, // Home\ Assistant._home-assistant._tcp.local.
Rrtype: dns.TypeSRV, // 33
Class: ClassCacheFlush, // 32769
Ttl: 120,
},
Port: entry.Port, // 8123
Target: srvName, // 963f1fa82b7142809711cebe7c826322.local.
},
&dns.A{
Hdr: dns.RR_Header{
Name: srvName, // 963f1fa82b7142809711cebe7c826322.local.
Rrtype: dns.TypeA, // 1
Class: ClassCacheFlush, // 32769
Ttl: 120,
},
A: ip,
},
)
}
+3 -2
View File
@@ -3,10 +3,11 @@ package mjpeg
import (
"bytes"
"encoding/binary"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
"image"
"image/jpeg"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
)
func RTPDepay(handlerFunc core.HandlerFunc) core.HandlerFunc {
+151 -18
View File
@@ -17,9 +17,15 @@ type API struct {
StreamProjectID string
StreamDeviceID string
StreamSessionID string
StreamExpiresAt time.Time
// WebRTC
StreamSessionID string
// RTSP
StreamToken string
StreamExtensionToken string
extendTimer *time.Timer
}
@@ -27,6 +33,12 @@ type Auth struct {
AccessToken string
}
type DeviceInfo struct {
Name string
DeviceID string
Protocols []string
}
var cache = map[string]*API{}
var cacheMu sync.Mutex
@@ -80,7 +92,7 @@ func NewAPI(clientID, clientSecret, refreshToken string) (*API, error) {
return api, nil
}
func (a *API) GetDevices(projectID string) (map[string]string, error) {
func (a *API) GetDevices(projectID string) ([]DeviceInfo, error) {
uri := "https://smartdevicemanagement.googleapis.com/v1/enterprises/" + projectID + "/devices"
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
@@ -108,24 +120,30 @@ func (a *API) GetDevices(projectID string) (map[string]string, error) {
return nil, err
}
devices := map[string]string{}
devices := make([]DeviceInfo, 0, len(resv.Devices))
for _, device := range resv.Devices {
// only RTSP and WEB_RTC available (both supported)
if len(device.Traits.SdmDevicesTraitsCameraLiveStream.SupportedProtocols) == 0 {
continue
}
if device.Traits.SdmDevicesTraitsCameraLiveStream.SupportedProtocols[0] != "WEB_RTC" {
continue
}
i := strings.LastIndexByte(device.Name, '/')
if i <= 0 {
continue
}
name := device.Traits.SdmDevicesTraitsInfo.CustomName
devices[name] = device.Name[i+1:]
// Devices configured through the Nest app use the container/room name as opposed to the customName trait
if name == "" && len(device.ParentRelations) > 0 {
name = device.ParentRelations[0].DisplayName
}
devices = append(devices, DeviceInfo{
Name: name,
DeviceID: device.Name[i+1:],
Protocols: device.Traits.SdmDevicesTraitsCameraLiveStream.SupportedProtocols,
})
}
return devices, nil
@@ -190,11 +208,20 @@ func (a *API) ExtendStream() error {
var reqv struct {
Command string `json:"command"`
Params struct {
MediaSessionID string `json:"mediaSessionId"`
MediaSessionID string `json:"mediaSessionId,omitempty"`
StreamExtensionToken string `json:"streamExtensionToken,omitempty"`
} `json:"params"`
}
reqv.Command = "sdm.devices.commands.CameraLiveStream.ExtendWebRtcStream"
reqv.Params.MediaSessionID = a.StreamSessionID
if a.StreamToken != "" {
// RTSP
reqv.Command = "sdm.devices.commands.CameraLiveStream.ExtendRtspStream"
reqv.Params.StreamExtensionToken = a.StreamExtensionToken
} else {
// WebRTC
reqv.Command = "sdm.devices.commands.CameraLiveStream.ExtendWebRtcStream"
reqv.Params.MediaSessionID = a.StreamSessionID
}
b, err := json.Marshal(reqv)
if err != nil {
@@ -223,8 +250,10 @@ func (a *API) ExtendStream() error {
var resv struct {
Results struct {
ExpiresAt time.Time `json:"expiresAt"`
MediaSessionID string `json:"mediaSessionId"`
ExpiresAt time.Time `json:"expiresAt"`
MediaSessionID string `json:"mediaSessionId"`
StreamExtensionToken string `json:"streamExtensionToken"`
StreamToken string `json:"streamToken"`
} `json:"results"`
}
@@ -234,6 +263,111 @@ func (a *API) ExtendStream() error {
a.StreamSessionID = resv.Results.MediaSessionID
a.StreamExpiresAt = resv.Results.ExpiresAt
a.StreamExtensionToken = resv.Results.StreamExtensionToken
a.StreamToken = resv.Results.StreamToken
return nil
}
func (a *API) GenerateRtspStream(projectID, deviceID string) (string, error) {
var reqv struct {
Command string `json:"command"`
Params struct{} `json:"params"`
}
reqv.Command = "sdm.devices.commands.CameraLiveStream.GenerateRtspStream"
b, err := json.Marshal(reqv)
if err != nil {
return "", err
}
uri := "https://smartdevicemanagement.googleapis.com/v1/enterprises/" +
projectID + "/devices/" + deviceID + ":executeCommand"
req, err := http.NewRequest("POST", uri, bytes.NewReader(b))
if err != nil {
return "", err
}
req.Header.Set("Authorization", "Bearer "+a.Token)
client := &http.Client{Timeout: time.Second * 5000}
res, err := client.Do(req)
if err != nil {
return "", err
}
if res.StatusCode != 200 {
return "", errors.New("nest: wrong status: " + res.Status)
}
var resv struct {
Results struct {
StreamURLs map[string]string `json:"streamUrls"`
StreamExtensionToken string `json:"streamExtensionToken"`
StreamToken string `json:"streamToken"`
ExpiresAt time.Time `json:"expiresAt"`
} `json:"results"`
}
if err = json.NewDecoder(res.Body).Decode(&resv); err != nil {
return "", err
}
if _, ok := resv.Results.StreamURLs["rtspUrl"]; !ok {
return "", errors.New("nest: failed to generate rtsp url")
}
a.StreamProjectID = projectID
a.StreamDeviceID = deviceID
a.StreamToken = resv.Results.StreamToken
a.StreamExtensionToken = resv.Results.StreamExtensionToken
a.StreamExpiresAt = resv.Results.ExpiresAt
return resv.Results.StreamURLs["rtspUrl"], nil
}
func (a *API) StopRTSPStream() error {
if a.StreamProjectID == "" || a.StreamDeviceID == "" {
return errors.New("nest: tried to stop rtsp stream without a project or device ID")
}
var reqv struct {
Command string `json:"command"`
Params struct {
StreamExtensionToken string `json:"streamExtensionToken"`
} `json:"params"`
}
reqv.Command = "sdm.devices.commands.CameraLiveStream.StopRtspStream"
reqv.Params.StreamExtensionToken = a.StreamExtensionToken
b, err := json.Marshal(reqv)
if err != nil {
return err
}
uri := "https://smartdevicemanagement.googleapis.com/v1/enterprises/" +
a.StreamProjectID + "/devices/" + a.StreamDeviceID + ":executeCommand"
req, err := http.NewRequest("POST", uri, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+a.Token)
client := &http.Client{Timeout: time.Second * 5000}
res, err := client.Do(req)
if err != nil {
return err
}
if res.StatusCode != 200 {
return errors.New("nest: wrong status: " + res.Status)
}
a.StreamProjectID = ""
a.StreamDeviceID = ""
a.StreamExtensionToken = ""
a.StreamToken = ""
return nil
}
@@ -266,10 +400,10 @@ type Device struct {
//SdmDevicesTraitsCameraClipPreview struct {
//} `json:"sdm.devices.traits.CameraClipPreview"`
} `json:"traits"`
//ParentRelations []struct {
// Parent string `json:"parent"`
// DisplayName string `json:"displayName"`
//} `json:"parentRelations"`
ParentRelations []struct {
Parent string `json:"parent"`
DisplayName string `json:"displayName"`
} `json:"parentRelations"`
}
func (a *API) StartExtendStreamTimer() {
@@ -282,7 +416,6 @@ func (a *API) StartExtendStreamTimer() {
duration = time.Until(a.StreamExpiresAt.Add(-30 * time.Second))
a.extendTimer.Reset(duration)
})
}
func (a *API) StopExtendStreamTimer() {
+71 -13
View File
@@ -3,18 +3,25 @@ package nest
import (
"errors"
"net/url"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
pion "github.com/pion/webrtc/v3"
)
type Client struct {
type WebRTCClient struct {
conn *webrtc.Conn
api *API
}
func Dial(rawURL string) (*Client, error) {
type RTSPClient struct {
conn *rtsp.Conn
api *API
}
func Dial(rawURL string) (core.Producer, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, err
@@ -36,6 +43,42 @@ func Dial(rawURL string) (*Client, error) {
return nil, err
}
protocols := strings.Split(query.Get("protocols"), ",")
if len(protocols) > 0 && protocols[0] == "RTSP" {
return rtspConn(nestAPI, rawURL, projectID, deviceID)
}
// Default to WEB_RTC for backwards compataiility
return rtcConn(nestAPI, rawURL, projectID, deviceID)
}
func (c *WebRTCClient) GetMedias() []*core.Media {
return c.conn.GetMedias()
}
func (c *WebRTCClient) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return c.conn.GetTrack(media, codec)
}
func (c *WebRTCClient) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
return c.conn.AddTrack(media, codec, track)
}
func (c *WebRTCClient) Start() error {
c.api.StartExtendStreamTimer()
return c.conn.Start()
}
func (c *WebRTCClient) Stop() error {
c.api.StopExtendStreamTimer()
return c.conn.Stop()
}
func (c *WebRTCClient) MarshalJSON() ([]byte, error) {
return c.conn.MarshalJSON()
}
func rtcConn(nestAPI *API, rawURL, projectID, deviceID string) (*WebRTCClient, error) {
rtcAPI, err := webrtc.NewAPI()
if err != nil {
return nil, err
@@ -77,31 +120,46 @@ func Dial(rawURL string) (*Client, error) {
return nil, err
}
return &Client{conn: conn, api: nestAPI}, nil
return &WebRTCClient{conn: conn, api: nestAPI}, nil
}
func (c *Client) GetMedias() []*core.Media {
return c.conn.GetMedias()
func rtspConn(nestAPI *API, rawURL, projectID, deviceID string) (*RTSPClient, error) {
rtspURL, err := nestAPI.GenerateRtspStream(projectID, deviceID)
if err != nil {
return nil, err
}
rtspClient := rtsp.NewClient(rtspURL)
if err := rtspClient.Dial(); err != nil {
return nil, err
}
if err := rtspClient.Describe(); err != nil {
return nil, err
}
return &RTSPClient{conn: rtspClient, api: nestAPI}, nil
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
func (c *RTSPClient) GetMedias() []*core.Media {
result := c.conn.GetMedias()
return result
}
func (c *RTSPClient) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
return c.conn.GetTrack(media, codec)
}
func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
return c.conn.AddTrack(media, codec, track)
}
func (c *Client) Start() error {
func (c *RTSPClient) Start() error {
c.api.StartExtendStreamTimer()
return c.conn.Start()
}
func (c *Client) Stop() error {
func (c *RTSPClient) Stop() error {
c.api.StopRTSPStream()
c.api.StopExtendStreamTimer()
return c.conn.Stop()
}
func (c *Client) MarshalJSON() ([]byte, error) {
func (c *RTSPClient) MarshalJSON() ([]byte, error) {
return c.conn.MarshalJSON()
}
+2 -1
View File
@@ -3,10 +3,11 @@ package ngrok
import (
"bufio"
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
"io"
"os/exec"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
)
type Ngrok struct {
+5 -2
View File
@@ -169,9 +169,12 @@ func (c *Client) GetServiceCapabilities() ([]byte, error) {
}
func (c *Client) DeviceRequest(operation string) ([]byte, error) {
if operation == DeviceGetServices {
switch operation {
case DeviceGetServices:
operation = `<tds:GetServices><tds:IncludeCapability>true</tds:IncludeCapability></tds:GetServices>`
} else {
case DeviceGetCapabilities:
operation = `<tds:GetCapabilities><tds:Category>All</tds:Category></tds:GetCapabilities>`
default:
operation = `<tds:` + operation + `/>`
}
return c.Request(c.deviceURL, operation)
+37 -11
View File
@@ -179,18 +179,35 @@ func appendProfile(e *Envelope, tag, name string) {
`)
}
func GetVideoSourceConfigurationsResponse(names []string) []byte {
e := NewEnvelope()
e.Append(`<trt:GetVideoSourceConfigurationsResponse>
`)
for _, name := range names {
appendProfile(e, "Configurations", name)
}
e.Append(`</trt:GetVideoSourceConfigurationsResponse>`)
return e.Bytes()
}
func GetVideoSourceConfigurationResponse(name string) []byte {
e := NewEnvelope()
e.Append(`<trt:GetVideoSourceConfigurationResponse>
<trt:Configuration token="`, name, `">
<tt:Name>VSC</tt:Name>
<tt:SourceToken>`, name, `</tt:SourceToken>
<tt:Bounds x="0" y="0" width="1920" height="1080"></tt:Bounds>
</trt:Configuration>
</trt:GetVideoSourceConfigurationResponse>`)
`)
appendVideoSourceConfiguration(e, "Configuration", name)
e.Append(`</trt:GetVideoSourceConfigurationResponse>`)
return e.Bytes()
}
func appendVideoSourceConfiguration(e *Envelope, tag, name string) {
e.Append(`<trt:`, tag, ` token="`, name, `" fixed="true">
<tt:Name>VSC</tt:Name>
<tt:SourceToken>`, name, `</tt:SourceToken>
<tt:Bounds x="0" y="0" width="1920" height="1080"></tt:Bounds>
</trt:`, tag, `>
`)
}
func GetVideoSourcesResponse(names []string) []byte {
e := NewEnvelope()
e.Append(`<trt:GetVideoSourcesResponse>
@@ -226,11 +243,7 @@ func StaticResponse(operation string) []byte {
e := NewEnvelope()
e.Append(responses[operation])
b := e.Bytes()
if operation == DeviceGetNetworkInterfaces {
println()
}
return b
return e.Bytes()
}
var responses = map[string]string{
@@ -249,4 +262,17 @@ var responses = map[string]string{
<tds:Scopes><tt:ScopeDef>Fixed</tt:ScopeDef><tt:ScopeItem>onvif://www.onvif.org/Profile/Streaming</tt:ScopeItem></tds:Scopes>
<tds:Scopes><tt:ScopeDef>Fixed</tt:ScopeDef><tt:ScopeItem>onvif://www.onvif.org/type/Network_Video_Transmitter</tt:ScopeItem></tds:Scopes>
</tds:GetScopesResponse>`,
MediaGetVideoEncoderConfigurations: `<trt:GetVideoEncoderConfigurationsResponse>
<tt:VideoEncoderConfiguration token="vec">
<tt:Name>VEC</tt:Name>
<tt:Encoding>H264</tt:Encoding>
<tt:Resolution><tt:Width>1920</tt:Width><tt:Height>1080</tt:Height></tt:Resolution>
<tt:RateControl />
</tt:VideoEncoderConfiguration>
</trt:GetVideoEncoderConfigurationsResponse>`,
MediaGetAudioEncoderConfigurations: `<trt:GetAudioEncoderConfigurationsResponse />`,
MediaGetAudioSources: `<trt:GetAudioSourcesResponse />`,
MediaGetAudioSourceConfigurations: `<trt:GetAudioSourceConfigurationsResponse />`,
}
+2 -1
View File
@@ -1,9 +1,10 @@
package v1
import (
"testing"
v2 "github.com/AlexxIT/go2rtc/pkg/pcm"
"github.com/stretchr/testify/require"
"testing"
)
func TestPCMUtoPCM(t *testing.T) {
+2 -3
View File
@@ -514,7 +514,6 @@ func (c *Client) Stop() error {
if c.prod != nil {
_ = c.prod.Stop()
c.prod = nil
}
if c.ws != nil {
@@ -537,6 +536,6 @@ func (c *Client) MarshalJSON() ([]byte, error) {
if webrtcProd, ok := c.prod.(*webrtc.Conn); ok {
return webrtcProd.MarshalJSON()
}
return nil, errors.New("ring: can't marshal")
return json.Marshal(c.prod)
}
+3 -5
View File
@@ -20,6 +20,7 @@ func NewSnapshotProducer(client *RingRestClient, camera *CameraData) *SnapshotPr
ID: core.NewID(),
FormatName: "ring/snapshot",
Protocol: "https",
RemoteAddr: "app-snaps.ring.com",
Medias: []*core.Media{
{
Kind: core.KindVideo,
@@ -43,7 +44,7 @@ func (p *SnapshotProducer) Start() error {
// Fetch snapshot
response, err := p.client.Request("GET", fmt.Sprintf("https://app-snaps.ring.com/snapshots/next/%d", int(p.camera.ID)), nil)
if err != nil {
return fmt.Errorf("failed to get snapshot: %w", err)
return err
}
pkt := &rtp.Packet{
@@ -51,10 +52,7 @@ func (p *SnapshotProducer) Start() error {
Payload: response,
}
// Send to all receivers
for _, receiver := range p.Receivers {
receiver.WriteRTP(pkt)
}
p.Receivers[0].WriteRTP(pkt)
return nil
}
+2 -1
View File
@@ -7,11 +7,12 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/core"
"net/http"
"net/url"
"strconv"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
)
type UserInfo struct {
+7
View File
@@ -1,3 +1,10 @@
## Tests
- go2rtc rtmp client => Reolink
- go2rtc rtmp server <= Dahua
- go2rtc rtmp publish => YouTube
- go2rtc rtmp publish => Telegram
## Logs
```
+15 -6
View File
@@ -46,7 +46,7 @@ func (c *Conn) Close() error {
return c.conn.Close()
}
func (c *Conn) readResponse(transID float64) ([]any, error) {
func (c *Conn) readResponse(wait func(items []any) bool) ([]any, error) {
for {
msgType, _, b, err := c.readMessage()
if err != nil {
@@ -59,7 +59,7 @@ func (c *Conn) readResponse(transID float64) ([]any, error) {
c.rdPacketSize = binary.BigEndian.Uint32(b)
case TypeCommand:
items, _ := amf.NewReader(b).ReadItems()
if len(items) >= 3 && (items[1] == transID || items[1] == float64(0)) {
if wait(items) {
return items, nil
}
}
@@ -250,7 +250,9 @@ func (c *Conn) writeConnect() error {
return err
}
v, err := c.readResponse(1)
v, err := c.readResponse(func(items []any) bool {
return len(items) >= 3 && items[0] == "_result" && items[1] == float64(1)
})
if err != nil {
return err
}
@@ -280,7 +282,9 @@ func (c *Conn) writeCreateStream() error {
return err
}
v, err := c.readResponse(4)
v, err := c.readResponse(func(items []any) bool {
return len(items) >= 3 && items[0] == "_result" && items[1] == float64(4)
})
if err != nil {
return err
}
@@ -301,7 +305,10 @@ func (c *Conn) writePublish() error {
return err
}
v, err := c.readResponse(5)
// YouTube can response with "onBWDone 0"
v, err := c.readResponse(func(items []any) bool {
return len(items) >= 3 && items[0] == "onStatus"
})
if err != nil {
return nil
}
@@ -321,7 +328,9 @@ func (c *Conn) writePlay() error {
}
// Reolink response with ID=0, other software respose with ID=5
v, err := c.readResponse(5)
v, err := c.readResponse(func(items []any) bool {
return len(items) >= 3 && items[0] == "onStatus"
})
if err != nil {
return nil
}
+45 -7
View File
@@ -2,10 +2,13 @@ package rtmp
import (
"bufio"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/flv/amf"
@@ -34,23 +37,54 @@ func NewServer(conn net.Conn) (*Conn, error) {
}
func (c *Conn) serverHandshake() error {
b := make([]byte, 1+1536)
// read C0+C1
// based on https://rtmp.veriskope.com/docs/spec/
_ = c.conn.SetDeadline(time.Now().Add(core.ConnDeadline))
// read C0
b := make([]byte, 1)
if _, err := io.ReadFull(c.rd, b); err != nil {
return err
}
// write S0+S1, skip random
if b[0] != 3 {
return errors.New("rtmp: wrong handshake")
}
// write S0
if _, err := c.conn.Write([]byte{3}); err != nil {
return err
}
b = make([]byte, 1536)
// write S1
tsS1 := nowMS()
binary.BigEndian.PutUint32(b, tsS1)
binary.BigEndian.PutUint32(b[4:], 0)
_, _ = rand.Read(b[8:])
if _, err := c.conn.Write(b); err != nil {
return err
}
// read S1, skip check
if _, err := io.ReadFull(c.rd, make([]byte, 1536)); err != nil {
// read C1
if _, err := io.ReadFull(c.rd, b); err != nil {
return err
}
// write C1
if _, err := c.conn.Write(b[1:]); err != nil {
// write S2
tsS2 := nowMS()
binary.BigEndian.PutUint32(b, tsS1)
binary.BigEndian.PutUint32(b[4:], tsS2)
if _, err := c.conn.Write(b); err != nil {
return err
}
// read C2
if _, err := io.ReadFull(c.rd, b); err != nil {
return err
}
_ = c.conn.SetDeadline(time.Time{})
return nil
}
@@ -161,3 +195,7 @@ func (c *Conn) WriteStart() error {
payload := amf.EncodeItems("onStatus", 0, nil, map[string]any{"code": code})
return c.writeMessage(3, TypeCommand, 0, payload)
}
func nowMS() uint32 {
return uint32(time.Now().UnixNano() / int64(time.Millisecond))
}
+2 -4
View File
@@ -237,13 +237,11 @@ func (c *Conn) SetupMedia(media *core.Media) (byte, error) {
rawURL := media.ID // control
if !strings.Contains(rawURL, "://") {
rawURL = c.URL.String()
if !strings.HasSuffix(rawURL, "/") {
// prefix check for https://github.com/AlexxIT/go2rtc/issues/1236
if !strings.HasSuffix(rawURL, "/") && !strings.HasPrefix(media.ID, "/") {
rawURL += "/"
}
rawURL += media.ID
} else if strings.HasPrefix(rawURL, "rtsp://rtsp://") {
// fix https://github.com/AlexxIT/go2rtc/issues/830
rawURL = rawURL[7:]
}
trackURL, err := urlParse(rawURL)
if err != nil {
+11
View File
@@ -75,6 +75,16 @@ func UnmarshalSDP(rawSDP []byte) ([]*core.Media, error) {
if codec.FmtpLine == "" {
codec.FmtpLine = findFmtpLine(codec.PayloadType, sd.MediaDescriptions)
}
case core.CodecH265:
if codec.FmtpLine != "" {
// all three parameters are needed for a valid fmtp line
// https://github.com/AlexxIT/go2rtc/pull/1588
if !strings.Contains(codec.FmtpLine, "sprop-vps=") ||
!strings.Contains(codec.FmtpLine, "sprop-sps=") ||
!strings.Contains(codec.FmtpLine, "sprop-pps=") {
codec.FmtpLine = ""
}
}
case core.CodecOpus:
// fix OPUS for some cameras https://datatracker.ietf.org/doc/html/rfc7587
codec.ClockRate = 48000
@@ -107,6 +117,7 @@ func findFmtpLine(payloadType uint8, descriptions []*sdp.MediaDescription) strin
// 1. Content-Base: rtsp://::ffff:192.168.1.123/onvif/profile.1/
// 2. Content-Base: rtsp://rtsp://turret2-cam.lan:554/stream1/
func urlParse(rawURL string) (*url.URL, error) {
// fix https://github.com/AlexxIT/go2rtc/issues/830
if strings.HasPrefix(rawURL, "rtsp://rtsp://") {
rawURL = rawURL[7:]
}
+19 -8
View File
@@ -19,19 +19,30 @@ func (c *Conn) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, e
c.stateMu.Lock()
defer c.stateMu.Unlock()
if c.state == StatePlay {
if err := c.Reconnect(); err != nil {
var channel byte
switch c.mode {
case core.ModeActiveProducer:
if c.state == StatePlay {
if err := c.Reconnect(); err != nil {
return nil, err
}
}
var err error
channel, err = c.SetupMedia(media)
if err != nil {
return nil, err
}
}
channel, err := c.SetupMedia(media)
if err != nil {
return nil, err
c.state = StateSetup
case core.ModePassiveConsumer:
// Backchannel
channel = byte(len(c.Senders)) * 2
default:
return nil, errors.New("rtsp: wrong mode for GetTrack")
}
c.state = StateSetup
track := core.NewReceiver(media, codec)
track.ID = channel
c.Receivers = append(c.Receivers, track)
+31 -12
View File
@@ -13,6 +13,8 @@ import (
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
var FailedAuth = errors.New("failed authentication")
func NewServer(conn net.Conn) *Conn {
return &Conn{
Connection: core.Connection{
@@ -45,7 +47,7 @@ func (c *Conn) Accept() error {
c.Fire(req)
if !c.auth.Validate(req) {
if valid, empty := c.auth.Validate(req); !valid {
res := &tcp.Response{
Status: "401 Unauthorized",
Header: map[string][]string{"Www-Authenticate": {`Basic realm="go2rtc"`}},
@@ -54,7 +56,12 @@ func (c *Conn) Accept() error {
if err = c.WriteResponse(res); err != nil {
return err
}
continue
if empty {
// eliminate false positive: ffmpeg sends first request without
// authorization header even if the user provides credentials
continue
}
return FailedAuth
}
// Receiver: OPTIONS > DESCRIBE > SETUP... > PLAY > TEARDOWN
@@ -129,6 +136,16 @@ func (c *Conn) Accept() error {
medias = append(medias, media)
}
for i, track := range c.Receivers {
media := &core.Media{
Kind: core.GetKind(track.Codec.Name),
Direction: core.DirectionSendonly,
Codecs: []*core.Codec{track.Codec},
ID: "trackID=" + strconv.Itoa(i+len(c.Senders)),
}
medias = append(medias, media)
}
res.Body, err = core.MarshalSDP(c.SessionName, medias)
if err != nil {
return err
@@ -141,29 +158,31 @@ func (c *Conn) Accept() error {
}
case MethodSetup:
tr := req.Header.Get("Transport")
res := &tcp.Response{
Header: map[string][]string{},
Request: req,
}
const transport = "RTP/AVP/TCP;unicast;interleaved="
if tr = core.Between(tr, "interleaved=", ";"); tr != "" {
// Test if client requests TCP transport, otherwise return 461 Transport not supported
// This allows smart clients who initially requested UDP to fall back on TCP transport
if tr := req.Header.Get("Transport"); strings.HasPrefix(tr, "RTP/AVP/TCP") {
c.session = core.RandString(8, 10)
c.state = StateSetup
if c.mode == core.ModePassiveConsumer {
if i := reqTrackID(req); i >= 0 && i < len(c.Senders) {
// mark sender as SETUP
c.Senders[i].Media.ID = MethodSetup
tr = fmt.Sprintf("%d-%d", i*2, i*2+1)
res.Header.Set("Transport", transport+tr)
if i := reqTrackID(req); i >= 0 && i < len(c.Senders)+len(c.Receivers) {
if i < len(c.Senders) {
c.Senders[i].Media.ID = MethodSetup
} else {
c.Receivers[i-len(c.Senders)].Media.ID = MethodSetup
}
tr = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1)
res.Header.Set("Transport", tr)
} else {
res.Status = "400 Bad Request"
}
} else {
res.Header.Set("Transport", transport+tr)
res.Header.Set("Transport", tr)
}
} else {
res.Status = "461 Unsupported transport"
+59
View File
@@ -0,0 +1,59 @@
package shell
import (
"context"
"os/exec"
)
// Command like exec.Cmd, but with support:
// - io.Closer interface
// - Wait from multiple places
// - Done channel
type Command struct {
*exec.Cmd
ctx context.Context
cancel context.CancelFunc
err error
}
func NewCommand(s string) *Command {
ctx, cancel := context.WithCancel(context.Background())
args := QuoteSplit(s)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.SysProcAttr = procAttr
return &Command{cmd, ctx, cancel, nil}
}
func (c *Command) Start() error {
if err := c.Cmd.Start(); err != nil {
return err
}
go func() {
c.err = c.Cmd.Wait()
c.cancel() // release context resources
}()
return nil
}
func (c *Command) Wait() error {
<-c.ctx.Done()
return c.err
}
func (c *Command) Run() error {
if err := c.Start(); err != nil {
return err
}
return c.Wait()
}
func (c *Command) Done() <-chan struct{} {
return c.ctx.Done()
}
func (c *Command) Close() error {
c.cancel()
return nil
}
+7
View File
@@ -0,0 +1,7 @@
//go:build !linux
package shell
import "syscall"
var procAttr *syscall.SysProcAttr
+6
View File
@@ -0,0 +1,6 @@
package shell
import "syscall"
// will stop child if parent died (even with SIGKILL)
var procAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}
+9 -2
View File
@@ -65,6 +65,13 @@ func (s *Server) DelSession(session *Session) {
s.mu.Unlock()
}
func (s *Server) GetSession(ssrc uint32) (session *Session) {
s.mu.Lock()
session = s.sessions[ssrc]
s.mu.Unlock()
return
}
func (s *Server) handle() error {
b := make([]byte, 2048)
for {
@@ -80,14 +87,14 @@ func (s *Server) handle() error {
case 99, 110, 0x80 | 99, 0x80 | 110:
// this is default position for SSRC in RTP packet
ssrc := binary.BigEndian.Uint32(b[8:])
if session, ok := s.sessions[ssrc]; ok {
if session := s.GetSession(ssrc); session != nil {
session.ReadRTP(b[:n])
}
case 200, 201, 202, 203, 204, 205, 206, 207:
// this is default position for SSRC in RTCP packet
ssrc := binary.BigEndian.Uint32(b[4:])
if session, ok := s.sessions[ssrc]; ok {
if session := s.GetSession(ssrc); session != nil {
session.ReadRTCP(b[:n])
}
}
+1 -5
View File
@@ -2,7 +2,6 @@ package stdin
import (
"encoding/json"
"errors"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/pion/rtp"
@@ -42,10 +41,7 @@ func (c *Client) Stop() (err error) {
if c.sender != nil {
c.sender.Close()
}
if c.cmd.Process == nil {
return nil
}
return errors.Join(c.cmd.Process.Kill(), c.cmd.Wait())
return c.cmd.Close()
}
func (c *Client) MarshalJSON() ([]byte, error) {
+3 -4
View File
@@ -1,21 +1,20 @@
package stdin
import (
"os/exec"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/shell"
)
// Deprecated: should be rewritten to core.Connection
type Client struct {
cmd *exec.Cmd
cmd *shell.Command
medias []*core.Media
sender *core.Sender
send int
}
func NewClient(cmd *exec.Cmd) (*Client, error) {
func NewClient(cmd *shell.Command) (*Client, error) {
c := &Client{
cmd: cmd,
medias: []*core.Media{
+1
View File
@@ -291,6 +291,7 @@ func dial(req *http.Request, brand, username, password string) (net.Conn, *http.
if err != nil {
return nil, nil, err
}
_, _ = io.Copy(io.Discard, res.Body) // discard leftovers
_ = res.Body.Close() // ignore response body
auth := res.Header.Get("WWW-Authenticate")
+4 -4
View File
@@ -85,14 +85,14 @@ func (a *Auth) Write(req *Request) {
}
}
func (a *Auth) Validate(req *Request) bool {
func (a *Auth) Validate(req *Request) (valid, empty bool) {
if a == nil {
return true
return true, true
}
header := req.Header.Get("Authorization")
if header == "" {
return false
return false, true
}
if a.Method == AuthUnknown {
@@ -100,7 +100,7 @@ func (a *Auth) Validate(req *Request) bool {
a.header = "Basic " + B64(a.user, a.pass)
}
return header == a.header
return header == a.header, false
}
func (a *Auth) ReadNone(res *Response) bool {
+2 -1
View File
@@ -5,10 +5,11 @@ import (
"crypto/sha1"
"encoding/base64"
"errors"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"net"
"net/http"
"strings"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func Dial(address string) (net.Conn, error) {
+48 -17
View File
@@ -4,6 +4,8 @@ import (
"net"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/xnet"
"github.com/pion/ice/v2"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
)
@@ -18,6 +20,7 @@ func NewAPI() (*webrtc.API, error) {
type Filters struct {
Candidates []string `yaml:"candidates"`
Loopback bool `yaml:"loopback"`
Interfaces []string `yaml:"interfaces"`
IPs []string `yaml:"ips"`
Networks []string `yaml:"networks"`
@@ -44,39 +47,53 @@ func NewServerAPI(network, address string, filters *Filters) (*webrtc.API, error
// fix https://github.com/pion/webrtc/pull/2407
s.SetDTLSInsecureSkipHelloVerify(true)
if filters != nil && filters.Loopback {
s.SetIncludeLoopbackCandidate(true)
}
var interfaceFilter func(name string) bool
if filters != nil && filters.Interfaces != nil {
s.SetIncludeLoopbackCandidate(true)
s.SetInterfaceFilter(func(name string) bool {
interfaceFilter = func(name string) bool {
return core.Contains(filters.Interfaces, name)
})
}
} else {
// disable listen on Hassio docker interfaces
s.SetInterfaceFilter(func(name string) bool {
return name != "hassio" && name != "docker0"
})
// default interfaces - all, except loopback
}
s.SetInterfaceFilter(interfaceFilter)
var ipFilter func(ip net.IP) bool
if filters != nil && filters.IPs != nil {
s.SetIncludeLoopbackCandidate(true)
s.SetIPFilter(func(ip net.IP) bool {
ipFilter = func(ip net.IP) bool {
return core.Contains(filters.IPs, ip.String())
})
}
} else {
// try filter all Docker-like interfaces
ipFilter = func(ip net.IP) bool {
return !xnet.Docker.Contains(ip)
}
// if there are no such interfaces - disable the filter
// the user will need to enable port forwarding
if nets, _ := xnet.IPNets(ipFilter); len(nets) == 0 {
ipFilter = nil
}
}
s.SetIPFilter(ipFilter)
var networkTypes []webrtc.NetworkType
if filters != nil && filters.Networks != nil {
var networkTypes []webrtc.NetworkType
for _, s := range filters.Networks {
if networkType, err := webrtc.NewNetworkType(s); err == nil {
networkTypes = append(networkTypes, networkType)
}
}
s.SetNetworkTypes(networkTypes)
} else {
s.SetNetworkTypes([]webrtc.NetworkType{
// default network types - all
networkTypes = []webrtc.NetworkType{
webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6,
webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6,
})
}
}
s.SetNetworkTypes(networkTypes)
if filters != nil && len(filters.UDPPorts) == 2 {
_ = s.SetEphemeralUDPPortRange(filters.UDPPorts[0], filters.UDPPorts[1])
@@ -100,10 +117,24 @@ func NewServerAPI(network, address string, filters *Filters) (*webrtc.API, error
}
if network == "" || network == "udp" {
if ln, err := net.ListenPacket("udp", address); err == nil {
udpMux := webrtc.NewICEUDPMux(nil, ln)
s.SetICEUDPMux(udpMux)
// UDPMuxDefault should not listening on unspecified address, use NewMultiUDPMuxFromPort instead
var udpMux ice.UDPMux
if port := xnet.ParseUnspecifiedPort(address); port != 0 {
var networks []ice.NetworkType
for _, ntype := range networkTypes {
networks = append(networks, ice.NetworkType(ntype))
}
udpMux, _ = ice.NewMultiUDPMuxFromPort(
port,
ice.UDPMuxFromPortWithInterfaceFilter(interfaceFilter),
ice.UDPMuxFromPortWithIPFilter(ipFilter),
ice.UDPMuxFromPortWithNetworks(networks...),
)
} else if ln, err := net.ListenPacket("udp", address); err == nil {
udpMux = ice.NewUDPMuxDefault(ice.UDPMuxParams{UDPConn: ln})
}
s.SetICEUDPMux(udpMux)
}
}
+4
View File
@@ -51,6 +51,10 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
}
pc.SCTP().Transport().ICETransport().OnSelectedCandidatePairChange(
func(pair *webrtc.ICECandidatePair) {
// fix situation when candidate pair changes multiple times
if i := strings.IndexByte(c.Protocol, '+'); i > 0 {
c.Protocol = c.Protocol[:i]
}
c.Protocol += "+" + pair.Remote.Protocol.String()
c.RemoteAddr = fmt.Sprintf(
"%s:%d %s", sanitizeIP6(pair.Remote.Address), pair.Remote.Port, pair.Remote.Typ,
+3 -2
View File
@@ -3,10 +3,11 @@ package webtorrent
import (
"encoding/base64"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/gorilla/websocket"
"sync"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/gorilla/websocket"
)
type Server struct {
+64
View File
@@ -0,0 +1,64 @@
package xnet
import (
"net"
"strconv"
)
// Docker has common docker addresses (class B):
// https://en.wikipedia.org/wiki/Private_network
// - docker0 172.17.0.1/16
// - br-xxxx 172.18.0.1/16
// - hassio 172.30.32.1/23
var Docker = net.IPNet{
IP: []byte{172, 16, 0, 0},
Mask: []byte{255, 240, 0, 0},
}
// ParseUnspecifiedPort will return port if address is unspecified
// ex. ":8555" or "0.0.0.0:8555"
func ParseUnspecifiedPort(address string) int {
host, port, err := net.SplitHostPort(address)
if err != nil {
return 0
}
if host != "" && host != "0.0.0.0" && host != "[::]" {
return 0
}
i, _ := strconv.Atoi(port)
return i
}
func IPNets(ipFilter func(ip net.IP) bool) ([]*net.IPNet, error) {
ifaces, err := net.Interfaces()
if err != nil {
return nil, err
}
var nets []*net.IPNet
for _, iface := range ifaces {
if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 {
continue
}
addrs, _ := iface.Addrs() // range on nil slice is OK
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
ip := v.IP.To4()
if ip == nil {
continue
}
if ipFilter != nil && !ipFilter(ip) {
continue
}
nets = append(nets, v)
}
}
}
return nets, nil
}
+131 -105
View File
@@ -23,149 +23,157 @@ func Encode(v any, indent int) ([]byte, error) {
return b.Bytes(), nil
}
// Patch - change key/value pair in YAML file without break formatting
func Patch(src []byte, key string, value any, path ...string) ([]byte, error) {
nodeParent, err := FindParent(src, path...)
func Patch(in []byte, path []string, value any) ([]byte, error) {
out, err := patch(in, path, value)
if err != nil {
return nil, err
}
var dst []byte
if nodeParent != nil {
dst, err = AddOrReplace(src, key, value, nodeParent)
} else {
dst, err = AddToEnd(src, key, value, path...)
}
if err = yaml.Unmarshal(dst, map[string]any{}); err != nil {
// validate
if err = yaml.Unmarshal(out, map[string]any{}); err != nil {
return nil, err
}
return dst, nil
return out, nil
}
// FindParent - return YAML Node from path of keys (tree)
func FindParent(src []byte, path ...string) (*yaml.Node, error) {
if len(src) == 0 {
return nil, nil
}
func patch(in []byte, path []string, value any) ([]byte, error) {
var root yaml.Node
if err := yaml.Unmarshal(src, &root); err != nil {
if err := yaml.Unmarshal(in, &root); err != nil {
// invalid yaml
return nil, err
}
if root.Content == nil {
return nil, nil
// empty in
if len(root.Content) != 1 {
return addToEnd(in, path, value)
}
parent := root.Content[0] // yaml.DocumentNode
for _, name := range path {
if parent == nil {
break
}
_, parent = FindChild(parent, name)
// yaml is not dict
if root.Content[0].Kind != yaml.MappingNode {
return nil, errors.New("yaml: can't patch")
}
return parent, nil
// dict items list
nodes := root.Content[0].Content
n := len(path) - 1
// parent node key/value
pKey, pVal := findNode(nodes, path[:n])
if pKey == nil {
// no parent node
return addToEnd(in, path, value)
}
var paste []byte
if value != nil {
// nil value means delete key
var err error
v := map[string]any{path[n]: value}
if paste, err = Encode(v, 2); err != nil {
return nil, err
}
}
iKey, _ := findNode(pVal.Content, path[n:])
if iKey != nil {
// key item not nil (replace value)
paste = addIndent(paste, iKey.Column-1)
i0, i1 := nodeBounds(in, iKey)
return join(in[:i0], paste, in[i1:]), nil
}
if pVal.Content != nil {
// parent value not nil (use first child indent)
paste = addIndent(paste, pVal.Column-1)
} else {
// parent value is nil (use parent indent + 2)
paste = addIndent(paste, pKey.Column+1)
}
_, i1 := nodeBounds(in, pKey)
return join(in[:i1], paste, in[i1:]), nil
}
// FindChild - search and return YAML key/value pair for current Node
func FindChild(node *yaml.Node, name string) (key, value *yaml.Node) {
for i, child := range node.Content {
if child.Value != name {
continue
func findNode(nodes []*yaml.Node, keys []string) (key, value *yaml.Node) {
for i, name := range keys {
for j := 0; j < len(nodes); j += 2 {
if nodes[j].Value == name {
if i < len(keys)-1 {
nodes = nodes[j+1].Content
break
}
return nodes[j], nodes[j+1]
}
}
return child, node.Content[i+1]
}
return nil, nil
}
func FirstChild(node *yaml.Node) *yaml.Node {
if node.Content == nil {
return node
}
return node.Content[0]
}
func nodeBounds(in []byte, node *yaml.Node) (offset0, offset1 int) {
// start from next line after node
offset0 = lineOffset(in, node.Line)
offset1 = lineOffset(in, node.Line+1)
func LastChild(node *yaml.Node) *yaml.Node {
if node.Content == nil {
return node
}
return LastChild(node.Content[len(node.Content)-1])
}
func AddOrReplace(src []byte, key string, value any, nodeParent *yaml.Node) ([]byte, error) {
v := map[string]any{key: value}
put, err := Encode(v, 2)
if err != nil {
return nil, err
if offset1 < 0 {
return offset0, len(in)
}
if nodeKey, nodeValue := FindChild(nodeParent, key); nodeKey != nil {
put = AddIndent(put, nodeKey.Column-1)
i0 := LineOffset(src, nodeKey.Line)
i1 := LineOffset(src, LastChild(nodeValue).Line+1)
if i1 < 0 { // no new line on the end of file
if value != nil {
return append(src[:i0], put...), nil
for i := offset1; i < len(in); {
indent, length := parseLine(in[i:])
if indent+1 != length {
if node.Column < indent+1 {
offset1 = i + length
} else {
break
}
return src[:i0], nil
}
dst := make([]byte, 0, len(src)+len(put))
dst = append(dst, src[:i0]...)
if value != nil {
dst = append(dst, put...)
}
return append(dst, src[i1:]...), nil
i += length
}
put = AddIndent(put, FirstChild(nodeParent).Column-1)
i := LineOffset(src, LastChild(nodeParent).Line+1)
if i < 0 { // no new line on the end of file
src = append(src, '\n')
if value != nil {
src = append(src, put...)
}
return src, nil
}
dst := make([]byte, 0, len(src)+len(put))
dst = append(dst, src[:i]...)
if value != nil {
dst = append(dst, put...)
}
return append(dst, src[i:]...), nil
return
}
func AddToEnd(src []byte, key string, value any, path ...string) ([]byte, error) {
if len(path) > 1 || value == nil {
return nil, errors.New("config: path not exist")
func addToEnd(in []byte, path []string, value any) ([]byte, error) {
if len(path) != 2 || value == nil {
return nil, errors.New("yaml: path not exist")
}
v := map[string]map[string]any{
path[0]: {key: value},
path[0]: {path[1]: value},
}
put, err := Encode(v, 2)
paste, err := Encode(v, 2)
if err != nil {
return nil, err
}
dst := make([]byte, 0, len(src)+len(put)+10)
dst = append(dst, src...)
if l := len(src); l > 0 && src[l-1] != '\n' {
dst = append(dst, '\n')
}
return append(dst, put...), nil
return join(in, paste), nil
}
func AddPrefix(src, pre []byte) (dst []byte) {
func join(items ...[]byte) []byte {
n := len(items) - 1
for _, b := range items {
n += len(b)
}
buf := make([]byte, 0, n)
for _, b := range items {
if len(b) == 0 {
continue
}
if n = len(buf); n > 0 && buf[n-1] != '\n' {
buf = append(buf, '\n')
}
buf = append(buf, b...)
}
return buf
}
func addPrefix(src, pre []byte) (dst []byte) {
for len(src) > 0 {
dst = append(dst, pre...)
i := bytes.IndexByte(src, '\n') + 1
@@ -180,21 +188,21 @@ func AddPrefix(src, pre []byte) (dst []byte) {
return
}
func AddIndent(src []byte, indent int) (dst []byte) {
func addIndent(in []byte, indent int) (dst []byte) {
pre := make([]byte, indent)
for i := 0; i < indent; i++ {
pre[i] = ' '
}
return AddPrefix(src, pre)
return addPrefix(in, pre)
}
func LineOffset(b []byte, line int) (offset int) {
func lineOffset(in []byte, line int) (offset int) {
for l := 1; ; l++ {
if l == line {
return offset
}
i := bytes.IndexByte(b[offset:], '\n') + 1
i := bytes.IndexByte(in[offset:], '\n') + 1
if i == 0 {
break
}
@@ -202,3 +210,21 @@ func LineOffset(b []byte, line int) (offset int) {
}
return -1
}
func parseLine(b []byte) (indent int, length int) {
prefix := true
for ; length < len(b); length++ {
switch b[length] {
case ' ':
if prefix {
indent++
}
case '\n':
length++
return
default:
prefix = false
}
}
return
}
+99 -136
View File
@@ -7,140 +7,103 @@ import (
)
func TestPatch(t *testing.T) {
b := []byte(`# prefix`)
// 1. Add first
b, err := Patch(b, "camera1", "url1", "streams")
require.Nil(t, err)
require.Equal(t, `# prefix
streams:
camera1: url1
`, string(b))
// 2. Add second
b, err = Patch(b, "camera2", []string{"url2", "url3"}, "streams")
require.Nil(t, err)
require.Equal(t, `# prefix
streams:
camera1: url1
camera2:
- url2
- url3
`, string(b))
// 3. Replace first
b, err = Patch(b, "camera1", "url4", "streams")
require.Nil(t, err)
require.Equal(t, `# prefix
streams:
camera1: url4
camera2:
- url2
- url3
`, string(b))
// 4. Replace second
b, err = Patch(b, "camera2", "url5", "streams")
require.Nil(t, err)
require.Equal(t, `# prefix
streams:
camera1: url4
camera2: url5
`, string(b))
// 5. Delete first
b, err = Patch(b, "camera1", nil, "streams")
require.Nil(t, err)
require.Equal(t, `# prefix
streams:
camera2: url5
`, string(b))
}
func TestPatchParings(t *testing.T) {
b := []byte(`homekit:
camera1:
pin: 123-45-678
streams:
camera1: url1
`)
// 1. Add new key
pairings := []string{"client1", "client2"}
b, err := Patch(b, "pairings", pairings, "homekit", "camera1")
require.Nil(t, err)
require.Equal(t, `homekit:
camera1:
pin: 123-45-678
pairings:
- client1
- client2
streams:
camera1: url1
`, string(b))
}
func TestPatch2(t *testing.T) {
b := []byte(`streams:
camera1:
- url1
- url2
`)
b, err := Patch(b, "camera2", "url3", "streams")
require.Nil(t, err)
require.Equal(t, `streams:
camera1:
- url1
- url2
camera2: url3
`, string(b))
}
func TestNoNewLineEnd1(t *testing.T) {
b := []byte(`streams:
camera1: url4
camera2:
- url2
- url3`)
b, err := Patch(b, "camera2", "url5", "streams")
require.Nil(t, err)
require.Equal(t, `streams:
camera1: url4
camera2: url5
`, string(b))
}
func TestNoNewLineEnd2(t *testing.T) {
b := []byte(`streams:
camera1: url1
homekit:
camera1:
pin: 123-45-678`)
// 1. Add new key
pairings := []string{"client1", "client2"}
b, err := Patch(b, "pairings", pairings, "homekit", "camera1")
require.Nil(t, err)
require.Equal(t, `streams:
camera1: url1
homekit:
camera1:
pin: 123-45-678
pairings:
- client1
- client2
`, string(b))
tests := []struct {
name string
src string
path []string
value any
expect string
}{
{
name: "empty config",
src: "",
path: []string{"streams", "camera1"},
value: "val1",
expect: "streams:\n camera1: val1\n",
},
{
name: "empty main key",
src: "#dummy",
path: []string{"streams", "camera1"},
value: "val1",
expect: "#dummy\nstreams:\n camera1: val1\n",
},
{
name: "single line value",
src: "streams:\n camera1: url1\n camera2: url2",
path: []string{"streams", "camera1"},
value: "val1",
expect: "streams:\n camera1: val1\n camera2: url2",
},
{
name: "next line value",
src: "streams:\n camera1:\n url1\n camera2: url2",
path: []string{"streams", "camera1"},
value: "val1",
expect: "streams:\n camera1: val1\n camera2: url2",
},
{
name: "two lines value",
src: "streams:\n camera1: url1\n url2\n camera2: url2",
path: []string{"streams", "camera1"},
value: "val1",
expect: "streams:\n camera1: val1\n camera2: url2",
},
{
name: "next two lines value",
src: "streams:\n camera1:\n url1\n url2\n camera2: url2",
path: []string{"streams", "camera1"},
value: "val1",
expect: "streams:\n camera1: val1\n camera2: url2",
},
{
name: "add array",
src: "",
path: []string{"streams", "camera1"},
value: []string{"val1", "val2"},
expect: "streams:\n camera1:\n - val1\n - val2\n",
},
{
name: "remove value",
src: "streams:\n camera1: url1\n camera2: url2",
path: []string{"streams", "camera1"},
value: nil,
expect: "streams:\n camera2: url2",
},
{
name: "add pairings",
src: "homekit:\n camera1:\nstreams:\n camera1: url1",
path: []string{"homekit", "camera1", "pairings"},
value: []string{"val1"},
expect: "homekit:\n camera1:\n pairings:\n - val1\nstreams:\n camera1: url1",
},
{
name: "remove pairings",
src: "homekit:\n camera1:\n pairings:\n - val1\nstreams:\n camera1: url1",
path: []string{"homekit", "camera1", "pairings"},
value: nil,
expect: "homekit:\n camera1:\nstreams:\n camera1: url1",
},
{
name: "no new line",
src: "streams:\n camera1: url1",
path: []string{"streams", "camera1"},
value: "val1",
expect: "streams:\n camera1: val1\n",
},
{
name: "no new line",
src: "streams:\n camera1: url1\nhomekit:\n camera1:\n name: dummy",
path: []string{"homekit", "camera1", "pairings"},
value: []string{"val1"},
expect: "streams:\n camera1: url1\nhomekit:\n camera1:\n name: dummy\n pairings:\n - val1\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b, err := Patch([]byte(tt.src), tt.path, tt.value)
require.NoError(t, err)
require.Equal(t, tt.expect, string(b))
})
}
}