Rewrite multipart source
This commit is contained in:
@@ -56,7 +56,7 @@ func handleHTTP(url string) (core.Producer, error) {
|
||||
return mjpeg.NewClient(res), nil
|
||||
|
||||
case ct == "multipart/x-mixed-replace":
|
||||
return multipart.NewClient(res)
|
||||
return multipart.Open(res.Body)
|
||||
|
||||
case ct == "application/vnd.apple.mpegurl" || ext == "m3u8":
|
||||
return hls.OpenURL(req.URL, res.Body)
|
||||
|
||||
@@ -1,259 +0,0 @@
|
||||
package multipart
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
core.Listener
|
||||
|
||||
UserAgent string
|
||||
RemoteAddr string
|
||||
|
||||
res *http.Response
|
||||
|
||||
boundary string
|
||||
reader *bufio.Reader
|
||||
|
||||
medias []*core.Media
|
||||
receivers []*core.Receiver
|
||||
|
||||
recv int
|
||||
}
|
||||
|
||||
func NewClient(res *http.Response) (*Client, error) {
|
||||
ct := res.Header.Get("Content-Type")
|
||||
|
||||
// added in go1.18
|
||||
_, boundary, ok := strings.Cut(ct, "boundary=")
|
||||
if !ok {
|
||||
return nil, errors.New("multipart: wrong Content-Type: " + ct)
|
||||
}
|
||||
|
||||
// some cameras add prefix to boundary header:
|
||||
// https://github.com/TheTimeWalker/wallpanel-android
|
||||
if !strings.HasPrefix(boundary, "--") {
|
||||
boundary = "--" + boundary
|
||||
}
|
||||
|
||||
c := &Client{
|
||||
boundary: boundary,
|
||||
reader: bufio.NewReader(res.Body),
|
||||
res: res,
|
||||
}
|
||||
|
||||
if err := c.probe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Client) Handle() error {
|
||||
if len(c.receivers) == 0 {
|
||||
return errors.New("multipart: no receivers")
|
||||
}
|
||||
|
||||
var mjpeg, video, audio *core.Receiver
|
||||
|
||||
for _, receiver := range c.receivers {
|
||||
switch receiver.Codec.Name {
|
||||
case core.CodecH264:
|
||||
video = receiver
|
||||
case core.CodecPCMU:
|
||||
audio = receiver
|
||||
default:
|
||||
mjpeg = receiver
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
header, body, err := c.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.recv += len(body)
|
||||
|
||||
if mjpeg != nil {
|
||||
packet := &rtp.Packet{
|
||||
Header: rtp.Header{Timestamp: core.Now90000()},
|
||||
Payload: body,
|
||||
}
|
||||
mjpeg.WriteRTP(packet)
|
||||
continue
|
||||
}
|
||||
|
||||
ct := header.Get("Content-Type")
|
||||
switch ct {
|
||||
case MimeVideo:
|
||||
if video != nil {
|
||||
ts := GetTimestamp(header)
|
||||
pkt := &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Timestamp: uint32(ts * 90000),
|
||||
},
|
||||
Payload: annexb.EncodeToAVCC(body, false),
|
||||
}
|
||||
video.WriteRTP(pkt)
|
||||
}
|
||||
|
||||
case MimeG711U:
|
||||
if audio != nil {
|
||||
ts := GetTimestamp(header)
|
||||
pkt := &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
Timestamp: uint32(ts * 8000),
|
||||
},
|
||||
Payload: body,
|
||||
}
|
||||
audio.WriteRTP(pkt)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Next() (http.Header, []byte, error) {
|
||||
for {
|
||||
// search next boundary and skip empty lines
|
||||
s, err := c.reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if strings.HasPrefix(s, c.boundary) {
|
||||
break
|
||||
}
|
||||
|
||||
if s == "\r\n" {
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, nil, errors.New("multipart: wrong boundary: " + s)
|
||||
}
|
||||
|
||||
tp := textproto.NewReader(c.reader)
|
||||
header, err := tp.ReadMIMEHeader()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
s := header.Get("Content-Length")
|
||||
if s == "" {
|
||||
return nil, nil, errors.New("multipart: no content length")
|
||||
}
|
||||
|
||||
size, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
buf := make([]byte, size)
|
||||
if _, err = io.ReadFull(c.reader, buf); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
_, _ = c.reader.Discard(2) // skip "\r\n"
|
||||
|
||||
return http.Header(header), buf, nil
|
||||
}
|
||||
|
||||
const (
|
||||
MimeVideo = "video/x-h264"
|
||||
MimeG711U = "audio/g711u"
|
||||
)
|
||||
|
||||
func (c *Client) probe() error {
|
||||
waitVideo := true
|
||||
waitAudio := true
|
||||
|
||||
for waitVideo || waitAudio {
|
||||
header, _, err := c.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var media *core.Media
|
||||
|
||||
ct := header.Get("Content-Type")
|
||||
switch ct {
|
||||
case MimeVideo:
|
||||
if !waitVideo {
|
||||
return nil
|
||||
}
|
||||
|
||||
media = &core.Media{
|
||||
Kind: core.KindVideo,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{
|
||||
{
|
||||
Name: core.CodecH264,
|
||||
ClockRate: 90000,
|
||||
PayloadType: core.PayloadTypeRAW,
|
||||
},
|
||||
},
|
||||
}
|
||||
waitVideo = false
|
||||
|
||||
case MimeG711U:
|
||||
if !waitAudio {
|
||||
return nil
|
||||
}
|
||||
|
||||
media = &core.Media{
|
||||
Kind: core.KindAudio,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{
|
||||
{
|
||||
Name: core.CodecPCMU,
|
||||
ClockRate: 8000,
|
||||
},
|
||||
},
|
||||
}
|
||||
waitAudio = false
|
||||
|
||||
default:
|
||||
media = &core.Media{
|
||||
Kind: core.KindVideo,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{
|
||||
{
|
||||
Name: core.CodecJPEG,
|
||||
ClockRate: 90000,
|
||||
PayloadType: core.PayloadTypeRAW,
|
||||
},
|
||||
},
|
||||
}
|
||||
waitVideo = false
|
||||
waitAudio = false
|
||||
}
|
||||
|
||||
c.medias = append(c.medias, media)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTimestamp - return timestamp in seconds
|
||||
func GetTimestamp(header http.Header) float64 {
|
||||
if s := header.Get("X-Timestamp"); s != "" {
|
||||
if f, _ := strconv.ParseFloat(s, 32); f != 0 {
|
||||
return f
|
||||
}
|
||||
}
|
||||
|
||||
return float64(time.Duration(time.Now().UnixNano()) / time.Second)
|
||||
}
|
||||
+229
-29
@@ -1,48 +1,248 @@
|
||||
package multipart
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"bufio"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/AlexxIT/go2rtc/pkg/core"
|
||||
"github.com/AlexxIT/go2rtc/pkg/h264/annexb"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
func (c *Client) GetMedias() []*core.Media {
|
||||
return c.medias
|
||||
type Producer struct {
|
||||
core.SuperProducer
|
||||
rd *core.ReadBuffer
|
||||
|
||||
boundary string
|
||||
reader *bufio.Reader
|
||||
}
|
||||
|
||||
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
|
||||
for _, track := range c.receivers {
|
||||
if track.Codec == codec {
|
||||
return track, nil
|
||||
func Open(rd io.Reader) (*Producer, error) {
|
||||
prod := &Producer{rd: core.NewReadBuffer(rd)}
|
||||
if err := prod.probe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
prod.Type = "Multipart producer"
|
||||
return prod, nil
|
||||
}
|
||||
|
||||
func (c *Producer) Start() error {
|
||||
if len(c.Receivers) == 0 {
|
||||
return errors.New("multipart: no receivers")
|
||||
}
|
||||
|
||||
var mjpeg, video, audio *core.Receiver
|
||||
|
||||
for _, receiver := range c.Receivers {
|
||||
switch receiver.Codec.Name {
|
||||
case core.CodecH264:
|
||||
video = receiver
|
||||
case core.CodecPCMU:
|
||||
audio = receiver
|
||||
default:
|
||||
mjpeg = receiver
|
||||
}
|
||||
}
|
||||
track := core.NewReceiver(media, codec)
|
||||
c.receivers = append(c.receivers, track)
|
||||
return track, nil
|
||||
}
|
||||
|
||||
func (c *Client) Start() error {
|
||||
return c.Handle()
|
||||
}
|
||||
for {
|
||||
header, body, err := c.next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Client) Stop() error {
|
||||
for _, receiver := range c.receivers {
|
||||
receiver.Close()
|
||||
c.Recv += len(body)
|
||||
|
||||
if mjpeg != nil {
|
||||
packet := &rtp.Packet{
|
||||
Header: rtp.Header{Timestamp: core.Now90000()},
|
||||
Payload: body,
|
||||
}
|
||||
mjpeg.WriteRTP(packet)
|
||||
continue
|
||||
}
|
||||
|
||||
ct := header.Get("Content-Type")
|
||||
switch ct {
|
||||
case MimeVideo:
|
||||
if video != nil {
|
||||
ts := GetTimestamp(header)
|
||||
pkt := &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Timestamp: uint32(ts * 90000),
|
||||
},
|
||||
Payload: annexb.EncodeToAVCC(body, false),
|
||||
}
|
||||
video.WriteRTP(pkt)
|
||||
}
|
||||
|
||||
case MimeG711U:
|
||||
if audio != nil {
|
||||
ts := GetTimestamp(header)
|
||||
pkt := &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
Timestamp: uint32(ts * 8000),
|
||||
},
|
||||
Payload: body,
|
||||
}
|
||||
audio.WriteRTP(pkt)
|
||||
}
|
||||
}
|
||||
}
|
||||
// important for close reader/writer gorutines
|
||||
_ = c.res.Body.Close()
|
||||
}
|
||||
|
||||
func (c *Producer) Stop() error {
|
||||
_ = c.SuperProducer.Close()
|
||||
return c.rd.Close()
|
||||
}
|
||||
|
||||
func (c *Producer) next() (http.Header, []byte, error) {
|
||||
for {
|
||||
// search next boundary and skip empty lines
|
||||
s, err := c.reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// auto guess boundary
|
||||
if c.boundary == "" && strings.HasPrefix(s, "--") {
|
||||
c.boundary = s
|
||||
break
|
||||
} else if strings.HasPrefix(s, c.boundary) {
|
||||
break
|
||||
}
|
||||
|
||||
if s == "\r\n" {
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, nil, errors.New("multipart: wrong boundary: " + s)
|
||||
}
|
||||
|
||||
tp := textproto.NewReader(c.reader)
|
||||
header, err := tp.ReadMIMEHeader()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
s := header.Get("Content-Length")
|
||||
if s == "" {
|
||||
return nil, nil, errors.New("multipart: no content length")
|
||||
}
|
||||
|
||||
size, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
buf := make([]byte, size)
|
||||
if _, err = io.ReadFull(c.reader, buf); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
_, _ = c.reader.Discard(2) // skip "\r\n"
|
||||
|
||||
return http.Header(header), buf, nil
|
||||
}
|
||||
|
||||
const (
|
||||
MimeVideo = "video/x-h264"
|
||||
MimeG711U = "audio/g711u"
|
||||
)
|
||||
|
||||
func (c *Producer) probe() error {
|
||||
c.rd.BufferSize = core.ProbeSize
|
||||
c.reader = bufio.NewReader(c.rd)
|
||||
|
||||
defer func() {
|
||||
c.rd.Reset()
|
||||
c.reader = bufio.NewReader(c.rd)
|
||||
}()
|
||||
|
||||
waitVideo := true
|
||||
waitAudio := true
|
||||
|
||||
for waitVideo || waitAudio {
|
||||
header, _, err := c.next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var media *core.Media
|
||||
|
||||
ct := header.Get("Content-Type")
|
||||
switch ct {
|
||||
case MimeVideo:
|
||||
if !waitVideo {
|
||||
return nil
|
||||
}
|
||||
|
||||
media = &core.Media{
|
||||
Kind: core.KindVideo,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{
|
||||
{
|
||||
Name: core.CodecH264,
|
||||
ClockRate: 90000,
|
||||
PayloadType: core.PayloadTypeRAW,
|
||||
},
|
||||
},
|
||||
}
|
||||
waitVideo = false
|
||||
|
||||
case MimeG711U:
|
||||
if !waitAudio {
|
||||
return nil
|
||||
}
|
||||
|
||||
media = &core.Media{
|
||||
Kind: core.KindAudio,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{
|
||||
{
|
||||
Name: core.CodecPCMU,
|
||||
ClockRate: 8000,
|
||||
},
|
||||
},
|
||||
}
|
||||
waitAudio = false
|
||||
|
||||
default:
|
||||
media = &core.Media{
|
||||
Kind: core.KindVideo,
|
||||
Direction: core.DirectionRecvonly,
|
||||
Codecs: []*core.Codec{
|
||||
{
|
||||
Name: core.CodecJPEG,
|
||||
ClockRate: 90000,
|
||||
PayloadType: core.PayloadTypeRAW,
|
||||
},
|
||||
},
|
||||
}
|
||||
waitVideo = false
|
||||
waitAudio = false
|
||||
}
|
||||
|
||||
c.Medias = append(c.Medias, media)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) MarshalJSON() ([]byte, error) {
|
||||
info := &core.Info{
|
||||
Type: "HTTP/mixed active producer",
|
||||
URL: c.res.Request.URL.String(),
|
||||
RemoteAddr: c.RemoteAddr,
|
||||
UserAgent: c.UserAgent,
|
||||
Medias: c.medias,
|
||||
Receivers: c.receivers,
|
||||
Recv: c.recv,
|
||||
// GetTimestamp - return timestamp in seconds
|
||||
func GetTimestamp(header http.Header) float64 {
|
||||
if s := header.Get("X-Timestamp"); s != "" {
|
||||
if f, _ := strconv.ParseFloat(s, 32); f != 0 {
|
||||
return f
|
||||
}
|
||||
}
|
||||
return json.Marshal(info)
|
||||
|
||||
return float64(time.Duration(time.Now().UnixNano()) / time.Second)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user