From dd066ba04060dfb4bfdea22679a259e39787cbee Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Thu, 17 Aug 2023 06:55:59 +0300 Subject: [PATCH] Add HLS client --- internal/http/http.go | 23 ++++--- pkg/hls/producer.go | 17 +++++ pkg/hls/reader.go | 146 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 176 insertions(+), 10 deletions(-) create mode 100644 pkg/hls/producer.go create mode 100644 pkg/hls/reader.go diff --git a/internal/http/http.go b/internal/http/http.go index f7461902..9d63990a 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -9,6 +9,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/hls" "github.com/AlexxIT/go2rtc/pkg/magic" "github.com/AlexxIT/go2rtc/pkg/mjpeg" "github.com/AlexxIT/go2rtc/pkg/multipart" @@ -39,27 +40,29 @@ func handleHTTP(url string) (core.Producer, error) { return nil, errors.New(res.Status) } + // 1. Guess format from content type ct := res.Header.Get("Content-Type") if i := strings.IndexByte(ct, ';'); i > 0 { ct = ct[:i] } - switch ct { - case "image/jpeg": + var ext string + if i := strings.LastIndexByte(req.URL.Path, '.'); i > 0 { + ext = req.URL.Path[i+1:] + } + + switch { + case ct == "image/jpeg": return mjpeg.NewClient(res), nil - case "multipart/x-mixed-replace": + case ct == "multipart/x-mixed-replace": return multipart.NewClient(res) - default: // "video/mpeg": + case ct == "application/vnd.apple.mpegurl" || ext == "m3u8": + return hls.OpenURL(req.URL, res.Body) } - client, err := magic.Open(res.Body) - if err != nil { - return nil, err - } - - return client, nil + return magic.Open(res.Body) } func handleTCP(rawURL string) (core.Producer, error) { diff --git a/pkg/hls/producer.go b/pkg/hls/producer.go new file mode 100644 index 00000000..410e771a --- /dev/null +++ b/pkg/hls/producer.go @@ -0,0 +1,17 @@ +package hls + +import ( + "io" + "net/url" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/mpegts" +) + +func OpenURL(u *url.URL, body io.ReadCloser) (core.Producer, error) { + rd, err := NewReader(u, body) + if err != nil { + return nil, err + } + return mpegts.Open(rd) +} diff --git a/pkg/hls/reader.go b/pkg/hls/reader.go new file mode 100644 index 00000000..a0e79b9e --- /dev/null +++ b/pkg/hls/reader.go @@ -0,0 +1,146 @@ +package hls + +import ( + "bytes" + "errors" + "io" + "net/http" + "net/url" + "regexp" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +type reader struct { + client *http.Client + request *http.Request + + lastSegment []byte + lastTime time.Time + + buf []byte +} + +func NewReader(u *url.URL, body io.ReadCloser) (io.Reader, error) { + b, err := io.ReadAll(body) + if err != nil { + return nil, err + } + + re := regexp.MustCompile(`#EXT-X-STREAM-INF.+?\n(\S+)`) + m := re.FindSubmatch(b) + if m == nil { + return nil, errors.New("hls: wrong playlist: " + string(b)) + } + + ref, err := url.Parse(string(m[1])) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("GET", u.ResolveReference(ref).String(), nil) + if err != nil { + return nil, err + } + + rd := &reader{ + client: &http.Client{Timeout: core.ConnDialTimeout}, + request: req, + } + return rd, nil +} + +func (r *reader) Read(dst []byte) (n int, err error) { + // 1. Check temporary tempbuffer + if len(r.buf) == 0 { + src, err2 := r.getSegment() + if err2 != nil { + return 0, err2 + } + + // 2. Check if the message fits in the buffer + if len(src) <= len(dst) { + return copy(dst, src), nil + } + + // 3. Put the message into a temporary buffer + r.buf = src + } + + // 4. Send temporary buffer + n = copy(dst, r.buf) + r.buf = r.buf[n:] + return +} + +func (r *reader) getSegment() ([]byte, error) { + for { + if wait := time.Second - time.Since(r.lastTime); wait > 0 { + time.Sleep(wait) + } + + // 1. Load playlist + res, err := r.client.Do(r.request) + if err != nil { + return nil, err + } + + playlist, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + r.lastTime = time.Now() + + //log.Printf("[hls] load playlist\n%s", playlist) + + // 2. Remove all previous segments from playlist + if i := bytes.Index(playlist, r.lastSegment); i > 0 { + playlist = playlist[i:] + } + + for playlist != nil { + // 3. Get link to new segment + var segment []byte + if segment, playlist = getSegment(playlist); segment == nil { + break + } + + //log.Printf("[hls] load segment: %s", segment) + + ref, err2 := url.Parse(string(segment)) + if err2 != nil { + return nil, err2 + } + + ref = r.request.URL.ResolveReference(ref) + if res, err2 = r.client.Get(ref.String()); err2 != nil { + return nil, err2 + } + + r.lastSegment = segment + + return io.ReadAll(res.Body) + } + } +} + +func getSegment(src []byte) (segment, left []byte) { + for ok := false; !ok; { + ok = bytes.HasPrefix(src, []byte("#EXTINF")) + + i := bytes.IndexByte(src, '\n') + 1 + if i == 0 { + return nil, nil + } + + src = src[i:] + } + + if i := bytes.IndexByte(src, '\n'); i > 0 { + return src[:i], src[i+1:] + } + + return src, nil +}