From dfc1f45f974f8b182b1f9574dc8c10d2e0817cab Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 2 Jun 2025 22:06:47 +0300 Subject: [PATCH 01/24] support preloading streams --- internal/streams/preload.go | 30 ++++++++++++ internal/streams/streams.go | 17 +++++-- pkg/preload/producer.go | 92 +++++++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 4 deletions(-) create mode 100644 internal/streams/preload.go create mode 100644 pkg/preload/producer.go diff --git a/internal/streams/preload.go b/internal/streams/preload.go new file mode 100644 index 00000000..c811cc5c --- /dev/null +++ b/internal/streams/preload.go @@ -0,0 +1,30 @@ +package streams + +import ( + "net/url" + "strings" + + "github.com/AlexxIT/go2rtc/pkg/preload" +) + +func (s *Stream) Preload(query url.Values) error { + cons := preload.NewPreload(query) + + if err := s.AddConsumer(cons); err != nil { + return err + } + + return nil +} + +func Preload(src string) { + name, rawQuery, _ := strings.Cut(src, "#") + query := ParseQuery(rawQuery) + + if stream := Get(name); stream != nil { + if err := stream.Preload(query); err != nil { + log.Error().Err(err).Caller().Send() + } + return + } +} diff --git a/internal/streams/streams.go b/internal/streams/streams.go index dcbaba28..7bbccace 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -16,6 +16,7 @@ func Init() { var cfg struct { Streams map[string]any `yaml:"streams"` Publish map[string]any `yaml:"publish"` + Preload []string `yaml:"preload"` } app.LoadConfig(&cfg) @@ -29,14 +30,22 @@ func Init() { api.HandleFunc("api/streams", apiStreams) api.HandleFunc("api/streams.dot", apiStreamsDOT) - if cfg.Publish == nil { + if cfg.Publish == nil && cfg.Preload == nil { return } time.AfterFunc(time.Second, func() { - for name, dst := range cfg.Publish { - if stream := Get(name); stream != nil { - Publish(stream, dst) + if cfg.Publish != nil { + for name, dst := range cfg.Publish { + if stream := Get(name); stream != nil { + Publish(stream, dst) + } + } + } + + if cfg.Preload != nil { + for _, src := range cfg.Preload { + Preload(src) } } }) diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go new file mode 100644 index 00000000..811cf2e4 --- /dev/null +++ b/pkg/preload/producer.go @@ -0,0 +1,92 @@ +package preload + +import ( + "net/url" + "strings" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +type Preload struct { + core.Connection + + Closed core.Waiter +} + +func NewPreload(query url.Values) *Preload { + medias := core.ParseQuery(query) + + for _, value := range query["microphone"] { + media := &core.Media{Kind: core.KindAudio, Direction: core.DirectionRecvonly} + + for _, name := range strings.Split(value, ",") { + name = strings.ToUpper(name) + switch name { + case "", "COPY": + name = core.CodecAny + } + media.Codecs = append(media.Codecs, &core.Codec{Name: name}) + } + + medias = append(medias, media) + } + + if len(medias) == 0 { + medias = []*core.Media{ + { + Kind: core.KindVideo, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{{Name: core.CodecAny}}, + }, + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{{Name: core.CodecAny}}, + }, + } + } + + return &Preload{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "preload", + Medias: medias, + Protocol: "native", + RemoteAddr: "localhost", + UserAgent: "go2rtc", + }, + } +} + +func (p *Preload) GetMedias() []*core.Media { + return p.Medias +} + +func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + sender := core.NewSender(media, track.Codec) + sender.Bind(track) + p.Senders = append(p.Senders, sender) + return nil +} + +func (p *Preload) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + receiver := core.NewReceiver(media, codec) + p.Receivers = append(p.Receivers, receiver) + return receiver, nil +} + +func (p *Preload) Start() error { + p.Closed.Wait() + return nil +} + +func (p *Preload) Stop() error { + for _, receiver := range p.Receivers { + receiver.Close() + } + for _, sender := range p.Senders { + sender.Close() + } + p.Closed.Done(nil) + return nil +} From 020549ef60103724575aee52e20096e817cb694b Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 2 Jun 2025 22:16:43 +0300 Subject: [PATCH 02/24] readme --- README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.md b/README.md index 90a2537f..09cafaeb 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ Ultimate camera streaming application with support RTSP, WebRTC, HomeKit, FFmpeg * [Incoming sources](#incoming-sources) * [Stream to camera](#stream-to-camera) * [Publish stream](#publish-stream) + * [Preload stream](#preload-stream) * [Module: API](#module-api) * [Module: RTSP](#module-rtsp) * [Module: RTMP](#module-rtmp) @@ -818,6 +819,25 @@ streams: - **Telegram Desktop App** > Any public or private channel or group (where you admin) > Live stream > Start with... > Start streaming. - **YouTube** > Create > Go live > Stream latency: Ultra low-latency > Copy: Stream URL + Stream key. +### Preload stream + +You can preload any stream on go2rtc start. This is useful for cameras that take a long time to start up. + +```yaml +preload: + - my_stream1 + - my_stream2#video#audio#microphone + - my_stream3#video=h265#audio=opus +streams: + my_stream1: + - rtsp://129.168.3.1:554/stream1 + my_stream2: + - rtsp://129.168.3.1:554/stream1 + my_stream3: + - rtsp://129.168.3.1:554/stream1 + - ffmpeg:my_stream3#video=copy#audio=opus +```` + ### Module: API The HTTP API is the main part for interacting with the application. Default address: `http://localhost:1984/`. From 493fa1ef6a1478e39c6a18f075c1434e43007242 Mon Sep 17 00:00:00 2001 From: seydx Date: Thu, 5 Jun 2025 11:33:03 +0300 Subject: [PATCH 03/24] add api endpoints and change config syntax --- internal/streams/api.go | 62 +++++++++++++++++++++++++++++++++++++ internal/streams/preload.go | 26 +++++++++------- internal/streams/streams.go | 11 ++++--- pkg/preload/producer.go | 6 ++-- 4 files changed, 85 insertions(+), 20 deletions(-) diff --git a/internal/streams/api.go b/internal/streams/api.go index 061e61c2..c0c6744b 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -122,3 +122,65 @@ func apiStreamsDOT(w http.ResponseWriter, r *http.Request) { api.Response(w, dot, "text/vnd.graphviz") } + +func apiPreload(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + src := query.Get("src") + query.Del("src") + + videoQuery := query.Get("video") + audioQuery := query.Get("audio") + micQuery := query.Get("microphone") + + if src == "" { + http.Error(w, "no source", http.StatusBadRequest) + return + } + + switch r.Method { + case "PUT": + // check if stream exists + if stream := Get(src); stream == nil { + http.Error(w, "stream not found", http.StatusNotFound) + return + } + + // check if consumer exists + if cons, ok := preloads[src]; ok { + cons.Stop() + delete(preloads, src) + } + + var rawQuery string + if videoQuery != "" { + rawQuery += "video=" + videoQuery + "#" + } + if audioQuery != "" { + rawQuery += "audio=" + audioQuery + "#" + } + if micQuery != "" { + rawQuery += "microphone=" + micQuery + } + + if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil { + log.Error().Err(err).Str("src", src).Msg("Failed to patch config for PUT") + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + Preload(src, rawQuery) + + case "DELETE": + if cons, ok := preloads[src]; ok { + cons.Stop() + delete(preloads, src) + } + + if err := app.PatchConfig([]string{"preload", src}, nil); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + } + + default: + http.Error(w, "", http.StatusMethodNotAllowed) + } +} diff --git a/internal/streams/preload.go b/internal/streams/preload.go index c811cc5c..7314df55 100644 --- a/internal/streams/preload.go +++ b/internal/streams/preload.go @@ -2,13 +2,15 @@ package streams import ( "net/url" - "strings" "github.com/AlexxIT/go2rtc/pkg/preload" ) -func (s *Stream) Preload(query url.Values) error { - cons := preload.NewPreload(query) +var preloads = map[string]*preload.Preload{} + +func (s *Stream) Preload(name string, query url.Values) error { + cons := preload.NewPreload(name, query) + preloads[name] = cons if err := s.AddConsumer(cons); err != nil { return err @@ -17,14 +19,16 @@ func (s *Stream) Preload(query url.Values) error { return nil } -func Preload(src string) { - name, rawQuery, _ := strings.Cut(src, "#") - query := ParseQuery(rawQuery) - - if stream := Get(name); stream != nil { - if err := stream.Preload(query); err != nil { - log.Error().Err(err).Caller().Send() - } +func Preload(src string, rawQuery string) { + // skip if exists + if _, ok := preloads[src]; ok { return } + + if stream := Get(src); stream != nil { + query := ParseQuery(rawQuery) + if err := stream.Preload(src, query); err != nil { + log.Error().Err(err).Caller().Send() + } + } } diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 7bbccace..8f07ea12 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -14,9 +14,9 @@ import ( func Init() { var cfg struct { - Streams map[string]any `yaml:"streams"` - Publish map[string]any `yaml:"publish"` - Preload []string `yaml:"preload"` + Streams map[string]any `yaml:"streams"` + Publish map[string]any `yaml:"publish"` + Preload map[string]string `yaml:"preload"` } app.LoadConfig(&cfg) @@ -29,6 +29,7 @@ func Init() { api.HandleFunc("api/streams", apiStreams) api.HandleFunc("api/streams.dot", apiStreamsDOT) + api.HandleFunc("api/preload", apiPreload) if cfg.Publish == nil && cfg.Preload == nil { return @@ -44,8 +45,8 @@ func Init() { } if cfg.Preload != nil { - for _, src := range cfg.Preload { - Preload(src) + for name, rawQuery := range cfg.Preload { + Preload(name, rawQuery) } } }) diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go index 811cf2e4..05a50d52 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/producer.go @@ -9,11 +9,10 @@ import ( type Preload struct { core.Connection - Closed core.Waiter } -func NewPreload(query url.Values) *Preload { +func NewPreload(name string, query url.Values) *Preload { medias := core.ParseQuery(query) for _, value := range query["microphone"] { @@ -49,11 +48,10 @@ func NewPreload(query url.Values) *Preload { return &Preload{ Connection: core.Connection{ ID: core.NewID(), - FormatName: "preload", Medias: medias, Protocol: "native", RemoteAddr: "localhost", - UserAgent: "go2rtc", + UserAgent: "go2rtc/preload", }, } } From 8ab7aeb8b25995199980e2411793b302ee9ab126 Mon Sep 17 00:00:00 2001 From: seydx Date: Thu, 5 Jun 2025 15:51:14 +0300 Subject: [PATCH 04/24] update readme --- README.md | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 09cafaeb..627d9bb9 100644 --- a/README.md +++ b/README.md @@ -825,18 +825,19 @@ You can preload any stream on go2rtc start. This is useful for cameras that take ```yaml preload: - - my_stream1 - - my_stream2#video#audio#microphone - - my_stream3#video=h265#audio=opus + camera1: # default: video&audio = ANY + camera2: "video" # preload only video track + camera3: "video=h264#audio=opus" # initialize transcoding pipeline + streams: - my_stream1: - - rtsp://129.168.3.1:554/stream1 - my_stream2: - - rtsp://129.168.3.1:554/stream1 - my_stream3: - - rtsp://129.168.3.1:554/stream1 - - ffmpeg:my_stream3#video=copy#audio=opus -```` + camera1: + - rtsp://192.168.1.100/stream + camera2: + - rtsp://192.168.1.101/stream + camera3: + - rtsp://192.168.1.102/h265stream + - ffmpeg:camera3#video=h264#audio=opus#hardware +``` ### Module: API From 91eeefec68405539d6d15996dbbb4d0d951b82fb Mon Sep 17 00:00:00 2001 From: seydx Date: Thu, 5 Jun 2025 16:01:49 +0300 Subject: [PATCH 05/24] openapi: add preload endpoints --- api/openapi.yaml | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/api/openapi.yaml b/api/openapi.yaml index 618acb48..a2d66a87 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -237,6 +237,54 @@ paths: + /api/preload: + put: + summary: Preload new stream + tags: [ Streams list ] + parameters: + - name: src + in: query + description: Stream source (name) + required: true + schema: { type: string } + example: "camera1" + - name: video + in: query + description: Video codecs filter + required: false + schema: { type: string } + example: all,h264,h265,... + - name: audio + in: query + description: Audio codecs filter + required: false + schema: { type: string } + example: all,aac,opus,... + - name: microphone + in: query + description: Microphone codecs filter + required: false + schema: { type: string } + example: all,aac,opus,... + responses: + default: + description: Default response + delete: + summary: Delete preloaded stream + tags: [ Streams list ] + parameters: + - name: src + in: query + description: Stream source (name) + required: true + schema: { type: string } + example: "camera1" + responses: + default: + description: Default response + + + /api/streams?src={src}: get: summary: Get stream info in JSON format From b6579122d1037934bccf33e1007a15e76bd2f166 Mon Sep 17 00:00:00 2001 From: seydx Date: Fri, 6 Jun 2025 03:11:28 +0200 Subject: [PATCH 06/24] fix --- internal/streams/api.go | 12 +++++++++--- pkg/preload/producer.go | 6 +++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/internal/streams/api.go b/internal/streams/api.go index c0c6744b..47febeb4 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -140,14 +140,15 @@ func apiPreload(w http.ResponseWriter, r *http.Request) { switch r.Method { case "PUT": // check if stream exists - if stream := Get(src); stream == nil { + stream := Get(src) + if stream == nil { http.Error(w, "stream not found", http.StatusNotFound) return } // check if consumer exists if cons, ok := preloads[src]; ok { - cons.Stop() + stream.RemoveConsumer(cons) delete(preloads, src) } @@ -172,7 +173,12 @@ func apiPreload(w http.ResponseWriter, r *http.Request) { case "DELETE": if cons, ok := preloads[src]; ok { - cons.Stop() + if stream := Get(src); stream != nil { + stream.RemoveConsumer(cons) + } else { + cons.Stop() + } + delete(preloads, src) } diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go index 05a50d52..932f5e29 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/producer.go @@ -9,7 +9,7 @@ import ( type Preload struct { core.Connection - Closed core.Waiter + closed core.Waiter } func NewPreload(name string, query url.Values) *Preload { @@ -74,7 +74,7 @@ func (p *Preload) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver } func (p *Preload) Start() error { - p.Closed.Wait() + p.closed.Wait() return nil } @@ -85,6 +85,6 @@ func (p *Preload) Stop() error { for _, sender := range p.Senders { sender.Close() } - p.Closed.Done(nil) + p.closed.Done(nil) return nil } From 6732e726d51b8433a413f81973e6ccd4a9c2848b Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 16 Jun 2025 00:33:16 +0200 Subject: [PATCH 07/24] update preload consumer to handle RTP packets --- pkg/preload/producer.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go index 932f5e29..8eb6aec2 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/producer.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" ) type Preload struct { @@ -62,7 +63,10 @@ func (p *Preload) GetMedias() []*core.Media { func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { sender := core.NewSender(media, track.Codec) - sender.Bind(track) + sender.Handler = func(pkt *rtp.Packet) { + p.Send += pkt.MarshalSize() + } + sender.HandleRTP(track) p.Senders = append(p.Senders, sender) return nil } From 57714544004b19da46771baccc0c6296bde8ae51 Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 16 Jun 2025 00:50:48 +0200 Subject: [PATCH 08/24] use preload as format name --- pkg/preload/producer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/preload/producer.go b/pkg/preload/producer.go index 8eb6aec2..748dff16 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/producer.go @@ -49,10 +49,9 @@ func NewPreload(name string, query url.Values) *Preload { return &Preload{ Connection: core.Connection{ ID: core.NewID(), + FormatName: "preload", Medias: medias, - Protocol: "native", RemoteAddr: "localhost", - UserAgent: "go2rtc/preload", }, } } From ef318f663e854d3ba5e8af3641947edfec43df4d Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 16 Jun 2025 09:32:07 +0200 Subject: [PATCH 09/24] fix preload queries --- internal/streams/api.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/internal/streams/api.go b/internal/streams/api.go index 47febeb4..1b91f906 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -128,10 +128,6 @@ func apiPreload(w http.ResponseWriter, r *http.Request) { src := query.Get("src") query.Del("src") - videoQuery := query.Get("video") - audioQuery := query.Get("audio") - micQuery := query.Get("microphone") - if src == "" { http.Error(w, "no source", http.StatusBadRequest) return @@ -152,15 +148,28 @@ func apiPreload(w http.ResponseWriter, r *http.Request) { delete(preloads, src) } + // parse query parameters var rawQuery string - if videoQuery != "" { - rawQuery += "video=" + videoQuery + "#" + if query.Has("video") { + if videoQuery := query.Get("video"); videoQuery != "" { + rawQuery += "video=" + videoQuery + "#" + } else { + rawQuery += "video#" + } } - if audioQuery != "" { - rawQuery += "audio=" + audioQuery + "#" + if query.Has("audio") { + if audioQuery := query.Get("audio"); audioQuery != "" { + rawQuery += "audio=" + audioQuery + "#" + } else { + rawQuery += "audio#" + } } - if micQuery != "" { - rawQuery += "microphone=" + micQuery + if query.Has("microphone") { + if micQuery := query.Get("microphone"); micQuery != "" { + rawQuery += "microphone=" + micQuery + "#" + } else { + rawQuery += "microphone#" + } } if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil { From 647b2acf487a36b9d478c8659a6c5f43880f840a Mon Sep 17 00:00:00 2001 From: seydx Date: Mon, 16 Jun 2025 09:58:55 +0200 Subject: [PATCH 10/24] cleanup --- pkg/preload/{producer.go => consumer.go} | 11 ----------- 1 file changed, 11 deletions(-) rename pkg/preload/{producer.go => consumer.go} (84%) diff --git a/pkg/preload/producer.go b/pkg/preload/consumer.go similarity index 84% rename from pkg/preload/producer.go rename to pkg/preload/consumer.go index 748dff16..4d3735a8 100644 --- a/pkg/preload/producer.go +++ b/pkg/preload/consumer.go @@ -51,15 +51,10 @@ func NewPreload(name string, query url.Values) *Preload { ID: core.NewID(), FormatName: "preload", Medias: medias, - RemoteAddr: "localhost", }, } } -func (p *Preload) GetMedias() []*core.Media { - return p.Medias -} - func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { sender := core.NewSender(media, track.Codec) sender.Handler = func(pkt *rtp.Packet) { @@ -70,12 +65,6 @@ func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Rec return nil } -func (p *Preload) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - receiver := core.NewReceiver(media, codec) - p.Receivers = append(p.Receivers, receiver) - return receiver, nil -} - func (p *Preload) Start() error { p.closed.Wait() return nil From c68e3cafe4e385270080c1b0ede0132bdd79657a Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Thu, 3 Jul 2025 23:35:58 +0200 Subject: [PATCH 11/24] fixes doorbird backchannel audio: - proper session handling - honor http status codes - prevent device from being flooded by limiting concurrent audio channels --- pkg/doorbird/backchannel.go | 64 +++++++++++++++++++++++++++----- pkg/doorbird/backchannel_lock.go | 5 +++ 2 files changed, 60 insertions(+), 9 deletions(-) create mode 100644 pkg/doorbird/backchannel_lock.go diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 82379383..82ea31b4 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -1,21 +1,32 @@ package doorbird import ( + "bufio" "fmt" "net" "net/url" + "strconv" + "strings" "time" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" ) +var ( + clt Client +) + type Client struct { core.Connection conn net.Conn } func Dial(rawURL string) (*Client, error) { + if clt.conn != nil { + return &clt, nil + } + u, err := url.Parse(rawURL) if err != nil { return nil, err @@ -45,6 +56,23 @@ func Dial(rawURL string) (*Client, error) { return nil, err } + reader := bufio.NewReader(conn) + statusLine, _ := reader.ReadString('\n') + parts := strings.SplitN(statusLine, " ", 3) + if len(parts) >= 2 { + statusCode, err := strconv.Atoi(parts[1]) + if err == nil { + if statusCode == 204 { + conn.Close() + return nil, fmt.Errorf("DoorBird user has no api permission: %d", statusCode) + } + if statusCode == 503 { + conn.Close() + return nil, fmt.Errorf("DoorBird device is busy: %d", statusCode) + } + } + } + medias := []*core.Media{ { Kind: core.KindAudio, @@ -55,17 +83,19 @@ func Dial(rawURL string) (*Client, error) { }, } - return &Client{ + clt = Client{ core.Connection{ ID: core.NewID(), FormatName: "doorbird", Protocol: "http", URL: rawURL, Medias: medias, - Transport: conn, + // Transport: conn, }, conn, - }, nil + } + + return &clt, nil } func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { @@ -73,12 +103,18 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + if len(c.Senders) > 0 { + return fmt.Errorf("DoorBird backchannel already in use") + } + sender := core.NewSender(media, track.Codec) sender.Handler = func(pkt *rtp.Packet) { - _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := c.conn.Write(pkt.Payload); err == nil { - c.Send += n + if c.conn != nil { + _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := c.conn.Write(pkt.Payload); err == nil { + c.Send += n + } } } @@ -87,7 +123,17 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece return nil } -func (c *Client) Start() (err error) { - _, err = c.conn.Read(nil) - return +func (c *Client) Start() error { + if c.conn == nil { + return nil + } + buf := make([]byte, 1) + for { + _, err := c.conn.Read(buf) + if err != nil { + c.conn.Close() + c.conn = nil + return err + } + } } diff --git a/pkg/doorbird/backchannel_lock.go b/pkg/doorbird/backchannel_lock.go new file mode 100644 index 00000000..758320dc --- /dev/null +++ b/pkg/doorbird/backchannel_lock.go @@ -0,0 +1,5 @@ +package doorbird + +import "sync" + +var backchannelMu sync.Mutex From e00d211619c437d1cbe920d7f08cbc558ef71c56 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Sun, 6 Jul 2025 22:33:25 +0200 Subject: [PATCH 12/24] ensure that doorbird errors where shown in logs --- pkg/doorbird/backchannel.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 82ea31b4..d338a445 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -2,6 +2,7 @@ package doorbird import ( "bufio" + "errors" "fmt" "net" "net/url" @@ -64,11 +65,11 @@ func Dial(rawURL string) (*Client, error) { if err == nil { if statusCode == 204 { conn.Close() - return nil, fmt.Errorf("DoorBird user has no api permission: %d", statusCode) + return nil, errors.New("DoorBird user has no api permission") } if statusCode == 503 { conn.Close() - return nil, fmt.Errorf("DoorBird device is busy: %d", statusCode) + return nil, errors.New("DoorBird device is busy") } } } @@ -104,7 +105,7 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { if len(c.Senders) > 0 { - return fmt.Errorf("DoorBird backchannel already in use") + return errors.New("DoorBird backchannel already in use") } sender := core.NewSender(media, track.Codec) From 56e61a85ee7be5c87e0313ec8e0dafc29a7c5d96 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Wed, 16 Jul 2025 21:07:34 +0200 Subject: [PATCH 13/24] proper error handling cleanup files --- pkg/doorbird/backchannel.go | 26 ++++++++++---------------- pkg/doorbird/backchannel_lock.go | 5 ----- 2 files changed, 10 insertions(+), 21 deletions(-) delete mode 100644 pkg/doorbird/backchannel_lock.go diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index d338a445..8a9a25d9 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -5,9 +5,8 @@ import ( "errors" "fmt" "net" + "net/http" "net/url" - "strconv" - "strings" "time" "github.com/AlexxIT/go2rtc/pkg/core" @@ -57,20 +56,15 @@ func Dial(rawURL string) (*Client, error) { return nil, err } - reader := bufio.NewReader(conn) - statusLine, _ := reader.ReadString('\n') - parts := strings.SplitN(statusLine, " ", 3) - if len(parts) >= 2 { - statusCode, err := strconv.Atoi(parts[1]) - if err == nil { - if statusCode == 204 { - conn.Close() - return nil, errors.New("DoorBird user has no api permission") - } - if statusCode == 503 { - conn.Close() - return nil, errors.New("DoorBird device is busy") - } + resp, _ := http.ReadResponse(bufio.NewReader(conn), nil) + if resp != nil { + switch resp.StatusCode { + case 204: + conn.Close() + return nil, errors.New("DoorBird user has no api permission") + case 503: + conn.Close() + return nil, errors.New("DoorBird device is busy") } } diff --git a/pkg/doorbird/backchannel_lock.go b/pkg/doorbird/backchannel_lock.go deleted file mode 100644 index 758320dc..00000000 --- a/pkg/doorbird/backchannel_lock.go +++ /dev/null @@ -1,5 +0,0 @@ -package doorbird - -import "sync" - -var backchannelMu sync.Mutex From a92e04b6e0885bec723b222f5d16d1ffa35a22a1 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Tue, 22 Jul 2025 20:54:24 +0200 Subject: [PATCH 14/24] added audio mixing capability to avoid device overload when multiple backchannel audio streams are connected --- pkg/doorbird/backchannel.go | 195 +++++++++++++++++++++++++++++++++--- 1 file changed, 179 insertions(+), 16 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 8a9a25d9..51b4c194 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -7,19 +7,140 @@ import ( "net" "net/http" "net/url" + "sync" "time" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/pcm" "github.com/pion/rtp" ) -var ( - clt Client -) +var clt Client + +type AudioMixer struct { + mu sync.Mutex + streams map[string]chan []byte + output chan []byte + running bool +} + +func NewAudioMixer() *AudioMixer { + return &AudioMixer{ + streams: make(map[string]chan []byte), + output: make(chan []byte, 100), + } +} + +func (m *AudioMixer) AddStream(id string) chan []byte { + m.mu.Lock() + defer m.mu.Unlock() + + if !m.running { + m.running = true + go m.mixLoop() + } + + stream := make(chan []byte, 100) + m.streams[id] = stream + return stream +} + +func (m *AudioMixer) RemoveStream(id string) { + m.mu.Lock() + defer m.mu.Unlock() + + if stream, exists := m.streams[id]; exists { + close(stream) + delete(m.streams, id) + } +} + +func (m *AudioMixer) mixLoop() { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + + for range ticker.C { + m.mu.Lock() + if len(m.streams) == 0 { + m.mu.Unlock() + continue + } + + var pcmSamples [][]int16 + activeStreams := 0 + + for _, stream := range m.streams { + select { + case data := <-stream: + if len(data) > 0 { + samples := make([]int16, len(data)) + for i, sample := range data { + samples[i] = pcm.PCMUtoPCM(sample) + } + pcmSamples = append(pcmSamples, samples) + activeStreams++ + } + default: + } + } + m.mu.Unlock() + + if activeStreams == 0 { + continue + } + + var mixedLength int + for _, samples := range pcmSamples { + if len(samples) > mixedLength { + mixedLength = len(samples) + } + } + + if mixedLength == 0 { + continue + } + + mixed := make([]int16, mixedLength) + for i := 0; i < mixedLength; i++ { + var sum int32 + var count int32 + + for _, samples := range pcmSamples { + if i < len(samples) { + sum += int32(samples[i]) + count++ + } + } + + if count > 0 { + averaged := sum / count + if averaged > 32767 { + mixed[i] = 32767 + } else if averaged < -32768 { + mixed[i] = -32768 + } else { + mixed[i] = int16(averaged) + } + } + } + + output := make([]byte, len(mixed)) + for i, sample := range mixed { + output[i] = pcm.PCMtoPCMU(sample) + } + + select { + case m.output <- output: + default: + } + } +} type Client struct { core.Connection - conn net.Conn + conn net.Conn + mixer *AudioMixer + trackMap map[*core.Sender]string } func Dial(rawURL string) (*Client, error) { @@ -85,9 +206,10 @@ func Dial(rawURL string) (*Client, error) { Protocol: "http", URL: rawURL, Medias: medias, - // Transport: conn, }, conn, + NewAudioMixer(), + make(map[*core.Sender]string), } return &clt, nil @@ -98,22 +220,35 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - if len(c.Senders) > 0 { - return errors.New("DoorBird backchannel already in use") - } - sender := core.NewSender(media, track.Codec) + trackID := fmt.Sprintf("%d", core.NewID()) + streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { - if c.conn != nil { - _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := c.conn.Write(pkt.Payload); err == nil { - c.Send += n + if c.conn != nil && len(pkt.Payload) > 0 { + select { + case streamChan <- pkt.Payload: + default: } } } - sender.HandleRTP(track) + c.trackMap[sender] = trackID + + if len(c.Senders) == 0 { + go func() { + for mixedData := range c.mixer.output { + if c.conn != nil { + _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := c.conn.Write(mixedData); err == nil { + c.Send += n + } + } + } + }() + } + + sender.WithParent(track).Start() c.Senders = append(c.Senders, sender) return nil } @@ -126,9 +261,37 @@ func (c *Client) Start() error { for { _, err := c.conn.Read(buf) if err != nil { - c.conn.Close() - c.conn = nil + c.cleanup() return err } } } + +func (c *Client) cleanup() { + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + if c.mixer != nil { + c.mixer.mu.Lock() + for id := range c.mixer.streams { + if stream, exists := c.mixer.streams[id]; exists { + close(stream) + } + } + c.mixer.streams = make(map[string]chan []byte) + close(c.mixer.output) + c.mixer.running = false + c.mixer.mu.Unlock() + } + + c.trackMap = make(map[*core.Sender]string) +} + +func (c *Client) RemoveTrack(sender *core.Sender) { + if trackID, exists := c.trackMap[sender]; exists { + c.mixer.RemoveStream(trackID) + delete(c.trackMap, sender) + } +} From 7d2ad92c4b4c426062cb48f22c38110a6cc4ce30 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Mon, 28 Jul 2025 22:27:38 +0200 Subject: [PATCH 15/24] fix app crashes remove orphaned streams --- pkg/doorbird/backchannel.go | 107 +++++++++++++++++++++++++++++++----- 1 file changed, 92 insertions(+), 15 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 51b4c194..8cdd0136 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -22,6 +22,7 @@ type AudioMixer struct { streams map[string]chan []byte output chan []byte running bool + closed bool } func NewAudioMixer() *AudioMixer { @@ -35,6 +36,12 @@ func (m *AudioMixer) AddStream(id string) chan []byte { m.mu.Lock() defer m.mu.Unlock() + if m.closed { + ch := make(chan []byte) + close(ch) + return ch + } + if !m.running { m.running = true go m.mixLoop() @@ -138,9 +145,11 @@ func (m *AudioMixer) mixLoop() { type Client struct { core.Connection - conn net.Conn - mixer *AudioMixer - trackMap map[*core.Sender]string + conn net.Conn + mixer *AudioMixer + trackMap map[*core.Sender]string + senderStats map[*core.Sender]time.Time + mu sync.RWMutex } func Dial(rawURL string) (*Client, error) { @@ -200,16 +209,17 @@ func Dial(rawURL string) (*Client, error) { } clt = Client{ - core.Connection{ + Connection: core.Connection{ ID: core.NewID(), FormatName: "doorbird", Protocol: "http", URL: rawURL, Medias: medias, }, - conn, - NewAudioMixer(), - make(map[*core.Sender]string), + conn: conn, + mixer: NewAudioMixer(), + trackMap: make(map[*core.Sender]string), + senderStats: make(map[*core.Sender]time.Time), } return &clt, nil @@ -220,20 +230,31 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + c.mu.Lock() + defer c.mu.Unlock() + sender := core.NewSender(media, track.Codec) trackID := fmt.Sprintf("%d", core.NewID()) streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { - if c.conn != nil && len(pkt.Payload) > 0 { + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn != nil && len(pkt.Payload) > 0 { select { case streamChan <- pkt.Payload: + c.mu.Lock() + c.senderStats[sender] = time.Now() + c.mu.Unlock() default: } } } c.trackMap[sender] = trackID + c.senderStats[sender] = time.Now() if len(c.Senders) == 0 { go func() { @@ -257,6 +278,15 @@ func (c *Client) Start() error { if c.conn == nil { return nil } + + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for range ticker.C { + c.cleanupOrphanedSenders() + } + }() + buf := make([]byte, 1) for { _, err := c.conn.Read(buf) @@ -268,6 +298,9 @@ func (c *Client) Start() error { } func (c *Client) cleanup() { + c.mu.Lock() + defer c.mu.Unlock() + if c.conn != nil { c.conn.Close() c.conn = nil @@ -275,23 +308,67 @@ func (c *Client) cleanup() { if c.mixer != nil { c.mixer.mu.Lock() - for id := range c.mixer.streams { - if stream, exists := c.mixer.streams[id]; exists { - close(stream) - } + c.mixer.closed = true + for id, stream := range c.mixer.streams { + close(stream) + delete(c.mixer.streams, id) + } + if c.mixer.running { + close(c.mixer.output) + c.mixer.running = false } - c.mixer.streams = make(map[string]chan []byte) - close(c.mixer.output) - c.mixer.running = false c.mixer.mu.Unlock() } c.trackMap = make(map[*core.Sender]string) + c.senderStats = make(map[*core.Sender]time.Time) +} + +func (c *Client) cleanupOrphanedSenders() { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + removedCount := 0 + validIndex := 0 + + for i, sender := range c.Senders { + lastActivity, exists := c.senderStats[sender] + if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= 5*time.Second { + if trackID, exists := c.trackMap[sender]; exists { + c.mixer.RemoveStream(trackID) + delete(c.trackMap, sender) + } + delete(c.senderStats, sender) + sender.Close() + removedCount++ + } else { + c.Senders[validIndex] = c.Senders[i] + validIndex++ + } + } + + c.Senders = c.Senders[:validIndex] + + if removedCount > 0 { + fmt.Printf("DoorBird: Cleaned up %d orphaned senders, %d remain active\n", removedCount, validIndex) + } } func (c *Client) RemoveTrack(sender *core.Sender) { + c.mu.Lock() + defer c.mu.Unlock() + if trackID, exists := c.trackMap[sender]; exists { c.mixer.RemoveStream(trackID) delete(c.trackMap, sender) } + delete(c.senderStats, sender) + + for i, s := range c.Senders { + if s == sender { + c.Senders = append(c.Senders[:i], c.Senders[i+1:]...) + break + } + } } From 3d38e5e567329d24d5ad87baf0729d841eaf3ad5 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Wed, 30 Jul 2025 23:37:06 +0200 Subject: [PATCH 16/24] fix unexpected close of backchannel streams --- pkg/doorbird/backchannel.go | 69 ++++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 9 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 8cdd0136..5e5e8834 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -15,7 +15,10 @@ import ( "github.com/pion/rtp" ) -var clt Client +var ( + cltMu sync.Mutex + cltMap = make(map[string]*Client) +) type AudioMixer struct { mu sync.Mutex @@ -68,6 +71,11 @@ func (m *AudioMixer) mixLoop() { for range ticker.C { m.mu.Lock() + if m.closed { + m.mu.Unlock() + return + } + if len(m.streams) == 0 { m.mu.Unlock() continue @@ -153,8 +161,12 @@ type Client struct { } func Dial(rawURL string) (*Client, error) { - if clt.conn != nil { - return &clt, nil + cltMu.Lock() + defer cltMu.Unlock() + + // Check if we already have a client for this URL + if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil { + return existingClient, nil } u, err := url.Parse(rawURL) @@ -183,6 +195,7 @@ func Dial(rawURL string) (*Client, error) { _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) if _, err = conn.Write([]byte(s)); err != nil { + conn.Close() return nil, err } @@ -208,7 +221,7 @@ func Dial(rawURL string) (*Client, error) { }, } - clt = Client{ + client := &Client{ Connection: core.Connection{ ID: core.NewID(), FormatName: "doorbird", @@ -222,7 +235,10 @@ func Dial(rawURL string) (*Client, error) { senderStats: make(map[*core.Sender]time.Time), } - return &clt, nil + // Store the client in the map + cltMap[rawURL] = client + + return client, nil } func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { @@ -238,17 +254,22 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { + if len(pkt.Payload) == 0 { + return + } + c.mu.RLock() conn := c.conn c.mu.RUnlock() - if conn != nil && len(pkt.Payload) > 0 { + if conn != nil { select { case streamChan <- pkt.Payload: c.mu.Lock() c.senderStats[sender] = time.Now() c.mu.Unlock() default: + // Channel is full, skip this packet } } } @@ -258,11 +279,24 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece if len(c.Senders) == 0 { go func() { + defer func() { + if r := recover(); r != nil { + // Recover from any panics when mixer is closed + } + }() + for mixedData := range c.mixer.output { - if c.conn != nil { - _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := c.conn.Write(mixedData); err == nil { + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn != nil && len(mixedData) > 0 { + _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := conn.Write(mixedData); err == nil { c.Send += n + } else { + // Connection failed, break out of loop + break } } } @@ -289,9 +323,15 @@ func (c *Client) Start() error { buf := make([]byte, 1) for { + // Set read deadline to detect connection issues + _ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) _, err := c.conn.Read(buf) if err != nil { c.cleanup() + // Remove this client from the global map + cltMu.Lock() + delete(cltMap, c.URL) + cltMu.Unlock() return err } } @@ -320,8 +360,19 @@ func (c *Client) cleanup() { c.mixer.mu.Unlock() } + // Close all senders + for _, sender := range c.Senders { + sender.Close() + } + c.Senders = nil + c.trackMap = make(map[*core.Sender]string) c.senderStats = make(map[*core.Sender]time.Time) + + // Remove from global map + cltMu.Lock() + delete(cltMap, c.URL) + cltMu.Unlock() } func (c *Client) cleanupOrphanedSenders() { From 975a43d39276bd61ef3ed0f9c2eb1adb66b5e60d Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Thu, 31 Jul 2025 21:07:45 +0200 Subject: [PATCH 17/24] reduce audio delay by lowering buffer size --- pkg/doorbird/backchannel.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 5e5e8834..dc66ee0e 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -15,6 +15,14 @@ import ( "github.com/pion/rtp" ) +const ( + AudioMixerInterval = 10 * time.Millisecond + AudioChannelBuffer = 10 + OutputChannelBuffer = 10 + SenderCleanupInterval = 5 * time.Second + SenderTimeoutDuration = 5 * time.Second +) + var ( cltMu sync.Mutex cltMap = make(map[string]*Client) @@ -31,7 +39,7 @@ type AudioMixer struct { func NewAudioMixer() *AudioMixer { return &AudioMixer{ streams: make(map[string]chan []byte), - output: make(chan []byte, 100), + output: make(chan []byte, OutputChannelBuffer), } } @@ -50,7 +58,7 @@ func (m *AudioMixer) AddStream(id string) chan []byte { go m.mixLoop() } - stream := make(chan []byte, 100) + stream := make(chan []byte, AudioChannelBuffer) m.streams[id] = stream return stream } @@ -66,7 +74,7 @@ func (m *AudioMixer) RemoveStream(id string) { } func (m *AudioMixer) mixLoop() { - ticker := time.NewTicker(20 * time.Millisecond) + ticker := time.NewTicker(AudioMixerInterval) defer ticker.Stop() for range ticker.C { @@ -164,7 +172,6 @@ func Dial(rawURL string) (*Client, error) { cltMu.Lock() defer cltMu.Unlock() - // Check if we already have a client for this URL if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil { return existingClient, nil } @@ -235,7 +242,6 @@ func Dial(rawURL string) (*Client, error) { senderStats: make(map[*core.Sender]time.Time), } - // Store the client in the map cltMap[rawURL] = client return client, nil @@ -269,7 +275,6 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece c.senderStats[sender] = time.Now() c.mu.Unlock() default: - // Channel is full, skip this packet } } } @@ -281,7 +286,6 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece go func() { defer func() { if r := recover(); r != nil { - // Recover from any panics when mixer is closed } }() @@ -295,7 +299,6 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece if n, err := conn.Write(mixedData); err == nil { c.Send += n } else { - // Connection failed, break out of loop break } } @@ -314,7 +317,7 @@ func (c *Client) Start() error { } go func() { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(SenderCleanupInterval) defer ticker.Stop() for range ticker.C { c.cleanupOrphanedSenders() @@ -323,12 +326,10 @@ func (c *Client) Start() error { buf := make([]byte, 1) for { - // Set read deadline to detect connection issues _ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) _, err := c.conn.Read(buf) if err != nil { c.cleanup() - // Remove this client from the global map cltMu.Lock() delete(cltMap, c.URL) cltMu.Unlock() @@ -360,7 +361,6 @@ func (c *Client) cleanup() { c.mixer.mu.Unlock() } - // Close all senders for _, sender := range c.Senders { sender.Close() } @@ -369,7 +369,6 @@ func (c *Client) cleanup() { c.trackMap = make(map[*core.Sender]string) c.senderStats = make(map[*core.Sender]time.Time) - // Remove from global map cltMu.Lock() delete(cltMap, c.URL) cltMu.Unlock() @@ -385,7 +384,7 @@ func (c *Client) cleanupOrphanedSenders() { for i, sender := range c.Senders { lastActivity, exists := c.senderStats[sender] - if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= 5*time.Second { + if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= SenderTimeoutDuration { if trackID, exists := c.trackMap[sender]; exists { c.mixer.RemoveStream(trackID) delete(c.trackMap, sender) From f2242e31c8d3757e589b8a7dff0e4b1ae8ab66fe Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Tue, 19 Aug 2025 07:53:10 +0200 Subject: [PATCH 18/24] impove connection timeout to prevent reconnections after 30 seconds --- pkg/doorbird/backchannel.go | 41 +++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index dc66ee0e..a49130e5 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -21,6 +21,8 @@ const ( OutputChannelBuffer = 10 SenderCleanupInterval = 5 * time.Second SenderTimeoutDuration = 5 * time.Second + ConnectionReadTimeout = 5 * time.Minute + HeartbeatInterval = 30 * time.Second ) var ( @@ -244,6 +246,8 @@ func Dial(rawURL string) (*Client, error) { cltMap[rawURL] = client + fmt.Printf("DoorBird: New connection established to %s\n", rawURL) + return client, nil } @@ -299,6 +303,7 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece if n, err := conn.Write(mixedData); err == nil { c.Send += n } else { + fmt.Printf("DoorBird: Write error, breaking audio loop: %v\n", err) break } } @@ -324,17 +329,47 @@ func (c *Client) Start() error { } }() + // Start a heartbeat goroutine to periodically check connection health + go func() { + heartbeat := time.NewTicker(HeartbeatInterval) + defer heartbeat.Stop() + + for range heartbeat.C { + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn != nil { + // Try to write a small amount of silence to keep connection alive + silence := make([]byte, 160) // 20ms of silence at 8kHz + _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if _, err := conn.Write(silence); err != nil { + fmt.Printf("DoorBird: Heartbeat write failed: %v\n", err) + // Don't break here, let the main read loop handle it + } + } + } + }() + + // The main loop now just monitors for any unexpected data or connection errors + // DoorBird typically doesn't send data back, so we use a very long timeout buf := make([]byte, 1) + connectionStart := time.Now() for { - _ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) - _, err := c.conn.Read(buf) + _ = c.conn.SetReadDeadline(time.Now().Add(ConnectionReadTimeout)) + n, err := c.conn.Read(buf) if err != nil { + elapsed := time.Since(connectionStart) + fmt.Printf("DoorBird: Connection failed after %v, error: %v\n", elapsed, err) c.cleanup() cltMu.Lock() delete(cltMap, c.URL) cltMu.Unlock() return err } + if n > 0 { + fmt.Printf("DoorBird: Unexpected data received: %v\n", buf[:n]) + } } } @@ -342,6 +377,8 @@ func (c *Client) cleanup() { c.mu.Lock() defer c.mu.Unlock() + fmt.Printf("DoorBird: Starting cleanup for connection %s\n", c.URL) + if c.conn != nil { c.conn.Close() c.conn = nil From 22cc8ac2c49b636423ee785844a71c761634d736 Mon Sep 17 00:00:00 2001 From: Alex X Date: Wed, 1 Oct 2025 16:57:39 +0300 Subject: [PATCH 19/24] Code refactoring for #1762 --- internal/streams/api.go | 80 ++++++++----------------- internal/streams/preload.go | 56 +++++++++++++----- internal/streams/streams.go | 16 +++-- pkg/preload/consumer.go | 82 -------------------------- pkg/probe/{producer.go => consumer.go} | 33 ++--------- 5 files changed, 79 insertions(+), 188 deletions(-) delete mode 100644 pkg/preload/consumer.go rename pkg/probe/{producer.go => consumer.go} (60%) diff --git a/internal/streams/api.go b/internal/streams/api.go index 1b91f906..d162cdf9 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -5,6 +5,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/app" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/probe" ) @@ -27,7 +28,7 @@ func apiStreams(w http.ResponseWriter, r *http.Request) { return } - cons := probe.NewProbe(query) + cons := probe.Create("probe", query) if len(cons.Medias) != 0 { cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { @@ -126,73 +127,44 @@ func apiStreamsDOT(w http.ResponseWriter, r *http.Request) { func apiPreload(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() src := query.Get("src") - query.Del("src") - if src == "" { - http.Error(w, "no source", http.StatusBadRequest) + // check if stream exists + stream := Get(src) + if stream == nil { + http.Error(w, "", http.StatusNotFound) return } switch r.Method { case "PUT": - // check if stream exists - stream := Get(src) - if stream == nil { - http.Error(w, "stream not found", http.StatusNotFound) + // it's safe to delete from map while iterating + for k := range query { + switch k { + case core.KindVideo, core.KindAudio, "microphone": + default: + delete(query, k) + } + } + + rawQuery := query.Encode() + + if err := AddPreload(stream, rawQuery); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } - // check if consumer exists - if cons, ok := preloads[src]; ok { - stream.RemoveConsumer(cons) - delete(preloads, src) - } - - // parse query parameters - var rawQuery string - if query.Has("video") { - if videoQuery := query.Get("video"); videoQuery != "" { - rawQuery += "video=" + videoQuery + "#" - } else { - rawQuery += "video#" - } - } - if query.Has("audio") { - if audioQuery := query.Get("audio"); audioQuery != "" { - rawQuery += "audio=" + audioQuery + "#" - } else { - rawQuery += "audio#" - } - } - if query.Has("microphone") { - if micQuery := query.Get("microphone"); micQuery != "" { - rawQuery += "microphone=" + micQuery + "#" - } else { - rawQuery += "microphone#" - } - } - if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil { - log.Error().Err(err).Str("src", src).Msg("Failed to patch config for PUT") - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + case "DELETE": + if err := DelPreload(stream); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } - Preload(src, rawQuery) - - case "DELETE": - if cons, ok := preloads[src]; ok { - if stream := Get(src); stream != nil { - stream.RemoveConsumer(cons) - } else { - cons.Stop() - } - - delete(preloads, src) - } - if err := app.PatchConfig([]string{"preload", src}, nil); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusInternalServerError) } default: diff --git a/internal/streams/preload.go b/internal/streams/preload.go index 7314df55..527746ac 100644 --- a/internal/streams/preload.go +++ b/internal/streams/preload.go @@ -1,34 +1,58 @@ package streams import ( + "errors" "net/url" + "sync" - "github.com/AlexxIT/go2rtc/pkg/preload" + "github.com/AlexxIT/go2rtc/pkg/probe" ) -var preloads = map[string]*preload.Preload{} +var preloads = map[*Stream]*probe.Probe{} +var preloadsMu sync.Mutex -func (s *Stream) Preload(name string, query url.Values) error { - cons := preload.NewPreload(name, query) - preloads[name] = cons +func Preload(stream *Stream, rawQuery string) { + if err := AddPreload(stream, rawQuery); err != nil { + log.Error().Err(err).Caller().Send() + } +} - if err := s.AddConsumer(cons); err != nil { +func AddPreload(stream *Stream, rawQuery string) error { + if rawQuery == "" { + rawQuery = "video&audio" + } + + query, err := url.ParseQuery(rawQuery) + if err != nil { return err } + preloadsMu.Lock() + defer preloadsMu.Unlock() + + if cons := preloads[stream]; cons != nil { + stream.RemoveConsumer(cons) + } + + cons := probe.Create("preload", query) + + if err = stream.AddConsumer(cons); err != nil { + return err + } + + preloads[stream] = cons return nil } -func Preload(src string, rawQuery string) { - // skip if exists - if _, ok := preloads[src]; ok { - return +func DelPreload(stream *Stream) error { + preloadsMu.Lock() + defer preloadsMu.Unlock() + + if cons := preloads[stream]; cons != nil { + stream.RemoveConsumer(cons) + delete(preloads, stream) + return nil } - if stream := Get(src); stream != nil { - query := ParseQuery(rawQuery) - if err := stream.Preload(src, query); err != nil { - log.Error().Err(err).Caller().Send() - } - } + return errors.New("streams: preload not found") } diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 8f07ea12..a0b1ed68 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -36,17 +36,15 @@ func Init() { } time.AfterFunc(time.Second, func() { - if cfg.Publish != nil { - for name, dst := range cfg.Publish { - if stream := Get(name); stream != nil { - Publish(stream, dst) - } + // range for nil map is OK + for name, dst := range cfg.Publish { + if stream := Get(name); stream != nil { + Publish(stream, dst) } } - - if cfg.Preload != nil { - for name, rawQuery := range cfg.Preload { - Preload(name, rawQuery) + for name, rawQuery := range cfg.Preload { + if stream := Get(name); stream != nil { + Preload(stream, rawQuery) } } }) diff --git a/pkg/preload/consumer.go b/pkg/preload/consumer.go deleted file mode 100644 index 4d3735a8..00000000 --- a/pkg/preload/consumer.go +++ /dev/null @@ -1,82 +0,0 @@ -package preload - -import ( - "net/url" - "strings" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/pion/rtp" -) - -type Preload struct { - core.Connection - closed core.Waiter -} - -func NewPreload(name string, query url.Values) *Preload { - medias := core.ParseQuery(query) - - for _, value := range query["microphone"] { - media := &core.Media{Kind: core.KindAudio, Direction: core.DirectionRecvonly} - - for _, name := range strings.Split(value, ",") { - name = strings.ToUpper(name) - switch name { - case "", "COPY": - name = core.CodecAny - } - media.Codecs = append(media.Codecs, &core.Codec{Name: name}) - } - - medias = append(medias, media) - } - - if len(medias) == 0 { - medias = []*core.Media{ - { - Kind: core.KindVideo, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{{Name: core.CodecAny}}, - }, - { - Kind: core.KindAudio, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{{Name: core.CodecAny}}, - }, - } - } - - return &Preload{ - Connection: core.Connection{ - ID: core.NewID(), - FormatName: "preload", - Medias: medias, - }, - } -} - -func (p *Preload) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - sender := core.NewSender(media, track.Codec) - sender.Handler = func(pkt *rtp.Packet) { - p.Send += pkt.MarshalSize() - } - sender.HandleRTP(track) - p.Senders = append(p.Senders, sender) - return nil -} - -func (p *Preload) Start() error { - p.closed.Wait() - return nil -} - -func (p *Preload) Stop() error { - for _, receiver := range p.Receivers { - receiver.Close() - } - for _, sender := range p.Senders { - sender.Close() - } - p.closed.Done(nil) - return nil -} diff --git a/pkg/probe/producer.go b/pkg/probe/consumer.go similarity index 60% rename from pkg/probe/producer.go rename to pkg/probe/consumer.go index 1fbd3efb..c6aa4478 100644 --- a/pkg/probe/producer.go +++ b/pkg/probe/consumer.go @@ -11,7 +11,7 @@ type Probe struct { core.Connection } -func NewProbe(query url.Values) *Probe { +func Create(name string, query url.Values) *Probe { medias := core.ParseQuery(query) for _, value := range query["microphone"] { @@ -32,39 +32,18 @@ func NewProbe(query url.Values) *Probe { return &Probe{ Connection: core.Connection{ ID: core.NewID(), - FormatName: "probe", + FormatName: name, Medias: medias, }, } } -func (p *Probe) GetMedias() []*core.Media { - return p.Medias -} - func (p *Probe) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { sender := core.NewSender(media, track.Codec) - sender.Bind(track) + sender.Handler = func(pkt *core.Packet) { + p.Send += len(pkt.Payload) + } + sender.HandleRTP(track) p.Senders = append(p.Senders, sender) return nil } - -func (p *Probe) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - receiver := core.NewReceiver(media, codec) - p.Receivers = append(p.Receivers, receiver) - return receiver, nil -} - -func (p *Probe) Start() error { - return nil -} - -func (p *Probe) Stop() error { - for _, receiver := range p.Receivers { - receiver.Close() - } - for _, sender := range p.Senders { - sender.Close() - } - return nil -} From 4dd1f73a189865051c49d73cc6dbbc28b0185018 Mon Sep 17 00:00:00 2001 From: Alex X Date: Wed, 1 Oct 2025 17:03:32 +0300 Subject: [PATCH 20/24] Update readme for #1762 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index dfbe79b0..5b3754c9 100644 --- a/README.md +++ b/README.md @@ -844,7 +844,7 @@ You can preload any stream on go2rtc start. This is useful for cameras that take preload: camera1: # default: video&audio = ANY camera2: "video" # preload only video track - camera3: "video=h264#audio=opus" # initialize transcoding pipeline + camera3: "video=h264&audio=opus" # preload H264 video and OPUS audio streams: camera1: From 54b95dced4ddee1a2e862f94365b7ad50ab1bff8 Mon Sep 17 00:00:00 2001 From: Alex X Date: Sat, 4 Oct 2025 19:18:36 +0300 Subject: [PATCH 21/24] Fix probing after #1762 --- pkg/probe/consumer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/probe/consumer.go b/pkg/probe/consumer.go index c6aa4478..a1ca7ca5 100644 --- a/pkg/probe/consumer.go +++ b/pkg/probe/consumer.go @@ -47,3 +47,7 @@ func (p *Probe) AddTrack(media *core.Media, codec *core.Codec, track *core.Recei p.Senders = append(p.Senders, sender) return nil } + +func (p *Probe) Start() error { + return nil +} From ec08dfee9c0d4c0c6ae620ca1cc64c8d44be223d Mon Sep 17 00:00:00 2001 From: Alex X Date: Sat, 4 Oct 2025 19:19:01 +0300 Subject: [PATCH 22/24] Fix stack API for new pion version --- internal/debug/stack.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/debug/stack.go b/internal/debug/stack.go index f8d62772..6bc735ad 100644 --- a/internal/debug/stack.go +++ b/internal/debug/stack.go @@ -29,8 +29,8 @@ var stackSkip = [][]byte{ []byte("created by github.com/AlexxIT/go2rtc/internal/homekit.Init"), // webrtc/api.go - []byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"), - []byte("created by github.com/pion/ice/v2.NewUDPMuxDefault"), + []byte("created by github.com/pion/ice/v4.NewTCPMuxDefault"), + []byte("created by github.com/pion/ice/v4.NewUDPMuxDefault"), } func stackHandler(w http.ResponseWriter, r *http.Request) { From 887f0f48905459d1ad6f2e94bbe33b2450980a8d Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Sat, 4 Oct 2025 21:37:19 +0200 Subject: [PATCH 23/24] fix connection handling in conjunction with doorbird backchannel --- pkg/doorbird/backchannel.go | 397 ++---------------------------------- 1 file changed, 16 insertions(+), 381 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index a49130e5..28eb5b69 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -1,183 +1,21 @@ package doorbird import ( - "bufio" - "errors" "fmt" "net" - "net/http" "net/url" - "sync" "time" "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/pcm" "github.com/pion/rtp" ) -const ( - AudioMixerInterval = 10 * time.Millisecond - AudioChannelBuffer = 10 - OutputChannelBuffer = 10 - SenderCleanupInterval = 5 * time.Second - SenderTimeoutDuration = 5 * time.Second - ConnectionReadTimeout = 5 * time.Minute - HeartbeatInterval = 30 * time.Second -) - -var ( - cltMu sync.Mutex - cltMap = make(map[string]*Client) -) - -type AudioMixer struct { - mu sync.Mutex - streams map[string]chan []byte - output chan []byte - running bool - closed bool -} - -func NewAudioMixer() *AudioMixer { - return &AudioMixer{ - streams: make(map[string]chan []byte), - output: make(chan []byte, OutputChannelBuffer), - } -} - -func (m *AudioMixer) AddStream(id string) chan []byte { - m.mu.Lock() - defer m.mu.Unlock() - - if m.closed { - ch := make(chan []byte) - close(ch) - return ch - } - - if !m.running { - m.running = true - go m.mixLoop() - } - - stream := make(chan []byte, AudioChannelBuffer) - m.streams[id] = stream - return stream -} - -func (m *AudioMixer) RemoveStream(id string) { - m.mu.Lock() - defer m.mu.Unlock() - - if stream, exists := m.streams[id]; exists { - close(stream) - delete(m.streams, id) - } -} - -func (m *AudioMixer) mixLoop() { - ticker := time.NewTicker(AudioMixerInterval) - defer ticker.Stop() - - for range ticker.C { - m.mu.Lock() - if m.closed { - m.mu.Unlock() - return - } - - if len(m.streams) == 0 { - m.mu.Unlock() - continue - } - - var pcmSamples [][]int16 - activeStreams := 0 - - for _, stream := range m.streams { - select { - case data := <-stream: - if len(data) > 0 { - samples := make([]int16, len(data)) - for i, sample := range data { - samples[i] = pcm.PCMUtoPCM(sample) - } - pcmSamples = append(pcmSamples, samples) - activeStreams++ - } - default: - } - } - m.mu.Unlock() - - if activeStreams == 0 { - continue - } - - var mixedLength int - for _, samples := range pcmSamples { - if len(samples) > mixedLength { - mixedLength = len(samples) - } - } - - if mixedLength == 0 { - continue - } - - mixed := make([]int16, mixedLength) - for i := 0; i < mixedLength; i++ { - var sum int32 - var count int32 - - for _, samples := range pcmSamples { - if i < len(samples) { - sum += int32(samples[i]) - count++ - } - } - - if count > 0 { - averaged := sum / count - if averaged > 32767 { - mixed[i] = 32767 - } else if averaged < -32768 { - mixed[i] = -32768 - } else { - mixed[i] = int16(averaged) - } - } - } - - output := make([]byte, len(mixed)) - for i, sample := range mixed { - output[i] = pcm.PCMtoPCMU(sample) - } - - select { - case m.output <- output: - default: - } - } -} - type Client struct { core.Connection - conn net.Conn - mixer *AudioMixer - trackMap map[*core.Sender]string - senderStats map[*core.Sender]time.Time - mu sync.RWMutex + conn net.Conn } func Dial(rawURL string) (*Client, error) { - cltMu.Lock() - defer cltMu.Unlock() - - if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil { - return existingClient, nil - } - u, err := url.Parse(rawURL) if err != nil { return nil, err @@ -204,22 +42,9 @@ func Dial(rawURL string) (*Client, error) { _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) if _, err = conn.Write([]byte(s)); err != nil { - conn.Close() return nil, err } - resp, _ := http.ReadResponse(bufio.NewReader(conn), nil) - if resp != nil { - switch resp.StatusCode { - case 204: - conn.Close() - return nil, errors.New("DoorBird user has no api permission") - case 503: - conn.Close() - return nil, errors.New("DoorBird device is busy") - } - } - medias := []*core.Media{ { Kind: core.KindAudio, @@ -230,25 +55,17 @@ func Dial(rawURL string) (*Client, error) { }, } - client := &Client{ - Connection: core.Connection{ + return &Client{ + core.Connection{ ID: core.NewID(), FormatName: "doorbird", Protocol: "http", URL: rawURL, Medias: medias, + Transport: conn, }, - conn: conn, - mixer: NewAudioMixer(), - trackMap: make(map[*core.Sender]string), - senderStats: make(map[*core.Sender]time.Time), - } - - cltMap[rawURL] = client - - fmt.Printf("DoorBird: New connection established to %s\n", rawURL) - - return client, nil + conn, + }, nil } func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { @@ -256,206 +73,24 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - c.mu.Lock() - defer c.mu.Unlock() - sender := core.NewSender(media, track.Codec) - trackID := fmt.Sprintf("%d", core.NewID()) - streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { - if len(pkt.Payload) == 0 { - return - } - - c.mu.RLock() - conn := c.conn - c.mu.RUnlock() - - if conn != nil { - select { - case streamChan <- pkt.Payload: - c.mu.Lock() - c.senderStats[sender] = time.Now() - c.mu.Unlock() - default: - } + _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := c.conn.Write(pkt.Payload); err == nil { + c.Send += n } } - c.trackMap[sender] = trackID - c.senderStats[sender] = time.Now() - - if len(c.Senders) == 0 { - go func() { - defer func() { - if r := recover(); r != nil { - } - }() - - for mixedData := range c.mixer.output { - c.mu.RLock() - conn := c.conn - c.mu.RUnlock() - - if conn != nil && len(mixedData) > 0 { - _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := conn.Write(mixedData); err == nil { - c.Send += n - } else { - fmt.Printf("DoorBird: Write error, breaking audio loop: %v\n", err) - break - } - } - } - }() - } - - sender.WithParent(track).Start() + sender.HandleRTP(track) c.Senders = append(c.Senders, sender) return nil } -func (c *Client) Start() error { - if c.conn == nil { - return nil - } - - go func() { - ticker := time.NewTicker(SenderCleanupInterval) - defer ticker.Stop() - for range ticker.C { - c.cleanupOrphanedSenders() - } - }() - - // Start a heartbeat goroutine to periodically check connection health - go func() { - heartbeat := time.NewTicker(HeartbeatInterval) - defer heartbeat.Stop() - - for range heartbeat.C { - c.mu.RLock() - conn := c.conn - c.mu.RUnlock() - - if conn != nil { - // Try to write a small amount of silence to keep connection alive - silence := make([]byte, 160) // 20ms of silence at 8kHz - _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if _, err := conn.Write(silence); err != nil { - fmt.Printf("DoorBird: Heartbeat write failed: %v\n", err) - // Don't break here, let the main read loop handle it - } - } - } - }() - - // The main loop now just monitors for any unexpected data or connection errors - // DoorBird typically doesn't send data back, so we use a very long timeout - buf := make([]byte, 1) - connectionStart := time.Now() - for { - _ = c.conn.SetReadDeadline(time.Now().Add(ConnectionReadTimeout)) - n, err := c.conn.Read(buf) - if err != nil { - elapsed := time.Since(connectionStart) - fmt.Printf("DoorBird: Connection failed after %v, error: %v\n", elapsed, err) - c.cleanup() - cltMu.Lock() - delete(cltMap, c.URL) - cltMu.Unlock() - return err - } - if n > 0 { - fmt.Printf("DoorBird: Unexpected data received: %v\n", buf[:n]) - } - } -} - -func (c *Client) cleanup() { - c.mu.Lock() - defer c.mu.Unlock() - - fmt.Printf("DoorBird: Starting cleanup for connection %s\n", c.URL) - - if c.conn != nil { - c.conn.Close() - c.conn = nil - } - - if c.mixer != nil { - c.mixer.mu.Lock() - c.mixer.closed = true - for id, stream := range c.mixer.streams { - close(stream) - delete(c.mixer.streams, id) - } - if c.mixer.running { - close(c.mixer.output) - c.mixer.running = false - } - c.mixer.mu.Unlock() - } - - for _, sender := range c.Senders { - sender.Close() - } - c.Senders = nil - - c.trackMap = make(map[*core.Sender]string) - c.senderStats = make(map[*core.Sender]time.Time) - - cltMu.Lock() - delete(cltMap, c.URL) - cltMu.Unlock() -} - -func (c *Client) cleanupOrphanedSenders() { - c.mu.Lock() - defer c.mu.Unlock() - - now := time.Now() - removedCount := 0 - validIndex := 0 - - for i, sender := range c.Senders { - lastActivity, exists := c.senderStats[sender] - if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= SenderTimeoutDuration { - if trackID, exists := c.trackMap[sender]; exists { - c.mixer.RemoveStream(trackID) - delete(c.trackMap, sender) - } - delete(c.senderStats, sender) - sender.Close() - removedCount++ - } else { - c.Senders[validIndex] = c.Senders[i] - validIndex++ - } - } - - c.Senders = c.Senders[:validIndex] - - if removedCount > 0 { - fmt.Printf("DoorBird: Cleaned up %d orphaned senders, %d remain active\n", removedCount, validIndex) - } -} - -func (c *Client) RemoveTrack(sender *core.Sender) { - c.mu.Lock() - defer c.mu.Unlock() - - if trackID, exists := c.trackMap[sender]; exists { - c.mixer.RemoveStream(trackID) - delete(c.trackMap, sender) - } - delete(c.senderStats, sender) - - for i, s := range c.Senders { - if s == sender { - c.Senders = append(c.Senders[:i], c.Senders[i+1:]...) - break - } - } +func (c *Client) Start() (err error) { + _, err = c.conn.Read(nil) + // just block until c.conn closed + b := make([]byte, 1) + _, _ = c.conn.Read(b) + return } From 94b7c33485ec29052c6521e4646de9ea6162a438 Mon Sep 17 00:00:00 2001 From: Alex X Date: Sun, 5 Oct 2025 16:00:58 +0300 Subject: [PATCH 24/24] Update backchannel.go Code refactoring for #1895 --- pkg/doorbird/backchannel.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 28eb5b69..4d252228 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -88,9 +88,8 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece } func (c *Client) Start() (err error) { - _, err = c.conn.Read(nil) // just block until c.conn closed b := make([]byte, 1) - _, _ = c.conn.Read(b) + _, err = c.conn.Read(b) return }