From 8778d7c9ab4f88486e862c0253657fa75cf47f62 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Wed, 2 Aug 2023 17:57:33 +0400 Subject: [PATCH] Add support http/mixed video/audio #545 --- internal/http/http.go | 19 +-- pkg/mjpeg/client.go | 95 +++----------- pkg/mjpeg/producer.go | 17 +-- pkg/multipart/client.go | 258 ++++++++++++++++++++++++++++++++++++++ pkg/multipart/producer.go | 48 +++++++ 5 files changed, 334 insertions(+), 103 deletions(-) create mode 100644 pkg/multipart/client.go create mode 100644 pkg/multipart/producer.go diff --git a/internal/http/http.go b/internal/http/http.go index 6c158196..e83597f7 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -2,17 +2,19 @@ package http import ( "errors" - "github.com/AlexxIT/go2rtc/internal/streams" - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/magic" - "github.com/AlexxIT/go2rtc/pkg/mjpeg" - "github.com/AlexxIT/go2rtc/pkg/rtmp" - "github.com/AlexxIT/go2rtc/pkg/tcp" "net" "net/http" "net/url" "strings" "time" + + "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/magic" + "github.com/AlexxIT/go2rtc/pkg/mjpeg" + "github.com/AlexxIT/go2rtc/pkg/multipart" + "github.com/AlexxIT/go2rtc/pkg/rtmp" + "github.com/AlexxIT/go2rtc/pkg/tcp" ) func Init() { @@ -45,9 +47,12 @@ func handleHTTP(url string) (core.Producer, error) { } switch ct { - case "image/jpeg", "multipart/x-mixed-replace": + case "image/jpeg": return mjpeg.NewClient(res), nil + case "multipart/x-mixed-replace": + return multipart.NewClient(res) + case "video/x-flv": var conn *rtmp.Client if conn, err = rtmp.Accept(res); err != nil { diff --git a/pkg/mjpeg/client.go b/pkg/mjpeg/client.go index a06d2c44..f16c42cd 100644 --- a/pkg/mjpeg/client.go +++ b/pkg/mjpeg/client.go @@ -1,14 +1,9 @@ package mjpeg import ( - "bufio" "errors" "io" "net/http" - "net/textproto" - "strconv" - "strings" - "time" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/tcp" @@ -34,16 +29,19 @@ func NewClient(res *http.Response) *Client { return &Client{res: res} } -func (c *Client) startJPEG() error { - buf, err := io.ReadAll(c.res.Body) +func (c *Client) Handle() error { + body, err := io.ReadAll(c.res.Body) if err != nil { return err } - packet := &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf} - c.receiver.WriteRTP(packet) + pkt := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: body, + } + c.receiver.WriteRTP(pkt) - c.recv += len(buf) + c.recv += len(body) req := c.res.Request @@ -57,86 +55,21 @@ func (c *Client) startJPEG() error { return errors.New("wrong status: " + res.Status) } - buf, err = io.ReadAll(res.Body) + body, err = io.ReadAll(res.Body) if err != nil { return err } + c.recv += len(body) + if c.receiver != nil { - packet = &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf} - c.receiver.WriteRTP(packet) - } - - c.recv += len(buf) - } - - return nil -} - -func (c *Client) startMJPEG(boundary string) error { - // some cameras add prefix to boundary header: - // https://github.com/TheTimeWalker/wallpanel-android - if !strings.HasPrefix(boundary, "--") { - boundary = "--" + boundary - } - - r := bufio.NewReader(c.res.Body) - tp := textproto.NewReader(r) - - for !c.closed { - s, err := tp.ReadLine() - if err != nil { - return err - } - - // fix leading empty line from esp32-cam-webserver - // https://github.com/AlexxIT/go2rtc/issues/545 - if s == "" { - continue - } - - if !strings.HasPrefix(s, boundary) { - return errors.New("wrong boundary: " + s) - } - - header, err := tp.ReadMIMEHeader() - if err != nil { - return err - } - - s = header.Get("Content-Length") - if s == "" { - return errors.New("no content length") - } - - size, err := strconv.Atoi(s) - if err != nil { - return err - } - - buf := make([]byte, size) - if _, err = io.ReadFull(r, buf); err != nil { - return err - } - - if c.receiver != nil { - packet := &rtp.Packet{ + pkt = &rtp.Packet{ Header: rtp.Header{Timestamp: core.Now90000()}, - Payload: buf, + Payload: body, } - c.receiver.WriteRTP(packet) - } - - c.recv += len(buf) - - if _, err = r.Discard(2); err != nil { - return err + c.receiver.WriteRTP(pkt) } } return nil } - -func now() uint32 { - return uint32(time.Now().UnixMilli() * 90) -} diff --git a/pkg/mjpeg/producer.go b/pkg/mjpeg/producer.go index 69d67faf..5b352252 100644 --- a/pkg/mjpeg/producer.go +++ b/pkg/mjpeg/producer.go @@ -2,8 +2,6 @@ package mjpeg import ( "encoding/json" - "errors" - "strings" "github.com/AlexxIT/go2rtc/pkg/core" ) @@ -33,19 +31,8 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) Start() error { - ct := c.res.Header.Get("Content-Type") - // https://github.com/AlexxIT/go2rtc/issues/278 - if strings.HasPrefix(ct, "image/jpeg") { - return c.startJPEG() - } - - // added in go1.18 - if _, s, ok := strings.Cut(ct, "boundary="); ok { - return c.startMJPEG(s) - } - - return errors.New("wrong Content-Type: " + ct) + return c.Handle() } func (c *Client) Stop() error { @@ -60,7 +47,7 @@ func (c *Client) Stop() error { func (c *Client) MarshalJSON() ([]byte, error) { info := &core.Info{ - Type: "MJPEG active producer", + Type: "JPEG active producer", URL: c.res.Request.URL.String(), RemoteAddr: c.RemoteAddr, UserAgent: c.UserAgent, diff --git a/pkg/multipart/client.go b/pkg/multipart/client.go new file mode 100644 index 00000000..b95c225e --- /dev/null +++ b/pkg/multipart/client.go @@ -0,0 +1,258 @@ +package multipart + +import ( + "bufio" + "errors" + "io" + "net/http" + "net/textproto" + "strconv" + "strings" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/pion/rtp" +) + +type Client struct { + core.Listener + + UserAgent string + RemoteAddr string + + res *http.Response + + boundary string + reader *bufio.Reader + + medias []*core.Media + receivers []*core.Receiver + + recv int +} + +func NewClient(res *http.Response) (*Client, error) { + ct := res.Header.Get("Content-Type") + + // added in go1.18 + _, boundary, ok := strings.Cut(ct, "boundary=") + if !ok { + return nil, errors.New("multipart: wrong Content-Type: " + ct) + } + + // some cameras add prefix to boundary header: + // https://github.com/TheTimeWalker/wallpanel-android + if !strings.HasPrefix(boundary, "--") { + boundary = "--" + boundary + } + + c := &Client{ + boundary: boundary, + reader: bufio.NewReader(res.Body), + } + + if err := c.probe(); err != nil { + return nil, err + } + + return c, nil +} + +func (c *Client) Handle() error { + if len(c.receivers) == 0 { + return errors.New("multipart: no receivers") + } + + var mjpeg, video, audio *core.Receiver + + for _, receiver := range c.receivers { + switch receiver.Codec.Name { + case core.CodecH264: + video = receiver + case core.CodecPCMU: + audio = receiver + default: + mjpeg = receiver + } + } + + for { + header, body, err := c.Next() + if err != nil { + return err + } + + c.recv += len(body) + + if mjpeg != nil { + packet := &rtp.Packet{ + Header: rtp.Header{Timestamp: core.Now90000()}, + Payload: body, + } + mjpeg.WriteRTP(packet) + continue + } + + ct := header.Get("Content-Type") + switch ct { + case MimeVideo: + if video != nil { + ts := GetTimestamp(header) + pkt := &rtp.Packet{ + Header: rtp.Header{ + Timestamp: uint32(ts * 90000), + }, + Payload: h264.AnnexB2AVC(body), + } + video.WriteRTP(pkt) + } + + case MimeG711U: + if audio != nil { + ts := GetTimestamp(header) + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + Timestamp: uint32(ts * 8000), + }, + Payload: body, + } + audio.WriteRTP(pkt) + } + } + } +} + +func (c *Client) Next() (http.Header, []byte, error) { + for { + // search next boundary and skip empty lines + s, err := c.reader.ReadString('\n') + if err != nil { + return nil, nil, err + } + + if strings.HasPrefix(s, c.boundary) { + break + } + + if s == "\r\n" { + continue + } + + return nil, nil, errors.New("multipart: wrong boundary: " + s) + } + + tp := textproto.NewReader(c.reader) + header, err := tp.ReadMIMEHeader() + if err != nil { + return nil, nil, err + } + + s := header.Get("Content-Length") + if s == "" { + return nil, nil, errors.New("multipart: no content length") + } + + size, err := strconv.Atoi(s) + if err != nil { + return nil, nil, err + } + + buf := make([]byte, size) + if _, err = io.ReadFull(c.reader, buf); err != nil { + return nil, nil, err + } + + _, _ = c.reader.Discard(2) // skip "\r\n" + + return http.Header(header), buf, nil +} + +const ( + MimeVideo = "video/x-h264" + MimeG711U = "audio/g711u" +) + +func (c *Client) probe() error { + waitVideo := true + waitAudio := true + + for waitVideo || waitAudio { + header, _, err := c.Next() + if err != nil { + return err + } + + var media *core.Media + + ct := header.Get("Content-Type") + switch ct { + case MimeVideo: + if !waitVideo { + return nil + } + + media = &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecH264, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + }, + }, + } + waitVideo = false + + case MimeG711U: + if !waitAudio { + return nil + } + + media = &core.Media{ + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecPCMU, + ClockRate: 8000, + }, + }, + } + waitAudio = false + + default: + media = &core.Media{ + Kind: core.KindVideo, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + { + Name: core.CodecJPEG, + ClockRate: 90000, + PayloadType: core.PayloadTypeRAW, + }, + }, + } + waitVideo = false + waitAudio = false + } + + c.medias = append(c.medias, media) + } + + return nil +} + +// GetTimestamp - return timestamp in seconds +func GetTimestamp(header http.Header) float64 { + if s := header.Get("X-Timestamp"); s != "" { + if f, _ := strconv.ParseFloat(s, 32); f != 0 { + return f + } + } + + return float64(time.Duration(time.Now().UnixNano()) / time.Second) +} diff --git a/pkg/multipart/producer.go b/pkg/multipart/producer.go new file mode 100644 index 00000000..3b45dc32 --- /dev/null +++ b/pkg/multipart/producer.go @@ -0,0 +1,48 @@ +package multipart + +import ( + "encoding/json" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func (c *Client) GetMedias() []*core.Media { + return c.medias +} + +func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + for _, track := range c.receivers { + if track.Codec == codec { + return track, nil + } + } + track := core.NewReceiver(media, codec) + c.receivers = append(c.receivers, track) + return track, nil +} + +func (c *Client) Start() error { + return c.Handle() +} + +func (c *Client) Stop() error { + for _, receiver := range c.receivers { + receiver.Close() + } + // important for close reader/writer gorutines + _ = c.res.Body.Close() + return nil +} + +func (c *Client) MarshalJSON() ([]byte, error) { + info := &core.Info{ + Type: "HTTP/mixed active producer", + URL: c.res.Request.URL.String(), + RemoteAddr: c.RemoteAddr, + UserAgent: c.UserAgent, + Medias: c.medias, + Receivers: c.receivers, + Recv: c.recv, + } + return json.Marshal(info) +}