diff --git a/internal/http/http.go b/internal/http/http.go index 9d63990a..c486d226 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -56,7 +56,7 @@ func handleHTTP(url string) (core.Producer, error) { return mjpeg.NewClient(res), nil case ct == "multipart/x-mixed-replace": - return multipart.NewClient(res) + return multipart.Open(res.Body) case ct == "application/vnd.apple.mpegurl" || ext == "m3u8": return hls.OpenURL(req.URL, res.Body) diff --git a/pkg/multipart/client.go b/pkg/multipart/client.go deleted file mode 100644 index 6367f0df..00000000 --- a/pkg/multipart/client.go +++ /dev/null @@ -1,259 +0,0 @@ -package multipart - -import ( - "bufio" - "errors" - "io" - "net/http" - "net/textproto" - "strconv" - "strings" - "time" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/h264/annexb" - "github.com/pion/rtp" -) - -type Client struct { - core.Listener - - UserAgent string - RemoteAddr string - - res *http.Response - - boundary string - reader *bufio.Reader - - medias []*core.Media - receivers []*core.Receiver - - recv int -} - -func NewClient(res *http.Response) (*Client, error) { - ct := res.Header.Get("Content-Type") - - // added in go1.18 - _, boundary, ok := strings.Cut(ct, "boundary=") - if !ok { - return nil, errors.New("multipart: wrong Content-Type: " + ct) - } - - // some cameras add prefix to boundary header: - // https://github.com/TheTimeWalker/wallpanel-android - if !strings.HasPrefix(boundary, "--") { - boundary = "--" + boundary - } - - c := &Client{ - boundary: boundary, - reader: bufio.NewReader(res.Body), - res: res, - } - - if err := c.probe(); err != nil { - return nil, err - } - - return c, nil -} - -func (c *Client) Handle() error { - if len(c.receivers) == 0 { - return errors.New("multipart: no receivers") - } - - var mjpeg, video, audio *core.Receiver - - for _, receiver := range c.receivers { - switch receiver.Codec.Name { - case core.CodecH264: - video = receiver - case core.CodecPCMU: - audio = receiver - default: - mjpeg = receiver - } - } - - for { - header, body, err := c.Next() - if err != nil { - return err - } - - c.recv += len(body) - - if mjpeg != nil { - packet := &rtp.Packet{ - Header: rtp.Header{Timestamp: core.Now90000()}, - Payload: body, - } - mjpeg.WriteRTP(packet) - continue - } - - ct := header.Get("Content-Type") - switch ct { - case MimeVideo: - if video != nil { - ts := GetTimestamp(header) - pkt := &rtp.Packet{ - Header: rtp.Header{ - Timestamp: uint32(ts * 90000), - }, - Payload: annexb.EncodeToAVCC(body, false), - } - video.WriteRTP(pkt) - } - - case MimeG711U: - if audio != nil { - ts := GetTimestamp(header) - pkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - Timestamp: uint32(ts * 8000), - }, - Payload: body, - } - audio.WriteRTP(pkt) - } - } - } -} - -func (c *Client) Next() (http.Header, []byte, error) { - for { - // search next boundary and skip empty lines - s, err := c.reader.ReadString('\n') - if err != nil { - return nil, nil, err - } - - if strings.HasPrefix(s, c.boundary) { - break - } - - if s == "\r\n" { - continue - } - - return nil, nil, errors.New("multipart: wrong boundary: " + s) - } - - tp := textproto.NewReader(c.reader) - header, err := tp.ReadMIMEHeader() - if err != nil { - return nil, nil, err - } - - s := header.Get("Content-Length") - if s == "" { - return nil, nil, errors.New("multipart: no content length") - } - - size, err := strconv.Atoi(s) - if err != nil { - return nil, nil, err - } - - buf := make([]byte, size) - if _, err = io.ReadFull(c.reader, buf); err != nil { - return nil, nil, err - } - - _, _ = c.reader.Discard(2) // skip "\r\n" - - return http.Header(header), buf, nil -} - -const ( - MimeVideo = "video/x-h264" - MimeG711U = "audio/g711u" -) - -func (c *Client) probe() error { - waitVideo := true - waitAudio := true - - for waitVideo || waitAudio { - header, _, err := c.Next() - if err != nil { - return err - } - - var media *core.Media - - ct := header.Get("Content-Type") - switch ct { - case MimeVideo: - if !waitVideo { - return nil - } - - media = &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: core.CodecH264, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - }, - }, - } - waitVideo = false - - case MimeG711U: - if !waitAudio { - return nil - } - - media = &core.Media{ - Kind: core.KindAudio, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: core.CodecPCMU, - ClockRate: 8000, - }, - }, - } - waitAudio = false - - default: - media = &core.Media{ - Kind: core.KindVideo, - Direction: core.DirectionRecvonly, - Codecs: []*core.Codec{ - { - Name: core.CodecJPEG, - ClockRate: 90000, - PayloadType: core.PayloadTypeRAW, - }, - }, - } - waitVideo = false - waitAudio = false - } - - c.medias = append(c.medias, media) - } - - return nil -} - -// GetTimestamp - return timestamp in seconds -func GetTimestamp(header http.Header) float64 { - if s := header.Get("X-Timestamp"); s != "" { - if f, _ := strconv.ParseFloat(s, 32); f != 0 { - return f - } - } - - return float64(time.Duration(time.Now().UnixNano()) / time.Second) -} diff --git a/pkg/multipart/producer.go b/pkg/multipart/producer.go index 3b45dc32..6766a885 100644 --- a/pkg/multipart/producer.go +++ b/pkg/multipart/producer.go @@ -1,48 +1,248 @@ package multipart import ( - "encoding/json" + "bufio" + "errors" + "io" + "net/http" + "net/textproto" + "strconv" + "strings" + "time" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264/annexb" + "github.com/pion/rtp" ) -func (c *Client) GetMedias() []*core.Media { - return c.medias +type Producer struct { + core.SuperProducer + rd *core.ReadBuffer + + boundary string + reader *bufio.Reader } -func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - for _, track := range c.receivers { - if track.Codec == codec { - return track, nil +func Open(rd io.Reader) (*Producer, error) { + prod := &Producer{rd: core.NewReadBuffer(rd)} + if err := prod.probe(); err != nil { + return nil, err + } + prod.Type = "Multipart producer" + return prod, nil +} + +func (c *Producer) Start() error { + if len(c.Receivers) == 0 { + return errors.New("multipart: no receivers") + } + + var mjpeg, video, audio *core.Receiver + + for _, receiver := range c.Receivers { + switch receiver.Codec.Name { + case core.CodecH264: + video = receiver + case core.CodecPCMU: + audio = receiver + default: + mjpeg = receiver } } - track := core.NewReceiver(media, codec) - c.receivers = append(c.receivers, track) - return track, nil -} -func (c *Client) Start() error { - return c.Handle() -} + for { + header, body, err := c.next() + if err != nil { + return err + } -func (c *Client) Stop() error { - for _, receiver := range c.receivers { - receiver.Close() + c.Recv += len(body) + + if mjpeg != nil { + packet := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: body, + } + mjpeg.WriteRTP(packet) + continue + } + + ct := header.Get("Content-Type") + switch ct { + case MimeVideo: + if video != nil { + ts := GetTimestamp(header) + pkt := &rtp.Packet{ + Header: rtp.Header{ + Timestamp: uint32(ts * 90000), + }, + Payload: annexb.EncodeToAVCC(body, false), + } + video.WriteRTP(pkt) + } + + case MimeG711U: + if audio != nil { + ts := GetTimestamp(header) + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + Timestamp: uint32(ts * 8000), + }, + Payload: body, + } + audio.WriteRTP(pkt) + } + } } - // important for close reader/writer gorutines - _ = c.res.Body.Close() +} + +func (c *Producer) Stop() error { + _ = c.SuperProducer.Close() + return c.rd.Close() +} + +func (c *Producer) next() (http.Header, []byte, error) { + for { + // search next boundary and skip empty lines + s, err := c.reader.ReadString('\n') + if err != nil { + return nil, nil, err + } + + // auto guess boundary + if c.boundary == "" && strings.HasPrefix(s, "--") { + c.boundary = s + break + } else if strings.HasPrefix(s, c.boundary) { + break + } + + if s == "\r\n" { + continue + } + + return nil, nil, errors.New("multipart: wrong boundary: " + s) + } + + tp := textproto.NewReader(c.reader) + header, err := tp.ReadMIMEHeader() + if err != nil { + return nil, nil, err + } + + s := header.Get("Content-Length") + if s == "" { + return nil, nil, errors.New("multipart: no content length") + } + + size, err := strconv.Atoi(s) + if err != nil { + return nil, nil, err + } + + buf := make([]byte, size) + if _, err = io.ReadFull(c.reader, buf); err != nil { + return nil, nil, err + } + + _, _ = c.reader.Discard(2) // skip "\r\n" + + return http.Header(header), buf, nil +} + +const ( + MimeVideo = "video/x-h264" + MimeG711U = "audio/g711u" +) + +func (c *Producer) probe() error { + c.rd.BufferSize = core.ProbeSize + c.reader = bufio.NewReader(c.rd) + + defer func() { + c.rd.Reset() + c.reader = bufio.NewReader(c.rd) + }() + + waitVideo := true + waitAudio := true + + for waitVideo || waitAudio { + header, _, err := c.next() + if err != nil { + return err + } + + var media *core.Media + + ct := header.Get("Content-Type") + switch ct { + case MimeVideo: + if !waitVideo { + return nil + } + + media = &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecH264, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + }, + }, + } + waitVideo = false + + case MimeG711U: + if !waitAudio { + return nil + } + + media = &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecPCMU, + ClockRate: 8000, + }, + }, + } + waitAudio = false + + default: + media = &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecJPEG, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + }, + }, + } + waitVideo = false + waitAudio = false + } + + c.Medias = append(c.Medias, media) + } + return nil } -func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Info{ - Type: "HTTP/mixed active producer", - URL: c.res.Request.URL.String(), - RemoteAddr: c.RemoteAddr, - UserAgent: c.UserAgent, - Medias: c.medias, - Receivers: c.receivers, - Recv: c.recv, +// GetTimestamp - return timestamp in seconds +func GetTimestamp(header http.Header) float64 { + if s := header.Get("X-Timestamp"); s != "" { + if f, _ := strconv.ParseFloat(s, 32); f != 0 { + return f + } } - return json.Marshal(info) + + return float64(time.Duration(time.Now().UnixNano()) / time.Second) }