Compare commits

...

12 Commits

Author SHA1 Message Date
Alexey Khit 89eb653d67 Update version to 0.1-rc.7 2023-01-08 23:18:52 +03:00
Alexey Khit 0e49ffdfff Fix GetMedias for producer in reconnect state 2023-01-08 21:42:13 +03:00
Alexey Khit bd2fc1252d Update last error for reconnect stream 2023-01-08 21:36:28 +03:00
Alexey Khit 78ac88448c Fix close problem ivideon client 2023-01-08 21:35:45 +03:00
Alexey Khit 4cd9757e53 Fix status info in JS player 2023-01-08 21:05:50 +03:00
Alexey Khit f9cb6fd670 Fix wrong RTSP H264 profile for some cameras 2023-01-08 21:05:17 +03:00
Alexey Khit 57fa6a5530 Add support for simultaneous requests from different consumers 2023-01-08 20:31:00 +03:00
Alexey Khit 6906b56524 Fix double start for RTSP source 2023-01-08 20:01:38 +03:00
Alexey Khit c9b0806c84 Add producer url to logs 2023-01-08 20:00:48 +03:00
Alexey Khit a9d1e64f88 Fix STUN candidate in IPv6 format 2023-01-08 15:45:11 +03:00
Alex X 9e9f07f3f7 Merge pull request #150 from skrashevich/dockerfile-crossbuild
Speedup container building using Golang cross-building
2023-01-06 14:06:50 +03:00
Sergey Krashevich b51aabd3d9 Update Dockerfile 2023-01-06 11:52:09 +03:00
10 changed files with 122 additions and 44 deletions
+11 -4
View File
@@ -1,24 +1,31 @@
# syntax=docker/dockerfile:labs
# 0. Prepare images # 0. Prepare images
ARG PYTHON_VERSION="3.11" ARG PYTHON_VERSION="3.11"
ARG GO_VERSION="1.19" ARG GO_VERSION="1.19"
ARG NGROK_VERSION="3" ARG NGROK_VERSION="3"
FROM python:${PYTHON_VERSION}-alpine AS base FROM python:${PYTHON_VERSION}-alpine AS base
FROM golang:${GO_VERSION}-alpine AS go
FROM ngrok/ngrok:${NGROK_VERSION}-alpine AS ngrok FROM ngrok/ngrok:${NGROK_VERSION}-alpine AS ngrok
# 1. Build go2rtc binary # 1. Build go2rtc binary
FROM go AS build FROM --platform=$BUILDPLATFORM golang:${GO_VERSION}-alpine AS build
ARG TARGETPLATFORM
ARG TARGETOS
ARG TARGETARCH
ENV GOOS=${TARGETOS}
ENV GOARCH=${TARGETARCH}
WORKDIR /build WORKDIR /build
# Cache dependencies # Cache dependencies
COPY go.mod go.sum ./ COPY go.mod go.sum ./
RUN go mod download RUN --mount=type=cache,target=/root/.cache/go-build go mod download
COPY . . COPY . .
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath RUN --mount=type=cache,target=/root/.cache/go-build CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath
# 2. Collect all files # 2. Collect all files
+1 -1
View File
@@ -10,7 +10,7 @@ import (
"runtime" "runtime"
) )
var Version = "0.1-rc.6" var Version = "0.1-rc.7"
var UserAgent = "go2rtc/" + Version var UserAgent = "go2rtc/" + Version
func Init() { func Init() {
+11 -7
View File
@@ -48,13 +48,18 @@ func (p *Producer) GetMedias() []*streamer.Media {
p.element, p.lastErr = GetProducer(p.url) p.element, p.lastErr = GetProducer(p.url)
if p.lastErr != nil || p.element == nil { if p.lastErr != nil || p.element == nil {
log.Error().Err(p.lastErr).Caller().Send() log.Error().Err(p.lastErr).Str("url", p.url).Caller().Send()
return nil return nil
} }
p.state = stateMedias p.state = stateMedias
} }
// if element in reconnect state
if p.element == nil {
return nil
}
return p.element.GetMedias() return p.element.GetMedias()
} }
@@ -102,7 +107,7 @@ func (p *Producer) start() {
go func() { go func() {
// safe read element while mu locked // safe read element while mu locked
if err := p.element.Start(); err != nil { if err := p.element.Start(); err != nil {
log.Warn().Err(err).Caller().Send() log.Warn().Err(err).Str("url", p.url).Caller().Send()
} }
p.reconnect() p.reconnect()
}() }()
@@ -119,10 +124,9 @@ func (p *Producer) reconnect() {
log.Debug().Msgf("[streams] reconnect to url=%s", p.url) log.Debug().Msgf("[streams] reconnect to url=%s", p.url)
var err error p.element, p.lastErr = GetProducer(p.url)
p.element, err = GetProducer(p.url) if p.lastErr != nil || p.element == nil {
if err != nil || p.element == nil { log.Debug().Err(p.lastErr).Caller().Send()
log.Debug().Err(err).Caller().Send()
// TODO: dynamic timeout // TODO: dynamic timeout
p.restart = time.AfterFunc(30*time.Second, p.reconnect) p.restart = time.AfterFunc(30*time.Second, p.reconnect)
return return
@@ -149,7 +153,7 @@ func (p *Producer) reconnect() {
} }
go func() { go func() {
if err = p.element.Start(); err != nil { if err := p.element.Start(); err != nil {
log.Debug().Err(err).Caller().Send() log.Debug().Err(err).Caller().Send()
} }
p.reconnect() p.reconnect()
+8 -1
View File
@@ -18,6 +18,7 @@ type Stream struct {
producers []*Producer producers []*Producer
consumers []*Consumer consumers []*Consumer
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup
} }
func NewStream(source interface{}) *Stream { func NewStream(source interface{}) *Stream {
@@ -59,6 +60,9 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
var codecs string var codecs string
// support for multiple simultaneous requests from different consumers
s.wg.Add(1)
// Step 1. Get consumer medias // Step 1. Get consumer medias
for icc, consMedia := range cons.GetMedias() { for icc, consMedia := range cons.GetMedias() {
log.Trace().Stringer("media", consMedia). log.Trace().Stringer("media", consMedia).
@@ -82,7 +86,7 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
// Step 4. Get producer track // Step 4. Get producer track
prodTrack := prod.GetTrack(prodMedia, prodCodec) prodTrack := prod.GetTrack(prodMedia, prodCodec)
if prodTrack == nil { if prodTrack == nil {
log.Warn().Msg("[stream] can't get track") log.Warn().Str("url", prod.url).Msg("[stream] can't get track")
continue continue
} }
@@ -97,6 +101,9 @@ func (s *Stream) AddConsumer(cons streamer.Consumer) (err error) {
} }
} }
s.wg.Done()
s.wg.Wait()
if len(producers) == 0 { if len(producers) == 0 {
s.stopProducers() s.stopProducers()
+11
View File
@@ -3,6 +3,7 @@ package h264
import ( import (
"encoding/base64" "encoding/base64"
"encoding/binary" "encoding/binary"
"fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"strings" "strings"
) )
@@ -51,6 +52,16 @@ func GetProfileLevelID(fmtp string) string {
if fmtp == "" { if fmtp == "" {
return "" return ""
} }
// some cameras has wrong profile-level-id
// https://github.com/AlexxIT/go2rtc/issues/155
if s := streamer.Between(fmtp, "sprop-parameter-sets=", ","); s != "" {
sps, _ := base64.StdEncoding.DecodeString(s)
if len(sps) >= 4 {
return fmt.Sprintf("%06X", sps[1:4])
}
}
return streamer.Between(fmtp, "profile-level-id=", ";") return streamer.Between(fmtp, "profile-level-id=", ";")
} }
+3 -1
View File
@@ -127,7 +127,9 @@ func (c *Client) Close() error {
if c.conn == nil { if c.conn == nil {
return nil return nil
} }
close(c.buffer) if c.buffer != nil {
close(c.buffer)
}
c.closed = true c.closed = true
return c.conn.Close() return c.conn.Close()
} }
+42 -8
View File
@@ -20,6 +20,7 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
) )
@@ -52,6 +53,7 @@ const (
StateConn StateConn
StateSetup StateSetup
StatePlay StatePlay
StateHandle
) )
type Conn struct { type Conn struct {
@@ -72,6 +74,7 @@ type Conn struct {
conn net.Conn conn net.Conn
mode Mode mode Mode
state State state State
stateMu sync.Mutex
reader *bufio.Reader reader *bufio.Reader
sequence int sequence int
uri string uri string
@@ -340,6 +343,9 @@ func (c *Conn) Setup() error {
func (c *Conn) SetupMedia( func (c *Conn) SetupMedia(
media *streamer.Media, codec *streamer.Codec, media *streamer.Media, codec *streamer.Codec,
) (*streamer.Track, error) { ) (*streamer.Track, error) {
c.stateMu.Lock()
defer c.stateMu.Unlock()
ch := c.GetChannel(media) ch := c.GetChannel(media)
if ch < 0 { if ch < 0 {
return nil, fmt.Errorf("wrong media: %v", media) return nil, fmt.Errorf("wrong media: %v", media)
@@ -461,12 +467,19 @@ func (c *Conn) SetupMedia(
} }
func (c *Conn) Play() (err error) { func (c *Conn) Play() (err error) {
c.stateMu.Lock()
defer c.stateMu.Unlock()
if c.state != StateSetup { if c.state != StateSetup {
return fmt.Errorf("RTSP PLAY from wrong state: %s", c.state) return fmt.Errorf("RTSP PLAY from wrong state: %s", c.state)
} }
req := &tcp.Request{Method: MethodPlay, URL: c.URL} req := &tcp.Request{Method: MethodPlay, URL: c.URL}
return c.Request(req) if err = c.Request(req); err == nil {
c.state = StatePlay
}
return
} }
func (c *Conn) Teardown() (err error) { func (c *Conn) Teardown() (err error) {
@@ -476,12 +489,14 @@ func (c *Conn) Teardown() (err error) {
} }
func (c *Conn) Close() error { func (c *Conn) Close() error {
c.stateMu.Lock()
defer c.stateMu.Unlock()
if c.state == StateNone { if c.state == StateNone {
return nil return nil
} }
if err := c.Teardown(); err != nil {
return err _ = c.Teardown()
}
c.state = StateNone c.state = StateNone
return c.conn.Close() return c.conn.Close()
} }
@@ -614,7 +629,10 @@ func (c *Conn) Accept() error {
case MethodRecord, MethodPlay: case MethodRecord, MethodPlay:
res := &tcp.Response{Request: req} res := &tcp.Response{Request: req}
return c.Response(res) if err = c.Response(res); err == nil {
c.state = StatePlay
}
return err
default: default:
return fmt.Errorf("unsupported method: %s", req.Method) return fmt.Errorf("unsupported method: %s", req.Method)
@@ -623,13 +641,29 @@ func (c *Conn) Accept() error {
} }
func (c *Conn) Handle() (err error) { func (c *Conn) Handle() (err error) {
if c.state != StateSetup { c.stateMu.Lock()
return fmt.Errorf("RTSP Handle from wrong state: %d", c.state)
switch c.state {
case StateNone: // Close after PLAY and before Handle is OK (because SETUP after PLAY)
case StatePlay:
c.state = StateHandle
default:
err = fmt.Errorf("RTSP Handle from wrong state: %d", c.state)
c.state = StateNone
_ = c.conn.Close()
} }
c.state = StatePlay c.stateMu.Unlock()
if c.state != StateHandle {
return
}
defer func() { defer func() {
c.stateMu.Lock()
defer c.stateMu.Unlock()
if c.state == StateNone { if c.state == StateNone {
err = nil err = nil
return return
+2 -1
View File
@@ -21,7 +21,8 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
} }
// can't setup new tracks from play state - forcing a reconnection feature // can't setup new tracks from play state - forcing a reconnection feature
if c.state == StatePlay { switch c.state {
case StatePlay, StateHandle:
go c.Close() go c.Close()
return streamer.NewTrack(codec, media.Direction) return streamer.NewTrack(codec, media.Direction)
} }
+5 -3
View File
@@ -1,6 +1,7 @@
package webrtc package webrtc
import ( import (
"errors"
"fmt" "fmt"
"github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/streamer"
"github.com/pion/ice/v2" "github.com/pion/ice/v2"
@@ -13,10 +14,11 @@ import (
) )
func NewCandidate(address string) (string, error) { func NewCandidate(address string) (string, error) {
host, port, err := net.SplitHostPort(address) i := strings.LastIndexByte(address, ':')
if err != nil { if i < 0 {
return "", err return "", errors.New("wrong candidate: " + address)
} }
host, port := address[:i], address[i+1:]
i, err := strconv.Atoi(port) i, err := strconv.Atoi(port)
if err != nil { if err != nil {
+28 -18
View File
@@ -1,19 +1,23 @@
import {VideoRTC} from "./video-rtc.js"; import {VideoRTC} from "./video-rtc.js";
class VideoStream extends VideoRTC { class VideoStream extends VideoRTC {
constructor() { set divMode(value) {
super(); this.querySelector(".mode").innerText = value;
this.querySelector(".status").innerText = "";
}
/** @type {HTMLDivElement} */ set divError(value) {
this.divMode = null; const state = this.querySelector(".mode").innerText;
/** @type {HTMLDivElement} */ if (state !== "loading") return;
this.divStatus = null; this.querySelector(".mode").innerText = "error";
this.querySelector(".status").innerText = value;
} }
/** /**
* Custom GUI * Custom GUI
*/ */
oninit() { oninit() {
console.debug("stream.oninit");
super.oninit(); super.oninit();
this.innerHTML = ` this.innerHTML = `
@@ -36,35 +40,36 @@ class VideoStream extends VideoRTC {
</div> </div>
`; `;
this.divStatus = this.querySelector(".status");
this.divMode = this.querySelector(".mode");
const info = this.querySelector(".info") const info = this.querySelector(".info")
this.insertBefore(this.video, info); this.insertBefore(this.video, info);
} }
onconnect() { onconnect() {
console.debug("stream.onconnect");
const result = super.onconnect(); const result = super.onconnect();
if (result) { if (result) this.divMode = "loading";
this.divMode.innerText = "loading";
}
return result; return result;
} }
ondisconnect() {
console.debug("stream.ondisconnect");
super.ondisconnect();
}
onopen() { onopen() {
console.debug("stream.onopen");
const result = super.onopen(); const result = super.onopen();
this.onmessage["stream"] = msg => { this.onmessage["stream"] = msg => {
console.debug("stream.onmessge", msg);
switch (msg.type) { switch (msg.type) {
case "error": case "error":
this.divMode.innerText = "error"; this.divError = msg.value;
this.divStatus.innerText = msg.value;
break; break;
case "mse": case "mse":
case "mp4": case "mp4":
case "mjpeg": case "mjpeg":
this.divMode.innerText = msg.type.toUpperCase(); this.divMode = msg.type.toUpperCase();
this.divStatus.innerText = "";
break; break;
} }
} }
@@ -72,12 +77,17 @@ class VideoStream extends VideoRTC {
return result; return result;
} }
onclose() {
console.debug("stream.onclose");
return super.onclose();
}
onpcvideo(ev) { onpcvideo(ev) {
console.debug("stream.onpcvideo");
super.onpcvideo(ev); super.onpcvideo(ev);
if (this.pcState !== WebSocket.CLOSED) { if (this.pcState !== WebSocket.CLOSED) {
this.divMode.innerText = "RTC"; this.divMode = "RTC";
this.divStatus.innerText = "";
} }
} }
} }