From 06fc9717dfdfa0a75d064959f4af9600774b0b88 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Wed, 1 Mar 2023 16:19:17 +0300 Subject: [PATCH] Add new Waiter class --- pkg/core/helpers.go | 53 ++++++++++++++++++++++++++++++++++++++++++ pkg/webrtc/conn.go | 13 ++++------- pkg/webrtc/producer.go | 2 +- 3 files changed, 59 insertions(+), 9 deletions(-) create mode 100644 pkg/core/helpers.go diff --git a/pkg/core/helpers.go b/pkg/core/helpers.go new file mode 100644 index 00000000..06fc65fd --- /dev/null +++ b/pkg/core/helpers.go @@ -0,0 +1,53 @@ +package core + +import ( + "sync" +) + +// Waiter support: +// - autotart on first Wait +// - block new waiters after last Done +// - safe Done after finish +type Waiter struct { + sync.WaitGroup + mu sync.Mutex + state int // state < 0 means finish +} + +func (w *Waiter) Add(delta int) { + w.mu.Lock() + if w.state >= 0 { + w.state += delta + w.WaitGroup.Add(delta) + } + w.mu.Unlock() +} + +func (w *Waiter) Wait() { + w.mu.Lock() + // first wait auto start waiter + if w.state == 0 { + w.state++ + w.WaitGroup.Add(1) + } + w.mu.Unlock() + + w.WaitGroup.Wait() +} + +func (w *Waiter) Done() { + w.mu.Lock() + + // safe run Done only when have tasks + if w.state > 0 { + w.state-- + w.WaitGroup.Done() + } + + // block waiter for any operations after last done + if w.state == 0 { + w.state = -1 + } + + w.mu.Unlock() +} diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index f11f4892..db4ee888 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -1,6 +1,7 @@ package webrtc import ( + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/pion/webrtc/v3" ) @@ -18,12 +19,12 @@ type Conn struct { receive int send int - offer string - start chan struct{} + offer string + closed core.Waiter } func NewConn(pc *webrtc.PeerConnection) *Conn { - c := &Conn{pc: pc, start: make(chan struct{})} + c := &Conn{pc: pc} pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { c.Fire(candidate) @@ -77,11 +78,7 @@ func NewConn(pc *webrtc.PeerConnection) *Conn { } func (c *Conn) Close() error { - // unblocked write to chan - select { - case c.start <- struct{}{}: - default: - } + c.closed.Done() return c.pc.Close() } diff --git a/pkg/webrtc/producer.go b/pkg/webrtc/producer.go index 1055f803..0a7a913b 100644 --- a/pkg/webrtc/producer.go +++ b/pkg/webrtc/producer.go @@ -17,7 +17,7 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer. } func (c *Conn) Start() error { - <-c.start + c.closed.Wait() return nil }