From 859cd1cbe63cf7acb445ce27bde47b5d72a748a6 Mon Sep 17 00:00:00 2001 From: seydx Date: Sun, 1 Jun 2025 01:44:01 +0300 Subject: [PATCH 1/4] support rtsp udp transport --- README.md | 2 + pkg/rtsp/client.go | 289 +++++++++++++++++++++++++++++--- pkg/rtsp/conn.go | 391 ++++++++++++++++++++++++++++++------------- pkg/rtsp/consumer.go | 19 ++- pkg/rtsp/ports.go | 75 +++++++++ 5 files changed, 626 insertions(+), 150 deletions(-) create mode 100644 pkg/rtsp/ports.go diff --git a/README.md b/README.md index 90a2537f..d1461206 100644 --- a/README.md +++ b/README.md @@ -259,6 +259,7 @@ Format: `rtsp...#{param1}#{param2}#{param3}` - Ignore audio - `#media=video` or ignore video - `#media=audio` - Ignore two way audio API `#backchannel=0` - important for some glitchy cameras - Use WebSocket transport `#transport=ws...` +- Use UDP transport `#transport=udp` **RTSP over WebSocket** @@ -268,6 +269,7 @@ streams: axis-rtsp-ws: rtsp://192.168.1.123:4567/axis-media/media.amp?overview=0&camera=1&resolution=1280x720&videoframeskipmode=empty&Axis-Orig-Sw=true#transport=ws://user:pass@192.168.1.123:4567/rtsp-over-websocket # WebSocket without authorization, RTSP - with dahua-rtsp-ws: rtsp://user:pass@192.168.1.123/cam/realmonitor?channel=1&subtype=1&proto=Private3#transport=ws://192.168.1.123/rtspoverwebsocket + udp_camera: rtsp://user:pass@192.168.1.345:554/stream1#transport=udp ``` #### Source: RTMP diff --git a/pkg/rtsp/client.go b/pkg/rtsp/client.go index 7fc134fc..ef5ebbfe 100644 --- a/pkg/rtsp/client.go +++ b/pkg/rtsp/client.go @@ -2,6 +2,7 @@ package rtsp import ( "bufio" + "encoding/binary" "errors" "fmt" "net" @@ -25,7 +26,13 @@ func NewClient(uri string) *Conn { ID: core.NewID(), FormatName: "rtsp", }, - uri: uri, + uri: uri, + udpRtpConns: make(map[byte]*UDPConnection), + udpRtcpConns: make(map[byte]*UDPConnection), + udpRtpListeners: make(map[byte]*UDPConnection), + udpRtcpListeners: make(map[byte]*UDPConnection), + portToChannel: make(map[int]byte), + channelCounter: 0, } } @@ -36,13 +43,20 @@ func (c *Conn) Dial() (err error) { var conn net.Conn - if c.Transport == "" { + if c.Transport == "" || c.Transport == "tcp" || c.Transport == "udp" { timeout := core.ConnDialTimeout if c.Timeout != 0 { timeout = time.Second * time.Duration(c.Timeout) } conn, err = tcp.Dial(c.URL, timeout) - c.Protocol = "rtsp+tcp" + + if c.Transport != "udp" { + c.Protocol = "rtsp+tcp" + c.transportMode = TransportTCP + } else { + c.Protocol = "rtsp+udp" + c.transportMode = TransportUDP + } } else { conn, err = websocket.Dial(c.Transport) c.Protocol = "ws" @@ -217,23 +231,64 @@ func (c *Conn) Record() (err error) { func (c *Conn) SetupMedia(media *core.Media) (byte, error) { var transport string + var mediaIndex int = -1 // try to use media position as channel number for i, m := range c.Medias { if m.Equal(media) { - transport = fmt.Sprintf( - // i - RTP (data channel) - // i+1 - RTCP (control channel) - "RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1, - ) + mediaIndex = i break } } - if transport == "" { + if mediaIndex == -1 { return 0, fmt.Errorf("wrong media: %v", media) } + if c.transportMode == TransportUDP { + transport, err := c.setupUDPTransport() + if err == nil { + return c.sendSetupRequest(media, transport) + } + // Fall back to TCP if UDP fails + c.closeUDP() + c.transportMode = TransportTCP + } + + transport = c.setupTCPTransport(mediaIndex) + return c.sendSetupRequest(media, transport) +} + +func (c *Conn) setupTCPTransport(mediaIndex int) string { + channel := byte(mediaIndex * 2) + transport := fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", channel, channel+1) + return transport +} + +func (c *Conn) setupUDPTransport() (string, error) { + portPair, err := GetUDPPorts(nil, 10) + if err != nil { + return "", err + } + + rtpChannel := c.getChannelForPort(portPair.RTPPort) + rtcpChannel := c.getChannelForPort(portPair.RTCPPort) + + c.udpRtpListeners[rtpChannel] = &UDPConnection{ + Conn: *portPair.RTPListener, + Channel: rtpChannel, + } + + c.udpRtcpListeners[rtcpChannel] = &UDPConnection{ + Conn: *portPair.RTCPListener, + Channel: rtcpChannel, + } + + transport := fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", portPair.RTPPort, portPair.RTCPPort) + return transport, nil +} + +func (c *Conn) sendSetupRequest(media *core.Media, transport string) (byte, error) { rawURL := media.ID // control if !strings.Contains(rawURL, "://") { rawURL = c.URL.String() @@ -286,27 +341,114 @@ func (c *Conn) SetupMedia(media *core.Media) (byte, error) { } } - // we send our `interleaved`, but camera can answer with another + // Parse server response + responseTransport := res.Header.Get("Transport") - // Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7 - // Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0 - // Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1 - transport = res.Header.Get("Transport") - if !strings.HasPrefix(transport, "RTP/AVP/TCP;") { - // Escam Q6 has a bug: - // Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1 - if !strings.Contains(transport, ";interleaved=") { - return 0, fmt.Errorf("wrong transport: %s", transport) + if c.transportMode == TransportUDP { + // Parse UDP response: client_ports=1234-1235;server_port=1234-1235 + var clientPorts []int + var serverPorts []int + + if strings.Contains(transport, "client_port=") { + parts := strings.Split(responseTransport, "client_port=") + if len(parts) > 1 { + portPart := strings.Split(strings.Split(parts[1], ";")[0], "-") + for _, p := range portPart { + if port, err := strconv.Atoi(p); err == nil { + clientPorts = append(clientPorts, port) + } + } + } } - } - channel := core.Between(transport, "interleaved=", "-") - i, err := strconv.Atoi(channel) - if err != nil { - return 0, err - } + if strings.Contains(responseTransport, "server_port=") { + parts := strings.Split(responseTransport, "server_port=") + if len(parts) > 1 { + portPart := strings.Split(strings.Split(parts[1], ";")[0], "-") + for _, p := range portPart { + if port, err := strconv.Atoi(p); err == nil { + serverPorts = append(serverPorts, port) + } + } + } + } - return byte(i), nil + // Create UDP connections for RTP and RTCP if we have both server ports + if len(serverPorts) >= 2 { + if host, _, err := net.SplitHostPort(c.Connection.RemoteAddr); err == nil { + rtpServerPort := serverPorts[0] + rtcpServerPort := serverPorts[1] + + cleanHost := host + if strings.Contains(cleanHost, ":") { + cleanHost = fmt.Sprintf("[%s]", host) + } + + remoteRtpAddr := fmt.Sprintf("%s:%d", cleanHost, rtpServerPort) + remoteRtcpAddr := fmt.Sprintf("%s:%d", cleanHost, rtcpServerPort) + + if rtpAddr, err := net.ResolveUDPAddr("udp", remoteRtpAddr); err == nil { + if rtpConn, err := net.DialUDP("udp", nil, rtpAddr); err == nil { + channel := c.getChannelForPort(rtpServerPort) + c.udpRtpConns[channel] = &UDPConnection{ + Conn: *rtpConn, + Channel: channel, + } + } + } + + if rtcpAddr, err := net.ResolveUDPAddr("udp", remoteRtcpAddr); err == nil { + if rtcpConn, err := net.DialUDP("udp", nil, rtcpAddr); err == nil { + channel := c.getChannelForPort(rtcpServerPort) + c.udpRtcpConns[channel] = &UDPConnection{ + Conn: *rtcpConn, + Channel: channel, + } + } + } + } + } + + // Try to open a hole in the NAT router (to allow incoming UDP packets) + // by send a UDP packet for RTP and RTCP to the remote RTSP server. + go c.tryHolePunching(clientPorts, serverPorts) + + var rtpPort string + if media.Direction == core.DirectionRecvonly { + rtpPort = core.Between(transport, "client_port=", "-") + } else { + rtpPort = core.Between(responseTransport, "server_port=", "-") + } + + i, err := strconv.Atoi(rtpPort) + if err != nil { + return 0, err + } + + return c.getChannelForPort(i), nil + + } else { + // we send our `interleaved`, but camera can answer with another + + // Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7 + // Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0 + // Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1 + if !strings.HasPrefix(responseTransport, "RTP/AVP/TCP;") { + // Escam Q6 has a bug: + // Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1 + if !strings.Contains(responseTransport, ";interleaved=") { + return 0, fmt.Errorf("wrong transport: %s", responseTransport) + } + } + + channel := core.Between(responseTransport, "interleaved=", "-") + i, err := strconv.Atoi(channel) + if err != nil { + return 0, err + } + + return byte(i), nil + } } func (c *Conn) Play() (err error) { @@ -321,11 +463,106 @@ func (c *Conn) Teardown() (err error) { } func (c *Conn) Close() error { + c.closeUDP() + if c.mode == core.ModeActiveProducer { _ = c.Teardown() } + if c.OnClose != nil { _ = c.OnClose() } + return c.conn.Close() } + +func (c *Conn) closeUDP() { + for _, listener := range c.udpRtpListeners { + _ = listener.Conn.Close() + } + for _, listener := range c.udpRtcpListeners { + _ = listener.Conn.Close() + } + for _, conn := range c.udpRtpConns { + _ = conn.Conn.Close() + } + for _, conn := range c.udpRtcpConns { + _ = conn.Conn.Close() + } + + c.udpRtpListeners = make(map[byte]*UDPConnection) + c.udpRtcpListeners = make(map[byte]*UDPConnection) + c.udpRtpConns = make(map[byte]*UDPConnection) + c.udpRtcpConns = make(map[byte]*UDPConnection) + c.portToChannel = make(map[int]byte) + c.channelCounter = 0 +} + +func (c *Conn) sendUDPRtpPacket(data []byte) error { + for len(data) >= 4 && data[0] == '$' { + channel := data[1] + size := binary.BigEndian.Uint16(data[2:4]) + + if len(data) < 4+int(size) { + return fmt.Errorf("incomplete RTP packet: %d < %d", len(data), 4+size) + } + + // Send RTP data without interleaved header + rtpData := data[4 : 4+size] + + if conn, ok := c.udpRtpConns[channel]; ok { + if err := conn.Conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { + return nil + } + + if _, err := conn.Conn.Write(rtpData); err != nil { + return err + } + } + + data = data[4+size:] // Move to next packet + } + + return nil +} + +func (c *Conn) tryHolePunching(clientPorts, serverPorts []int) { + if len(clientPorts) < 2 || len(serverPorts) < 2 { + return + } + + host, _, _ := net.SplitHostPort(c.Connection.RemoteAddr) + if strings.Contains(host, ":") { + host = fmt.Sprintf("[%s]", host) + } + + // RTP hole punch + if rtpListener, ok := c.udpRtpListeners[c.getChannelForPort(clientPorts[0])]; ok { + if addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, serverPorts[0])); err == nil { + rtpListener.Conn.WriteToUDP([]byte{0x80, 0x00, 0x00, 0x00}, addr) + } + } + + // RTCP hole punch + if rtcpListener, ok := c.udpRtcpListeners[c.getChannelForPort(clientPorts[1])]; ok { + if addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, serverPorts[1])); err == nil { + rtcpListener.Conn.WriteToUDP([]byte{0x80, 0xC8, 0x00, 0x01}, addr) + } + } +} + +func (c *Conn) getChannelForPort(port int) byte { + if channel, exists := c.portToChannel[port]; exists { + return channel + } + + c.channelCounter++ + if c.channelCounter == 0 { + c.channelCounter = 1 + } + + channel := c.channelCounter + c.portToChannel[port] = channel + + return channel +} diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index 0c2009d7..a5f001c2 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -40,6 +40,7 @@ type Conn struct { keepalive int mode core.Mode playOK bool + playErr error reader *bufio.Reader sequence int session string @@ -47,8 +48,32 @@ type Conn struct { state State stateMu sync.Mutex + + transportMode TransportMode + + // UDP + + udpRtpConns map[byte]*UDPConnection + udpRtcpConns map[byte]*UDPConnection + udpRtpListeners map[byte]*UDPConnection + udpRtcpListeners map[byte]*UDPConnection + portToChannel map[int]byte + channelCounter byte } +type UDPConnection struct { + Conn net.UDPConn + Channel byte +} + +type TransportMode int + +const ( + TransportTCP TransportMode = iota + TransportUDP + ReceiveMTU = 1500 +) + const ( ProtoRTSP = "RTSP/1.0" MethodOptions = "OPTIONS" @@ -68,7 +93,6 @@ func (s State) String() string { case StateNone: return "NONE" case StateConn: - return "CONN" case StateSetup: return MethodSetup @@ -131,133 +155,22 @@ func (c *Conn) Handle() (err error) { for c.state != StateNone { ts := time.Now() + time := ts.Add(timeout) - if err = c.conn.SetReadDeadline(ts.Add(timeout)); err != nil { + if err = c.conn.SetReadDeadline(time); err != nil { return } - // we can read: - // 1. RTP interleaved: `$` + 1B channel number + 2B size - // 2. RTSP response: RTSP/1.0 200 OK - // 3. RTSP request: OPTIONS ... - var buf4 []byte // `$` + 1B channel number + 2B size - buf4, err = c.reader.Peek(4) - if err != nil { - return - } - - var channelID byte - var size uint16 - - if buf4[0] != '$' { - switch string(buf4) { - case "RTSP": - var res *tcp.Response - if res, err = c.ReadResponse(); err != nil { - return - } - c.Fire(res) - // for playing backchannel only after OK response on play - c.playOK = true - continue - - case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": - var req *tcp.Request - if req, err = c.ReadRequest(); err != nil { - return - } - c.Fire(req) - if req.Method == MethodOptions { - res := &tcp.Response{Request: req} - if err = c.WriteResponse(res); err != nil { - return - } - } - continue - - default: - c.Fire("RTSP wrong input") - - for i := 0; ; i++ { - // search next start symbol - if _, err = c.reader.ReadBytes('$'); err != nil { - return err - } - - if channelID, err = c.reader.ReadByte(); err != nil { - return err - } - - // TODO: better check maximum good channel ID - if channelID >= 20 { - continue - } - - buf4 = make([]byte, 2) - if _, err = io.ReadFull(c.reader, buf4); err != nil { - return err - } - - // check if size good for RTP - size = binary.BigEndian.Uint16(buf4) - if size <= 1500 { - break - } - - // 10 tries to find good packet - if i >= 10 { - return fmt.Errorf("RTSP wrong input") - } - } + if c.transportMode == TransportUDP { + if err = c.handleUDPClientData(time); err != nil { + return err } } else { - // hope that the odd channels are always RTCP - channelID = buf4[1] - - // get data size - size = binary.BigEndian.Uint16(buf4[2:]) - - // skip 4 bytes from c.reader.Peek - if _, err = c.reader.Discard(4); err != nil { - return + if err = c.handleTCPClientData(); err != nil { + return err } } - // init memory for data - buf := make([]byte, size) - if _, err = io.ReadFull(c.reader, buf); err != nil { - return - } - - c.Recv += int(size) - - if channelID&1 == 0 { - packet := &rtp.Packet{} - if err = packet.Unmarshal(buf); err != nil { - return - } - - for _, receiver := range c.Receivers { - if receiver.ID == channelID { - receiver.WriteRTP(packet) - break - } - } - } else { - msg := &RTCP{Channel: channelID} - - if err = msg.Header.Unmarshal(buf); err != nil { - continue - } - - msg.Packets, err = rtcp.Unmarshal(buf) - if err != nil { - continue - } - - c.Fire(msg) - } - if keepaliveDT != 0 && ts.After(keepaliveTS) { req := &tcp.Request{Method: MethodOptions, URL: c.URL} if err = c.WriteRequest(req); err != nil { @@ -271,6 +184,246 @@ func (c *Conn) Handle() (err error) { return } +func (c *Conn) handleUDPClientData(time time.Time) error { + if c.playErr != nil { + return c.playErr + } + + if c.state == StatePlay && c.playOK { + return nil + } + + var buf4 []byte + + buf4, err := c.reader.Peek(4) + if err != nil { + return err + } + + switch string(buf4) { + case "RTSP": + var res *tcp.Response + if res, err = c.ReadResponse(); err != nil { + return err + } + + c.Fire(res) + c.playOK = true + + for _, listener := range c.udpRtpListeners { + go func(listener *UDPConnection) { + defer listener.Conn.Close() + + for c.state != StateNone { + if err := listener.Conn.SetReadDeadline(time); err != nil { + c.playErr = err + return + } + + buffer := make([]byte, ReceiveMTU) + n, _, err := listener.Conn.ReadFromUDP(buffer) + if err != nil { + c.playErr = err + break + } + + packet := &rtp.Packet{} + if err := packet.Unmarshal(buffer[:n]); err != nil { + c.playErr = err + return + } + + for _, receiver := range c.Receivers { + if receiver.ID == listener.Channel { + receiver.WriteRTP(packet) + break + } + } + + c.Recv += len(buffer[:n]) + } + }(listener) + } + + for _, listener := range c.udpRtcpListeners { + go func(listener *UDPConnection) { + defer listener.Conn.Close() + + for c.state != StateNone { + if err := listener.Conn.SetReadDeadline(time); err != nil { + return + } + + buffer := make([]byte, ReceiveMTU) + n, _, err := listener.Conn.ReadFromUDP(buffer) + if err != nil { + break + } + + msg := &RTCP{Channel: listener.Channel} + + if err := msg.Header.Unmarshal(buffer[:n]); err != nil { + continue + } + + msg.Packets, err = rtcp.Unmarshal(buffer[:n]) + if err != nil { + continue + } + + c.Fire(msg) + } + }(listener) + } + + case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": + var req *tcp.Request + if req, err = c.ReadRequest(); err != nil { + return err + } + c.Fire(req) + if req.Method == MethodOptions { + res := &tcp.Response{Request: req} + if err = c.WriteResponse(res); err != nil { + return err + } + } + + default: + return fmt.Errorf("RTSP wrong input") + } + + return nil +} + +func (c *Conn) handleTCPClientData() error { + // we can read: + // 1. RTP interleaved: `$` + 1B channel number + 2B size + // 2. RTSP response: RTSP/1.0 200 OK + // 3. RTSP request: OPTIONS ... + var buf4 []byte // `$` + 1B channel number + 2B size + var err error + + buf4, err = c.reader.Peek(4) + if err != nil { + return err + } + + var channel byte + var size uint16 + + if buf4[0] != '$' { + switch string(buf4) { + case "RTSP": + var res *tcp.Response + if res, err = c.ReadResponse(); err != nil { + return err + } + c.Fire(res) + // for playing backchannel only after OK response on play + c.playOK = true + return nil + + case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": + var req *tcp.Request + if req, err = c.ReadRequest(); err != nil { + return err + } + c.Fire(req) + if req.Method == MethodOptions { + res := &tcp.Response{Request: req} + if err = c.WriteResponse(res); err != nil { + return err + } + } + return nil + + default: + c.Fire("RTSP wrong input") + + for i := 0; ; i++ { + // search next start symbol + if _, err = c.reader.ReadBytes('$'); err != nil { + return err + } + + if channel, err = c.reader.ReadByte(); err != nil { + return err + } + + // TODO: better check maximum good channel ID + if channel >= 20 { + continue + } + + buf4 = make([]byte, 2) + if _, err = io.ReadFull(c.reader, buf4); err != nil { + return err + } + + // check if size good for RTP + size = binary.BigEndian.Uint16(buf4) + if size <= 1500 { + break + } + + // 10 tries to find good packet + if i >= 10 { + return fmt.Errorf("RTSP wrong input") + } + } + } + } else { + // hope that the odd channels are always RTCP + channel = buf4[1] + + // get data size + size = binary.BigEndian.Uint16(buf4[2:]) + + // skip 4 bytes from c.reader.Peek + if _, err = c.reader.Discard(4); err != nil { + return err + } + } + + // init memory for data + buf := make([]byte, size) + if _, err = io.ReadFull(c.reader, buf); err != nil { + return err + } + + c.Recv += int(size) + + if channel&1 == 0 { + packet := &rtp.Packet{} + if err = packet.Unmarshal(buf); err != nil { + return err + } + + for _, receiver := range c.Receivers { + if receiver.ID == channel { + receiver.WriteRTP(packet) + break + } + } + } else { + msg := &RTCP{Channel: channel} + + if err = msg.Header.Unmarshal(buf); err != nil { + return nil + } + + msg.Packets, err = rtcp.Unmarshal(buf) + if err != nil { + return nil + } + + c.Fire(msg) + } + + return nil +} + func (c *Conn) WriteRequest(req *tcp.Request) error { if req.Proto == "" { req.Proto = ProtoRTSP diff --git a/pkg/rtsp/consumer.go b/pkg/rtsp/consumer.go index 860ed113..b5827436 100644 --- a/pkg/rtsp/consumer.go +++ b/pkg/rtsp/consumer.go @@ -85,13 +85,22 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. } flushBuf := func() { - if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { - return - } //log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf)) - if _, err := c.conn.Write(buf[:n]); err == nil { - c.Send += n + + if c.transportMode == TransportUDP { + if err := c.sendUDPRtpPacket(buf[:n]); err == nil { + c.Send += n + } + } else { + if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { + return + } + + if _, err := c.conn.Write(buf[:n]); err == nil { + c.Send += n + } } + n = 0 } diff --git a/pkg/rtsp/ports.go b/pkg/rtsp/ports.go new file mode 100644 index 00000000..d280ac6d --- /dev/null +++ b/pkg/rtsp/ports.go @@ -0,0 +1,75 @@ +package rtsp + +import ( + "fmt" + "net" + "sync" +) + +var mu sync.Mutex + +type UDPPortPair struct { + RTPListener *net.UDPConn + RTCPListener *net.UDPConn + RTPPort int + RTCPPort int +} + +func (p *UDPPortPair) Close() { + if p.RTPListener != nil { + _ = p.RTPListener.Close() + } + if p.RTCPListener != nil { + _ = p.RTCPListener.Close() + } +} + +func GetUDPPorts(ip net.IP, maxAttempts int) (*UDPPortPair, error) { + mu.Lock() + defer mu.Unlock() + + if ip == nil { + ip = net.IPv4(0, 0, 0, 0) + } + + for i := 0; i < maxAttempts; i++ { + // Get a random even port from the OS + tempListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: 0}) + if err != nil { + continue + } + + addr := tempListener.LocalAddr().(*net.UDPAddr) + basePort := addr.Port + tempListener.Close() + + // 11. RTP over Network and Transport Protocols (https://www.ietf.org/rfc/rfc3550.txt) + // For UDP and similar protocols, + // RTP SHOULD use an even destination port number and the corresponding + // RTCP stream SHOULD use the next higher (odd) destination port number + if basePort%2 == 1 { + basePort-- + } + + // Try to bind both ports + rtpListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: basePort}) + if err != nil { + continue + } + + rtcpListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: basePort + 1}) + if err != nil { + rtpListener.Close() + continue + } + + return &UDPPortPair{ + RTPListener: rtpListener, + RTCPListener: rtcpListener, + RTPPort: basePort, + RTCPPort: basePort + 1, + }, nil + } + + return nil, fmt.Errorf("failed to allocate consecutive UDP ports after %d attempts", maxAttempts) +} From 24ca87e00d4d5be424de97ebd7a1f1eead8adb87 Mon Sep 17 00:00:00 2001 From: seydx Date: Sun, 1 Jun 2025 18:40:53 +0300 Subject: [PATCH 2/4] dont fallback to tcp if udp failes --- pkg/rtsp/client.go | 15 ++++++--------- pkg/rtsp/conn.go | 6 +----- pkg/rtsp/consumer.go | 2 +- 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/pkg/rtsp/client.go b/pkg/rtsp/client.go index ef5ebbfe..3d1bb0df 100644 --- a/pkg/rtsp/client.go +++ b/pkg/rtsp/client.go @@ -52,10 +52,8 @@ func (c *Conn) Dial() (err error) { if c.Transport != "udp" { c.Protocol = "rtsp+tcp" - c.transportMode = TransportTCP } else { c.Protocol = "rtsp+udp" - c.transportMode = TransportUDP } } else { conn, err = websocket.Dial(c.Transport) @@ -245,14 +243,13 @@ func (c *Conn) SetupMedia(media *core.Media) (byte, error) { return 0, fmt.Errorf("wrong media: %v", media) } - if c.transportMode == TransportUDP { + if c.Transport == "udp" { transport, err := c.setupUDPTransport() - if err == nil { - return c.sendSetupRequest(media, transport) + if err != nil { + return 0, err } - // Fall back to TCP if UDP fails - c.closeUDP() - c.transportMode = TransportTCP + + return c.sendSetupRequest(media, transport) } transport = c.setupTCPTransport(mediaIndex) @@ -344,7 +341,7 @@ func (c *Conn) sendSetupRequest(media *core.Media, transport string) (byte, erro // Parse server response responseTransport := res.Header.Get("Transport") - if c.transportMode == TransportUDP { + if c.Transport == "udp" { // Parse UDP response: client_ports=1234-1235;server_port=1234-1235 var clientPorts []int var serverPorts []int diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index a5f001c2..ddb15a74 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -49,8 +49,6 @@ type Conn struct { state State stateMu sync.Mutex - transportMode TransportMode - // UDP udpRtpConns map[byte]*UDPConnection @@ -69,8 +67,6 @@ type UDPConnection struct { type TransportMode int const ( - TransportTCP TransportMode = iota - TransportUDP ReceiveMTU = 1500 ) @@ -161,7 +157,7 @@ func (c *Conn) Handle() (err error) { return } - if c.transportMode == TransportUDP { + if c.Transport == "udp" { if err = c.handleUDPClientData(time); err != nil { return err } diff --git a/pkg/rtsp/consumer.go b/pkg/rtsp/consumer.go index b5827436..fde2684c 100644 --- a/pkg/rtsp/consumer.go +++ b/pkg/rtsp/consumer.go @@ -87,7 +87,7 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. flushBuf := func() { //log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf)) - if c.transportMode == TransportUDP { + if c.Transport == "udp" { if err := c.sendUDPRtpPacket(buf[:n]); err == nil { c.Send += n } From fde1fdc592832dcdfa360eb8762a0baba0b4436f Mon Sep 17 00:00:00 2001 From: Alex X Date: Thu, 9 Oct 2025 21:07:20 +0300 Subject: [PATCH 3/4] Code refactoring for #1758 --- pkg/rtsp/client.go | 325 +++++++++++++------------------------------ pkg/rtsp/conn.go | 218 ++++++++--------------------- pkg/rtsp/consumer.go | 38 +++-- pkg/rtsp/ports.go | 75 ---------- 4 files changed, 178 insertions(+), 478 deletions(-) delete mode 100644 pkg/rtsp/ports.go diff --git a/pkg/rtsp/client.go b/pkg/rtsp/client.go index 3d1bb0df..4e891213 100644 --- a/pkg/rtsp/client.go +++ b/pkg/rtsp/client.go @@ -2,7 +2,6 @@ package rtsp import ( "bufio" - "encoding/binary" "errors" "fmt" "net" @@ -10,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/AlexxIT/go2rtc/pkg/tcp/websocket" @@ -26,13 +26,7 @@ func NewClient(uri string) *Conn { ID: core.NewID(), FormatName: "rtsp", }, - uri: uri, - udpRtpConns: make(map[byte]*UDPConnection), - udpRtcpConns: make(map[byte]*UDPConnection), - udpRtpListeners: make(map[byte]*UDPConnection), - udpRtcpListeners: make(map[byte]*UDPConnection), - portToChannel: make(map[int]byte), - channelCounter: 0, + uri: uri, } } @@ -43,10 +37,13 @@ func (c *Conn) Dial() (err error) { var conn net.Conn - if c.Transport == "" || c.Transport == "tcp" || c.Transport == "udp" { - timeout := core.ConnDialTimeout + switch c.Transport { + case "", "tcp", "udp": + var timeout time.Duration if c.Timeout != 0 { timeout = time.Second * time.Duration(c.Timeout) + } else { + timeout = core.ConnDialTimeout } conn, err = tcp.Dial(c.URL, timeout) @@ -55,7 +52,7 @@ func (c *Conn) Dial() (err error) { } else { c.Protocol = "rtsp+udp" } - } else { + default: conn, err = websocket.Dial(c.Transport) c.Protocol = "ws" } @@ -73,6 +70,9 @@ func (c *Conn) Dial() (err error) { c.sequence = 0 c.state = StateConn + c.udpConn = nil + c.udpAddr = nil + c.Connection.RemoteAddr = conn.RemoteAddr().String() c.Connection.Transport = conn c.Connection.URL = c.uri @@ -229,63 +229,35 @@ func (c *Conn) Record() (err error) { func (c *Conn) SetupMedia(media *core.Media) (byte, error) { var transport string - var mediaIndex int = -1 - - // try to use media position as channel number - for i, m := range c.Medias { - if m.Equal(media) { - mediaIndex = i - break - } - } - - if mediaIndex == -1 { - return 0, fmt.Errorf("wrong media: %v", media) - } if c.Transport == "udp" { - transport, err := c.setupUDPTransport() + conn1, conn2, err := ListenUDPPair() if err != nil { return 0, err } - return c.sendSetupRequest(media, transport) + c.udpConn = append(c.udpConn, conn1, conn2) + + port := conn1.LocalAddr().(*net.UDPAddr).Port + transport = fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", port, port+1) + } else { + // try to use media position as channel number + for i, m := range c.Medias { + if m.Equal(media) { + transport = fmt.Sprintf( + // i - RTP (data channel) + // i+1 - RTCP (control channel) + "RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1, + ) + break + } + } } - transport = c.setupTCPTransport(mediaIndex) - return c.sendSetupRequest(media, transport) -} - -func (c *Conn) setupTCPTransport(mediaIndex int) string { - channel := byte(mediaIndex * 2) - transport := fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", channel, channel+1) - return transport -} - -func (c *Conn) setupUDPTransport() (string, error) { - portPair, err := GetUDPPorts(nil, 10) - if err != nil { - return "", err + if transport == "" { + return 0, fmt.Errorf("wrong media: %v", media) } - rtpChannel := c.getChannelForPort(portPair.RTPPort) - rtcpChannel := c.getChannelForPort(portPair.RTCPPort) - - c.udpRtpListeners[rtpChannel] = &UDPConnection{ - Conn: *portPair.RTPListener, - Channel: rtpChannel, - } - - c.udpRtcpListeners[rtcpChannel] = &UDPConnection{ - Conn: *portPair.RTCPListener, - Channel: rtcpChannel, - } - - transport := fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", portPair.RTPPort, portPair.RTCPPort) - return transport, nil -} - -func (c *Conn) sendSetupRequest(media *core.Media, transport string) (byte, error) { rawURL := media.ID // control if !strings.Contains(rawURL, "://") { rawURL = c.URL.String() @@ -339,109 +311,48 @@ func (c *Conn) sendSetupRequest(media *core.Media, transport string) (byte, erro } // Parse server response - responseTransport := res.Header.Get("Transport") + transport = res.Header.Get("Transport") if c.Transport == "udp" { - // Parse UDP response: client_ports=1234-1235;server_port=1234-1235 - var clientPorts []int - var serverPorts []int + channel := byte(len(c.udpConn) - 2) - if strings.Contains(transport, "client_port=") { - parts := strings.Split(responseTransport, "client_port=") - if len(parts) > 1 { - portPart := strings.Split(strings.Split(parts[1], ";")[0], "-") - for _, p := range portPart { - if port, err := strconv.Atoi(p); err == nil { - clientPorts = append(clientPorts, port) - } - } + // Dahua: RTP/AVP/UDP;unicast;client_port=49292-49293;server_port=43670-43671;ssrc=7CB694B4 + // OpenIPC: RTP/AVP/UDP;unicast;client_port=59612-59613 + if s := core.Between(transport, "server_port=", ";"); s != "" { + s1, s2, _ := strings.Cut(s, "-") + port1 := core.Atoi(s1) + port2 := core.Atoi(s2) + // TODO: more smart handling empty server ports + if port1 > 0 && port2 > 0 { + remoteIP := c.conn.RemoteAddr().(*net.TCPAddr).IP + c.udpAddr = append(c.udpAddr, + &net.UDPAddr{IP: remoteIP, Port: port1}, + &net.UDPAddr{IP: remoteIP, Port: port2}, + ) + + go func() { + // Try to open a hole in the NAT router (to allow incoming UDP packets) + // by send a UDP packet for RTP and RTCP to the remote RTSP server. + // https://github.com/FFmpeg/FFmpeg/blob/aa91ae25b88e195e6af4248e0ab30605735ca1cd/libavformat/rtpdec.c#L416-L438 + _, _ = c.WriteToUDP([]byte{0x80, 0x00, 0x00, 0x00}, channel) + _, _ = c.WriteToUDP([]byte{0x80, 0xC8, 0x00, 0x01}, channel+1) + }() } } - if strings.Contains(responseTransport, "server_port=") { - parts := strings.Split(responseTransport, "server_port=") - if len(parts) > 1 { - portPart := strings.Split(strings.Split(parts[1], ";")[0], "-") - for _, p := range portPart { - if port, err := strconv.Atoi(p); err == nil { - serverPorts = append(serverPorts, port) - } - } - } - } - - // Create UDP connections for RTP and RTCP if we have both server ports - if len(serverPorts) >= 2 { - if host, _, err := net.SplitHostPort(c.Connection.RemoteAddr); err == nil { - rtpServerPort := serverPorts[0] - rtcpServerPort := serverPorts[1] - - cleanHost := host - if strings.Contains(cleanHost, ":") { - cleanHost = fmt.Sprintf("[%s]", host) - } - - remoteRtpAddr := fmt.Sprintf("%s:%d", cleanHost, rtpServerPort) - remoteRtcpAddr := fmt.Sprintf("%s:%d", cleanHost, rtcpServerPort) - - if rtpAddr, err := net.ResolveUDPAddr("udp", remoteRtpAddr); err == nil { - if rtpConn, err := net.DialUDP("udp", nil, rtpAddr); err == nil { - channel := c.getChannelForPort(rtpServerPort) - c.udpRtpConns[channel] = &UDPConnection{ - Conn: *rtpConn, - Channel: channel, - } - } - } - - if rtcpAddr, err := net.ResolveUDPAddr("udp", remoteRtcpAddr); err == nil { - if rtcpConn, err := net.DialUDP("udp", nil, rtcpAddr); err == nil { - channel := c.getChannelForPort(rtcpServerPort) - c.udpRtcpConns[channel] = &UDPConnection{ - Conn: *rtcpConn, - Channel: channel, - } - } - } - } - } - - // Try to open a hole in the NAT router (to allow incoming UDP packets) - // by send a UDP packet for RTP and RTCP to the remote RTSP server. - go c.tryHolePunching(clientPorts, serverPorts) - - var rtpPort string - if media.Direction == core.DirectionRecvonly { - rtpPort = core.Between(transport, "client_port=", "-") - } else { - rtpPort = core.Between(responseTransport, "server_port=", "-") - } - - i, err := strconv.Atoi(rtpPort) - if err != nil { - return 0, err - } - - return c.getChannelForPort(i), nil - + return channel, nil } else { // we send our `interleaved`, but camera can answer with another // Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7 // Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0 // Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1 - if !strings.HasPrefix(responseTransport, "RTP/AVP/TCP;") { - // Escam Q6 has a bug: - // Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1 - if !strings.Contains(responseTransport, ";interleaved=") { - return 0, fmt.Errorf("wrong transport: %s", responseTransport) - } - } - - channel := core.Between(responseTransport, "interleaved=", "-") - i, err := strconv.Atoi(channel) + // Escam Q6 has a bug: + // Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1 + s := core.Between(transport, "interleaved=", "-") + i, err := strconv.Atoi(s) if err != nil { - return 0, err + return 0, fmt.Errorf("wrong transport: %s", transport) } return byte(i), nil @@ -460,106 +371,62 @@ func (c *Conn) Teardown() (err error) { } func (c *Conn) Close() error { - c.closeUDP() - if c.mode == core.ModeActiveProducer { _ = c.Teardown() } - if c.OnClose != nil { _ = c.OnClose() } - + for _, conn := range c.udpConn { + _ = conn.Close() + } return c.conn.Close() } -func (c *Conn) closeUDP() { - for _, listener := range c.udpRtpListeners { - _ = listener.Conn.Close() - } - for _, listener := range c.udpRtcpListeners { - _ = listener.Conn.Close() - } - for _, conn := range c.udpRtpConns { - _ = conn.Conn.Close() - } - for _, conn := range c.udpRtcpConns { - _ = conn.Conn.Close() - } - - c.udpRtpListeners = make(map[byte]*UDPConnection) - c.udpRtcpListeners = make(map[byte]*UDPConnection) - c.udpRtpConns = make(map[byte]*UDPConnection) - c.udpRtcpConns = make(map[byte]*UDPConnection) - c.portToChannel = make(map[int]byte) - c.channelCounter = 0 +func (c *Conn) WriteToUDP(b []byte, channel byte) (int, error) { + return c.udpConn[channel].WriteToUDP(b, c.udpAddr[channel]) } -func (c *Conn) sendUDPRtpPacket(data []byte) error { - for len(data) >= 4 && data[0] == '$' { - channel := data[1] - size := binary.BigEndian.Uint16(data[2:4]) +const listenUDPAttemps = 10 - if len(data) < 4+int(size) { - return fmt.Errorf("incomplete RTP packet: %d < %d", len(data), 4+size) +var listenUDPMu sync.Mutex + +func ListenUDPPair() (*net.UDPConn, *net.UDPConn, error) { + listenUDPMu.Lock() + defer listenUDPMu.Unlock() + + for i := 0; i < listenUDPAttemps; i++ { + // Get a random even port from the OS + ln1, err := net.ListenUDP("udp", &net.UDPAddr{IP: nil, Port: 0}) + if err != nil { + continue } - // Send RTP data without interleaved header - rtpData := data[4 : 4+size] + var port1 = ln1.LocalAddr().(*net.UDPAddr).Port + var port2 int - if conn, ok := c.udpRtpConns[channel]; ok { - if err := conn.Conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { - return nil - } - - if _, err := conn.Conn.Write(rtpData); err != nil { - return err - } + // 11. RTP over Network and Transport Protocols (https://www.ietf.org/rfc/rfc3550.txt) + // For UDP and similar protocols, + // RTP SHOULD use an even destination port number and the corresponding + // RTCP stream SHOULD use the next higher (odd) destination port number + if port1&1 > 0 { + port2 = port1 - 1 + } else { + port2 = port1 + 1 } - data = data[4+size:] // Move to next packet - } + ln2, err := net.ListenUDP("udp", &net.UDPAddr{IP: nil, Port: port2}) + if err != nil { + _ = ln1.Close() + continue + } - return nil -} - -func (c *Conn) tryHolePunching(clientPorts, serverPorts []int) { - if len(clientPorts) < 2 || len(serverPorts) < 2 { - return - } - - host, _, _ := net.SplitHostPort(c.Connection.RemoteAddr) - if strings.Contains(host, ":") { - host = fmt.Sprintf("[%s]", host) - } - - // RTP hole punch - if rtpListener, ok := c.udpRtpListeners[c.getChannelForPort(clientPorts[0])]; ok { - if addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, serverPorts[0])); err == nil { - rtpListener.Conn.WriteToUDP([]byte{0x80, 0x00, 0x00, 0x00}, addr) + if port1 < port2 { + return ln1, ln2, nil + } else { + return ln2, ln1, nil } } - // RTCP hole punch - if rtcpListener, ok := c.udpRtcpListeners[c.getChannelForPort(clientPorts[1])]; ok { - if addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, serverPorts[1])); err == nil { - rtcpListener.Conn.WriteToUDP([]byte{0x80, 0xC8, 0x00, 0x01}, addr) - } - } -} - -func (c *Conn) getChannelForPort(port int) byte { - if channel, exists := c.portToChannel[port]; exists { - return channel - } - - c.channelCounter++ - if c.channelCounter == 0 { - c.channelCounter = 1 - } - - channel := c.channelCounter - c.portToChannel[port] = channel - - return channel + return nil, nil, fmt.Errorf("can't open two UDP ports") } diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index ddb15a74..2984c781 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -2,6 +2,7 @@ package rtsp import ( "bufio" + "context" "encoding/binary" "fmt" "io" @@ -13,7 +14,6 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/tcp" - "github.com/pion/rtcp" "github.com/pion/rtp" ) @@ -49,27 +49,10 @@ type Conn struct { state State stateMu sync.Mutex - // UDP - - udpRtpConns map[byte]*UDPConnection - udpRtcpConns map[byte]*UDPConnection - udpRtpListeners map[byte]*UDPConnection - udpRtcpListeners map[byte]*UDPConnection - portToChannel map[int]byte - channelCounter byte + udpConn []*net.UDPConn + udpAddr []*net.UDPAddr } -type UDPConnection struct { - Conn net.UDPConn - Channel byte -} - -type TransportMode int - -const ( - ReceiveMTU = 1500 -) - const ( ProtoRTSP = "RTSP/1.0" MethodOptions = "OPTIONS" @@ -108,23 +91,25 @@ const ( func (c *Conn) Handle() (err error) { var timeout time.Duration - var keepaliveDT time.Duration - var keepaliveTS time.Time - switch c.mode { case core.ModeActiveProducer: + var keepaliveDT time.Duration + if c.keepalive > 5 { keepaliveDT = time.Duration(c.keepalive-5) * time.Second } else { keepaliveDT = 25 * time.Second } - keepaliveTS = time.Now().Add(keepaliveDT) + + ctx, cancel := context.WithCancel(context.Background()) + go c.handleKeepalive(ctx, keepaliveDT) + defer cancel() if c.Timeout == 0 { // polling frames from remote RTSP Server (ex Camera) timeout = time.Second * 5 - if len(c.Receivers) == 0 { + if len(c.Receivers) == 0 || c.Transport == "udp" { // if we only send audio to camera // https://github.com/AlexxIT/go2rtc/issues/659 timeout += keepaliveDT @@ -149,150 +134,58 @@ func (c *Conn) Handle() (err error) { return fmt.Errorf("wrong RTSP conn mode: %d", c.mode) } + for i := 0; i < len(c.udpConn); i++ { + go c.handleUDPData(byte(i)) + } + for c.state != StateNone { ts := time.Now() - time := ts.Add(timeout) - if err = c.conn.SetReadDeadline(time); err != nil { + _ = c.conn.SetReadDeadline(ts.Add(timeout)) + + if err = c.handleTCPData(); err != nil { return } - - if c.Transport == "udp" { - if err = c.handleUDPClientData(time); err != nil { - return err - } - } else { - if err = c.handleTCPClientData(); err != nil { - return err - } - } - - if keepaliveDT != 0 && ts.After(keepaliveTS) { - req := &tcp.Request{Method: MethodOptions, URL: c.URL} - if err = c.WriteRequest(req); err != nil { - return - } - - keepaliveTS = ts.Add(keepaliveDT) - } } return } -func (c *Conn) handleUDPClientData(time time.Time) error { - if c.playErr != nil { - return c.playErr - } - - if c.state == StatePlay && c.playOK { - return nil - } - - var buf4 []byte - - buf4, err := c.reader.Peek(4) - if err != nil { - return err - } - - switch string(buf4) { - case "RTSP": - var res *tcp.Response - if res, err = c.ReadResponse(); err != nil { - return err - } - - c.Fire(res) - c.playOK = true - - for _, listener := range c.udpRtpListeners { - go func(listener *UDPConnection) { - defer listener.Conn.Close() - - for c.state != StateNone { - if err := listener.Conn.SetReadDeadline(time); err != nil { - c.playErr = err - return - } - - buffer := make([]byte, ReceiveMTU) - n, _, err := listener.Conn.ReadFromUDP(buffer) - if err != nil { - c.playErr = err - break - } - - packet := &rtp.Packet{} - if err := packet.Unmarshal(buffer[:n]); err != nil { - c.playErr = err - return - } - - for _, receiver := range c.Receivers { - if receiver.ID == listener.Channel { - receiver.WriteRTP(packet) - break - } - } - - c.Recv += len(buffer[:n]) - } - }(listener) - } - - for _, listener := range c.udpRtcpListeners { - go func(listener *UDPConnection) { - defer listener.Conn.Close() - - for c.state != StateNone { - if err := listener.Conn.SetReadDeadline(time); err != nil { - return - } - - buffer := make([]byte, ReceiveMTU) - n, _, err := listener.Conn.ReadFromUDP(buffer) - if err != nil { - break - } - - msg := &RTCP{Channel: listener.Channel} - - if err := msg.Header.Unmarshal(buffer[:n]); err != nil { - continue - } - - msg.Packets, err = rtcp.Unmarshal(buffer[:n]) - if err != nil { - continue - } - - c.Fire(msg) - } - }(listener) - } - - case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": - var req *tcp.Request - if req, err = c.ReadRequest(); err != nil { - return err - } - c.Fire(req) - if req.Method == MethodOptions { - res := &tcp.Response{Request: req} - if err = c.WriteResponse(res); err != nil { - return err +func (c *Conn) handleKeepalive(ctx context.Context, d time.Duration) { + ticker := time.NewTicker(d) + for { + select { + case <-ticker.C: + req := &tcp.Request{Method: MethodOptions, URL: c.URL} + if err := c.WriteRequest(req); err != nil { + return } + case <-ctx.Done(): + return } - - default: - return fmt.Errorf("RTSP wrong input") } - - return nil } -func (c *Conn) handleTCPClientData() error { +func (c *Conn) handleUDPData(channel byte) { + // TODO: handle timeouts and drop TCP connection after any error + conn := c.udpConn[channel] + + for { + // TP-Link Tapo camera has crazy 10000 bytes packet size + buf := make([]byte, 10240) + + n, _, err := conn.ReadFromUDP(buf) + if err != nil { + return + } + + if err = c.handleRawPacket(channel, buf[:n]); err != nil { + return + } + } +} + +func (c *Conn) handleTCPData() error { // we can read: // 1. RTP interleaved: `$` + 1B channel number + 2B size // 2. RTSP response: RTSP/1.0 200 OK @@ -390,9 +283,13 @@ func (c *Conn) handleTCPClientData() error { c.Recv += int(size) + return c.handleRawPacket(channel, buf) +} + +func (c *Conn) handleRawPacket(channel byte, buf []byte) error { if channel&1 == 0 { packet := &rtp.Packet{} - if err = packet.Unmarshal(buf); err != nil { + if err := packet.Unmarshal(buf); err != nil { return err } @@ -405,14 +302,15 @@ func (c *Conn) handleTCPClientData() error { } else { msg := &RTCP{Channel: channel} - if err = msg.Header.Unmarshal(buf); err != nil { + if err := msg.Header.Unmarshal(buf); err != nil { return nil } - msg.Packets, err = rtcp.Unmarshal(buf) - if err != nil { - return nil - } + //var err error + //msg.Packets, err = rtcp.Unmarshal(buf) + //if err != nil { + // return nil + //} c.Fire(msg) } diff --git a/pkg/rtsp/consumer.go b/pkg/rtsp/consumer.go index fde2684c..e6525d96 100644 --- a/pkg/rtsp/consumer.go +++ b/pkg/rtsp/consumer.go @@ -86,21 +86,9 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. flushBuf := func() { //log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf)) - - if c.Transport == "udp" { - if err := c.sendUDPRtpPacket(buf[:n]); err == nil { - c.Send += n - } - } else { - if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { - return - } - - if _, err := c.conn.Write(buf[:n]); err == nil { - c.Send += n - } + if err := c.writeInterleavedData(buf[:n]); err != nil { + c.Send += n } - n = 0 } @@ -186,3 +174,25 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. return handlerFunc } + +func (c *Conn) writeInterleavedData(data []byte) error { + if c.Transport != "udp" { + _ = c.conn.SetWriteDeadline(time.Now().Add(Timeout)) + _, err := c.conn.Write(data) + return err + } + + for len(data) >= 4 && data[0] == '$' { + channel := data[1] + size := uint16(data[2])<<8 | uint16(data[3]) + rtpData := data[4 : 4+size] + + if _, err := c.WriteToUDP(rtpData, channel); err != nil { + return err + } + + data = data[4+size:] + } + + return nil +} diff --git a/pkg/rtsp/ports.go b/pkg/rtsp/ports.go deleted file mode 100644 index d280ac6d..00000000 --- a/pkg/rtsp/ports.go +++ /dev/null @@ -1,75 +0,0 @@ -package rtsp - -import ( - "fmt" - "net" - "sync" -) - -var mu sync.Mutex - -type UDPPortPair struct { - RTPListener *net.UDPConn - RTCPListener *net.UDPConn - RTPPort int - RTCPPort int -} - -func (p *UDPPortPair) Close() { - if p.RTPListener != nil { - _ = p.RTPListener.Close() - } - if p.RTCPListener != nil { - _ = p.RTCPListener.Close() - } -} - -func GetUDPPorts(ip net.IP, maxAttempts int) (*UDPPortPair, error) { - mu.Lock() - defer mu.Unlock() - - if ip == nil { - ip = net.IPv4(0, 0, 0, 0) - } - - for i := 0; i < maxAttempts; i++ { - // Get a random even port from the OS - tempListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: 0}) - if err != nil { - continue - } - - addr := tempListener.LocalAddr().(*net.UDPAddr) - basePort := addr.Port - tempListener.Close() - - // 11. RTP over Network and Transport Protocols (https://www.ietf.org/rfc/rfc3550.txt) - // For UDP and similar protocols, - // RTP SHOULD use an even destination port number and the corresponding - // RTCP stream SHOULD use the next higher (odd) destination port number - if basePort%2 == 1 { - basePort-- - } - - // Try to bind both ports - rtpListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: basePort}) - if err != nil { - continue - } - - rtcpListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: basePort + 1}) - if err != nil { - rtpListener.Close() - continue - } - - return &UDPPortPair{ - RTPListener: rtpListener, - RTCPListener: rtcpListener, - RTPPort: basePort, - RTCPPort: basePort + 1, - }, nil - } - - return nil, fmt.Errorf("failed to allocate consecutive UDP ports after %d attempts", maxAttempts) -} From 98f88d037e4f1662d8324e10cd07f0bd7b504b7c Mon Sep 17 00:00:00 2001 From: Alex X Date: Fri, 10 Oct 2025 11:11:29 +0300 Subject: [PATCH 4/4] Remove UDP example from readme --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index d002de3a..cb3d6c8b 100644 --- a/README.md +++ b/README.md @@ -262,7 +262,6 @@ Format: `rtsp...#{param1}#{param2}#{param3}` - Ignore audio - `#media=video` or ignore video - `#media=audio` - Ignore two-way audio API `#backchannel=0` - important for some glitchy cameras - Use WebSocket transport `#transport=ws...` -- Use UDP transport `#transport=udp` **RTSP over WebSocket** @@ -272,7 +271,6 @@ streams: axis-rtsp-ws: rtsp://192.168.1.123:4567/axis-media/media.amp?overview=0&camera=1&resolution=1280x720&videoframeskipmode=empty&Axis-Orig-Sw=true#transport=ws://user:pass@192.168.1.123:4567/rtsp-over-websocket # WebSocket without authorization, RTSP - with dahua-rtsp-ws: rtsp://user:pass@192.168.1.123/cam/realmonitor?channel=1&subtype=1&proto=Private3#transport=ws://192.168.1.123/rtspoverwebsocket - udp_camera: rtsp://user:pass@192.168.1.345:554/stream1#transport=udp ``` #### Source: RTMP