Compare commits

..

16 Commits

Author SHA1 Message Date
Alexey Khit 12b712426d Fix busy RTSP backchannel 2022-08-22 15:41:25 +03:00
Alexey Khit a9af245ef8 Fix async requests to Producer 2022-08-22 15:40:28 +03:00
Alexey Khit f251129a2f Fix RTSP Transport header parsing 2022-08-22 14:46:39 +03:00
Alexey Khit d28debabe9 Update fix for parsing RTSP SDP 2022-08-22 14:44:33 +03:00
Alexey Khit 07bf00f9f6 Update readme 2022-08-22 13:40:58 +03:00
Alexey Khit be6ec7dbb9 Fix RTSP requests for some cameras 2022-08-22 13:38:26 +03:00
Alexey Khit 4e575d1356 Adds build file for win64 2022-08-22 11:43:42 +03:00
Alexey Khit 4cbacfec0c Adds empty response on RTSP error 2022-08-22 11:43:26 +03:00
Alexey Khit 31e24c6e03 Adds stop with empty producer warning 2022-08-22 11:33:38 +03:00
Alexey Khit 401bf85a10 Update RTSP error output 2022-08-22 09:09:18 +03:00
Alexey Khit f36851f83a Fix response with empty producer 2022-08-22 09:06:40 +03:00
Alexey Khit 67522dbb19 Update readme 2022-08-22 08:44:27 +03:00
Alexey Khit 26b5745f0a Adds keep-alive to RTSP connection 2022-08-22 06:54:58 +03:00
Alexey Khit 46f6a5d8e1 Return unmodified errors from RTSP 2022-08-22 06:54:42 +03:00
Alexey Khit 48f58d0669 Fix wrong stream name request 2022-08-22 06:54:08 +03:00
Alexey Khit fd0b8f3c39 Fix RTMP with audio 2022-08-22 05:46:22 +03:00
17 changed files with 189 additions and 82 deletions
+11 -10
View File
@@ -2,9 +2,9 @@
**go2rtc** - ultimate camera streaming application with support RTSP, WebRTC, FFmpeg, RTMP, etc.
- zero-dependency and zero-config small [app for all OS](#installation) (Windows, macOS, Linux, ARM)
- zero-dependency and zero-config small [app for all OS](#go2rtc-binary) (Windows, macOS, Linux, ARM)
- zero-delay for all supported protocols (lowest possible streaming latency)
- zero-load on CPU for supported codecs
- low CPU load for supported codecs
- on the fly transcoding for unsupported codecs via [FFmpeg](#source-ffmpeg)
- multi-source 2-way [codecs negotiation](#codecs-negotiation)
- streaming from private networks via [Ngrok](#module-webrtc)
@@ -14,7 +14,7 @@
- [webrtc](https://github.com/pion/webrtc) go library and whole [@pion](https://github.com/pion) team
- series of streaming projects from [@deepch](https://github.com/deepch)
- [rtsp-simple-server](https://github.com/aler9/rtsp-simple-server) idea from [@aler9](https://github.com/aler9)
- [GStreamer](https://gstreamer.freedesktop.org/) multimedia framework pipeline idea
- [GStreamer](https://gstreamer.freedesktop.org/) framework pipeline idea
- [MediaSoup](https://mediasoup.org/) framework routing idea
## Codecs negotiation
@@ -76,7 +76,7 @@ Download binary for your OS from [latest release](https://github.com/AlexxIT/go2
- `go2rtc_mac_amd64` - Mac with Intel
- `go2rtc_mac_arm64` - Mac with M1
Don't forget to fix the rights `chmod +x go2rtc_linux_xxx` on Linux and Mac.
Don't forget to fix the rights `chmod +x go2rtc_xxx_xxx` on Linux and Mac.
### go2rtc: Home Assistant Add-on
@@ -103,7 +103,7 @@ Create file `go2rtc.yaml` next to the app.
- `api` server will start on default **1984 port**
- `rtsp` server will start on default **8554 port**
- `webrtc` will use random UDP port for each connection
- `ffmpeg` will use default transcoding options (you need to install it [manually](https://ffmpeg.org/))
- `ffmpeg` will use default transcoding options (you may install it [manually](https://ffmpeg.org/))
Available modules:
@@ -118,7 +118,7 @@ Available modules:
### Module: Streams
**go2rtc** support different stream source types. You can config only one link as stream source or multiple.
**go2rtc** support different stream source types. You can config one or multiple link as stream source.
Available source types:
@@ -128,6 +128,8 @@ Available source types:
- [exec](#source-exec) - advanced FFmpeg and GStreamer integration
- [hass](#source-hass) - Home Assistant integration
**PS.** You can use sources like `MJPEG`, `HLS` and others via FFmpeg integration.
#### Source: RTSP
- Support **RTSP and RTSPS** links with multiple video and audio tracks
@@ -274,9 +276,9 @@ rtsp:
### Module: WebRTC
WebRTC usually works without problems in the local network. But external access may require additional settings. It depends on what type of internet do you have.
WebRTC usually works without problems in the local network. But external access may require additional settings. It depends on what type of Internet do you have.
- by default, WebRTC use two random UDP ports for each connection (for video and audio)
- by default, WebRTC use two random UDP ports for each connection (video and audio)
- you can enable one additional TCP port for all connections and use it for external access
**Static public IP**
@@ -405,10 +407,9 @@ In other cases you need to use IP-address of server with **go2rtc** application.
2. Add generic camera with RTSP link:
- Hass > Settings > Integrations > Add Integration > [Generic Camera](https://my.home-assistant.io/redirect/config_flow_start/?domain=generic) > `rtsp://...` or `rtmp://...`
3. Use Picture Entity or Picture Glance lovelace card
- you can use either direct RTSP links to cameras or take RTSP streams from **go2rtc**
4. Open full screen card - this is should be WebRTC stream
- you can use either direct RTSP links to cameras or take RTSP streams from **go2rtc**
PS. Default Home Assistant lovelace cards don't support 2-way audio. You can use 2-way audio from [Add-on Web UI](https://my.home-assistant.io/redirect/supervisor_addon/?addon=a889bffc_go2rtc&repository_url=https%3A%2F%2Fgithub.com%2FAlexxIT%2Fhassio-addons). But you need use HTTPS to access the microphone. This is a browser restriction and cannot be avoided.
### Module: Log
-10
View File
@@ -9,8 +9,6 @@ import (
"github.com/rs/zerolog"
"net"
"net/http"
"os"
"strconv"
)
func Init() {
@@ -39,9 +37,7 @@ func Init() {
HandleFunc("/api/frame.mp4", frameHandler)
HandleFunc("/api/frame.raw", frameHandler)
HandleFunc("/api/stack", stackHandler)
HandleFunc("/api/streams", streamsHandler)
HandleFunc("/api/exit", exitHandler)
HandleFunc("/api/ws", apiWS)
// ensure we can listen without errors
@@ -99,12 +95,6 @@ func streamsHandler(w http.ResponseWriter, r *http.Request) {
}
}
func exitHandler(w http.ResponseWriter, r *http.Request) {
s := r.URL.Query().Get("code")
code, _ := strconv.Atoi(s)
os.Exit(code)
}
func apiWS(w http.ResponseWriter, r *http.Request) {
ctx := new(Context)
if err := ctx.Upgrade(w, r); err != nil {
+27
View File
@@ -0,0 +1,27 @@
package debug
import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/streams"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"net/http"
"os"
"strconv"
)
func Init() {
api.HandleFunc("/api/stack", stackHandler)
api.HandleFunc("/api/exit", exitHandler)
streams.HandleFunc("null", nullHandler)
}
func exitHandler(_ http.ResponseWriter, r *http.Request) {
s := r.URL.Query().Get("code")
code, _ := strconv.Atoi(s)
os.Exit(code)
}
func nullHandler(string) (streamer.Producer, error) {
return nil, nil
}
+1 -1
View File
@@ -1,4 +1,4 @@
package api
package debug
import (
"bytes"
+10 -1
View File
@@ -65,8 +65,17 @@ func rtspHandler(url string) (streamer.Producer, error) {
if err = conn.Dial(); err != nil {
return nil, err
}
conn.Backchannel = true
if err = conn.Describe(); err != nil {
return nil, err
// second try without backchannel, we need to reconnect
if err = conn.Dial(); err != nil {
return nil, err
}
conn.Backchannel = false
if err = conn.Describe(); err != nil {
return nil, err
}
}
return conn, nil
+3
View File
@@ -19,6 +19,9 @@ func HandleFunc(scheme string, handler Handler) {
func HasProducer(url string) bool {
i := strings.IndexByte(url, ':')
if i <= 0 { // TODO: i < 4 ?
return false
}
return handlers[url[:i]] != nil
}
+22 -3
View File
@@ -2,6 +2,7 @@ package streams
import (
"github.com/AlexxIT/go2rtc/pkg/streamer"
"sync"
)
type state byte
@@ -21,15 +22,19 @@ type Producer struct {
tracks []*streamer.Track
state state
mx sync.Mutex
}
func (p *Producer) GetMedias() []*streamer.Media {
p.mx.Lock()
defer p.mx.Unlock()
if p.state == stateNone {
log.Debug().Str("url", p.url).Msg("[streams] probe producer")
var err error
p.element, err = GetProducer(p.url)
if err != nil {
if err != nil || p.element == nil {
log.Error().Err(err).Str("url", p.url).Msg("[streams] probe producer")
return nil
}
@@ -41,6 +46,9 @@ func (p *Producer) GetMedias() []*streamer.Media {
}
func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
p.mx.Lock()
defer p.mx.Unlock()
if p.state == stateMedias {
p.state = stateTracks
}
@@ -61,6 +69,9 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea
// internals
func (p *Producer) start() {
p.mx.Lock()
defer p.mx.Unlock()
if p.state != stateTracks {
return
}
@@ -72,10 +83,18 @@ func (p *Producer) start() {
}
func (p *Producer) stop() {
p.mx.Lock()
log.Debug().Str("url", p.url).Msg("[streams] stop producer")
_ = p.element.Stop()
p.element = nil
if p.element != nil {
_ = p.element.Stop()
p.element = nil
} else {
log.Warn().Str("url", p.url).Msg("[streams] stop empty producer")
}
p.tracks = nil
p.state = stateNone
p.mx.Unlock()
}
+2 -1
View File
@@ -2,6 +2,7 @@ package streams
import (
"encoding/json"
"errors"
"github.com/AlexxIT/go2rtc/pkg/streamer"
)
@@ -78,7 +79,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
// can't match tracks for consumer
if len(consumer.tracks) == 0 {
return nil
return errors.New("couldn't find the matching tracks")
}
s.consumers = append(s.consumers, consumer)
+2 -1
View File
@@ -2,6 +2,7 @@ package streams
import (
"github.com/AlexxIT/go2rtc/pkg/fake"
"github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/stretchr/testify/assert"
"testing"
@@ -103,7 +104,7 @@ a=control:streamid=0
func TestRouting(t *testing.T) {
prod := &fake.Producer{}
prod.Medias, _ = streamer.UnmarshalRTSPSDP([]byte(dahuaSimple))
prod.Medias, _ = rtsp.UnmarshalSDP([]byte(dahuaSimple))
assert.Len(t, prod.Medias, 3)
HandleFunc("fake", func(url string) (streamer.Producer, error) {
+1
View File
@@ -108,6 +108,7 @@ func offerHandler(ctx *api.Context, msg *streamer.Message) {
// 2. AddConsumer, so we get new tracks
if err = stream.AddConsumer(conn); err != nil {
log.Warn().Err(err).Msg("[api.webrtc] add consumer")
_ = conn.Conn.Close()
ctx.Error(err)
return
}
+2
View File
@@ -3,6 +3,7 @@ package main
import (
"github.com/AlexxIT/go2rtc/cmd/api"
"github.com/AlexxIT/go2rtc/cmd/app"
"github.com/AlexxIT/go2rtc/cmd/debug"
"github.com/AlexxIT/go2rtc/cmd/exec"
"github.com/AlexxIT/go2rtc/cmd/ffmpeg"
"github.com/AlexxIT/go2rtc/cmd/hass"
@@ -33,6 +34,7 @@ func main() {
mse.Init()
ngrok.Init()
debug.Init()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+32 -2
View File
@@ -3,9 +3,12 @@ package rtmp
import (
"encoding/base64"
"encoding/binary"
"encoding/hex"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/h264"
"github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/codec/aacparser"
"github.com/deepch/vdk/codec/h264parser"
"github.com/deepch/vdk/format/rtmp"
"github.com/pion/rtp"
@@ -70,9 +73,36 @@ func (c *Client) Dial() (err error) {
c.tracks = append(c.tracks, track)
case av.AAC:
panic("not implemented")
// TODO: fix support
cd := stream.(aacparser.CodecData)
// a=fmtp:97 streamtype=5;profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1588
fmtp := fmt.Sprintf(
"config=%s",
hex.EncodeToString(cd.ConfigBytes),
)
codec := &streamer.Codec{
Name: streamer.CodecAAC,
ClockRate: uint32(cd.Config.SampleRate),
Channels: uint16(cd.Config.ChannelConfig),
FmtpLine: fmtp,
}
media := &streamer.Media{
Kind: streamer.KindAudio,
Direction: streamer.DirectionSendonly,
Codecs: []*streamer.Codec{codec},
}
c.medias = append(c.medias, media)
track := &streamer.Track{
Codec: codec, Direction: media.Direction,
}
c.tracks = append(c.tracks, track)
default:
panic("unsupported codec")
fmt.Printf("[rtmp] unsupported codec %+v\n", stream)
}
}
+62 -32
View File
@@ -43,11 +43,15 @@ const (
ModeServerConsumer
)
const KeepAlive = time.Second * 25
type Conn struct {
streamer.Element
// public
Backchannel bool
Medias []*streamer.Media
Session string
UserAgent string
@@ -104,6 +108,9 @@ func (c *Conn) Dial() (err error) {
//if c.state != StateClientInit {
// panic("wrong state")
//}
if c.conn != nil && c.auth != nil {
c.auth.Reset()
}
c.conn, err = net.DialTimeout(
"tcp", c.URL.Host, 10*time.Second,
@@ -144,7 +151,9 @@ func (c *Conn) Request(req *tcp.Request) error {
}
c.sequence++
req.Header.Set("CSeq", strconv.Itoa(c.sequence))
// important to send case sensitive CSeq
// https://github.com/AlexxIT/go2rtc/issues/7
req.Header["CSeq"] = []string{strconv.Itoa(c.sequence)}
c.auth.Write(req)
@@ -254,21 +263,17 @@ func (c *Conn) Describe() error {
Method: MethodDescribe,
URL: c.URL,
Header: map[string][]string{
"Accept": {"application/sdp"},
"Require": {"www.onvif.org/ver20/backchannel"},
"Accept": {"application/sdp"},
},
}
if c.Backchannel {
req.Header.Set("Require", "www.onvif.org/ver20/backchannel")
}
res, err := c.Do(req)
if err != nil {
if res != nil {
// if we have answer - give second chanse without onvif header
req.Header.Del("Require")
res, err = c.Do(req)
}
if err != nil {
return err
}
return err
}
if val := res.Header.Get("Content-Base"); val != "" {
@@ -278,13 +283,7 @@ func (c *Conn) Describe() error {
}
}
// fix bug in Sonoff camera SDP "o=- 1 1 IN IP4 rom t_rtsplin"
// TODO: make some universal fix
if i := bytes.Index(res.Body, []byte("rom t_rtsplin")); i > 0 {
res.Body[i+3] = '_'
}
c.Medias, err = streamer.UnmarshalRTSPSDP(res.Body)
c.Medias, err = UnmarshalSDP(res.Body)
if err != nil {
return err
}
@@ -370,9 +369,10 @@ func (c *Conn) SetupMedia(
// Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7
// Transport: RTP/AVP/TCP;unicast;destination=192.168.1.123;source=192.168.10.12;interleaved=0
// Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1
s := res.Header.Get("Transport")
// TODO: rewrite
if !strings.HasPrefix(s, "RTP/AVP/TCP;unicast") {
if !strings.HasPrefix(s, "RTP/AVP/TCP;") {
return nil, fmt.Errorf("wrong transport: %s", s)
}
@@ -475,7 +475,7 @@ func (c *Conn) Accept() error {
return errors.New("wrong content type")
}
c.Medias, err = streamer.UnmarshalRTSPSDP(req.Body)
c.Medias, err = UnmarshalSDP(req.Body)
if err != nil {
return err
}
@@ -575,6 +575,7 @@ func (c *Conn) Handle() (err error) {
}()
//c.Fire(streamer.StatePlaying)
ts := time.Now().Add(KeepAlive)
for {
// we can read:
@@ -629,7 +630,7 @@ func (c *Conn) Handle() (err error) {
if channelID&1 == 0 {
packet := &rtp.Packet{}
if err = packet.Unmarshal(buf); err != nil {
return errors.New("wrong RTP data")
return
}
track := c.channels[channelID]
@@ -643,16 +644,27 @@ func (c *Conn) Handle() (err error) {
msg := &RTCP{Channel: channelID}
if err = msg.Header.Unmarshal(buf); err != nil {
return errors.New("wrong RTCP data")
return
}
msg.Packets, err = rtcp.Unmarshal(buf)
if err != nil {
return errors.New("wrong RTCP data")
return
}
c.Fire(msg)
}
// keep-alive
now := time.Now()
if now.After(ts) {
req := &tcp.Request{Method: MethodOptions, URL: c.URL}
// don't need to wait respose on this request
if err = c.Request(req); err != nil {
return err
}
ts = now.Add(KeepAlive)
}
}
}
@@ -712,17 +724,35 @@ type RTCP struct {
Packets []rtcp.Packet
}
func between(s, sub1, sub2 string) (res string, ok1 bool, ok2 bool) {
i := strings.Index(s, sub1)
if i >= 0 {
ok1 = true
s = s[i+len(sub1):]
const sdpHeader = `v=0
o=- 0 0 IN IP4 0.0.0.0
s=-
t=0 0`
func UnmarshalSDP(rawSDP []byte) ([]*streamer.Media, error) {
medias, err := streamer.UnmarshalSDP(rawSDP)
if err != nil {
// fix SDP header for some cameras
i := bytes.Index(rawSDP, []byte("\nm="))
if i > 0 {
rawSDP = append([]byte(sdpHeader), rawSDP[i:]...)
medias, err = streamer.UnmarshalSDP(rawSDP)
}
if err != nil {
return nil, err
}
}
i = strings.Index(s, sub2)
if i >= 0 {
return s[:i], ok1, true
// fix bug in ONVIF spec
// https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec-v241.pdf
for _, media := range medias {
switch media.Direction {
case streamer.DirectionRecvonly, "":
media.Direction = streamer.DirectionSendonly
case streamer.DirectionSendonly:
media.Direction = streamer.DirectionRecvonly
}
}
return s, ok1, false
return medias, nil
}
-20
View File
@@ -180,26 +180,6 @@ func UnmarshalSDP(rawSDP []byte) ([]*Media, error) {
return medias, nil
}
func UnmarshalRTSPSDP(rawSDP []byte) ([]*Media, error) {
medias, err := UnmarshalSDP(rawSDP)
if err != nil {
return nil, err
}
// fix bug in ONVIF spec
// https://www.onvif.org/specs/stream/ONVIF-Streaming-Spec-v241.pdf
for _, media := range medias {
switch media.Direction {
case DirectionRecvonly, "":
media.Direction = DirectionSendonly
case DirectionSendonly:
media.Direction = DirectionRecvonly
}
}
return medias, nil
}
func MarshalSDP(medias []*Media) ([]byte, error) {
sd := &sdp.SessionDescription{}
+6
View File
@@ -80,6 +80,12 @@ func (a *Auth) Write(req *Request) {
}
}
func (a *Auth) Reset() {
if a.Method == AuthDigest {
a.Method = AuthUnknown
}
}
func Between(s, sub1, sub2 string) string {
i := strings.Index(s, sub1)
if i < 0 {
+4 -1
View File
@@ -47,10 +47,13 @@ func ReadResponse(r *bufio.Reader) (*Response, error) {
if err != nil {
return nil, err
}
if line == "" {
return nil, errors.New("empty response on RTSP request")
}
ss := strings.SplitN(line, " ", 3)
if len(ss) != 3 {
return nil, errors.New("malformed response")
return nil, fmt.Errorf("malformed response: %s", line)
}
res := &Response{
+4
View File
@@ -0,0 +1,4 @@
@SET GOOS=windows
@SET GOARCH=amd64
cd ..
go build -ldflags "-s -w" -trimpath && upx-3.96 go2rtc.exe