diff --git a/cmd/hls/hls.go b/cmd/hls/hls.go new file mode 100644 index 00000000..32618c64 --- /dev/null +++ b/cmd/hls/hls.go @@ -0,0 +1,261 @@ +package hls + +import ( + "fmt" + "github.com/AlexxIT/go2rtc/cmd/api" + "github.com/AlexxIT/go2rtc/cmd/streams" + "github.com/AlexxIT/go2rtc/pkg/mp4" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/AlexxIT/go2rtc/pkg/ts" + "github.com/rs/zerolog/log" + "net/http" + "strconv" + "sync" + "time" +) + +func Init() { + api.HandleFunc("api/stream.m3u8", handlerStream) + api.HandleFunc("api/hls/playlist.m3u8", handlerPlaylist) + + // HLS (TS) + api.HandleFunc("api/hls/segment.ts", handlerSegmentTS) + + // HLS (fMP4) + api.HandleFunc("api/hls/init.mp4", handlerInit) + api.HandleFunc("api/hls/segment.m4s", handlerSegmentMP4) +} + +type Consumer interface { + streamer.Consumer + Init() ([]byte, error) + MimeCodecs() string + Start() +} + +type Session struct { + cons Consumer + playlist string + init []byte + segment []byte + seq int + alive *time.Timer + mu sync.Mutex +} + +const keepalive = 5 * time.Second + +var sessions = map[string]*Session{} + +func handlerStream(w http.ResponseWriter, r *http.Request) { + // CORS important for Chromecast + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + + if r.Method == "OPTIONS" { + w.Header().Set("Access-Control-Allow-Methods", "GET") + return + } + + src := r.URL.Query().Get("src") + stream := streams.GetOrNew(src) + if stream == nil { + http.Error(w, api.StreamNotFound, http.StatusNotFound) + return + } + + var cons Consumer + + // use fMP4 with codecs filter and TS without + medias := mp4.ParseQuery(r.URL.Query()) + if medias != nil { + cons = &mp4.Consumer{ + RemoteAddr: r.RemoteAddr, + UserAgent: r.UserAgent(), + Medias: medias, + } + } else { + cons = &ts.Consumer{ + RemoteAddr: r.RemoteAddr, + UserAgent: r.UserAgent(), + } + } + + session := &Session{cons: cons} + + cons.Listen(func(msg interface{}) { + if data, ok := msg.([]byte); ok { + session.mu.Lock() + session.segment = append(session.segment, data...) + session.mu.Unlock() + } + }) + + if err := stream.AddConsumer(cons); err != nil { + log.Error().Err(err).Caller().Send() + return + } + + session.alive = time.AfterFunc(keepalive, func() { + stream.RemoveConsumer(cons) + }) + session.init, _ = cons.Init() + + cons.Start() + + sid := strconv.FormatInt(time.Now().UnixNano(), 10) + + // two segments important for Chromecast + if medias != nil { + session.playlist = `#EXTM3U +#EXT-X-VERSION:6 +#EXT-X-TARGETDURATION:1 +#EXT-X-MEDIA-SEQUENCE:%d +#EXT-X-MAP:URI="init.mp4?id=` + sid + `" +#EXTINF:0.500, +segment.m4s?id=` + sid + `&n=%d +#EXTINF:0.500, +segment.m4s?id=` + sid + `&n=%d` + } else { + session.playlist = `#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-TARGETDURATION:1 +#EXT-X-MEDIA-SEQUENCE:%d +#EXTINF:0.500, +segment.ts?id=` + sid + `&n=%d +#EXTINF:0.500, +segment.ts?id=` + sid + `&n=%d` + } + + sessions[sid] = session + + // bandwidth important for Safari, codecs useful for smooth playback + data := []byte(`#EXTM3U +#EXT-X-STREAM-INF:BANDWIDTH=1000000,CODECS="` + cons.MimeCodecs() + `" +hls/playlist.m3u8?id=` + sid) + + if _, err := w.Write(data); err != nil { + log.Error().Err(err).Caller().Send() + } +} + +func handlerPlaylist(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + + if r.Method == "OPTIONS" { + w.Header().Set("Access-Control-Allow-Methods", "GET") + return + } + + sid := r.URL.Query().Get("id") + session := sessions[sid] + if session == nil { + http.NotFound(w, r) + return + } + + s := fmt.Sprintf(session.playlist, session.seq, session.seq, session.seq+1) + + if _, err := w.Write([]byte(s)); err != nil { + log.Error().Err(err).Caller().Send() + } +} + +func handlerSegmentTS(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "video/mp2t") + + if r.Method == "OPTIONS" { + w.Header().Set("Access-Control-Allow-Methods", "GET") + return + } + + sid := r.URL.Query().Get("id") + session := sessions[sid] + if session == nil { + http.NotFound(w, r) + return + } + + session.alive.Reset(keepalive) + + var i byte + for len(session.segment) == 0 { + if i++; i > 10 { + http.NotFound(w, r) + return + } + time.Sleep(time.Millisecond * 100) + } + + session.mu.Lock() + data := session.segment + // important to start new segment with init + session.segment = session.init + session.seq++ + session.mu.Unlock() + + if _, err := w.Write(data); err != nil { + log.Error().Err(err).Caller().Send() + } +} + +func handlerInit(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Add("Content-Type", "video/mp4") + + if r.Method == "OPTIONS" { + w.Header().Set("Access-Control-Allow-Methods", "GET") + return + } + + sid := r.URL.Query().Get("id") + session := sessions[sid] + if session == nil { + http.NotFound(w, r) + return + } + + if _, err := w.Write(session.init); err != nil { + log.Error().Err(err).Caller().Send() + } +} + +func handlerSegmentMP4(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Add("Content-Type", "video/iso.segment") + + if r.Method == "OPTIONS" { + w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS") + return + } + + sid := r.URL.Query().Get("id") + session := sessions[sid] + if session == nil { + http.NotFound(w, r) + return + } + + session.alive.Reset(keepalive) + + var i byte + for len(session.segment) == 0 { + if i++; i > 10 { + http.NotFound(w, r) + return + } + time.Sleep(time.Millisecond * 100) + } + + session.mu.Lock() + data := session.segment + session.segment = nil + session.seq++ + session.mu.Unlock() + + if _, err := w.Write(data); err != nil { + log.Error().Err(err).Caller().Send() + } +} diff --git a/main.go b/main.go index 63826914..63b7072e 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "github.com/AlexxIT/go2rtc/cmd/exec" "github.com/AlexxIT/go2rtc/cmd/ffmpeg" "github.com/AlexxIT/go2rtc/cmd/hass" + "github.com/AlexxIT/go2rtc/cmd/hls" "github.com/AlexxIT/go2rtc/cmd/homekit" "github.com/AlexxIT/go2rtc/cmd/http" "github.com/AlexxIT/go2rtc/cmd/ivideon" @@ -42,6 +43,7 @@ func main() { webrtc.Init() mp4.Init() + hls.Init() mjpeg.Init() http.Init() diff --git a/pkg/ts/ts.go b/pkg/ts/ts.go new file mode 100644 index 00000000..ad80e0ca --- /dev/null +++ b/pkg/ts/ts.go @@ -0,0 +1,200 @@ +package ts + +import ( + "bytes" + "encoding/hex" + "github.com/AlexxIT/go2rtc/pkg/aac" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/deepch/vdk/av" + "github.com/deepch/vdk/codec/aacparser" + "github.com/deepch/vdk/codec/h264parser" + "github.com/deepch/vdk/format/ts" + "github.com/pion/rtp" + "sync/atomic" + "time" +) + +type Consumer struct { + streamer.Element + + UserAgent string + RemoteAddr string + + buf *bytes.Buffer + muxer *ts.Muxer + mimeType string + streams []av.CodecData + start bool + init []byte + + send uint32 +} + +func (c *Consumer) GetMedias() []*streamer.Media { + return []*streamer.Media{ + { + Kind: streamer.KindVideo, + Direction: streamer.DirectionRecvonly, + Codecs: []*streamer.Codec{ + {Name: streamer.CodecH264}, + }, + }, + //{ + // Kind: streamer.KindAudio, + // Direction: streamer.DirectionRecvonly, + // Codecs: []*streamer.Codec{ + // {Name: streamer.CodecAAC}, + // }, + //}, + } +} + +func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { + codec := track.Codec + trackID := int8(len(c.streams)) + + switch codec.Name { + case streamer.CodecH264: + sps, pps := h264.GetParameterSet(codec.FmtpLine) + stream, err := h264parser.NewCodecDataFromSPSAndPPS(sps, pps) + if err != nil { + return nil + } + + if len(c.mimeType) > 0 { + c.mimeType += "," + } + + // TODO: fixme + // some devices won't play high level + if stream.RecordInfo.AVCLevelIndication <= 0x29 { + c.mimeType += "avc1." + h264.GetProfileLevelID(codec.FmtpLine) + } else { + c.mimeType += "avc1.640029" + } + + c.streams = append(c.streams, stream) + + pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond} + + ts2time := time.Second / time.Duration(codec.ClockRate) + + push := func(packet *rtp.Packet) error { + if packet.Version != h264.RTPPacketVersionAVC { + return nil + } + + if !c.start { + return nil + } + + pkt.Data = packet.Payload + newTime := time.Duration(packet.Timestamp) * ts2time + if pkt.Time > 0 { + pkt.Duration = newTime - pkt.Time + } + pkt.Time = newTime + + if err = c.muxer.WritePacket(pkt); err != nil { + return err + } + + // clone bytes from buffer, so next packet won't overwrite it + buf := append([]byte{}, c.buf.Bytes()...) + atomic.AddUint32(&c.send, uint32(len(buf))) + c.Fire(buf) + + c.buf.Reset() + + return nil + } + + if codec.IsRTP() { + wrapper := h264.RTPDepay(track) + push = wrapper(push) + } + + return track.Bind(push) + + case streamer.CodecAAC: + s := streamer.Between(codec.FmtpLine, "config=", ";") + + b, err := hex.DecodeString(s) + if err != nil { + return nil + } + + stream, err := aacparser.NewCodecDataFromMPEG4AudioConfigBytes(b) + if err != nil { + return nil + } + + if len(c.mimeType) > 0 { + c.mimeType += "," + } + + c.mimeType += "mp4a.40.2" + c.streams = append(c.streams, stream) + + pkt := av.Packet{Idx: trackID, CompositionTime: time.Millisecond} + + ts2time := time.Second / time.Duration(codec.ClockRate) + + push := func(packet *rtp.Packet) error { + if !c.start { + return nil + } + + pkt.Data = packet.Payload + newTime := time.Duration(packet.Timestamp) * ts2time + if pkt.Time > 0 { + pkt.Duration = newTime - pkt.Time + } + pkt.Time = newTime + + if err := c.muxer.WritePacket(pkt); err != nil { + return err + } + + // clone bytes from buffer, so next packet won't overwrite it + buf := append([]byte{}, c.buf.Bytes()...) + atomic.AddUint32(&c.send, uint32(len(buf))) + c.Fire(buf) + + c.buf.Reset() + + return nil + } + + if codec.IsRTP() { + wrapper := aac.RTPDepay(track) + push = wrapper(push) + } + + return track.Bind(push) + } + + panic("unsupported codec") +} + +func (c *Consumer) MimeCodecs() string { + return c.mimeType +} + +func (c *Consumer) Init() ([]byte, error) { + c.buf = bytes.NewBuffer(nil) + c.muxer = ts.NewMuxer(c.buf) + + // first packet will be with header, it's ok + if err := c.muxer.WriteHeader(c.streams); err != nil { + return nil, err + } + data := append([]byte{}, c.buf.Bytes()...) + + return data, nil +} + +func (c *Consumer) Start() { + c.start = true +}