Update HLS stream processing

This commit is contained in:
Alexey Khit
2023-07-08 09:33:31 +03:00
parent fded87aa33
commit 01ef67153e
3 changed files with 18 additions and 5 deletions
+15 -3
View File
@@ -3,12 +3,13 @@ package hls
import ( import (
"github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4" "github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/mpegts" "github.com/AlexxIT/go2rtc/pkg/mpegts"
"github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog/log" "github.com/rs/zerolog"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
@@ -16,6 +17,8 @@ import (
) )
func Init() { func Init() {
log = app.GetLogger("hls")
api.HandleFunc("api/stream.m3u8", handlerStream) api.HandleFunc("api/stream.m3u8", handlerStream)
api.HandleFunc("api/hls/playlist.m3u8", handlerPlaylist) api.HandleFunc("api/hls/playlist.m3u8", handlerPlaylist)
@@ -37,6 +40,8 @@ type Consumer interface {
Start() Start()
} }
var log zerolog.Logger
const keepalive = 5 * time.Second const keepalive = 5 * time.Second
var sessions = map[string]*Session{} var sessions = map[string]*Session{}
@@ -94,15 +99,19 @@ func handlerStream(w http.ResponseWriter, r *http.Request) {
} }
}) })
sid := core.RandString(8, 62)
session.alive = time.AfterFunc(keepalive, func() { session.alive = time.AfterFunc(keepalive, func() {
sessionsMu.Lock()
delete(sessions, sid)
sessionsMu.Unlock()
stream.RemoveConsumer(cons) stream.RemoveConsumer(cons)
}) })
session.init, _ = cons.Init() session.init, _ = cons.Init()
cons.Start() cons.Start()
sid := core.RandString(8, 62)
// two segments important for Chromecast // two segments important for Chromecast
if medias != nil { if medias != nil {
session.template = `#EXTM3U session.template = `#EXTM3U
@@ -187,6 +196,7 @@ func handlerSegmentTS(w http.ResponseWriter, r *http.Request) {
data := session.Segment() data := session.Segment()
if data == nil { if data == nil {
log.Warn().Msgf("[hls] can't get segment %s", r.URL.RawQuery)
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
@@ -219,6 +229,7 @@ func handlerInit(w http.ResponseWriter, r *http.Request) {
session.segment0 = session.Segment() session.segment0 = session.Segment()
if session.segment0 == nil { if session.segment0 == nil {
log.Warn().Msgf("[hls] can't get init %s", r.URL.RawQuery)
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
@@ -259,6 +270,7 @@ func handlerSegmentMP4(w http.ResponseWriter, r *http.Request) {
} }
if data == nil { if data == nil {
log.Warn().Msgf("[hls] can't get segment %s", r.URL.RawQuery)
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
+1 -1
View File
@@ -22,7 +22,7 @@ func (s *Session) Playlist() string {
} }
func (s *Session) Segment() (segment []byte) { func (s *Session) Segment() (segment []byte) {
for i := 0; i < 20 && segment == nil; i++ { for i := 0; i < 60 && segment == nil; i++ {
if i > 0 { if i > 0 {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
} }
+2 -1
View File
@@ -8,7 +8,6 @@ import (
"github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4" "github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/AlexxIT/go2rtc/pkg/tcp"
"github.com/rs/zerolog/log"
"strings" "strings"
"time" "time"
) )
@@ -22,6 +21,8 @@ func handlerWSHLS(tr *ws.Transport, msg *ws.Message) error {
codecs := msg.String() codecs := msg.String()
log.Trace().Msgf("[hls] new ws consumer codecs=%s", codecs)
cons := &mp4.Consumer{ cons := &mp4.Consumer{
Desc: "HLS/WebSocket", Desc: "HLS/WebSocket",
RemoteAddr: tcp.RemoteAddr(tr.Request), RemoteAddr: tcp.RemoteAddr(tr.Request),