Add support http/mixed video/audio #545

This commit is contained in:
Alexey Khit
2023-08-02 17:57:33 +04:00
parent 2faea1bb69
commit 8778d7c9ab
5 changed files with 334 additions and 103 deletions
+12 -7
View File
@@ -2,17 +2,19 @@ package http
import (
"errors"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/rtmp"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/magic"
"github.com/AlexxIT/go2rtc/pkg/mjpeg"
"github.com/AlexxIT/go2rtc/pkg/multipart"
"github.com/AlexxIT/go2rtc/pkg/rtmp"
"github.com/AlexxIT/go2rtc/pkg/tcp"
)
func Init() {
@@ -45,9 +47,12 @@ func handleHTTP(url string) (core.Producer, error) {
}
switch ct {
case "image/jpeg", "multipart/x-mixed-replace":
case "image/jpeg":
return mjpeg.NewClient(res), nil
case "multipart/x-mixed-replace":
return multipart.NewClient(res)
case "video/x-flv":
var conn *rtmp.Client
if conn, err = rtmp.Accept(res); err != nil {
+14 -81
View File
@@ -1,14 +1,9 @@
package mjpeg
import (
"bufio"
"errors"
"io"
"net/http"
"net/textproto"
"strconv"
"strings"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/tcp"
@@ -34,16 +29,19 @@ func NewClient(res *http.Response) *Client {
return &Client{res: res}
}
func (c *Client) startJPEG() error {
buf, err := io.ReadAll(c.res.Body)
func (c *Client) Handle() error {
body, err := io.ReadAll(c.res.Body)
if err != nil {
return err
}
packet := &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf}
c.receiver.WriteRTP(packet)
pkt := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
c.receiver.WriteRTP(pkt)
c.recv += len(buf)
c.recv += len(body)
req := c.res.Request
@@ -57,86 +55,21 @@ func (c *Client) startJPEG() error {
return errors.New("wrong status: " + res.Status)
}
buf, err = io.ReadAll(res.Body)
body, err = io.ReadAll(res.Body)
if err != nil {
return err
}
c.recv += len(body)
if c.receiver != nil {
packet = &rtp.Packet{Header: rtp.Header{Timestamp: now()}, Payload: buf}
c.receiver.WriteRTP(packet)
}
c.recv += len(buf)
}
return nil
}
func (c *Client) startMJPEG(boundary string) error {
// some cameras add prefix to boundary header:
// https://github.com/TheTimeWalker/wallpanel-android
if !strings.HasPrefix(boundary, "--") {
boundary = "--" + boundary
}
r := bufio.NewReader(c.res.Body)
tp := textproto.NewReader(r)
for !c.closed {
s, err := tp.ReadLine()
if err != nil {
return err
}
// fix leading empty line from esp32-cam-webserver
// https://github.com/AlexxIT/go2rtc/issues/545
if s == "" {
continue
}
if !strings.HasPrefix(s, boundary) {
return errors.New("wrong boundary: " + s)
}
header, err := tp.ReadMIMEHeader()
if err != nil {
return err
}
s = header.Get("Content-Length")
if s == "" {
return errors.New("no content length")
}
size, err := strconv.Atoi(s)
if err != nil {
return err
}
buf := make([]byte, size)
if _, err = io.ReadFull(r, buf); err != nil {
return err
}
if c.receiver != nil {
packet := &rtp.Packet{
pkt = &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: buf,
Payload: body,
}
c.receiver.WriteRTP(packet)
}
c.recv += len(buf)
if _, err = r.Discard(2); err != nil {
return err
c.receiver.WriteRTP(pkt)
}
}
return nil
}
func now() uint32 {
return uint32(time.Now().UnixMilli() * 90)
}
+2 -15
View File
@@ -2,8 +2,6 @@ package mjpeg
import (
"encoding/json"
"errors"
"strings"
"github.com/AlexxIT/go2rtc/pkg/core"
)
@@ -33,19 +31,8 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver,
}
func (c *Client) Start() error {
ct := c.res.Header.Get("Content-Type")
// https://github.com/AlexxIT/go2rtc/issues/278
if strings.HasPrefix(ct, "image/jpeg") {
return c.startJPEG()
}
// added in go1.18
if _, s, ok := strings.Cut(ct, "boundary="); ok {
return c.startMJPEG(s)
}
return errors.New("wrong Content-Type: " + ct)
return c.Handle()
}
func (c *Client) Stop() error {
@@ -60,7 +47,7 @@ func (c *Client) Stop() error {
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "MJPEG active producer",
Type: "JPEG active producer",
URL: c.res.Request.URL.String(),
RemoteAddr: c.RemoteAddr,
UserAgent: c.UserAgent,
+258
View File
@@ -0,0 +1,258 @@
package multipart
import (
"bufio"
"errors"
"io"
"net/http"
"net/textproto"
"strconv"
"strings"
"time"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/pion/rtp"
)
type Client struct {
core.Listener
UserAgent string
RemoteAddr string
res *http.Response
boundary string
reader *bufio.Reader
medias []*core.Media
receivers []*core.Receiver
recv int
}
func NewClient(res *http.Response) (*Client, error) {
ct := res.Header.Get("Content-Type")
// added in go1.18
_, boundary, ok := strings.Cut(ct, "boundary=")
if !ok {
return nil, errors.New("multipart: wrong Content-Type: " + ct)
}
// some cameras add prefix to boundary header:
// https://github.com/TheTimeWalker/wallpanel-android
if !strings.HasPrefix(boundary, "--") {
boundary = "--" + boundary
}
c := &Client{
boundary: boundary,
reader: bufio.NewReader(res.Body),
}
if err := c.probe(); err != nil {
return nil, err
}
return c, nil
}
func (c *Client) Handle() error {
if len(c.receivers) == 0 {
return errors.New("multipart: no receivers")
}
var mjpeg, video, audio *core.Receiver
for _, receiver := range c.receivers {
switch receiver.Codec.Name {
case core.CodecH264:
video = receiver
case core.CodecPCMU:
audio = receiver
default:
mjpeg = receiver
}
}
for {
header, body, err := c.Next()
if err != nil {
return err
}
c.recv += len(body)
if mjpeg != nil {
packet := &rtp.Packet{
Header: rtp.Header{Timestamp: core.Now90000()},
Payload: body,
}
mjpeg.WriteRTP(packet)
continue
}
ct := header.Get("Content-Type")
switch ct {
case MimeVideo:
if video != nil {
ts := GetTimestamp(header)
pkt := &rtp.Packet{
Header: rtp.Header{
Timestamp: uint32(ts * 90000),
},
Payload: h264.AnnexB2AVC(body),
}
video.WriteRTP(pkt)
}
case MimeG711U:
if audio != nil {
ts := GetTimestamp(header)
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
Timestamp: uint32(ts * 8000),
},
Payload: body,
}
audio.WriteRTP(pkt)
}
}
}
}
func (c *Client) Next() (http.Header, []byte, error) {
for {
// search next boundary and skip empty lines
s, err := c.reader.ReadString('\n')
if err != nil {
return nil, nil, err
}
if strings.HasPrefix(s, c.boundary) {
break
}
if s == "\r\n" {
continue
}
return nil, nil, errors.New("multipart: wrong boundary: " + s)
}
tp := textproto.NewReader(c.reader)
header, err := tp.ReadMIMEHeader()
if err != nil {
return nil, nil, err
}
s := header.Get("Content-Length")
if s == "" {
return nil, nil, errors.New("multipart: no content length")
}
size, err := strconv.Atoi(s)
if err != nil {
return nil, nil, err
}
buf := make([]byte, size)
if _, err = io.ReadFull(c.reader, buf); err != nil {
return nil, nil, err
}
_, _ = c.reader.Discard(2) // skip "\r\n"
return http.Header(header), buf, nil
}
const (
MimeVideo = "video/x-h264"
MimeG711U = "audio/g711u"
)
func (c *Client) probe() error {
waitVideo := true
waitAudio := true
for waitVideo || waitAudio {
header, _, err := c.Next()
if err != nil {
return err
}
var media *core.Media
ct := header.Get("Content-Type")
switch ct {
case MimeVideo:
if !waitVideo {
return nil
}
media = &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecH264,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
}
waitVideo = false
case MimeG711U:
if !waitAudio {
return nil
}
media = &core.Media{
Kind: core.KindAudio,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecPCMU,
ClockRate: 8000,
},
},
}
waitAudio = false
default:
media = &core.Media{
Kind: core.KindVideo,
Direction: core.DirectionRecvonly,
Codecs: []*core.Codec{
{
Name: core.CodecJPEG,
ClockRate: 90000,
PayloadType: core.PayloadTypeRAW,
},
},
}
waitVideo = false
waitAudio = false
}
c.medias = append(c.medias, media)
}
return nil
}
// GetTimestamp - return timestamp in seconds
func GetTimestamp(header http.Header) float64 {
if s := header.Get("X-Timestamp"); s != "" {
if f, _ := strconv.ParseFloat(s, 32); f != 0 {
return f
}
}
return float64(time.Duration(time.Now().UnixNano()) / time.Second)
}
+48
View File
@@ -0,0 +1,48 @@
package multipart
import (
"encoding/json"
"github.com/AlexxIT/go2rtc/pkg/core"
)
func (c *Client) GetMedias() []*core.Media {
return c.medias
}
func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) {
for _, track := range c.receivers {
if track.Codec == codec {
return track, nil
}
}
track := core.NewReceiver(media, codec)
c.receivers = append(c.receivers, track)
return track, nil
}
func (c *Client) Start() error {
return c.Handle()
}
func (c *Client) Stop() error {
for _, receiver := range c.receivers {
receiver.Close()
}
// important for close reader/writer gorutines
_ = c.res.Body.Close()
return nil
}
func (c *Client) MarshalJSON() ([]byte, error) {
info := &core.Info{
Type: "HTTP/mixed active producer",
URL: c.res.Request.URL.String(),
RemoteAddr: c.RemoteAddr,
UserAgent: c.UserAgent,
Medias: c.medias,
Receivers: c.receivers,
Recv: c.recv,
}
return json.Marshal(info)
}