diff --git a/cmd/rtmp/rtmp.go b/cmd/rtmp/rtmp.go index 9697aecd..be342ecd 100644 --- a/cmd/rtmp/rtmp.go +++ b/cmd/rtmp/rtmp.go @@ -8,6 +8,9 @@ import ( func Init() { streams.HandleFunc("rtmp", handle) + // RTMPT (flv over HTTP) + streams.HandleFunc("http", handle) + streams.HandleFunc("https", handle) } func handle(url string) (streamer.Producer, error) { diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index d2e11e34..7e2125b4 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -5,15 +5,24 @@ import ( "encoding/hex" "fmt" "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/rtmpt" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/deepch/vdk/av" "github.com/deepch/vdk/codec/aacparser" "github.com/deepch/vdk/codec/h264parser" "github.com/deepch/vdk/format/rtmp" "github.com/pion/rtp" + "strings" "time" ) +// Conn for RTMP and RTMPT (flv over HTTP) +type Conn interface { + Streams() (streams []av.CodecData, err error) + ReadPacket() (pkt av.Packet, err error) + Close() (err error) +} + type Client struct { streamer.Element @@ -22,7 +31,7 @@ type Client struct { medias []*streamer.Media tracks []*streamer.Track - conn *rtmp.Conn + conn Conn closed bool receive int @@ -33,7 +42,12 @@ func NewClient(uri string) *Client { } func (c *Client) Dial() (err error) { - c.conn, err = rtmp.Dial(c.URI) + if strings.HasPrefix(c.URI, "http") { + c.conn, err = rtmpt.Dial(c.URI) + } else { + c.conn, err = rtmp.Dial(c.URI) + } + if err != nil { return } diff --git a/pkg/rtmp/streamer.go b/pkg/rtmp/streamer.go index ce84b379..b5be3472 100644 --- a/pkg/rtmp/streamer.go +++ b/pkg/rtmp/streamer.go @@ -32,7 +32,7 @@ func (c *Client) MarshalJSON() ([]byte, error) { v := map[string]interface{}{ streamer.JSONReceive: c.receive, streamer.JSONType: "RTMP client producer", - streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(), + //streamer.JSONRemoteAddr: c.conn.NetConn().RemoteAddr().String(), "url": c.URI, } for i, media := range c.medias { diff --git a/pkg/rtmpt/README.md b/pkg/rtmpt/README.md new file mode 100644 index 00000000..d928944b --- /dev/null +++ b/pkg/rtmpt/README.md @@ -0,0 +1,3 @@ +## Useful links + +- https://medium.com/@nate510/don-t-use-go-s-default-http-client-4804cb19f779 diff --git a/pkg/rtmpt/rtmpt.go b/pkg/rtmpt/rtmpt.go new file mode 100644 index 00000000..9194c660 --- /dev/null +++ b/pkg/rtmpt/rtmpt.go @@ -0,0 +1,100 @@ +package rtmpt + +import ( + "bufio" + "errors" + "github.com/deepch/vdk/av" + "github.com/deepch/vdk/codec/h264parser" + "github.com/deepch/vdk/format/flv/flvio" + "github.com/deepch/vdk/utils/bits/pio" + "io" + "net/http" +) + +func Dial(uri string) (*Conn, error) { + req, err := http.NewRequest("GET", uri, nil) + if err != nil { + return nil, err + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + + c := Conn{ + conn: res.Body, + reader: bufio.NewReaderSize(res.Body, pio.RecommendBufioSize), + buf: make([]byte, 256), + } + + if _, err = io.ReadFull(c.reader, c.buf[:flvio.FileHeaderLength]); err != nil { + return nil, err + } + + flags, n, err := flvio.ParseFileHeader(c.buf) + if err != nil { + return nil, err + } + + if flags&flvio.FILE_HAS_VIDEO == 0 { + return nil, errors.New("not supported") + } + + if _, err = c.reader.Discard(n); err != nil { + return nil, err + } + + return &c, nil +} + +type Conn struct { + conn io.ReadCloser + reader *bufio.Reader + buf []byte +} + +func (c *Conn) Streams() ([]av.CodecData, error) { + for { + tag, _, err := flvio.ReadTag(c.reader, c.buf) + if err != nil { + return nil, err + } + + if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AAC_SEQHDR { + continue + } + + stream, err := h264parser.NewCodecDataFromAVCDecoderConfRecord(tag.Data) + if err != nil { + return nil, err + } + + return []av.CodecData{stream}, nil + } +} + +func (c *Conn) ReadPacket() (av.Packet, error) { + for { + tag, ts, err := flvio.ReadTag(c.reader, c.buf) + if err != nil { + return av.Packet{}, err + } + + if tag.Type != flvio.TAG_VIDEO || tag.AVCPacketType != flvio.AVC_NALU { + continue + } + + return av.Packet{ + Idx: 0, + Data: tag.Data, + CompositionTime: flvio.TsToTime(tag.CompositionTime), + IsKeyFrame: tag.FrameType == flvio.FRAME_KEY, + Time: flvio.TsToTime(ts), + }, nil + } +} + +func (c *Conn) Close() (err error) { + return c.conn.Close() +}