Fix concurrency in ivideon
This commit is contained in:
+36
-13
@@ -18,6 +18,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type State byte
|
||||||
|
|
||||||
|
const (
|
||||||
|
StateNone State = iota
|
||||||
|
StateConn
|
||||||
|
StateHandle
|
||||||
|
)
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
streamer.Element
|
streamer.Element
|
||||||
|
|
||||||
@@ -31,6 +39,7 @@ type Client struct {
|
|||||||
t0 time.Time
|
t0 time.Time
|
||||||
|
|
||||||
buffer chan []byte
|
buffer chan []byte
|
||||||
|
state State
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -69,16 +78,26 @@ func (c *Client) Dial() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.state = StateConn
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Handle() error {
|
func (c *Client) Handle() error {
|
||||||
c.buffer = make(chan []byte, 5)
|
|
||||||
// add delay to the stream for smooth playing (not a best solution)
|
// add delay to the stream for smooth playing (not a best solution)
|
||||||
c.t0 = time.Now().Add(time.Second)
|
c.t0 = time.Now().Add(time.Second)
|
||||||
|
|
||||||
// processing stream in separate thread for lower delay between packets
|
c.mu.Lock()
|
||||||
go c.worker()
|
|
||||||
|
if c.state == StateConn {
|
||||||
|
c.buffer = make(chan []byte, 5)
|
||||||
|
c.state = StateHandle
|
||||||
|
|
||||||
|
// processing stream in separate thread for lower delay between packets
|
||||||
|
go c.worker(c.buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
_, data, err := c.conn.ReadMessage()
|
_, data, err := c.conn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -88,7 +107,7 @@ func (c *Client) Handle() error {
|
|||||||
track := c.tracks[c.msg.Track]
|
track := c.tracks[c.msg.Track]
|
||||||
if track != nil {
|
if track != nil {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
if c.buffer != nil {
|
if c.state == StateHandle {
|
||||||
c.buffer <- data
|
c.buffer <- data
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
@@ -119,7 +138,7 @@ func (c *Client) Handle() error {
|
|||||||
track = c.tracks[msg.Track]
|
track = c.tracks[msg.Track]
|
||||||
if track != nil {
|
if track != nil {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
if c.buffer != nil {
|
if c.state == StateHandle {
|
||||||
c.buffer <- data
|
c.buffer <- data
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
@@ -132,15 +151,19 @@ func (c *Client) Handle() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Close() error {
|
func (c *Client) Close() error {
|
||||||
if c.conn == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
if c.buffer != nil {
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
switch c.state {
|
||||||
|
case StateNone:
|
||||||
|
return nil
|
||||||
|
case StateConn:
|
||||||
|
case StateHandle:
|
||||||
close(c.buffer)
|
close(c.buffer)
|
||||||
c.buffer = nil
|
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
|
||||||
|
c.state = StateNone
|
||||||
|
|
||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -220,13 +243,13 @@ func (c *Client) getTracks() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) worker() {
|
func (c *Client) worker(buffer chan []byte) {
|
||||||
var track *streamer.Track
|
var track *streamer.Track
|
||||||
for _, track = range c.tracks {
|
for _, track = range c.tracks {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
for data := range c.buffer {
|
for data := range buffer {
|
||||||
moof := &fmp4io.MovieFrag{}
|
moof := &fmp4io.MovieFrag{}
|
||||||
if _, err := moof.Unmarshal(data, 0); err != nil {
|
if _, err := moof.Unmarshal(data, 0); err != nil {
|
||||||
continue
|
continue
|
||||||
|
|||||||
Reference in New Issue
Block a user