diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c802df63..6015efa0 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -124,7 +124,7 @@ jobs: uses: docker/metadata-action@v5 with: images: | - ${{ github.repository }} + name=${{ github.repository }},enable=${{ github.event.repository.fork == false }} ghcr.io/${{ github.repository }} tags: | type=ref,event=branch @@ -138,14 +138,14 @@ jobs: uses: docker/setup-buildx-action@v3 - name: Login to DockerHub - if: github.event_name != 'pull_request' + if: github.event_name == 'push' && github.event.repository.fork == false uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Login to GitHub Container Registry - if: github.event_name != 'pull_request' + if: github.event_name == 'push' uses: docker/login-action@v3 with: registry: ghcr.io @@ -181,7 +181,7 @@ jobs: uses: docker/metadata-action@v5 with: images: | - ${{ github.repository }} + name=${{ github.repository }},enable=${{ github.event.repository.fork == false }} ghcr.io/${{ github.repository }} flavor: | suffix=-hardware,onlatest=true @@ -198,14 +198,14 @@ jobs: uses: docker/setup-buildx-action@v3 - name: Login to DockerHub - if: github.event_name != 'pull_request' + if: github.event_name == 'push' && github.event.repository.fork == false uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - + - name: Login to GitHub Container Registry - if: github.event_name != 'pull_request' + if: github.event_name == 'push' uses: docker/login-action@v3 with: registry: ghcr.io @@ -236,7 +236,7 @@ jobs: uses: docker/metadata-action@v5 with: images: | - ${{ github.repository }} + name=${{ github.repository }},enable=${{ github.event.repository.fork == false }} ghcr.io/${{ github.repository }} flavor: | suffix=-rockchip,onlatest=true @@ -253,14 +253,14 @@ jobs: uses: docker/setup-buildx-action@v3 - name: Login to DockerHub - if: github.event_name != 'pull_request' + if: github.event_name == 'push' && github.event.repository.fork == false uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Login to GitHub Container Registry - if: github.event_name != 'pull_request' + if: github.event_name == 'push' uses: docker/login-action@v3 with: registry: ghcr.io diff --git a/README.md b/README.md index 74c71a0b..783b87b5 100644 --- a/README.md +++ b/README.md @@ -702,7 +702,7 @@ Supports [Amazon Kinesis Video Streams](https://aws.amazon.com/kinesis/video-str **switchbot** -Support connection to [SwitchBot](https://us.switch-bot.com/) cameras that are based on Kinesis Video Streams. Specifically, this includes [Pan/Tilt Cam Plus 2K](https://us.switch-bot.com/pages/switchbot-pan-tilt-cam-plus-2k) and [Pan/Tilt Cam Plus 3K](https://us.switch-bot.com/pages/switchbot-pan-tilt-cam-plus-3k). `Outdoor Spotlight Cam 1080P`, `Outdoor Spotlight Cam 2K`, `Pan/Tilt Cam`, `Pan/Tilt Cam 2K`, `Indoor Cam` are based on Tuya, so this feature is not available. +Support connection to [SwitchBot](https://us.switch-bot.com/) cameras that are based on Kinesis Video Streams. Specifically, this includes [Pan/Tilt Cam Plus 2K](https://us.switch-bot.com/pages/switchbot-pan-tilt-cam-plus-2k) and [Pan/Tilt Cam Plus 3K](https://us.switch-bot.com/pages/switchbot-pan-tilt-cam-plus-3k) and [Smart Video Doorbell](https://www.switchbot.jp/products/switchbot-smart-video-doorbell). `Outdoor Spotlight Cam 1080P`, `Outdoor Spotlight Cam 2K`, `Pan/Tilt Cam`, `Pan/Tilt Cam 2K`, `Indoor Cam` are based on Tuya, so this feature is not available. ```yaml streams: @@ -711,7 +711,7 @@ streams: webrtc-openipc: webrtc:ws://192.168.1.123/webrtc_ws#format=openipc#ice_servers=[{"urls":"stun:stun.kinesisvideo.eu-north-1.amazonaws.com:443"}] webrtc-wyze: webrtc:http://192.168.1.123:5000/signaling/camera1?kvs#format=wyze webrtc-kinesis: webrtc:wss://...amazonaws.com/?...#format=kinesis#client_id=...#ice_servers=[{...},{...}] - webrtc-switchbot: webrtc:wss://...amazonaws.com/?...#format=switchbot#resolution=hd#client_id=...#ice_servers=[{...},{...}] + webrtc-switchbot: webrtc:wss://...amazonaws.com/?...#format=switchbot#resolution=hd#play_type=0#client_id=...#ice_servers=[{...},{...}] ``` **PS.** For `kinesis` sources, you can use [echo](#source-echo) to get connection params using `bash`, `python` or any other script language. diff --git a/docker/Dockerfile b/docker/Dockerfile index 854ea6c9..8d064f21 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,7 +1,7 @@ # syntax=docker/dockerfile:labs # 0. Prepare images -ARG PYTHON_VERSION="3.11" +ARG PYTHON_VERSION="3.13" ARG GO_VERSION="1.25" diff --git a/internal/app/config.go b/internal/app/config.go index f0eb36e0..0f95894a 100644 --- a/internal/app/config.go +++ b/internal/app/config.go @@ -7,7 +7,7 @@ import ( "strings" "sync" - "github.com/AlexxIT/go2rtc/pkg/shell" + "github.com/AlexxIT/go2rtc/pkg/creds" "github.com/AlexxIT/go2rtc/pkg/yaml" ) @@ -71,13 +71,15 @@ func initConfig(confs flagConfig) { // config as file if ConfigPath == "" { ConfigPath = conf + initStorage() } if data, _ = os.ReadFile(conf); data == nil { continue } - data = []byte(shell.ReplaceEnvVars(string(data))) + loadEnv(data) + data = creds.ReplaceVars(data) configs = append(configs, data) } } diff --git a/internal/app/log.go b/internal/app/log.go index b8ca4aa5..9ec89a2c 100644 --- a/internal/app/log.go +++ b/internal/app/log.go @@ -6,6 +6,7 @@ import ( "strings" "sync" + "github.com/AlexxIT/go2rtc/pkg/creds" "github.com/mattn/go-isatty" "github.com/rs/zerolog" ) @@ -88,6 +89,8 @@ func initLogger() { writer = MemoryLog } + writer = creds.SecretWriter(writer) + lvl, _ := zerolog.ParseLevel(modules["level"]) Logger = zerolog.New(writer).Level(lvl) diff --git a/internal/app/storage.go b/internal/app/storage.go new file mode 100644 index 00000000..cfa1ca91 --- /dev/null +++ b/internal/app/storage.go @@ -0,0 +1,56 @@ +package app + +import ( + "sync" + + "github.com/AlexxIT/go2rtc/pkg/creds" + "github.com/AlexxIT/go2rtc/pkg/yaml" +) + +func initStorage() { + storage = &envStorage{data: make(map[string]string)} + creds.SetStorage(storage) +} + +func loadEnv(data []byte) { + var cfg struct { + Env map[string]string `yaml:"env"` + } + + if err := yaml.Unmarshal(data, &cfg); err != nil { + return + } + + storage.mu.Lock() + for name, value := range cfg.Env { + storage.data[name] = value + creds.AddSecret(value) + } + storage.mu.Unlock() +} + +var storage *envStorage + +type envStorage struct { + data map[string]string + mu sync.Mutex +} + +func (s *envStorage) SetValue(name, value string) error { + if err := PatchConfig([]string{"env", name}, value); err != nil { + return err + } + + s.mu.Lock() + s.data[name] = value + s.mu.Unlock() + + return nil +} + +func (s *envStorage) GetValue(name string) (value string, ok bool) { + s.mu.Lock() + value, ok = s.data[name] + s.mu.Unlock() + return +} diff --git a/internal/debug/stack.go b/internal/debug/stack.go index f8d62772..6bc735ad 100644 --- a/internal/debug/stack.go +++ b/internal/debug/stack.go @@ -29,8 +29,8 @@ var stackSkip = [][]byte{ []byte("created by github.com/AlexxIT/go2rtc/internal/homekit.Init"), // webrtc/api.go - []byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"), - []byte("created by github.com/pion/ice/v2.NewUDPMuxDefault"), + []byte("created by github.com/pion/ice/v4.NewTCPMuxDefault"), + []byte("created by github.com/pion/ice/v4.NewUDPMuxDefault"), } func stackHandler(w http.ResponseWriter, r *http.Request) { diff --git a/internal/streams/api.go b/internal/streams/api.go index d162cdf9..28f09708 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -6,10 +6,13 @@ import ( "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/creds" "github.com/AlexxIT/go2rtc/pkg/probe" ) func apiStreams(w http.ResponseWriter, r *http.Request) { + w = creds.SecretResponse(w) + query := r.URL.Query() src := query.Get("src") @@ -121,6 +124,8 @@ func apiStreamsDOT(w http.ResponseWriter, r *http.Request) { } dot = append(dot, '}') + dot = []byte(creds.SecretString(string(dot))) + api.Response(w, dot, "text/vnd.graphviz") } diff --git a/internal/webrtc/switchbot.go b/internal/webrtc/switchbot.go index 5ece88ae..6f72e55d 100644 --- a/internal/webrtc/switchbot.go +++ b/internal/webrtc/switchbot.go @@ -33,8 +33,12 @@ func switchbotClient(rawURL string, query url.Values) (core.Producer, error) { v.Resolution = 0 case "sd": v.Resolution = 1 + case "auto": + v.Resolution = 2 } + v.PlayType = core.Atoi(query.Get("play_type")) // zero by default + return v, nil }) } diff --git a/pkg/creds/README.md b/pkg/creds/README.md new file mode 100644 index 00000000..1909a206 --- /dev/null +++ b/pkg/creds/README.md @@ -0,0 +1,7 @@ +# Credentials + +This module allows you to get variables: + +- from custom storage (ex. config file) +- from [credential files](https://systemd.io/CREDENTIALS/) +- from environment variables diff --git a/pkg/creds/creds.go b/pkg/creds/creds.go new file mode 100644 index 00000000..84bc275a --- /dev/null +++ b/pkg/creds/creds.go @@ -0,0 +1,79 @@ +package creds + +import ( + "errors" + "os" + "path/filepath" + "regexp" + "strings" +) + +type Storage interface { + SetValue(name, value string) error + GetValue(name string) (string, bool) +} + +var storage Storage + +func SetStorage(s Storage) { + storage = s +} + +func SetValue(name, value string) error { + if storage == nil { + return errors.New("credentials: storage not initialized") + } + if err := storage.SetValue(name, value); err != nil { + return err + } + AddSecret(value) + return nil +} + +func GetValue(name string) (value string, ok bool) { + value, ok = getValue(name) + AddSecret(value) + return +} + +func getValue(name string) (string, bool) { + if storage != nil { + if value, ok := storage.GetValue(name); ok { + return value, true + } + } + + if dir, ok := os.LookupEnv("CREDENTIALS_DIRECTORY"); ok { + if value, _ := os.ReadFile(filepath.Join(dir, name)); value != nil { + return strings.TrimSpace(string(value)), true + } + } + + return os.LookupEnv(name) +} + +// ReplaceVars - support format ${CAMERA_PASSWORD} and ${RTSP_USER:admin} +func ReplaceVars(data []byte) []byte { + re := regexp.MustCompile(`\${([^}{]+)}`) + return re.ReplaceAllFunc(data, func(match []byte) []byte { + key := string(match[2 : len(match)-1]) + + var def string + var defok bool + + if i := strings.IndexByte(key, ':'); i > 0 { + key, def = key[:i], key[i+1:] + defok = true + } + + if value, ok := GetValue(key); ok { + return []byte(value) + } + + if defok { + return []byte(def) + } + + return match + }) +} diff --git a/pkg/creds/secrets.go b/pkg/creds/secrets.go new file mode 100644 index 00000000..a9a0094e --- /dev/null +++ b/pkg/creds/secrets.go @@ -0,0 +1,83 @@ +package creds + +import ( + "io" + "net/http" + "slices" + "strings" + "sync" +) + +func AddSecret(value string) { + if value == "" { + return + } + + secretsMu.Lock() + defer secretsMu.Unlock() + + if slices.Contains(secrets, value) { + return + } + + secrets = append(secrets, value) + secretsReplacer = nil +} + +var secrets []string +var secretsMu sync.Mutex +var secretsReplacer *strings.Replacer + +func getReplacer() *strings.Replacer { + secretsMu.Lock() + defer secretsMu.Unlock() + + if secretsReplacer == nil { + oldnew := make([]string, 0, 2*len(secrets)) + for _, s := range secrets { + oldnew = append(oldnew, s, "***") + } + secretsReplacer = strings.NewReplacer(oldnew...) + } + + return secretsReplacer +} + +func SecretString(s string) string { + re := getReplacer() + return re.Replace(s) +} + +func SecretWriter(w io.Writer) io.Writer { + return &secretWriter{w} +} + +type secretWriter struct { + w io.Writer +} + +func (s *secretWriter) Write(b []byte) (int, error) { + re := getReplacer() + return re.WriteString(s.w, string(b)) +} + +type secretResponse struct { + w http.ResponseWriter +} + +func (s *secretResponse) Header() http.Header { + return s.w.Header() +} + +func (s *secretResponse) Write(b []byte) (int, error) { + re := getReplacer() + return re.WriteString(s.w, string(b)) +} + +func (s *secretResponse) WriteHeader(statusCode int) { + s.w.WriteHeader(statusCode) +} + +func SecretResponse(w http.ResponseWriter) http.ResponseWriter { + return &secretResponse{w} +} diff --git a/pkg/creds/secrets_test.go b/pkg/creds/secrets_test.go new file mode 100644 index 00000000..83f1908a --- /dev/null +++ b/pkg/creds/secrets_test.go @@ -0,0 +1,15 @@ +package creds + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestString(t *testing.T) { + AddSecret("admin") + AddSecret("pa$$word") + + s := SecretString("rtsp://admin:pa$$word@192.168.1.123/stream1") + require.Equal(t, "rtsp://***:***@192.168.1.123/stream1", s) +} diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 82379383..4d252228 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -88,6 +88,8 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece } func (c *Client) Start() (err error) { - _, err = c.conn.Read(nil) + // just block until c.conn closed + b := make([]byte, 1) + _, err = c.conn.Read(b) return } diff --git a/pkg/h265/rtp.go b/pkg/h265/rtp.go index 72d2c02f..9c571ec5 100644 --- a/pkg/h265/rtp.go +++ b/pkg/h265/rtp.go @@ -14,6 +14,7 @@ func RTPDepay(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc { buf := make([]byte, 0, 512*1024) // 512K var nuStart int + var seqNum uint16 return func(packet *rtp.Packet) { data := packet.Payload @@ -34,9 +35,19 @@ func RTPDepay(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc { } } + // when we collect data into one buffer, we need to make sure + // that all of it falls into the same sequence + if len(buf) > 0 && packet.SequenceNumber-seqNum != 1 { + //log.Printf("broken H265 sequence") + buf = buf[:0] // drop data + return + } + + seqNum = packet.SequenceNumber + if nuType == NALUTypeFU { switch data[2] >> 6 { - case 2: // begin + case 0b10: // begin nuType = data[2] & 0x3F // push PS data before keyframe @@ -49,13 +60,30 @@ func RTPDepay(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc { buf = append(buf, (data[0]&0x81)|(nuType<<1), data[1]) buf = append(buf, data[3:]...) return - case 0: // continue + case 0b00: // continue + if len(buf) == 0 { + //log.Printf("broken H265 fragment") + return + } + buf = append(buf, data[3:]...) return - case 1: // end + case 0b01: // end + if len(buf) == 0 { + //log.Printf("broken H265 fragment") + return + } + buf = append(buf, data[3:]...) + + if nuStart > len(buf)+4 { + //log.Printf("broken H265 fragment") + buf = buf[:0] // drop data + return + } + binary.BigEndian.PutUint32(buf[nuStart:], uint32(len(buf)-nuStart-4)) - case 3: // wrong RFC 7798 realisation from OpenIPC project + case 0b11: // wrong RFC 7798 realisation from OpenIPC project // A non-fragmented NAL unit MUST NOT be transmitted in one FU; i.e., // the Start bit and End bit must not both be set to 1 in the same FU // header. @@ -65,10 +93,8 @@ func RTPDepay(codec *core.Codec, handler core.HandlerFunc) core.HandlerFunc { buf = append(buf, data[3:]...) } } else { - nuStart = len(buf) - buf = append(buf, 0, 0, 0, 0) // NAL unit size + buf = binary.BigEndian.AppendUint32(buf, uint32(len(data))) // NAL unit size buf = append(buf, data...) - binary.BigEndian.PutUint32(buf[nuStart:], uint32(len(data))) } // collect all NAL Units for Access Unit diff --git a/pkg/onvif/client.go b/pkg/onvif/client.go index 936f65f6..77bbe0ff 100644 --- a/pkg/onvif/client.go +++ b/pkg/onvif/client.go @@ -3,9 +3,10 @@ package onvif import ( "bytes" "errors" + "fmt" "html" "io" - "net/http" + "net" "net/url" "regexp" "strings" @@ -43,7 +44,14 @@ func NewClient(rawURL string) (*Client, error) { } client.mediaURL = FindTagValue(b, "Media.+?XAddr") + if client.mediaURL == "" { + client.mediaURL = baseURL + "/onvif/media_service" + } + client.imaginURL = FindTagValue(b, "Imaging.+?XAddr") + if client.imaginURL == "" { + client.imaginURL = baseURL + "/onvif/imaging_service" + } return client, nil } @@ -175,26 +183,62 @@ func (c *Client) MediaRequest(operation string) ([]byte, error) { return c.Request(c.mediaURL, operation) } -func (c *Client) Request(url, body string) ([]byte, error) { - if url == "" { +func (c *Client) Request(rawUrl, body string) ([]byte, error) { + if rawUrl == "" { return nil, errors.New("onvif: unsupported service") } e := NewEnvelopeWithUser(c.url.User) e.Append(body) - client := &http.Client{Timeout: time.Second * 5000} - res, err := client.Post(url, `application/soap+xml;charset=utf-8`, bytes.NewReader(e.Bytes())) + u, err := url.Parse(rawUrl) if err != nil { return nil, err } - // need to close body with eny response status - b, err := io.ReadAll(res.Body) - - if err == nil && res.StatusCode != http.StatusOK { - err = errors.New("onvif: " + res.Status + " for " + url) + host := u.Host + if u.Port() == "" { + host += ":80" } - return b, err + conn, err := net.DialTimeout("tcp", host, 5*time.Second) + if err != nil { + return nil, err + } + defer conn.Close() + + reqBody := e.Bytes() + rawReq := fmt.Appendf(nil, "POST %s HTTP/1.1\r\n"+ + "Host: %s\r\n"+ + "Content-Type: application/soap+xml;charset=utf-8\r\n"+ + "Content-Length: %d\r\n"+ + "Connection: close\r\n"+ + "\r\n", u.Path, u.Host, len(reqBody)) + rawReq = append(rawReq, reqBody...) + + if _, err = conn.Write(rawReq); err != nil { + return nil, err + } + + rawRes, err := io.ReadAll(conn) + if err != nil { + return nil, err + } + + // Look for XML in complete response + if i := bytes.Index(rawRes, []byte(" 0 { + return rawRes[i:], nil + } + + // No XML found - might be an error response + if i := bytes.Index(rawRes, []byte("\r\n\r\n")); i > 0 { + if bytes.Contains(rawRes[:i], []byte("chunked")) { + return nil, errors.New("onvif: TODO: support chunked encoding") + } + + // Return body after headers + return rawRes[i+4:], nil + } + + return rawRes, nil } diff --git a/pkg/probe/consumer.go b/pkg/probe/consumer.go index c6aa4478..a1ca7ca5 100644 --- a/pkg/probe/consumer.go +++ b/pkg/probe/consumer.go @@ -47,3 +47,7 @@ func (p *Probe) AddTrack(media *core.Media, codec *core.Codec, track *core.Recei p.Senders = append(p.Senders, sender) return nil } + +func (p *Probe) Start() error { + return nil +} diff --git a/pkg/rtsp/client.go b/pkg/rtsp/client.go index 7fc134fc..4e891213 100644 --- a/pkg/rtsp/client.go +++ b/pkg/rtsp/client.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/AlexxIT/go2rtc/pkg/tcp/websocket" @@ -36,14 +37,22 @@ func (c *Conn) Dial() (err error) { var conn net.Conn - if c.Transport == "" { - timeout := core.ConnDialTimeout + switch c.Transport { + case "", "tcp", "udp": + var timeout time.Duration if c.Timeout != 0 { timeout = time.Second * time.Duration(c.Timeout) + } else { + timeout = core.ConnDialTimeout } conn, err = tcp.Dial(c.URL, timeout) - c.Protocol = "rtsp+tcp" - } else { + + if c.Transport != "udp" { + c.Protocol = "rtsp+tcp" + } else { + c.Protocol = "rtsp+udp" + } + default: conn, err = websocket.Dial(c.Transport) c.Protocol = "ws" } @@ -61,6 +70,9 @@ func (c *Conn) Dial() (err error) { c.sequence = 0 c.state = StateConn + c.udpConn = nil + c.udpAddr = nil + c.Connection.RemoteAddr = conn.RemoteAddr().String() c.Connection.Transport = conn c.Connection.URL = c.uri @@ -218,15 +230,27 @@ func (c *Conn) Record() (err error) { func (c *Conn) SetupMedia(media *core.Media) (byte, error) { var transport string - // try to use media position as channel number - for i, m := range c.Medias { - if m.Equal(media) { - transport = fmt.Sprintf( - // i - RTP (data channel) - // i+1 - RTCP (control channel) - "RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1, - ) - break + if c.Transport == "udp" { + conn1, conn2, err := ListenUDPPair() + if err != nil { + return 0, err + } + + c.udpConn = append(c.udpConn, conn1, conn2) + + port := conn1.LocalAddr().(*net.UDPAddr).Port + transport = fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", port, port+1) + } else { + // try to use media position as channel number + for i, m := range c.Medias { + if m.Equal(media) { + transport = fmt.Sprintf( + // i - RTP (data channel) + // i+1 - RTCP (control channel) + "RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1, + ) + break + } } } @@ -286,27 +310,53 @@ func (c *Conn) SetupMedia(media *core.Media) (byte, error) { } } - // we send our `interleaved`, but camera can answer with another - - // Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7 - // Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0 - // Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1 + // Parse server response transport = res.Header.Get("Transport") - if !strings.HasPrefix(transport, "RTP/AVP/TCP;") { + + if c.Transport == "udp" { + channel := byte(len(c.udpConn) - 2) + + // Dahua: RTP/AVP/UDP;unicast;client_port=49292-49293;server_port=43670-43671;ssrc=7CB694B4 + // OpenIPC: RTP/AVP/UDP;unicast;client_port=59612-59613 + if s := core.Between(transport, "server_port=", ";"); s != "" { + s1, s2, _ := strings.Cut(s, "-") + port1 := core.Atoi(s1) + port2 := core.Atoi(s2) + // TODO: more smart handling empty server ports + if port1 > 0 && port2 > 0 { + remoteIP := c.conn.RemoteAddr().(*net.TCPAddr).IP + c.udpAddr = append(c.udpAddr, + &net.UDPAddr{IP: remoteIP, Port: port1}, + &net.UDPAddr{IP: remoteIP, Port: port2}, + ) + + go func() { + // Try to open a hole in the NAT router (to allow incoming UDP packets) + // by send a UDP packet for RTP and RTCP to the remote RTSP server. + // https://github.com/FFmpeg/FFmpeg/blob/aa91ae25b88e195e6af4248e0ab30605735ca1cd/libavformat/rtpdec.c#L416-L438 + _, _ = c.WriteToUDP([]byte{0x80, 0x00, 0x00, 0x00}, channel) + _, _ = c.WriteToUDP([]byte{0x80, 0xC8, 0x00, 0x01}, channel+1) + }() + } + } + + return channel, nil + } else { + // we send our `interleaved`, but camera can answer with another + + // Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7 + // Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0 + // Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1 // Escam Q6 has a bug: // Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1 - if !strings.Contains(transport, ";interleaved=") { + s := core.Between(transport, "interleaved=", "-") + i, err := strconv.Atoi(s) + if err != nil { return 0, fmt.Errorf("wrong transport: %s", transport) } - } - channel := core.Between(transport, "interleaved=", "-") - i, err := strconv.Atoi(channel) - if err != nil { - return 0, err + return byte(i), nil } - - return byte(i), nil } func (c *Conn) Play() (err error) { @@ -327,5 +377,56 @@ func (c *Conn) Close() error { if c.OnClose != nil { _ = c.OnClose() } + for _, conn := range c.udpConn { + _ = conn.Close() + } return c.conn.Close() } + +func (c *Conn) WriteToUDP(b []byte, channel byte) (int, error) { + return c.udpConn[channel].WriteToUDP(b, c.udpAddr[channel]) +} + +const listenUDPAttemps = 10 + +var listenUDPMu sync.Mutex + +func ListenUDPPair() (*net.UDPConn, *net.UDPConn, error) { + listenUDPMu.Lock() + defer listenUDPMu.Unlock() + + for i := 0; i < listenUDPAttemps; i++ { + // Get a random even port from the OS + ln1, err := net.ListenUDP("udp", &net.UDPAddr{IP: nil, Port: 0}) + if err != nil { + continue + } + + var port1 = ln1.LocalAddr().(*net.UDPAddr).Port + var port2 int + + // 11. RTP over Network and Transport Protocols (https://www.ietf.org/rfc/rfc3550.txt) + // For UDP and similar protocols, + // RTP SHOULD use an even destination port number and the corresponding + // RTCP stream SHOULD use the next higher (odd) destination port number + if port1&1 > 0 { + port2 = port1 - 1 + } else { + port2 = port1 + 1 + } + + ln2, err := net.ListenUDP("udp", &net.UDPAddr{IP: nil, Port: port2}) + if err != nil { + _ = ln1.Close() + continue + } + + if port1 < port2 { + return ln1, ln2, nil + } else { + return ln2, ln1, nil + } + } + + return nil, nil, fmt.Errorf("can't open two UDP ports") +} diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index 0c2009d7..2984c781 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -2,6 +2,7 @@ package rtsp import ( "bufio" + "context" "encoding/binary" "fmt" "io" @@ -13,7 +14,6 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/tcp" - "github.com/pion/rtcp" "github.com/pion/rtp" ) @@ -40,6 +40,7 @@ type Conn struct { keepalive int mode core.Mode playOK bool + playErr error reader *bufio.Reader sequence int session string @@ -47,6 +48,9 @@ type Conn struct { state State stateMu sync.Mutex + + udpConn []*net.UDPConn + udpAddr []*net.UDPAddr } const ( @@ -68,7 +72,6 @@ func (s State) String() string { case StateNone: return "NONE" case StateConn: - return "CONN" case StateSetup: return MethodSetup @@ -88,23 +91,25 @@ const ( func (c *Conn) Handle() (err error) { var timeout time.Duration - var keepaliveDT time.Duration - var keepaliveTS time.Time - switch c.mode { case core.ModeActiveProducer: + var keepaliveDT time.Duration + if c.keepalive > 5 { keepaliveDT = time.Duration(c.keepalive-5) * time.Second } else { keepaliveDT = 25 * time.Second } - keepaliveTS = time.Now().Add(keepaliveDT) + + ctx, cancel := context.WithCancel(context.Background()) + go c.handleKeepalive(ctx, keepaliveDT) + defer cancel() if c.Timeout == 0 { // polling frames from remote RTSP Server (ex Camera) timeout = time.Second * 5 - if len(c.Receivers) == 0 { + if len(c.Receivers) == 0 || c.Transport == "udp" { // if we only send audio to camera // https://github.com/AlexxIT/go2rtc/issues/659 timeout += keepaliveDT @@ -129,148 +134,190 @@ func (c *Conn) Handle() (err error) { return fmt.Errorf("wrong RTSP conn mode: %d", c.mode) } + for i := 0; i < len(c.udpConn); i++ { + go c.handleUDPData(byte(i)) + } + for c.state != StateNone { ts := time.Now() - if err = c.conn.SetReadDeadline(ts.Add(timeout)); err != nil { + _ = c.conn.SetReadDeadline(ts.Add(timeout)) + + if err = c.handleTCPData(); err != nil { return } - - // we can read: - // 1. RTP interleaved: `$` + 1B channel number + 2B size - // 2. RTSP response: RTSP/1.0 200 OK - // 3. RTSP request: OPTIONS ... - var buf4 []byte // `$` + 1B channel number + 2B size - buf4, err = c.reader.Peek(4) - if err != nil { - return - } - - var channelID byte - var size uint16 - - if buf4[0] != '$' { - switch string(buf4) { - case "RTSP": - var res *tcp.Response - if res, err = c.ReadResponse(); err != nil { - return - } - c.Fire(res) - // for playing backchannel only after OK response on play - c.playOK = true - continue - - case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": - var req *tcp.Request - if req, err = c.ReadRequest(); err != nil { - return - } - c.Fire(req) - if req.Method == MethodOptions { - res := &tcp.Response{Request: req} - if err = c.WriteResponse(res); err != nil { - return - } - } - continue - - default: - c.Fire("RTSP wrong input") - - for i := 0; ; i++ { - // search next start symbol - if _, err = c.reader.ReadBytes('$'); err != nil { - return err - } - - if channelID, err = c.reader.ReadByte(); err != nil { - return err - } - - // TODO: better check maximum good channel ID - if channelID >= 20 { - continue - } - - buf4 = make([]byte, 2) - if _, err = io.ReadFull(c.reader, buf4); err != nil { - return err - } - - // check if size good for RTP - size = binary.BigEndian.Uint16(buf4) - if size <= 1500 { - break - } - - // 10 tries to find good packet - if i >= 10 { - return fmt.Errorf("RTSP wrong input") - } - } - } - } else { - // hope that the odd channels are always RTCP - channelID = buf4[1] - - // get data size - size = binary.BigEndian.Uint16(buf4[2:]) - - // skip 4 bytes from c.reader.Peek - if _, err = c.reader.Discard(4); err != nil { - return - } - } - - // init memory for data - buf := make([]byte, size) - if _, err = io.ReadFull(c.reader, buf); err != nil { - return - } - - c.Recv += int(size) - - if channelID&1 == 0 { - packet := &rtp.Packet{} - if err = packet.Unmarshal(buf); err != nil { - return - } - - for _, receiver := range c.Receivers { - if receiver.ID == channelID { - receiver.WriteRTP(packet) - break - } - } - } else { - msg := &RTCP{Channel: channelID} - - if err = msg.Header.Unmarshal(buf); err != nil { - continue - } - - msg.Packets, err = rtcp.Unmarshal(buf) - if err != nil { - continue - } - - c.Fire(msg) - } - - if keepaliveDT != 0 && ts.After(keepaliveTS) { - req := &tcp.Request{Method: MethodOptions, URL: c.URL} - if err = c.WriteRequest(req); err != nil { - return - } - - keepaliveTS = ts.Add(keepaliveDT) - } } return } +func (c *Conn) handleKeepalive(ctx context.Context, d time.Duration) { + ticker := time.NewTicker(d) + for { + select { + case <-ticker.C: + req := &tcp.Request{Method: MethodOptions, URL: c.URL} + if err := c.WriteRequest(req); err != nil { + return + } + case <-ctx.Done(): + return + } + } +} + +func (c *Conn) handleUDPData(channel byte) { + // TODO: handle timeouts and drop TCP connection after any error + conn := c.udpConn[channel] + + for { + // TP-Link Tapo camera has crazy 10000 bytes packet size + buf := make([]byte, 10240) + + n, _, err := conn.ReadFromUDP(buf) + if err != nil { + return + } + + if err = c.handleRawPacket(channel, buf[:n]); err != nil { + return + } + } +} + +func (c *Conn) handleTCPData() error { + // we can read: + // 1. RTP interleaved: `$` + 1B channel number + 2B size + // 2. RTSP response: RTSP/1.0 200 OK + // 3. RTSP request: OPTIONS ... + var buf4 []byte // `$` + 1B channel number + 2B size + var err error + + buf4, err = c.reader.Peek(4) + if err != nil { + return err + } + + var channel byte + var size uint16 + + if buf4[0] != '$' { + switch string(buf4) { + case "RTSP": + var res *tcp.Response + if res, err = c.ReadResponse(); err != nil { + return err + } + c.Fire(res) + // for playing backchannel only after OK response on play + c.playOK = true + return nil + + case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": + var req *tcp.Request + if req, err = c.ReadRequest(); err != nil { + return err + } + c.Fire(req) + if req.Method == MethodOptions { + res := &tcp.Response{Request: req} + if err = c.WriteResponse(res); err != nil { + return err + } + } + return nil + + default: + c.Fire("RTSP wrong input") + + for i := 0; ; i++ { + // search next start symbol + if _, err = c.reader.ReadBytes('$'); err != nil { + return err + } + + if channel, err = c.reader.ReadByte(); err != nil { + return err + } + + // TODO: better check maximum good channel ID + if channel >= 20 { + continue + } + + buf4 = make([]byte, 2) + if _, err = io.ReadFull(c.reader, buf4); err != nil { + return err + } + + // check if size good for RTP + size = binary.BigEndian.Uint16(buf4) + if size <= 1500 { + break + } + + // 10 tries to find good packet + if i >= 10 { + return fmt.Errorf("RTSP wrong input") + } + } + } + } else { + // hope that the odd channels are always RTCP + channel = buf4[1] + + // get data size + size = binary.BigEndian.Uint16(buf4[2:]) + + // skip 4 bytes from c.reader.Peek + if _, err = c.reader.Discard(4); err != nil { + return err + } + } + + // init memory for data + buf := make([]byte, size) + if _, err = io.ReadFull(c.reader, buf); err != nil { + return err + } + + c.Recv += int(size) + + return c.handleRawPacket(channel, buf) +} + +func (c *Conn) handleRawPacket(channel byte, buf []byte) error { + if channel&1 == 0 { + packet := &rtp.Packet{} + if err := packet.Unmarshal(buf); err != nil { + return err + } + + for _, receiver := range c.Receivers { + if receiver.ID == channel { + receiver.WriteRTP(packet) + break + } + } + } else { + msg := &RTCP{Channel: channel} + + if err := msg.Header.Unmarshal(buf); err != nil { + return nil + } + + //var err error + //msg.Packets, err = rtcp.Unmarshal(buf) + //if err != nil { + // return nil + //} + + c.Fire(msg) + } + + return nil +} + func (c *Conn) WriteRequest(req *tcp.Request) error { if req.Proto == "" { req.Proto = ProtoRTSP diff --git a/pkg/rtsp/consumer.go b/pkg/rtsp/consumer.go index 860ed113..e6525d96 100644 --- a/pkg/rtsp/consumer.go +++ b/pkg/rtsp/consumer.go @@ -85,11 +85,8 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. } flushBuf := func() { - if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { - return - } //log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf)) - if _, err := c.conn.Write(buf[:n]); err == nil { + if err := c.writeInterleavedData(buf[:n]); err != nil { c.Send += n } n = 0 @@ -177,3 +174,25 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. return handlerFunc } + +func (c *Conn) writeInterleavedData(data []byte) error { + if c.Transport != "udp" { + _ = c.conn.SetWriteDeadline(time.Now().Add(Timeout)) + _, err := c.conn.Write(data) + return err + } + + for len(data) >= 4 && data[0] == '$' { + channel := data[1] + size := uint16(data[2])<<8 | uint16(data[3]) + rtpData := data[4 : 4+size] + + if _, err := c.WriteToUDP(rtpData, channel); err != nil { + return err + } + + data = data[4+size:] + } + + return nil +} diff --git a/pkg/shell/shell.go b/pkg/shell/shell.go index 75df671f..e04a58c4 100644 --- a/pkg/shell/shell.go +++ b/pkg/shell/shell.go @@ -3,8 +3,6 @@ package shell import ( "os" "os/signal" - "path/filepath" - "regexp" "strings" "syscall" ) @@ -38,39 +36,6 @@ func QuoteSplit(s string) []string { return a } -// ReplaceEnvVars - support format ${CAMERA_PASSWORD} and ${RTSP_USER:admin} -func ReplaceEnvVars(text string) string { - re := regexp.MustCompile(`\${([^}{]+)}`) - return re.ReplaceAllStringFunc(text, func(match string) string { - key := match[2 : len(match)-1] - - var def string - var dok bool - - if i := strings.IndexByte(key, ':'); i > 0 { - key, def = key[:i], key[i+1:] - dok = true - } - - if dir, vok := os.LookupEnv("CREDENTIALS_DIRECTORY"); vok { - value, err := os.ReadFile(filepath.Join(dir, key)) - if err == nil { - return strings.TrimSpace(string(value)) - } - } - - if value, vok := os.LookupEnv(key); vok { - return value - } - - if dok { - return def - } - - return match - }) -} - func RunUntilSignal() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)