Fix panic for concurrent streams map read and map write #1612

This commit is contained in:
Alex X
2025-02-24 21:02:33 +03:00
parent e55c2e9598
commit 90544ba713
5 changed files with 45 additions and 28 deletions
+4 -4
View File
@@ -112,7 +112,7 @@ func apiUnpair(id string) error {
return errors.New(api.StreamNotFound) return errors.New(api.StreamNotFound)
} }
rawURL := findHomeKitURL(stream) rawURL := findHomeKitURL(stream.Sources())
if rawURL == "" { if rawURL == "" {
return errors.New("not homekit source") return errors.New("not homekit source")
} }
@@ -128,10 +128,10 @@ func apiUnpair(id string) error {
func findHomeKitURLs() map[string]*url.URL { func findHomeKitURLs() map[string]*url.URL {
urls := map[string]*url.URL{} urls := map[string]*url.URL{}
for id, stream := range streams.Streams() { for name, sources := range streams.GetAllSources() {
if rawURL := findHomeKitURL(stream); rawURL != "" { if rawURL := findHomeKitURL(sources); rawURL != "" {
if u, err := url.Parse(rawURL); err == nil { if u, err := url.Parse(rawURL); err == nil {
urls[id] = u urls[name] = u
} }
} }
} }
+2 -3
View File
@@ -79,7 +79,7 @@ func Init() {
Handler: homekit.ServerHandler(srv), Handler: homekit.ServerHandler(srv),
} }
if url := findHomeKitURL(stream); url != "" { if url := findHomeKitURL(stream.Sources()); url != "" {
// 1. Act as transparent proxy for HomeKit camera // 1. Act as transparent proxy for HomeKit camera
dial := func() (net.Conn, error) { dial := func() (net.Conn, error) {
client, err := homekit.Dial(url, srtp.Server) client, err := homekit.Dial(url, srtp.Server)
@@ -186,8 +186,7 @@ func hapPairVerify(w http.ResponseWriter, r *http.Request) {
} }
} }
func findHomeKitURL(stream *streams.Stream) string { func findHomeKitURL(sources []string) string {
sources := stream.Sources()
if len(sources) == 0 { if len(sources) == 0 {
return "" return ""
} }
+2 -2
View File
@@ -99,11 +99,11 @@ func onvifDeviceService(w http.ResponseWriter, r *http.Request) {
}) })
case onvif.MediaGetVideoSources: case onvif.MediaGetVideoSources:
b = onvif.GetVideoSourcesResponse(streams.GetAll()) b = onvif.GetVideoSourcesResponse(streams.GetAllNames())
case onvif.MediaGetProfiles: case onvif.MediaGetProfiles:
// important for Hass: H264 codec, width, height // important for Hass: H264 codec, width, height
b = onvif.GetProfilesResponse(streams.GetAll()) b = onvif.GetProfilesResponse(streams.GetAllNames())
case onvif.MediaGetProfile: case onvif.MediaGetProfile:
token := onvif.FindTagValue(b, "ProfileToken") token := onvif.FindTagValue(b, "ProfileToken")
+3 -2
View File
@@ -47,11 +47,12 @@ func NewStream(source any) *Stream {
} }
} }
func (s *Stream) Sources() (sources []string) { func (s *Stream) Sources() []string {
sources := make([]string, 0, len(s.producers))
for _, prod := range s.producers { for _, prod := range s.producers {
sources = append(sources, prod.url) sources = append(sources, prod.url)
} }
return return sources
} }
func (s *Stream) SetSource(source string) { func (s *Stream) SetSource(source string) {
+33 -16
View File
@@ -42,10 +42,6 @@ func Init() {
}) })
} }
func Get(name string) *Stream {
return streams[name]
}
var sanitize = regexp.MustCompile(`\s`) var sanitize = regexp.MustCompile(`\s`)
// Validate - not allow creating dynamic streams with spaces in the source // Validate - not allow creating dynamic streams with spaces in the source
@@ -68,6 +64,7 @@ func New(name string, sources ...string) *Stream {
streamsMu.Lock() streamsMu.Lock()
streams[name] = stream streams[name] = stream
streamsMu.Unlock() streamsMu.Unlock()
return stream return stream
} }
@@ -124,7 +121,7 @@ func GetOrPatch(query url.Values) *Stream {
} }
// check if src is stream name // check if src is stream name
if stream, ok := streams[source]; ok { if stream := Get(source); stream != nil {
return stream return stream
} }
@@ -139,21 +136,41 @@ func GetOrPatch(query url.Values) *Stream {
return Patch(source, source) return Patch(source, source)
} }
func GetAll() (names []string) { var log zerolog.Logger
// streams map
var streams = map[string]*Stream{}
var streamsMu sync.Mutex
func Get(name string) *Stream {
streamsMu.Lock()
defer streamsMu.Unlock()
return streams[name]
}
func Delete(name string) {
streamsMu.Lock()
defer streamsMu.Unlock()
delete(streams, name)
}
func GetAllNames() []string {
streamsMu.Lock()
names := make([]string, 0, len(streams))
for name := range streams { for name := range streams {
names = append(names, name) names = append(names, name)
} }
return streamsMu.Unlock()
return names
} }
func Streams() map[string]*Stream { func GetAllSources() map[string][]string {
return streams streamsMu.Lock()
sources := make(map[string][]string, len(streams))
for name, stream := range streams {
sources[name] = stream.Sources()
} }
streamsMu.Unlock()
func Delete(id string) { return sources
delete(streams, id)
} }
var log zerolog.Logger
var streams = map[string]*Stream{}
var streamsMu sync.Mutex