Improve HLS processing

This commit is contained in:
Alexey Khit
2023-06-28 19:45:24 +03:00
parent af2398c072
commit fa763399c2
4 changed files with 165 additions and 51 deletions
+3
View File
@@ -0,0 +1,3 @@
## Useful links
- https://walterebert.com/playground/video/hls/
+40 -51
View File
@@ -1,8 +1,8 @@
package hls package hls
import ( import (
"fmt"
"github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4" "github.com/AlexxIT/go2rtc/pkg/mp4"
@@ -25,6 +25,8 @@ func Init() {
// HLS (fMP4) // HLS (fMP4)
api.HandleFunc("api/hls/init.mp4", handlerInit) api.HandleFunc("api/hls/init.mp4", handlerInit)
api.HandleFunc("api/hls/segment.m4s", handlerSegmentMP4) api.HandleFunc("api/hls/segment.m4s", handlerSegmentMP4)
ws.HandleFunc("hls", handlerWSHLS)
} }
type Consumer interface { type Consumer interface {
@@ -35,16 +37,6 @@ type Consumer interface {
Start() Start()
} }
type Session struct {
cons Consumer
playlist string
init []byte
segment []byte
seq int
alive *time.Timer
mu sync.Mutex
}
const keepalive = 5 * time.Second const keepalive = 5 * time.Second
var sessions = map[string]*Session{} var sessions = map[string]*Session{}
@@ -86,21 +78,21 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
} }
} }
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
return
}
session := &Session{cons: cons} session := &Session{cons: cons}
cons.Listen(func(msg any) { cons.Listen(func(msg any) {
if data, ok := msg.([]byte); ok { if data, ok := msg.([]byte); ok {
session.mu.Lock() session.mu.Lock()
session.segment = append(session.segment, data...) session.buffer = append(session.buffer, data...)
session.mu.Unlock() session.mu.Unlock()
} }
}) })
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
return
}
session.alive = time.AfterFunc(keepalive, func() { session.alive = time.AfterFunc(keepalive, func() {
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
}) })
@@ -112,7 +104,7 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
// two segments important for Chromecast // two segments important for Chromecast
if medias != nil { if medias != nil {
session.playlist = `#EXTM3U session.template = `#EXTM3U
#EXT-X-VERSION:6 #EXT-X-VERSION:6
#EXT-X-TARGETDURATION:1 #EXT-X-TARGETDURATION:1
#EXT-X-MEDIA-SEQUENCE:%d #EXT-X-MEDIA-SEQUENCE:%d
@@ -122,7 +114,7 @@ segment.m4s?id=` + sid + `&n=%d
#EXTINF:0.500, #EXTINF:0.500,
segment.m4s?id=` + sid + `&n=%d` segment.m4s?id=` + sid + `&n=%d`
} else { } else {
session.playlist = `#EXTM3U session.template = `#EXTM3U
#EXT-X-VERSION:3 #EXT-X-VERSION:3
#EXT-X-TARGETDURATION:1 #EXT-X-TARGETDURATION:1
#EXT-X-MEDIA-SEQUENCE:%d #EXT-X-MEDIA-SEQUENCE:%d
@@ -167,9 +159,7 @@ func handlerPlaylist(w http.ResponseWriter, r *http.Request) {
return return
} }
s := fmt.Sprintf(session.playlist, session.seq, session.seq, session.seq+1) if _, err := w.Write([]byte(session.Playlist())); err != nil {
if _, err := w.Write([]byte(s)); err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
} }
} }
@@ -194,22 +184,12 @@ func handlerSegmentTS(w http.ResponseWriter, r *http.Request) {
session.alive.Reset(keepalive) session.alive.Reset(keepalive)
var i byte data := session.Segment()
for len(session.segment) == 0 { if data == nil {
if i++; i > 10 { http.NotFound(w, r)
http.NotFound(w, r) return
return
}
time.Sleep(time.Millisecond * 100)
} }
session.mu.Lock()
data := session.segment
// important to start new segment with init
session.segment = session.init
session.seq++
session.mu.Unlock()
if _, err := w.Write(data); err != nil { if _, err := w.Write(data); err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
} }
@@ -233,7 +213,16 @@ func handlerInit(w http.ResponseWriter, r *http.Request) {
return return
} }
if _, err := w.Write(session.init); err != nil { data := session.init
session.init = nil
session.segment0 = session.Segment()
if session.segment0 == nil {
http.NotFound(w, r)
return
}
if _, err := w.Write(data); err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
} }
} }
@@ -243,11 +232,13 @@ func handlerSegmentMP4(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "video/iso.segment") w.Header().Add("Content-Type", "video/iso.segment")
if r.Method == "OPTIONS" { if r.Method == "OPTIONS" {
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS") w.Header().Set("Access-Control-Allow-Methods", "GET")
return return
} }
sid := r.URL.Query().Get("id") query := r.URL.Query()
sid := query.Get("id")
sessionsMu.RLock() sessionsMu.RLock()
session := sessions[sid] session := sessions[sid]
sessionsMu.RUnlock() sessionsMu.RUnlock()
@@ -258,20 +249,18 @@ func handlerSegmentMP4(w http.ResponseWriter, r *http.Request) {
session.alive.Reset(keepalive) session.alive.Reset(keepalive)
var i byte var data []byte
for len(session.segment) == 0 {
if i++; i > 10 { if query.Get("n") != "0" {
http.NotFound(w, r) data = session.Segment()
return } else {
} data = session.segment0
time.Sleep(time.Millisecond * 100)
} }
session.mu.Lock() if data == nil {
data := session.segment http.NotFound(w, r)
session.segment = nil return
session.seq++ }
session.mu.Unlock()
if _, err := w.Write(data); err != nil { if _, err := w.Write(data); err != nil {
log.Error().Err(err).Caller().Send() log.Error().Err(err).Caller().Send()
+41
View File
@@ -0,0 +1,41 @@
package hls
import (
"fmt"
"sync"
"time"
)
type Session struct {
cons Consumer
template string
init []byte
segment0 []byte
buffer []byte
seq int
alive *time.Timer
mu sync.Mutex
}
func (s *Session) Playlist() string {
return fmt.Sprintf(s.template, s.seq, s.seq, s.seq+1)
}
func (s *Session) Segment() (segment []byte) {
for i := 0; i < 20 && segment == nil; i++ {
if i > 0 {
time.Sleep(50 * time.Millisecond)
}
s.mu.Lock()
if len(s.buffer) > 0 {
segment = s.buffer
// for TS important to start new segment with init
s.buffer = s.init
s.seq++
}
s.mu.Unlock()
}
return
}
+81
View File
@@ -0,0 +1,81 @@
package hls
import (
"errors"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog/log"
"strings"
"time"
)
func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error {
src := tr.Request.URL.Query().Get("src")
stream := streams.Get(src)
if stream == nil {
return errors.New(api.StreamNotFound)
}
codecs := msg.String()
cons := &mp4.Consumer{
RemoteAddr: tcp.RemoteAddr(tr.Request),
UserAgent: tr.Request.UserAgent(),
Medias: mp4.ParseCodecs(codecs, true),
}
if err := stream.AddConsumer(cons); err != nil {
log.Error().Err(err).Caller().Send()
return err
}
session := &Session{cons: cons}
cons.Listen(func(msg any) {
if data, ok := msg.([]byte); ok {
session.mu.Lock()
session.buffer = append(session.buffer, data...)
session.mu.Unlock()
}
})
session.alive = time.AfterFunc(keepalive, func() {
stream.RemoveConsumer(cons)
})
session.init, _ = cons.Init()
cons.Start()
sid := core.RandString(8, 62)
// two segments important for Chromecast
session.template = `#EXTM3U
#EXT-X-VERSION:6
#EXT-X-TARGETDURATION:1
#EXT-X-MEDIA-SEQUENCE:%d
#EXT-X-MAP:URI="init.mp4?id=` + sid + `"
#EXTINF:0.500,
segment.m4s?id=` + sid + `&n=%d
#EXTINF:0.500,
segment.m4s?id=` + sid + `&n=%d`
sessionsMu.Lock()
sessions[sid] = session
sessionsMu.Unlock()
// Apple Safari can play FLAC codec, but fail it it in m3u8 playlist
codecs = strings.Replace(cons.MimeCodecs(), mp4.MimeFlac, mp4.MimeAAC, 1)
// bandwidth important for Safari, codecs useful for smooth playback
data := `#EXTM3U
#EXT-X-STREAM-INF:BANDWIDTH=1000000,CODECS="` + codecs + `"
hls/playlist.m3u8?id=` + sid
tr.Write(&ws.Message{Type: "hls", Value: data})
return nil
}