From fa763399c2594b8ea0281f1460706e298482ddb6 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Wed, 28 Jun 2023 19:45:24 +0300 Subject: [PATCH] Improve HLS processing --- internal/hls/README.md | 3 ++ internal/hls/hls.go | 91 ++++++++++++++++++----------------------- internal/hls/session.go | 41 +++++++++++++++++++ internal/hls/ws.go | 81 ++++++++++++++++++++++++++++++++++++ 4 files changed, 165 insertions(+), 51 deletions(-) create mode 100644 internal/hls/README.md create mode 100644 internal/hls/session.go create mode 100644 internal/hls/ws.go diff --git a/internal/hls/README.md b/internal/hls/README.md new file mode 100644 index 00000000..c14f0fdb --- /dev/null +++ b/internal/hls/README.md @@ -0,0 +1,3 @@ +## Useful links + +- https://walterebert.com/playground/video/hls/ diff --git a/internal/hls/hls.go b/internal/hls/hls.go index a92b46a7..de92047a 100644 --- a/internal/hls/hls.go +++ b/internal/hls/hls.go @@ -1,8 +1,8 @@ package hls import ( - "fmt" "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mp4" @@ -25,6 +25,8 @@ func Init() { // HLS (fMP4) api.HandleFunc("api/hls/init.mp4", handlerInit) api.HandleFunc("api/hls/segment.m4s", handlerSegmentMP4) + + ws.HandleFunc("hls", handlerWSHLS) } type Consumer interface { @@ -35,16 +37,6 @@ type Consumer interface { Start() } -type Session struct { - cons Consumer - playlist string - init []byte - segment []byte - seq int - alive *time.Timer - mu sync.Mutex -} - const keepalive = 5 * time.Second var sessions = map[string]*Session{} @@ -86,21 +78,21 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { } } + if err := stream.AddConsumer(cons); err != nil { + log.Error().Err(err).Caller().Send() + return + } + session := &Session{cons: cons} cons.Listen(func(msg any) { if data, ok := msg.([]byte); ok { session.mu.Lock() - session.segment = append(session.segment, data...) + session.buffer = append(session.buffer, data...) session.mu.Unlock() } }) - if err := stream.AddConsumer(cons); err != nil { - log.Error().Err(err).Caller().Send() - return - } - session.alive = time.AfterFunc(keepalive, func() { stream.RemoveConsumer(cons) }) @@ -112,7 +104,7 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { // two segments important for Chromecast if medias != nil { - session.playlist = `#EXTM3U + session.template = `#EXTM3U #EXT-X-VERSION:6 #EXT-X-TARGETDURATION:1 #EXT-X-MEDIA-SEQUENCE:%d @@ -122,7 +114,7 @@ segment.m4s?id=` + sid + `&n=%d #EXTINF:0.500, segment.m4s?id=` + sid + `&n=%d` } else { - session.playlist = `#EXTM3U + session.template = `#EXTM3U #EXT-X-VERSION:3 #EXT-X-TARGETDURATION:1 #EXT-X-MEDIA-SEQUENCE:%d @@ -167,9 +159,7 @@ func handlerPlaylist(w http.ResponseWriter, r *http.Request) { return } - s := fmt.Sprintf(session.playlist, session.seq, session.seq, session.seq+1) - - if _, err := w.Write([]byte(s)); err != nil { + if _, err := w.Write([]byte(session.Playlist())); err != nil { log.Error().Err(err).Caller().Send() } } @@ -194,22 +184,12 @@ func handlerSegmentTS(w http.ResponseWriter, r *http.Request) { session.alive.Reset(keepalive) - var i byte - for len(session.segment) == 0 { - if i++; i > 10 { - http.NotFound(w, r) - return - } - time.Sleep(time.Millisecond * 100) + data := session.Segment() + if data == nil { + http.NotFound(w, r) + return } - session.mu.Lock() - data := session.segment - // important to start new segment with init - session.segment = session.init - session.seq++ - session.mu.Unlock() - if _, err := w.Write(data); err != nil { log.Error().Err(err).Caller().Send() } @@ -233,7 +213,16 @@ func handlerInit(w http.ResponseWriter, r *http.Request) { return } - if _, err := w.Write(session.init); err != nil { + data := session.init + session.init = nil + + session.segment0 = session.Segment() + if session.segment0 == nil { + http.NotFound(w, r) + return + } + + if _, err := w.Write(data); err != nil { log.Error().Err(err).Caller().Send() } } @@ -243,11 +232,13 @@ func handlerSegmentMP4(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "video/iso.segment") if r.Method == "OPTIONS" { - w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS") + w.Header().Set("Access-Control-Allow-Methods", "GET") return } - sid := r.URL.Query().Get("id") + query := r.URL.Query() + + sid := query.Get("id") sessionsMu.RLock() session := sessions[sid] sessionsMu.RUnlock() @@ -258,20 +249,18 @@ func handlerSegmentMP4(w http.ResponseWriter, r *http.Request) { session.alive.Reset(keepalive) - var i byte - for len(session.segment) == 0 { - if i++; i > 10 { - http.NotFound(w, r) - return - } - time.Sleep(time.Millisecond * 100) + var data []byte + + if query.Get("n") != "0" { + data = session.Segment() + } else { + data = session.segment0 } - session.mu.Lock() - data := session.segment - session.segment = nil - session.seq++ - session.mu.Unlock() + if data == nil { + http.NotFound(w, r) + return + } if _, err := w.Write(data); err != nil { log.Error().Err(err).Caller().Send() diff --git a/internal/hls/session.go b/internal/hls/session.go new file mode 100644 index 00000000..6f4d66ed --- /dev/null +++ b/internal/hls/session.go @@ -0,0 +1,41 @@ +package hls + +import ( + "fmt" + "sync" + "time" +) + +type Session struct { + cons Consumer + template string + init []byte + segment0 []byte + buffer []byte + seq int + alive *time.Timer + mu sync.Mutex +} + +func (s *Session) Playlist() string { + return fmt.Sprintf(s.template, s.seq, s.seq, s.seq+1) +} + +func (s *Session) Segment() (segment []byte) { + for i := 0; i < 20 && segment == nil; i++ { + if i > 0 { + time.Sleep(50 * time.Millisecond) + } + + s.mu.Lock() + if len(s.buffer) > 0 { + segment = s.buffer + // for TS important to start new segment with init + s.buffer = s.init + s.seq++ + } + s.mu.Unlock() + } + + return +} diff --git a/internal/hls/ws.go b/internal/hls/ws.go new file mode 100644 index 00000000..adae0a57 --- /dev/null +++ b/internal/hls/ws.go @@ -0,0 +1,81 @@ +package hls + +import ( + "errors" + "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/api/ws" + "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/mp4" + "github.com/AlexxIT/go2rtc/pkg/tcp" + "github.com/rs/zerolog/log" + "strings" + "time" +) + +func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error { + src := tr.Request.URL.Query().Get("src") + stream := streams.Get(src) + if stream == nil { + return errors.New(api.StreamNotFound) + } + + codecs := msg.String() + + cons := &mp4.Consumer{ + RemoteAddr: tcp.RemoteAddr(tr.Request), + UserAgent: tr.Request.UserAgent(), + Medias: mp4.ParseCodecs(codecs, true), + } + + if err := stream.AddConsumer(cons); err != nil { + log.Error().Err(err).Caller().Send() + return err + } + + session := &Session{cons: cons} + + cons.Listen(func(msg any) { + if data, ok := msg.([]byte); ok { + session.mu.Lock() + session.buffer = append(session.buffer, data...) + session.mu.Unlock() + } + }) + + session.alive = time.AfterFunc(keepalive, func() { + stream.RemoveConsumer(cons) + }) + session.init, _ = cons.Init() + + cons.Start() + + sid := core.RandString(8, 62) + + // two segments important for Chromecast + session.template = `#EXTM3U +#EXT-X-VERSION:6 +#EXT-X-TARGETDURATION:1 +#EXT-X-MEDIA-SEQUENCE:%d +#EXT-X-MAP:URI="init.mp4?id=` + sid + `" +#EXTINF:0.500, +segment.m4s?id=` + sid + `&n=%d +#EXTINF:0.500, +segment.m4s?id=` + sid + `&n=%d` + + sessionsMu.Lock() + sessions[sid] = session + sessionsMu.Unlock() + + // Apple Safari can play FLAC codec, but fail it it in m3u8 playlist + codecs = strings.Replace(cons.MimeCodecs(), mp4.MimeFlac, mp4.MimeAAC, 1) + + // bandwidth important for Safari, codecs useful for smooth playback + data := `#EXTM3U +#EXT-X-STREAM-INF:BANDWIDTH=1000000,CODECS="` + codecs + `" +hls/playlist.m3u8?id=` + sid + + tr.Write(&ws.Message{Type: "hls", Value: data}) + + return nil +}