diff --git a/README.md b/README.md index cb3d6c8b..5b3754c9 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ Ultimate camera streaming application with support for RTSP, WebRTC, HomeKit, FF * [Incoming sources](#incoming-sources) * [Stream to camera](#stream-to-camera) * [Publish stream](#publish-stream) + * [Preload stream](#preload-stream) * [Module: API](#module-api) * [Module: RTSP](#module-rtsp) * [Module: RTMP](#module-rtmp) @@ -835,6 +836,26 @@ streams: - **Telegram Desktop App** > Any public or private channel or group (where you admin) > Live stream > Start with... > Start streaming. - **YouTube** > Create > Go live > Stream latency: Ultra low-latency > Copy: Stream URL + Stream key. +### Preload stream + +You can preload any stream on go2rtc start. This is useful for cameras that take a long time to start up. + +```yaml +preload: + camera1: # default: video&audio = ANY + camera2: "video" # preload only video track + camera3: "video=h264&audio=opus" # preload H264 video and OPUS audio + +streams: + camera1: + - rtsp://192.168.1.100/stream + camera2: + - rtsp://192.168.1.101/stream + camera3: + - rtsp://192.168.1.102/h265stream + - ffmpeg:camera3#video=h264#audio=opus#hardware +``` + ### Module: API The HTTP API is the main part for interacting with the application. Default address: `http://localhost:1984/`. diff --git a/api/openapi.yaml b/api/openapi.yaml index 618acb48..a2d66a87 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -237,6 +237,54 @@ paths: + /api/preload: + put: + summary: Preload new stream + tags: [ Streams list ] + parameters: + - name: src + in: query + description: Stream source (name) + required: true + schema: { type: string } + example: "camera1" + - name: video + in: query + description: Video codecs filter + required: false + schema: { type: string } + example: all,h264,h265,... + - name: audio + in: query + description: Audio codecs filter + required: false + schema: { type: string } + example: all,aac,opus,... + - name: microphone + in: query + description: Microphone codecs filter + required: false + schema: { type: string } + example: all,aac,opus,... + responses: + default: + description: Default response + delete: + summary: Delete preloaded stream + tags: [ Streams list ] + parameters: + - name: src + in: query + description: Stream source (name) + required: true + schema: { type: string } + example: "camera1" + responses: + default: + description: Default response + + + /api/streams?src={src}: get: summary: Get stream info in JSON format diff --git a/internal/debug/stack.go b/internal/debug/stack.go index f8d62772..6bc735ad 100644 --- a/internal/debug/stack.go +++ b/internal/debug/stack.go @@ -29,8 +29,8 @@ var stackSkip = [][]byte{ []byte("created by github.com/AlexxIT/go2rtc/internal/homekit.Init"), // webrtc/api.go - []byte("created by github.com/pion/ice/v2.NewTCPMuxDefault"), - []byte("created by github.com/pion/ice/v2.NewUDPMuxDefault"), + []byte("created by github.com/pion/ice/v4.NewTCPMuxDefault"), + []byte("created by github.com/pion/ice/v4.NewUDPMuxDefault"), } func stackHandler(w http.ResponseWriter, r *http.Request) { diff --git a/internal/streams/api.go b/internal/streams/api.go index 189178b6..28f09708 100644 --- a/internal/streams/api.go +++ b/internal/streams/api.go @@ -5,6 +5,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/app" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/creds" "github.com/AlexxIT/go2rtc/pkg/probe" ) @@ -30,7 +31,7 @@ func apiStreams(w http.ResponseWriter, r *http.Request) { return } - cons := probe.NewProbe(query) + cons := probe.Create("probe", query) if len(cons.Medias) != 0 { cons.WithRequest(r) if err := stream.AddConsumer(cons); err != nil { @@ -127,3 +128,51 @@ func apiStreamsDOT(w http.ResponseWriter, r *http.Request) { api.Response(w, dot, "text/vnd.graphviz") } + +func apiPreload(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + src := query.Get("src") + + // check if stream exists + stream := Get(src) + if stream == nil { + http.Error(w, "", http.StatusNotFound) + return + } + + switch r.Method { + case "PUT": + // it's safe to delete from map while iterating + for k := range query { + switch k { + case core.KindVideo, core.KindAudio, "microphone": + default: + delete(query, k) + } + } + + rawQuery := query.Encode() + + if err := AddPreload(stream, rawQuery); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := app.PatchConfig([]string{"preload", src}, rawQuery); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + case "DELETE": + if err := DelPreload(stream); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := app.PatchConfig([]string{"preload", src}, nil); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + default: + http.Error(w, "", http.StatusMethodNotAllowed) + } +} diff --git a/internal/streams/preload.go b/internal/streams/preload.go new file mode 100644 index 00000000..527746ac --- /dev/null +++ b/internal/streams/preload.go @@ -0,0 +1,58 @@ +package streams + +import ( + "errors" + "net/url" + "sync" + + "github.com/AlexxIT/go2rtc/pkg/probe" +) + +var preloads = map[*Stream]*probe.Probe{} +var preloadsMu sync.Mutex + +func Preload(stream *Stream, rawQuery string) { + if err := AddPreload(stream, rawQuery); err != nil { + log.Error().Err(err).Caller().Send() + } +} + +func AddPreload(stream *Stream, rawQuery string) error { + if rawQuery == "" { + rawQuery = "video&audio" + } + + query, err := url.ParseQuery(rawQuery) + if err != nil { + return err + } + + preloadsMu.Lock() + defer preloadsMu.Unlock() + + if cons := preloads[stream]; cons != nil { + stream.RemoveConsumer(cons) + } + + cons := probe.Create("preload", query) + + if err = stream.AddConsumer(cons); err != nil { + return err + } + + preloads[stream] = cons + return nil +} + +func DelPreload(stream *Stream) error { + preloadsMu.Lock() + defer preloadsMu.Unlock() + + if cons := preloads[stream]; cons != nil { + stream.RemoveConsumer(cons) + delete(preloads, stream) + return nil + } + + return errors.New("streams: preload not found") +} diff --git a/internal/streams/streams.go b/internal/streams/streams.go index dcbaba28..a0b1ed68 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -14,8 +14,9 @@ import ( func Init() { var cfg struct { - Streams map[string]any `yaml:"streams"` - Publish map[string]any `yaml:"publish"` + Streams map[string]any `yaml:"streams"` + Publish map[string]any `yaml:"publish"` + Preload map[string]string `yaml:"preload"` } app.LoadConfig(&cfg) @@ -28,17 +29,24 @@ func Init() { api.HandleFunc("api/streams", apiStreams) api.HandleFunc("api/streams.dot", apiStreamsDOT) + api.HandleFunc("api/preload", apiPreload) - if cfg.Publish == nil { + if cfg.Publish == nil && cfg.Preload == nil { return } time.AfterFunc(time.Second, func() { + // range for nil map is OK for name, dst := range cfg.Publish { if stream := Get(name); stream != nil { Publish(stream, dst) } } + for name, rawQuery := range cfg.Preload { + if stream := Get(name); stream != nil { + Preload(stream, rawQuery) + } + } }) } diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 82379383..4d252228 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -88,6 +88,8 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece } func (c *Client) Start() (err error) { - _, err = c.conn.Read(nil) + // just block until c.conn closed + b := make([]byte, 1) + _, err = c.conn.Read(b) return } diff --git a/pkg/probe/producer.go b/pkg/probe/consumer.go similarity index 63% rename from pkg/probe/producer.go rename to pkg/probe/consumer.go index 1fbd3efb..a1ca7ca5 100644 --- a/pkg/probe/producer.go +++ b/pkg/probe/consumer.go @@ -11,7 +11,7 @@ type Probe struct { core.Connection } -func NewProbe(query url.Values) *Probe { +func Create(name string, query url.Values) *Probe { medias := core.ParseQuery(query) for _, value := range query["microphone"] { @@ -32,39 +32,22 @@ func NewProbe(query url.Values) *Probe { return &Probe{ Connection: core.Connection{ ID: core.NewID(), - FormatName: "probe", + FormatName: name, Medias: medias, }, } } -func (p *Probe) GetMedias() []*core.Media { - return p.Medias -} - func (p *Probe) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { sender := core.NewSender(media, track.Codec) - sender.Bind(track) + sender.Handler = func(pkt *core.Packet) { + p.Send += len(pkt.Payload) + } + sender.HandleRTP(track) p.Senders = append(p.Senders, sender) return nil } -func (p *Probe) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - receiver := core.NewReceiver(media, codec) - p.Receivers = append(p.Receivers, receiver) - return receiver, nil -} - func (p *Probe) Start() error { return nil } - -func (p *Probe) Stop() error { - for _, receiver := range p.Receivers { - receiver.Close() - } - for _, sender := range p.Senders { - sender.Close() - } - return nil -}