diff --git a/cmd/streams/producer.go b/cmd/streams/producer.go index 5e32e88f..7bc45905 100644 --- a/cmd/streams/producer.go +++ b/cmd/streams/producer.go @@ -140,7 +140,7 @@ func (p *Producer) reconnect() { // move sink from old track to new track newTrack := p.element.GetTrack(media, codec) - newTrack.Sink = oldTrack.Sink + newTrack.GetSink(oldTrack) p.tracks[i] = newTrack break diff --git a/cmd/streams/stream.go b/cmd/streams/stream.go index f0eef79a..451c6b4a 100644 --- a/cmd/streams/stream.go +++ b/cmd/streams/stream.go @@ -129,7 +129,7 @@ func (s *Stream) RemoveConsumer(cons streamer.Consumer) { var sink bool for _, track := range producer.tracks { - if len(track.Sink) > 0 { + if track.HasSink() { sink = true } } diff --git a/pkg/streamer/track.go b/pkg/streamer/track.go index d3359d5a..7892c91c 100644 --- a/pkg/streamer/track.go +++ b/pkg/streamer/track.go @@ -12,44 +12,54 @@ type WrapperFunc func(push WriterFunc) WriterFunc type Track struct { Codec *Codec Direction string - Sink map[*Track]WriterFunc - mx sync.Mutex + sink map[*Track]WriterFunc + sinkMu sync.Mutex } func (t *Track) String() string { s := t.Codec.String() - s += fmt.Sprintf(", sinks=%d", len(t.Sink)) + s += fmt.Sprintf(", sinks=%d", len(t.sink)) return s } func (t *Track) WriteRTP(p *rtp.Packet) error { - t.mx.Lock() - for _, f := range t.Sink { + t.sinkMu.Lock() + for _, f := range t.sink { _ = f(p) } - t.mx.Unlock() + t.sinkMu.Unlock() return nil } func (t *Track) Bind(w WriterFunc) *Track { - t.mx.Lock() + t.sinkMu.Lock() - if t.Sink == nil { - t.Sink = map[*Track]WriterFunc{} + if t.sink == nil { + t.sink = map[*Track]WriterFunc{} } clone := &Track{ - Codec: t.Codec, Direction: t.Direction, Sink: t.Sink, + Codec: t.Codec, Direction: t.Direction, sink: t.sink, } - t.Sink[clone] = w + t.sink[clone] = w - t.mx.Unlock() + t.sinkMu.Unlock() return clone } func (t *Track) Unbind() { - t.mx.Lock() - delete(t.Sink, t) - t.mx.Unlock() + t.sinkMu.Lock() + delete(t.sink, t) + t.sinkMu.Unlock() +} + +func (t *Track) GetSink(from *Track) { + t.sink = from.sink +} + +func (t *Track) HasSink() bool { + t.sinkMu.Lock() + defer t.sinkMu.Unlock() + return len(t.sink) > 0 }