Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 89eb653d67 | |||
| 0e49ffdfff | |||
| bd2fc1252d | |||
| 78ac88448c | |||
| 4cd9757e53 | |||
| f9cb6fd670 | |||
| 57fa6a5530 | |||
| 6906b56524 | |||
| c9b0806c84 | |||
| a9d1e64f88 | |||
| 9e9f07f3f7 | |||
| b51aabd3d9 |
+11
-4
@@ -1,24 +1,31 @@
|
||||
# syntax=docker/dockerfile:labs
|
||||
|
||||
# 0. Prepare images
|
||||
ARG PYTHON_VERSION="3.11"
|
||||
ARG GO_VERSION="1.19"
|
||||
ARG NGROK_VERSION="3"
|
||||
|
||||
FROM python:${PYTHON_VERSION}-alpine AS base
|
||||
FROM golang:${GO_VERSION}-alpine AS go
|
||||
FROM ngrok/ngrok:${NGROK_VERSION}-alpine AS ngrok
|
||||
|
||||
|
||||
# 1. Build go2rtc binary
|
||||
FROM go AS build
|
||||
FROM --platform=$BUILDPLATFORM golang:${GO_VERSION}-alpine AS build
|
||||
ARG TARGETPLATFORM
|
||||
ARG TARGETOS
|
||||
ARG TARGETARCH
|
||||
|
||||
ENV GOOS=${TARGETOS}
|
||||
ENV GOARCH=${TARGETARCH}
|
||||
|
||||
WORKDIR /build
|
||||
|
||||
# Cache dependencies
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
RUN --mount=type=cache,target=/root/.cache/go-build go mod download
|
||||
|
||||
COPY . .
|
||||
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath
|
||||
RUN --mount=type=cache,target=/root/.cache/go-build CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath
|
||||
|
||||
|
||||
# 2. Collect all files
|
||||
|
||||
+1
-1
@@ -10,7 +10,7 @@ import (
|
||||
"runtime"
|
||||
)
|
||||
|
||||
var Version = "0.1-rc.6"
|
||||
var Version = "0.1-rc.7"
|
||||
var UserAgent = "go2rtc/" + Version
|
||||
|
||||
func Init() {
|
||||
|
||||
+11
-7
@@ -48,13 +48,18 @@ func (p *Producer) GetMedias() []*streamer.Media {
|
||||
|
||||
p.element, p.lastErr = GetProducer(p.url)
|
||||
if p.lastErr != nil || p.element == nil {
|
||||
log.Error().Err(p.lastErr).Caller().Send()
|
||||
log.Error().Err(p.lastErr).Str("url", p.url).Caller().Send()
|
||||
return nil
|
||||
}
|
||||
|
||||
p.state = stateMedias
|
||||
}
|
||||
|
||||
// if element in reconnect state
|
||||
if p.element == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return p.element.GetMedias()
|
||||
}
|
||||
|
||||
@@ -102,7 +107,7 @@ func (p *Producer) start() {
|
||||
go func() {
|
||||
// safe read element while mu locked
|
||||
if err := p.element.Start(); err != nil {
|
||||
log.Warn().Err(err).Caller().Send()
|
||||
log.Warn().Err(err).Str("url", p.url).Caller().Send()
|
||||
}
|
||||
p.reconnect()
|
||||
}()
|
||||
@@ -119,10 +124,9 @@ func (p *Producer) reconnect() {
|
||||
|
||||
log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
|
||||
|
||||
var err error
|
||||
p.element, err = GetProducer(p.url)
|
||||
if err != nil || p.element == nil {
|
||||
log.Debug().Err(err).Caller().Send()
|
||||
p.element, p.lastErr = GetProducer(p.url)
|
||||
if p.lastErr != nil || p.element == nil {
|
||||
log.Debug().Err(p.lastErr).Caller().Send()
|
||||
// TODO: dynamic timeout
|
||||
p.restart = time.AfterFunc(30*time.Second, p.reconnect)
|
||||
return
|
||||
@@ -149,7 +153,7 @@ func (p *Producer) reconnect() {
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err = p.element.Start(); err != nil {
|
||||
if err := p.element.Start(); err != nil {
|
||||
log.Debug().Err(err).Caller().Send()
|
||||
}
|
||||
p.reconnect()
|
||||
|
||||
@@ -18,6 +18,7 @@ type Stream struct {
|
||||
producers []*Producer
|
||||
consumers []*Consumer
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewStream(source interface{}) *Stream {
|
||||
@@ -59,6 +60,9 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||
|
||||
var codecs string
|
||||
|
||||
// support for multiple simultaneous requests from different consumers
|
||||
s.wg.Add(1)
|
||||
|
||||
// Step 1. Get consumer medias
|
||||
for icc, consMedia := range cons.GetMedias() {
|
||||
log.Trace().Stringer("media", consMedia).
|
||||
@@ -82,7 +86,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||
// Step 4. Get producer track
|
||||
prodTrack := prod.GetTrack(prodMedia, prodCodec)
|
||||
if prodTrack == nil {
|
||||
log.Warn().Msg("[stream] can't get track")
|
||||
log.Warn().Str("url", prod.url).Msg("[stream] can't get track")
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -97,6 +101,9 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
s.wg.Done()
|
||||
s.wg.Wait()
|
||||
|
||||
if len(producers) == 0 {
|
||||
s.stopProducers()
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package h264
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"strings"
|
||||
)
|
||||
@@ -51,6 +52,16 @@ func GetProfileLevelID(fmtp string) string {
|
||||
if fmtp == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
// some cameras has wrong profile-level-id
|
||||
// https://github.com/AlexxIT/go2rtc/issues/155
|
||||
if s := streamer.Between(fmtp, "sprop-parameter-sets=", ","); s != "" {
|
||||
sps, _ := base64.StdEncoding.DecodeString(s)
|
||||
if len(sps) >= 4 {
|
||||
return fmt.Sprintf("%06X", sps[1:4])
|
||||
}
|
||||
}
|
||||
|
||||
return streamer.Between(fmtp, "profile-level-id=", ";")
|
||||
}
|
||||
|
||||
|
||||
@@ -127,7 +127,9 @@ func (c *Client) Close() error {
|
||||
if c.conn == nil {
|
||||
return nil
|
||||
}
|
||||
close(c.buffer)
|
||||
if c.buffer != nil {
|
||||
close(c.buffer)
|
||||
}
|
||||
c.closed = true
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
+42
-8
@@ -20,6 +20,7 @@ import (
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -52,6 +53,7 @@ const (
|
||||
StateConn
|
||||
StateSetup
|
||||
StatePlay
|
||||
StateHandle
|
||||
)
|
||||
|
||||
type Conn struct {
|
||||
@@ -72,6 +74,7 @@ type Conn struct {
|
||||
conn net.Conn
|
||||
mode Mode
|
||||
state State
|
||||
stateMu sync.Mutex
|
||||
reader *bufio.Reader
|
||||
sequence int
|
||||
uri string
|
||||
@@ -340,6 +343,9 @@ func (c *Conn) Setup() error {
|
||||
func (c *Conn) SetupMedia(
|
||||
media *streamer.Media, codec *streamer.Codec,
|
||||
) (*streamer.Track, error) {
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
|
||||
ch := c.GetChannel(media)
|
||||
if ch < 0 {
|
||||
return nil, fmt.Errorf("wrong media: %v", media)
|
||||
@@ -461,12 +467,19 @@ func (c *Conn) SetupMedia(
|
||||
}
|
||||
|
||||
func (c *Conn) Play() (err error) {
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
|
||||
if c.state != StateSetup {
|
||||
return fmt.Errorf("RTSP PLAY from wrong state: %s", c.state)
|
||||
}
|
||||
|
||||
req := &tcp.Request{Method: MethodPlay, URL: c.URL}
|
||||
return c.Request(req)
|
||||
if err = c.Request(req); err == nil {
|
||||
c.state = StatePlay
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Conn) Teardown() (err error) {
|
||||
@@ -476,12 +489,14 @@ func (c *Conn) Teardown() (err error) {
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
|
||||
if c.state == StateNone {
|
||||
return nil
|
||||
}
|
||||
if err := c.Teardown(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_ = c.Teardown()
|
||||
c.state = StateNone
|
||||
return c.conn.Close()
|
||||
}
|
||||
@@ -614,7 +629,10 @@ func (c *Conn) Accept() error {
|
||||
|
||||
case MethodRecord, MethodPlay:
|
||||
res := &tcp.Response{Request: req}
|
||||
return c.Response(res)
|
||||
if err = c.Response(res); err == nil {
|
||||
c.state = StatePlay
|
||||
}
|
||||
return err
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unsupported method: %s", req.Method)
|
||||
@@ -623,13 +641,29 @@ func (c *Conn) Accept() error {
|
||||
}
|
||||
|
||||
func (c *Conn) Handle() (err error) {
|
||||
if c.state != StateSetup {
|
||||
return fmt.Errorf("RTSP Handle from wrong state: %d", c.state)
|
||||
c.stateMu.Lock()
|
||||
|
||||
switch c.state {
|
||||
case StateNone: // Close after PLAY and before Handle is OK (because SETUP after PLAY)
|
||||
case StatePlay:
|
||||
c.state = StateHandle
|
||||
default:
|
||||
err = fmt.Errorf("RTSP Handle from wrong state: %d", c.state)
|
||||
|
||||
c.state = StateNone
|
||||
_ = c.conn.Close()
|
||||
}
|
||||
|
||||
c.state = StatePlay
|
||||
c.stateMu.Unlock()
|
||||
|
||||
if c.state != StateHandle {
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
c.stateMu.Lock()
|
||||
defer c.stateMu.Unlock()
|
||||
|
||||
if c.state == StateNone {
|
||||
err = nil
|
||||
return
|
||||
|
||||
@@ -21,7 +21,8 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
|
||||
}
|
||||
|
||||
// can't setup new tracks from play state - forcing a reconnection feature
|
||||
if c.state == StatePlay {
|
||||
switch c.state {
|
||||
case StatePlay, StateHandle:
|
||||
go c.Close()
|
||||
return streamer.NewTrack(codec, media.Direction)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package webrtc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/AlexxIT/go2rtc/pkg/streamer"
|
||||
"github.com/pion/ice/v2"
|
||||
@@ -13,10 +14,11 @@ import (
|
||||
)
|
||||
|
||||
func NewCandidate(address string) (string, error) {
|
||||
host, port, err := net.SplitHostPort(address)
|
||||
if err != nil {
|
||||
return "", err
|
||||
i := strings.LastIndexByte(address, ':')
|
||||
if i < 0 {
|
||||
return "", errors.New("wrong candidate: " + address)
|
||||
}
|
||||
host, port := address[:i], address[i+1:]
|
||||
|
||||
i, err := strconv.Atoi(port)
|
||||
if err != nil {
|
||||
|
||||
+28
-18
@@ -1,19 +1,23 @@
|
||||
import {VideoRTC} from "./video-rtc.js";
|
||||
|
||||
class VideoStream extends VideoRTC {
|
||||
constructor() {
|
||||
super();
|
||||
set divMode(value) {
|
||||
this.querySelector(".mode").innerText = value;
|
||||
this.querySelector(".status").innerText = "";
|
||||
}
|
||||
|
||||
/** @type {HTMLDivElement} */
|
||||
this.divMode = null;
|
||||
/** @type {HTMLDivElement} */
|
||||
this.divStatus = null;
|
||||
set divError(value) {
|
||||
const state = this.querySelector(".mode").innerText;
|
||||
if (state !== "loading") return;
|
||||
this.querySelector(".mode").innerText = "error";
|
||||
this.querySelector(".status").innerText = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom GUI
|
||||
*/
|
||||
oninit() {
|
||||
console.debug("stream.oninit");
|
||||
super.oninit();
|
||||
|
||||
this.innerHTML = `
|
||||
@@ -36,35 +40,36 @@ class VideoStream extends VideoRTC {
|
||||
</div>
|
||||
`;
|
||||
|
||||
this.divStatus = this.querySelector(".status");
|
||||
this.divMode = this.querySelector(".mode");
|
||||
|
||||
const info = this.querySelector(".info")
|
||||
this.insertBefore(this.video, info);
|
||||
}
|
||||
|
||||
onconnect() {
|
||||
console.debug("stream.onconnect");
|
||||
const result = super.onconnect();
|
||||
if (result) {
|
||||
this.divMode.innerText = "loading";
|
||||
}
|
||||
if (result) this.divMode = "loading";
|
||||
return result;
|
||||
}
|
||||
|
||||
ondisconnect() {
|
||||
console.debug("stream.ondisconnect");
|
||||
super.ondisconnect();
|
||||
}
|
||||
|
||||
onopen() {
|
||||
console.debug("stream.onopen");
|
||||
const result = super.onopen();
|
||||
|
||||
this.onmessage["stream"] = msg => {
|
||||
console.debug("stream.onmessge", msg);
|
||||
switch (msg.type) {
|
||||
case "error":
|
||||
this.divMode.innerText = "error";
|
||||
this.divStatus.innerText = msg.value;
|
||||
this.divError = msg.value;
|
||||
break;
|
||||
case "mse":
|
||||
case "mp4":
|
||||
case "mjpeg":
|
||||
this.divMode.innerText = msg.type.toUpperCase();
|
||||
this.divStatus.innerText = "";
|
||||
this.divMode = msg.type.toUpperCase();
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -72,12 +77,17 @@ class VideoStream extends VideoRTC {
|
||||
return result;
|
||||
}
|
||||
|
||||
onclose() {
|
||||
console.debug("stream.onclose");
|
||||
return super.onclose();
|
||||
}
|
||||
|
||||
onpcvideo(ev) {
|
||||
console.debug("stream.onpcvideo");
|
||||
super.onpcvideo(ev);
|
||||
|
||||
if (this.pcState !== WebSocket.CLOSED) {
|
||||
this.divMode.innerText = "RTC";
|
||||
this.divStatus.innerText = "";
|
||||
this.divMode = "RTC";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user