diff --git a/cmd/http/http.go b/cmd/http/http.go new file mode 100644 index 00000000..11b9d417 --- /dev/null +++ b/cmd/http/http.go @@ -0,0 +1,56 @@ +package http + +import ( + "errors" + "fmt" + "github.com/AlexxIT/go2rtc/cmd/streams" + "github.com/AlexxIT/go2rtc/pkg/mjpeg" + "github.com/AlexxIT/go2rtc/pkg/rtmp" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/AlexxIT/go2rtc/pkg/tcp" + "net/http" + "strings" +) + +func Init() { + streams.HandleFunc("http", handle) + streams.HandleFunc("https", handle) +} + +func handle(url string) (streamer.Producer, error) { + // first we get the Content-Type to define supported producer + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + res, err := tcp.Do(req) + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, errors.New(res.Status) + } + + ct := res.Header.Get("Content-Type") + if i := strings.IndexByte(ct, ';'); i > 0 { + ct = ct[:i] + } + + switch ct { + case "image/jpeg", "multipart/x-mixed-replace": + return mjpeg.NewClient(res), nil + case "video/x-flv": + var conn *rtmp.Client + if conn, err = rtmp.Accept(res); err != nil { + return nil, err + } + if err = conn.Describe(); err != nil { + return nil, err + } + return conn, nil + } + + return nil, fmt.Errorf("unsupported Content-Type: %s", ct) +} diff --git a/cmd/mjpeg/mjpeg.go b/cmd/mjpeg/mjpeg.go index cf3c96f0..54cbb31d 100644 --- a/cmd/mjpeg/mjpeg.go +++ b/cmd/mjpeg/mjpeg.go @@ -40,8 +40,12 @@ func handlerKeyframe(w http.ResponseWriter, r *http.Request) { stream.RemoveConsumer(cons) - w.Header().Set("Content-Type", "image/jpeg") - w.Header().Set("Content-Length", strconv.Itoa(len(data))) + h := w.Header() + h.Set("Content-Type", "image/jpeg") + h.Set("Content-Length", strconv.Itoa(len(data))) + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "close") + h.Set("Pragma", "no-cache") if _, err := w.Write(data); err != nil { log.Error().Err(err).Caller().Send() @@ -57,20 +61,21 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { return } - exit := make(chan struct{}) + flusher := w.(http.Flusher) cons := &mjpeg.Consumer{} cons.Listen(func(msg interface{}) { switch msg := msg.(type) { case []byte: data := []byte(header + strconv.Itoa(len(msg))) - data = append(data, 0x0D, 0x0A, 0x0D, 0x0A) + data = append(data, '\r', '\n', '\r', '\n') data = append(data, msg...) - data = append(data, 0x0D, 0x0A) + data = append(data, '\r', '\n') - if _, err := w.Write(data); err != nil { - exit <- struct{}{} - } + // Chrome bug: mjpeg image always shows the second to last image + // https://bugs.chromium.org/p/chromium/issues/detail?id=527446 + _, _ = w.Write(data) + flusher.Flush() } }) @@ -79,9 +84,13 @@ func handlerStream(w http.ResponseWriter, r *http.Request) { return } - w.Header().Set("Content-Type", `multipart/x-mixed-replace; boundary=frame`) + h := w.Header() + h.Set("Content-Type", "multipart/x-mixed-replace; boundary=frame") + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "close") + h.Set("Pragma", "no-cache") - <-exit + <-r.Context().Done() stream.RemoveConsumer(cons) diff --git a/cmd/rtmp/rtmp.go b/cmd/rtmp/rtmp.go index b413f175..38c3b240 100644 --- a/cmd/rtmp/rtmp.go +++ b/cmd/rtmp/rtmp.go @@ -8,8 +8,6 @@ import ( func Init() { streams.HandleFunc("rtmp", handle) - streams.HandleFunc("http", handle) - streams.HandleFunc("https", handle) } func handle(url string) (streamer.Producer, error) { @@ -17,5 +15,8 @@ func handle(url string) (streamer.Producer, error) { if err := conn.Dial(); err != nil { return nil, err } + if err := conn.Describe(); err != nil { + return nil, err + } return conn, nil } diff --git a/main.go b/main.go index 0ca26111..62ff105d 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "github.com/AlexxIT/go2rtc/cmd/ffmpeg" "github.com/AlexxIT/go2rtc/cmd/hass" "github.com/AlexxIT/go2rtc/cmd/homekit" + "github.com/AlexxIT/go2rtc/cmd/http" "github.com/AlexxIT/go2rtc/cmd/ivideon" "github.com/AlexxIT/go2rtc/cmd/mjpeg" "github.com/AlexxIT/go2rtc/cmd/mp4" @@ -40,6 +41,7 @@ func main() { webrtc.Init() mp4.Init() mjpeg.Init() + http.Init() srtp.Init() homekit.Init() diff --git a/pkg/httpflv/httpflv.go b/pkg/httpflv/httpflv.go index 823f7b26..54034df5 100644 --- a/pkg/httpflv/httpflv.go +++ b/pkg/httpflv/httpflv.go @@ -22,13 +22,17 @@ func Dial(uri string) (*Conn, error) { return nil, err } + return Accept(res) +} + +func Accept(res *http.Response) (*Conn, error) { c := Conn{ conn: res.Body, reader: bufio.NewReaderSize(res.Body, pio.RecommendBufioSize), buf: make([]byte, 256), } - if _, err = io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil { + if _, err := io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil { return nil, err } @@ -49,9 +53,9 @@ func Dial(uri string) (*Conn, error) { } type Conn struct { - conn io.ReadCloser - reader *bufio.Reader - buf []byte + conn io.ReadCloser + reader *bufio.Reader + buf []byte } func (c *Conn) Streams() ([]av.CodecData, error) { diff --git a/pkg/ivideon/client.go b/pkg/ivideon/client.go index a49bf274..d89cacc6 100644 --- a/pkg/ivideon/client.go +++ b/pkg/ivideon/client.go @@ -165,7 +165,7 @@ func (c *Client) getTracks() error { Name: streamer.CodecH264, ClockRate: 90000, FmtpLine: "profile-level-id=" + msg.CodecString[i+1:], - PayloadType: streamer.PayloadTypeMP4, + PayloadType: streamer.PayloadTypeRAW, } i = bytes.Index(msg.Data, []byte("avcC")) - 4 diff --git a/pkg/mjpeg/README.md b/pkg/mjpeg/README.md index f23c88c2..fa8c4e1c 100644 --- a/pkg/mjpeg/README.md +++ b/pkg/mjpeg/README.md @@ -2,3 +2,4 @@ - https://www.rfc-editor.org/rfc/rfc2435 - https://github.com/GStreamer/gst-plugins-good/blob/master/gst/rtp/gstrtpjpegdepay.c +- https://mjpeg.sanford.io/ diff --git a/pkg/mjpeg/client.go b/pkg/mjpeg/client.go new file mode 100644 index 00000000..36d26987 --- /dev/null +++ b/pkg/mjpeg/client.go @@ -0,0 +1,147 @@ +package mjpeg + +import ( + "bufio" + "errors" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/AlexxIT/go2rtc/pkg/tcp" + "github.com/pion/rtp" + "io" + "net/http" + "net/textproto" + "strconv" + "strings" +) + +type Client struct { + streamer.Element + + UserAgent string + RemoteAddr string + + closed bool + res *http.Response + + track *streamer.Track +} + +func NewClient(res *http.Response) *Client { + codec := &streamer.Codec{ + Name: streamer.CodecJPEG, ClockRate: 90000, PayloadType: streamer.PayloadTypeRAW, + } + return &Client{ + res: res, + track: streamer.NewTrack(codec, streamer.DirectionSendonly), + } +} + +func (c *Client) GetMedias() []*streamer.Media { + return []*streamer.Media{{ + Kind: streamer.KindVideo, + Direction: streamer.DirectionSendonly, + Codecs: []*streamer.Codec{c.track.Codec}, + }} +} + +func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { + return c.track +} + +func (c *Client) Start() error { + ct := c.res.Header.Get("Content-Type") + + if ct == "image/jpeg" { + return c.startJPEG() + } + + // added in go1.18 + if _, s, ok := strings.Cut(ct, "boundary="); ok { + return c.startMJPEG(s) + } + + return errors.New("wrong Content-Type: " + ct) +} + +func (c *Client) Stop() error { + c.closed = true + return nil +} + +func (c *Client) startJPEG() error { + buf, err := io.ReadAll(c.res.Body) + if err != nil { + return err + } + + packet := &rtp.Packet{Payload: buf} + _ = c.track.WriteRTP(packet) + + req := c.res.Request + + for !c.closed { + res, err := tcp.Do(req) + if err != nil { + return err + } + + if res.StatusCode != http.StatusOK { + return errors.New("wrong status: " + res.Status) + } + + buf, err = io.ReadAll(res.Body) + if err != nil { + return err + } + + packet = &rtp.Packet{Payload: buf} + _ = c.track.WriteRTP(packet) + } + + return nil +} + +func (c *Client) startMJPEG(boundary string) error { + boundary = "--" + boundary + + r := bufio.NewReader(c.res.Body) + tp := textproto.NewReader(r) + + for !c.closed { + s, err := tp.ReadLine() + if err != nil { + return err + } + if s != boundary { + return errors.New("wrong boundary: " + s) + } + + header, err := tp.ReadMIMEHeader() + if err != nil { + return err + } + + s = header.Get("Content-Length") + if s == "" { + return errors.New("no content length") + } + + size, err := strconv.Atoi(s) + if err != nil { + return err + } + + buf := make([]byte, size) + if _, err = io.ReadFull(r, buf); err != nil { + return err + } + + packet := &rtp.Packet{Payload: buf} + _ = c.track.WriteRTP(packet) + + if _, err = r.Discard(2); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/mjpeg/consumer.go b/pkg/mjpeg/consumer.go index 8b057f20..206136ab 100644 --- a/pkg/mjpeg/consumer.go +++ b/pkg/mjpeg/consumer.go @@ -26,70 +26,15 @@ func (c *Consumer) GetMedias() []*streamer.Media { } func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { - var header, payload []byte - push := func(packet *rtp.Packet) error { - //fmt.Printf( - // "[RTP] codec: %s, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, mark: %v\n", - // track.Codec.Name, len(packet.Payload), packet.Timestamp, - // packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker, - //) - - // https://www.rfc-editor.org/rfc/rfc2435#section-3.1 - b := packet.Payload - - // 3.1. JPEG header - t := b[4] - - // 3.1.7. Restart Marker header - if 64 <= t && t <= 127 { - b = b[12:] // skip it - } else { - b = b[8:] - } - - if header == nil { - var lqt, cqt []byte - - // 3.1.8. Quantization Table header - q := packet.Payload[5] - if q >= 128 { - lqt = b[4:68] - cqt = b[68:132] - b = b[132:] - } else { - lqt, cqt = MakeTables(q) - } - - // https://www.rfc-editor.org/rfc/rfc2435#section-3.1.5 - // The maximum width is 2040 pixels. - w := uint16(packet.Payload[6]) << 3 - h := uint16(packet.Payload[7]) << 3 - - // fix 2560x1920 and 2560x1440 - if w == 512 && (h == 1920 || h == 1440) { - w = 2560 - } - - //fmt.Printf("t: %d, q: %d, w: %d, h: %d\n", t, q, w, h) - header = MakeHeaders(t, w, h, lqt, cqt) - } - - // 3.1.9. JPEG Payload - payload = append(payload, b...) - - if packet.Marker { - b = append(header, payload...) - if end := b[len(b)-2:]; end[0] != 0xFF && end[1] != 0xD9 { - b = append(b, 0xFF, 0xD9) - } - c.Fire(b) - - header = nil - payload = nil - } - + c.Fire(packet.Payload) return nil } + + if track.Codec.IsRTP() { + wrapper := RTPDepay(track) + push = wrapper(push) + } + return track.Bind(push) } diff --git a/pkg/mjpeg/rtp.go b/pkg/mjpeg/rtp.go new file mode 100644 index 00000000..a926d7c9 --- /dev/null +++ b/pkg/mjpeg/rtp.go @@ -0,0 +1,78 @@ +package mjpeg + +import ( + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/pion/rtp" +) + +func RTPDepay(track *streamer.Track) streamer.WrapperFunc { + var header, payload []byte + + return func(push streamer.WriterFunc) streamer.WriterFunc { + return func(packet *rtp.Packet) error { + //fmt.Printf( + // "[RTP] codec: %s, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, mark: %v\n", + // track.Codec.Name, len(packet.Payload), packet.Timestamp, + // packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker, + //) + + // https://www.rfc-editor.org/rfc/rfc2435#section-3.1 + b := packet.Payload + + // 3.1. JPEG header + t := b[4] + + // 3.1.7. Restart Marker header + if 64 <= t && t <= 127 { + b = b[12:] // skip it + } else { + b = b[8:] + } + + if header == nil { + var lqt, cqt []byte + + // 3.1.8. Quantization Table header + q := packet.Payload[5] + if q >= 128 { + lqt = b[4:68] + cqt = b[68:132] + b = b[132:] + } else { + lqt, cqt = MakeTables(q) + } + + // https://www.rfc-editor.org/rfc/rfc2435#section-3.1.5 + // The maximum width is 2040 pixels. + w := uint16(packet.Payload[6]) << 3 + h := uint16(packet.Payload[7]) << 3 + + // fix 2560x1920 and 2560x1440 + if w == 512 && (h == 1920 || h == 1440) { + w = 2560 + } + + //fmt.Printf("t: %d, q: %d, w: %d, h: %d\n", t, q, w, h) + header = MakeHeaders(t, w, h, lqt, cqt) + } + + // 3.1.9. JPEG Payload + payload = append(payload, b...) + + if !packet.Marker { + return nil + } + + b = append(header, payload...) + if end := b[len(b)-2:]; end[0] != 0xFF && end[1] != 0xD9 { + b = append(b, 0xFF, 0xD9) + } + + header = nil + payload = nil + + packet.Payload = b + return push(packet) + } + } +} diff --git a/pkg/mp4/consumer.go b/pkg/mp4/consumer.go index f6a26657..bbfbe40c 100644 --- a/pkg/mp4/consumer.go +++ b/pkg/mp4/consumer.go @@ -72,10 +72,10 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea } var wrapper streamer.WrapperFunc - if codec.IsMP4() { - wrapper = h264.RepairAVC(track) - } else { + if codec.IsRTP() { wrapper = h264.RTPDepay(track) + } else { + wrapper = h264.RepairAVC(track) } push = wrapper(push) @@ -98,7 +98,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea return nil } - if !codec.IsMP4() { + if codec.IsRTP() { wrapper := h265.RTPDepay(track) push = wrapper(push) } @@ -118,7 +118,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea return nil } - if !codec.IsMP4() { + if codec.IsRTP() { wrapper := aac.RTPDepay(track) push = wrapper(push) } diff --git a/pkg/mp4/keyframe.go b/pkg/mp4/keyframe.go index fd9c97f4..058d98b1 100644 --- a/pkg/mp4/keyframe.go +++ b/pkg/mp4/keyframe.go @@ -52,10 +52,10 @@ func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *strea } var wrapper streamer.WrapperFunc - if track.Codec.IsMP4() { - wrapper = h264.RepairAVC(track) - } else { + if track.Codec.IsRTP() { wrapper = h264.RTPDepay(track) + } else { + wrapper = h264.RepairAVC(track) } push = wrapper(push) @@ -73,7 +73,7 @@ func (c *Keyframe) AddTrack(media *streamer.Media, track *streamer.Track) *strea return nil } - if !track.Codec.IsMP4() { + if track.Codec.IsRTP() { wrapper := h265.RTPDepay(track) push = wrapper(push) } diff --git a/pkg/mp4f/consumer.go b/pkg/mp4f/consumer.go index 7a71d5af..994d0bbe 100644 --- a/pkg/mp4f/consumer.go +++ b/pkg/mp4f/consumer.go @@ -89,7 +89,7 @@ func (c *Consumer) AddTrack(media *streamer.Media, track *streamer.Track) *strea return nil } - if !codec.IsMP4() { + if !codec.IsRAW() { wrapper := h264.RTPDepay(track) push = wrapper(push) } @@ -146,7 +146,7 @@ func (c *Consumer) Init() ([]byte, error) { return data, nil } -func (c *Consumer) Start() { +func (c *Consumer) Start() { c.start = true } diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index 0cedfbdc..19f414cc 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -11,7 +11,7 @@ import ( "github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/format/rtmp" "github.com/pion/rtp" - "strings" + "net/http" "time" ) @@ -41,16 +41,20 @@ func NewClient(uri string) *Client { } func (c *Client) Dial() (err error) { - if strings.HasPrefix(c.URI, "http") { - c.conn, err = httpflv.Dial(c.URI) - } else { - c.conn, err = rtmp.Dial(c.URI) - } + c.conn, err = rtmp.Dial(c.URI) + return +} +// Accept - convert http.Response to Client +func Accept(res *http.Response) (*Client, error) { + conn, err := httpflv.Accept(res) if err != nil { - return + return nil, err } + return &Client{URI: res.Request.URL.String(), conn: conn}, nil +} +func (c *Client) Describe() (err error) { // important to get SPS/PPS streams, err := c.conn.Streams() if err != nil { @@ -73,7 +77,7 @@ func (c *Client) Dial() (err error) { Name: streamer.CodecH264, ClockRate: 90000, FmtpLine: fmtp, - PayloadType: streamer.PayloadTypeMP4, + PayloadType: streamer.PayloadTypeRAW, } media := &streamer.Media{ @@ -96,7 +100,7 @@ func (c *Client) Dial() (err error) { Channels: uint16(cd.Config.ChannelConfig), // a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588 FmtpLine: "streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=" + hex.EncodeToString(cd.ConfigBytes), - PayloadType: streamer.PayloadTypeMP4, + PayloadType: streamer.PayloadTypeRAW, } media := &streamer.Media{ diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index aebc0280..49044eaf 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -794,7 +794,7 @@ func (c *Conn) bindTrack( return nil } - if track.Codec.IsMP4() { + if !track.Codec.IsRTP() { switch track.Codec.Name { case streamer.CodecH264: wrapper := h264.RTPPay(1500) diff --git a/pkg/streamer/media.go b/pkg/streamer/media.go index a737ebef..d295fb53 100644 --- a/pkg/streamer/media.go +++ b/pkg/streamer/media.go @@ -37,7 +37,7 @@ const ( CodecELD = "ELD" // AAC-ELD ) -const PayloadTypeMP4 byte = 255 +const PayloadTypeRAW byte = 255 func GetKind(name string) string { switch name { @@ -139,8 +139,8 @@ func (c *Codec) String() string { return s } -func (c *Codec) IsMP4() bool { - return c.PayloadType == PayloadTypeMP4 +func (c *Codec) IsRTP() bool { + return c.PayloadType != PayloadTypeRAW } func (c *Codec) Clone() *Codec { diff --git a/pkg/tcp/request.go b/pkg/tcp/request.go new file mode 100644 index 00000000..9e10a5ef --- /dev/null +++ b/pkg/tcp/request.go @@ -0,0 +1,68 @@ +package tcp + +import ( + "errors" + "fmt" + "net/http" + "strings" + "time" +) + +// Do - http.Client with support Digest Authorization +func Do(req *http.Request) (*http.Response, error) { + // need to create new client each time to reset timeout + client := http.Client{Timeout: time.Second * 5000} + res, err := client.Do(req) + if err != nil { + return nil, err + } + + if res.StatusCode == http.StatusUnauthorized && req.URL.User != nil { + auth := res.Header.Get("WWW-Authenticate") + if !strings.HasPrefix(auth, "Digest") { + return nil, errors.New("unsupported auth: " + auth) + } + + realm := Between(auth, `realm="`, `"`) + nonce := Between(auth, `nonce="`, `"`) + qop := Between(auth, `qop="`, `"`) + + user := req.URL.User + username := user.Username() + password, _ := user.Password() + ha1 := HexMD5(username, realm, password) + + uri := req.URL.RequestURI() + ha2 := HexMD5(req.Method, uri) + + var header string + + switch qop { + case "": + response := HexMD5(ha1, nonce, ha2) + header = fmt.Sprintf( + `Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s"`, + user, realm, nonce, uri, response, + ) + case "auth": + nc := "00000001" + cnonce := "00000001" // TODO: random... + response := HexMD5(ha1, nonce, nc, cnonce, qop, ha2) + header = fmt.Sprintf( + `Digest username="%s", realm="%s", nonce="%s", uri="%s", qop=%s, nc=%s, cnonce="%s", response="%s"`, + username, realm, nonce, uri, qop, nc, cnonce, response, + ) + default: + return nil, errors.New("unsupported qop: " + auth) + } + + req.Header.Set("Authorization", header) + + res, err = client.Do(req) + if err != nil { + return nil, err + } + } + + return res, nil +} diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index 7dab2133..50ad95a5 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -57,10 +57,10 @@ func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer. wrapper := h264.RTPPay(1200) push = wrapper(push) - if codec.IsMP4() { - wrapper = h264.RepairAVC(track) - } else { + if codec.IsRTP() { wrapper = h264.RTPDepay(track) + } else { + wrapper = h264.RepairAVC(track) } push = wrapper(push)