Add new Waiter class
This commit is contained in:
@@ -0,0 +1,53 @@
|
|||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Waiter support:
|
||||||
|
// - autotart on first Wait
|
||||||
|
// - block new waiters after last Done
|
||||||
|
// - safe Done after finish
|
||||||
|
type Waiter struct {
|
||||||
|
sync.WaitGroup
|
||||||
|
mu sync.Mutex
|
||||||
|
state int // state < 0 means finish
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Waiter) Add(delta int) {
|
||||||
|
w.mu.Lock()
|
||||||
|
if w.state >= 0 {
|
||||||
|
w.state += delta
|
||||||
|
w.WaitGroup.Add(delta)
|
||||||
|
}
|
||||||
|
w.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Waiter) Wait() {
|
||||||
|
w.mu.Lock()
|
||||||
|
// first wait auto start waiter
|
||||||
|
if w.state == 0 {
|
||||||
|
w.state++
|
||||||
|
w.WaitGroup.Add(1)
|
||||||
|
}
|
||||||
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
w.WaitGroup.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Waiter) Done() {
|
||||||
|
w.mu.Lock()
|
||||||
|
|
||||||
|
// safe run Done only when have tasks
|
||||||
|
if w.state > 0 {
|
||||||
|
w.state--
|
||||||
|
w.WaitGroup.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
// block waiter for any operations after last done
|
||||||
|
if w.state == 0 {
|
||||||
|
w.state = -1
|
||||||
|
}
|
||||||
|
|
||||||
|
w.mu.Unlock()
|
||||||
|
}
|
||||||
+5
-8
@@ -1,6 +1,7 @@
|
|||||||
package webrtc
|
package webrtc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||||
"github.com/pion/webrtc/v3"
|
"github.com/pion/webrtc/v3"
|
||||||
)
|
)
|
||||||
@@ -18,12 +19,12 @@ type Conn struct {
|
|||||||
receive int
|
receive int
|
||||||
send int
|
send int
|
||||||
|
|
||||||
offer string
|
offer string
|
||||||
start chan struct{}
|
closed core.Waiter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConn(pc *webrtc.PeerConnection) *Conn {
|
func NewConn(pc *webrtc.PeerConnection) *Conn {
|
||||||
c := &Conn{pc: pc, start: make(chan struct{})}
|
c := &Conn{pc: pc}
|
||||||
|
|
||||||
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
|
||||||
c.Fire(candidate)
|
c.Fire(candidate)
|
||||||
@@ -77,11 +78,7 @@ func NewConn(pc *webrtc.PeerConnection) *Conn {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Close() error {
|
func (c *Conn) Close() error {
|
||||||
// unblocked write to chan
|
c.closed.Done()
|
||||||
select {
|
|
||||||
case c.start <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return c.pc.Close()
|
return c.pc.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Start() error {
|
func (c *Conn) Start() error {
|
||||||
<-c.start
|
c.closed.Wait()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user