From 859cd1cbe63cf7acb445ce27bde47b5d72a748a6 Mon Sep 17 00:00:00 2001 From: seydx Date: Sun, 1 Jun 2025 01:44:01 +0300 Subject: [PATCH] support rtsp udp transport --- README.md | 2 + pkg/rtsp/client.go | 289 +++++++++++++++++++++++++++++--- pkg/rtsp/conn.go | 391 ++++++++++++++++++++++++++++++------------- pkg/rtsp/consumer.go | 19 ++- pkg/rtsp/ports.go | 75 +++++++++ 5 files changed, 626 insertions(+), 150 deletions(-) create mode 100644 pkg/rtsp/ports.go diff --git a/README.md b/README.md index 90a2537f..d1461206 100644 --- a/README.md +++ b/README.md @@ -259,6 +259,7 @@ Format: `rtsp...#{param1}#{param2}#{param3}` - Ignore audio - `#media=video` or ignore video - `#media=audio` - Ignore two way audio API `#backchannel=0` - important for some glitchy cameras - Use WebSocket transport `#transport=ws...` +- Use UDP transport `#transport=udp` **RTSP over WebSocket** @@ -268,6 +269,7 @@ streams: axis-rtsp-ws: rtsp://192.168.1.123:4567/axis-media/media.amp?overview=0&camera=1&resolution=1280x720&videoframeskipmode=empty&Axis-Orig-Sw=true#transport=ws://user:pass@192.168.1.123:4567/rtsp-over-websocket # WebSocket without authorization, RTSP - with dahua-rtsp-ws: rtsp://user:pass@192.168.1.123/cam/realmonitor?channel=1&subtype=1&proto=Private3#transport=ws://192.168.1.123/rtspoverwebsocket + udp_camera: rtsp://user:pass@192.168.1.345:554/stream1#transport=udp ``` #### Source: RTMP diff --git a/pkg/rtsp/client.go b/pkg/rtsp/client.go index 7fc134fc..ef5ebbfe 100644 --- a/pkg/rtsp/client.go +++ b/pkg/rtsp/client.go @@ -2,6 +2,7 @@ package rtsp import ( "bufio" + "encoding/binary" "errors" "fmt" "net" @@ -25,7 +26,13 @@ func NewClient(uri string) *Conn { ID: core.NewID(), FormatName: "rtsp", }, - uri: uri, + uri: uri, + udpRtpConns: make(map[byte]*UDPConnection), + udpRtcpConns: make(map[byte]*UDPConnection), + udpRtpListeners: make(map[byte]*UDPConnection), + udpRtcpListeners: make(map[byte]*UDPConnection), + portToChannel: make(map[int]byte), + channelCounter: 0, } } @@ -36,13 +43,20 @@ func (c *Conn) Dial() (err error) { var conn net.Conn - if c.Transport == "" { + if c.Transport == "" || c.Transport == "tcp" || c.Transport == "udp" { timeout := core.ConnDialTimeout if c.Timeout != 0 { timeout = time.Second * time.Duration(c.Timeout) } conn, err = tcp.Dial(c.URL, timeout) - c.Protocol = "rtsp+tcp" + + if c.Transport != "udp" { + c.Protocol = "rtsp+tcp" + c.transportMode = TransportTCP + } else { + c.Protocol = "rtsp+udp" + c.transportMode = TransportUDP + } } else { conn, err = websocket.Dial(c.Transport) c.Protocol = "ws" @@ -217,23 +231,64 @@ func (c *Conn) Record() (err error) { func (c *Conn) SetupMedia(media *core.Media) (byte, error) { var transport string + var mediaIndex int = -1 // try to use media position as channel number for i, m := range c.Medias { if m.Equal(media) { - transport = fmt.Sprintf( - // i - RTP (data channel) - // i+1 - RTCP (control channel) - "RTP/AVP/TCP;unicast;interleaved=%d-%d", i*2, i*2+1, - ) + mediaIndex = i break } } - if transport == "" { + if mediaIndex == -1 { return 0, fmt.Errorf("wrong media: %v", media) } + if c.transportMode == TransportUDP { + transport, err := c.setupUDPTransport() + if err == nil { + return c.sendSetupRequest(media, transport) + } + // Fall back to TCP if UDP fails + c.closeUDP() + c.transportMode = TransportTCP + } + + transport = c.setupTCPTransport(mediaIndex) + return c.sendSetupRequest(media, transport) +} + +func (c *Conn) setupTCPTransport(mediaIndex int) string { + channel := byte(mediaIndex * 2) + transport := fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", channel, channel+1) + return transport +} + +func (c *Conn) setupUDPTransport() (string, error) { + portPair, err := GetUDPPorts(nil, 10) + if err != nil { + return "", err + } + + rtpChannel := c.getChannelForPort(portPair.RTPPort) + rtcpChannel := c.getChannelForPort(portPair.RTCPPort) + + c.udpRtpListeners[rtpChannel] = &UDPConnection{ + Conn: *portPair.RTPListener, + Channel: rtpChannel, + } + + c.udpRtcpListeners[rtcpChannel] = &UDPConnection{ + Conn: *portPair.RTCPListener, + Channel: rtcpChannel, + } + + transport := fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", portPair.RTPPort, portPair.RTCPPort) + return transport, nil +} + +func (c *Conn) sendSetupRequest(media *core.Media, transport string) (byte, error) { rawURL := media.ID // control if !strings.Contains(rawURL, "://") { rawURL = c.URL.String() @@ -286,27 +341,114 @@ func (c *Conn) SetupMedia(media *core.Media) (byte, error) { } } - // we send our `interleaved`, but camera can answer with another + // Parse server response + responseTransport := res.Header.Get("Transport") - // Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7 - // Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0 - // Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1 - transport = res.Header.Get("Transport") - if !strings.HasPrefix(transport, "RTP/AVP/TCP;") { - // Escam Q6 has a bug: - // Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1 - if !strings.Contains(transport, ";interleaved=") { - return 0, fmt.Errorf("wrong transport: %s", transport) + if c.transportMode == TransportUDP { + // Parse UDP response: client_ports=1234-1235;server_port=1234-1235 + var clientPorts []int + var serverPorts []int + + if strings.Contains(transport, "client_port=") { + parts := strings.Split(responseTransport, "client_port=") + if len(parts) > 1 { + portPart := strings.Split(strings.Split(parts[1], ";")[0], "-") + for _, p := range portPart { + if port, err := strconv.Atoi(p); err == nil { + clientPorts = append(clientPorts, port) + } + } + } } - } - channel := core.Between(transport, "interleaved=", "-") - i, err := strconv.Atoi(channel) - if err != nil { - return 0, err - } + if strings.Contains(responseTransport, "server_port=") { + parts := strings.Split(responseTransport, "server_port=") + if len(parts) > 1 { + portPart := strings.Split(strings.Split(parts[1], ";")[0], "-") + for _, p := range portPart { + if port, err := strconv.Atoi(p); err == nil { + serverPorts = append(serverPorts, port) + } + } + } + } - return byte(i), nil + // Create UDP connections for RTP and RTCP if we have both server ports + if len(serverPorts) >= 2 { + if host, _, err := net.SplitHostPort(c.Connection.RemoteAddr); err == nil { + rtpServerPort := serverPorts[0] + rtcpServerPort := serverPorts[1] + + cleanHost := host + if strings.Contains(cleanHost, ":") { + cleanHost = fmt.Sprintf("[%s]", host) + } + + remoteRtpAddr := fmt.Sprintf("%s:%d", cleanHost, rtpServerPort) + remoteRtcpAddr := fmt.Sprintf("%s:%d", cleanHost, rtcpServerPort) + + if rtpAddr, err := net.ResolveUDPAddr("udp", remoteRtpAddr); err == nil { + if rtpConn, err := net.DialUDP("udp", nil, rtpAddr); err == nil { + channel := c.getChannelForPort(rtpServerPort) + c.udpRtpConns[channel] = &UDPConnection{ + Conn: *rtpConn, + Channel: channel, + } + } + } + + if rtcpAddr, err := net.ResolveUDPAddr("udp", remoteRtcpAddr); err == nil { + if rtcpConn, err := net.DialUDP("udp", nil, rtcpAddr); err == nil { + channel := c.getChannelForPort(rtcpServerPort) + c.udpRtcpConns[channel] = &UDPConnection{ + Conn: *rtcpConn, + Channel: channel, + } + } + } + } + } + + // Try to open a hole in the NAT router (to allow incoming UDP packets) + // by send a UDP packet for RTP and RTCP to the remote RTSP server. + go c.tryHolePunching(clientPorts, serverPorts) + + var rtpPort string + if media.Direction == core.DirectionRecvonly { + rtpPort = core.Between(transport, "client_port=", "-") + } else { + rtpPort = core.Between(responseTransport, "server_port=", "-") + } + + i, err := strconv.Atoi(rtpPort) + if err != nil { + return 0, err + } + + return c.getChannelForPort(i), nil + + } else { + // we send our `interleaved`, but camera can answer with another + + // Transport: RTP/AVP/TCP;unicast;interleaved=10-11;ssrc=10117CB7 + // Transport: RTP/AVP/TCP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0 + // Transport: RTP/AVP/TCP;ssrc=22345682;interleaved=0-1 + if !strings.HasPrefix(responseTransport, "RTP/AVP/TCP;") { + // Escam Q6 has a bug: + // Transport: RTP/AVP;unicast;destination=192.168.1.111;source=192.168.1.222;interleaved=0-1 + if !strings.Contains(responseTransport, ";interleaved=") { + return 0, fmt.Errorf("wrong transport: %s", responseTransport) + } + } + + channel := core.Between(responseTransport, "interleaved=", "-") + i, err := strconv.Atoi(channel) + if err != nil { + return 0, err + } + + return byte(i), nil + } } func (c *Conn) Play() (err error) { @@ -321,11 +463,106 @@ func (c *Conn) Teardown() (err error) { } func (c *Conn) Close() error { + c.closeUDP() + if c.mode == core.ModeActiveProducer { _ = c.Teardown() } + if c.OnClose != nil { _ = c.OnClose() } + return c.conn.Close() } + +func (c *Conn) closeUDP() { + for _, listener := range c.udpRtpListeners { + _ = listener.Conn.Close() + } + for _, listener := range c.udpRtcpListeners { + _ = listener.Conn.Close() + } + for _, conn := range c.udpRtpConns { + _ = conn.Conn.Close() + } + for _, conn := range c.udpRtcpConns { + _ = conn.Conn.Close() + } + + c.udpRtpListeners = make(map[byte]*UDPConnection) + c.udpRtcpListeners = make(map[byte]*UDPConnection) + c.udpRtpConns = make(map[byte]*UDPConnection) + c.udpRtcpConns = make(map[byte]*UDPConnection) + c.portToChannel = make(map[int]byte) + c.channelCounter = 0 +} + +func (c *Conn) sendUDPRtpPacket(data []byte) error { + for len(data) >= 4 && data[0] == '$' { + channel := data[1] + size := binary.BigEndian.Uint16(data[2:4]) + + if len(data) < 4+int(size) { + return fmt.Errorf("incomplete RTP packet: %d < %d", len(data), 4+size) + } + + // Send RTP data without interleaved header + rtpData := data[4 : 4+size] + + if conn, ok := c.udpRtpConns[channel]; ok { + if err := conn.Conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { + return nil + } + + if _, err := conn.Conn.Write(rtpData); err != nil { + return err + } + } + + data = data[4+size:] // Move to next packet + } + + return nil +} + +func (c *Conn) tryHolePunching(clientPorts, serverPorts []int) { + if len(clientPorts) < 2 || len(serverPorts) < 2 { + return + } + + host, _, _ := net.SplitHostPort(c.Connection.RemoteAddr) + if strings.Contains(host, ":") { + host = fmt.Sprintf("[%s]", host) + } + + // RTP hole punch + if rtpListener, ok := c.udpRtpListeners[c.getChannelForPort(clientPorts[0])]; ok { + if addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, serverPorts[0])); err == nil { + rtpListener.Conn.WriteToUDP([]byte{0x80, 0x00, 0x00, 0x00}, addr) + } + } + + // RTCP hole punch + if rtcpListener, ok := c.udpRtcpListeners[c.getChannelForPort(clientPorts[1])]; ok { + if addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, serverPorts[1])); err == nil { + rtcpListener.Conn.WriteToUDP([]byte{0x80, 0xC8, 0x00, 0x01}, addr) + } + } +} + +func (c *Conn) getChannelForPort(port int) byte { + if channel, exists := c.portToChannel[port]; exists { + return channel + } + + c.channelCounter++ + if c.channelCounter == 0 { + c.channelCounter = 1 + } + + channel := c.channelCounter + c.portToChannel[port] = channel + + return channel +} diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index 0c2009d7..a5f001c2 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -40,6 +40,7 @@ type Conn struct { keepalive int mode core.Mode playOK bool + playErr error reader *bufio.Reader sequence int session string @@ -47,8 +48,32 @@ type Conn struct { state State stateMu sync.Mutex + + transportMode TransportMode + + // UDP + + udpRtpConns map[byte]*UDPConnection + udpRtcpConns map[byte]*UDPConnection + udpRtpListeners map[byte]*UDPConnection + udpRtcpListeners map[byte]*UDPConnection + portToChannel map[int]byte + channelCounter byte } +type UDPConnection struct { + Conn net.UDPConn + Channel byte +} + +type TransportMode int + +const ( + TransportTCP TransportMode = iota + TransportUDP + ReceiveMTU = 1500 +) + const ( ProtoRTSP = "RTSP/1.0" MethodOptions = "OPTIONS" @@ -68,7 +93,6 @@ func (s State) String() string { case StateNone: return "NONE" case StateConn: - return "CONN" case StateSetup: return MethodSetup @@ -131,133 +155,22 @@ func (c *Conn) Handle() (err error) { for c.state != StateNone { ts := time.Now() + time := ts.Add(timeout) - if err = c.conn.SetReadDeadline(ts.Add(timeout)); err != nil { + if err = c.conn.SetReadDeadline(time); err != nil { return } - // we can read: - // 1. RTP interleaved: `$` + 1B channel number + 2B size - // 2. RTSP response: RTSP/1.0 200 OK - // 3. RTSP request: OPTIONS ... - var buf4 []byte // `$` + 1B channel number + 2B size - buf4, err = c.reader.Peek(4) - if err != nil { - return - } - - var channelID byte - var size uint16 - - if buf4[0] != '$' { - switch string(buf4) { - case "RTSP": - var res *tcp.Response - if res, err = c.ReadResponse(); err != nil { - return - } - c.Fire(res) - // for playing backchannel only after OK response on play - c.playOK = true - continue - - case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": - var req *tcp.Request - if req, err = c.ReadRequest(); err != nil { - return - } - c.Fire(req) - if req.Method == MethodOptions { - res := &tcp.Response{Request: req} - if err = c.WriteResponse(res); err != nil { - return - } - } - continue - - default: - c.Fire("RTSP wrong input") - - for i := 0; ; i++ { - // search next start symbol - if _, err = c.reader.ReadBytes('$'); err != nil { - return err - } - - if channelID, err = c.reader.ReadByte(); err != nil { - return err - } - - // TODO: better check maximum good channel ID - if channelID >= 20 { - continue - } - - buf4 = make([]byte, 2) - if _, err = io.ReadFull(c.reader, buf4); err != nil { - return err - } - - // check if size good for RTP - size = binary.BigEndian.Uint16(buf4) - if size <= 1500 { - break - } - - // 10 tries to find good packet - if i >= 10 { - return fmt.Errorf("RTSP wrong input") - } - } + if c.transportMode == TransportUDP { + if err = c.handleUDPClientData(time); err != nil { + return err } } else { - // hope that the odd channels are always RTCP - channelID = buf4[1] - - // get data size - size = binary.BigEndian.Uint16(buf4[2:]) - - // skip 4 bytes from c.reader.Peek - if _, err = c.reader.Discard(4); err != nil { - return + if err = c.handleTCPClientData(); err != nil { + return err } } - // init memory for data - buf := make([]byte, size) - if _, err = io.ReadFull(c.reader, buf); err != nil { - return - } - - c.Recv += int(size) - - if channelID&1 == 0 { - packet := &rtp.Packet{} - if err = packet.Unmarshal(buf); err != nil { - return - } - - for _, receiver := range c.Receivers { - if receiver.ID == channelID { - receiver.WriteRTP(packet) - break - } - } - } else { - msg := &RTCP{Channel: channelID} - - if err = msg.Header.Unmarshal(buf); err != nil { - continue - } - - msg.Packets, err = rtcp.Unmarshal(buf) - if err != nil { - continue - } - - c.Fire(msg) - } - if keepaliveDT != 0 && ts.After(keepaliveTS) { req := &tcp.Request{Method: MethodOptions, URL: c.URL} if err = c.WriteRequest(req); err != nil { @@ -271,6 +184,246 @@ func (c *Conn) Handle() (err error) { return } +func (c *Conn) handleUDPClientData(time time.Time) error { + if c.playErr != nil { + return c.playErr + } + + if c.state == StatePlay && c.playOK { + return nil + } + + var buf4 []byte + + buf4, err := c.reader.Peek(4) + if err != nil { + return err + } + + switch string(buf4) { + case "RTSP": + var res *tcp.Response + if res, err = c.ReadResponse(); err != nil { + return err + } + + c.Fire(res) + c.playOK = true + + for _, listener := range c.udpRtpListeners { + go func(listener *UDPConnection) { + defer listener.Conn.Close() + + for c.state != StateNone { + if err := listener.Conn.SetReadDeadline(time); err != nil { + c.playErr = err + return + } + + buffer := make([]byte, ReceiveMTU) + n, _, err := listener.Conn.ReadFromUDP(buffer) + if err != nil { + c.playErr = err + break + } + + packet := &rtp.Packet{} + if err := packet.Unmarshal(buffer[:n]); err != nil { + c.playErr = err + return + } + + for _, receiver := range c.Receivers { + if receiver.ID == listener.Channel { + receiver.WriteRTP(packet) + break + } + } + + c.Recv += len(buffer[:n]) + } + }(listener) + } + + for _, listener := range c.udpRtcpListeners { + go func(listener *UDPConnection) { + defer listener.Conn.Close() + + for c.state != StateNone { + if err := listener.Conn.SetReadDeadline(time); err != nil { + return + } + + buffer := make([]byte, ReceiveMTU) + n, _, err := listener.Conn.ReadFromUDP(buffer) + if err != nil { + break + } + + msg := &RTCP{Channel: listener.Channel} + + if err := msg.Header.Unmarshal(buffer[:n]); err != nil { + continue + } + + msg.Packets, err = rtcp.Unmarshal(buffer[:n]) + if err != nil { + continue + } + + c.Fire(msg) + } + }(listener) + } + + case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": + var req *tcp.Request + if req, err = c.ReadRequest(); err != nil { + return err + } + c.Fire(req) + if req.Method == MethodOptions { + res := &tcp.Response{Request: req} + if err = c.WriteResponse(res); err != nil { + return err + } + } + + default: + return fmt.Errorf("RTSP wrong input") + } + + return nil +} + +func (c *Conn) handleTCPClientData() error { + // we can read: + // 1. RTP interleaved: `$` + 1B channel number + 2B size + // 2. RTSP response: RTSP/1.0 200 OK + // 3. RTSP request: OPTIONS ... + var buf4 []byte // `$` + 1B channel number + 2B size + var err error + + buf4, err = c.reader.Peek(4) + if err != nil { + return err + } + + var channel byte + var size uint16 + + if buf4[0] != '$' { + switch string(buf4) { + case "RTSP": + var res *tcp.Response + if res, err = c.ReadResponse(); err != nil { + return err + } + c.Fire(res) + // for playing backchannel only after OK response on play + c.playOK = true + return nil + + case "OPTI", "TEAR", "DESC", "SETU", "PLAY", "PAUS", "RECO", "ANNO", "GET_", "SET_": + var req *tcp.Request + if req, err = c.ReadRequest(); err != nil { + return err + } + c.Fire(req) + if req.Method == MethodOptions { + res := &tcp.Response{Request: req} + if err = c.WriteResponse(res); err != nil { + return err + } + } + return nil + + default: + c.Fire("RTSP wrong input") + + for i := 0; ; i++ { + // search next start symbol + if _, err = c.reader.ReadBytes('$'); err != nil { + return err + } + + if channel, err = c.reader.ReadByte(); err != nil { + return err + } + + // TODO: better check maximum good channel ID + if channel >= 20 { + continue + } + + buf4 = make([]byte, 2) + if _, err = io.ReadFull(c.reader, buf4); err != nil { + return err + } + + // check if size good for RTP + size = binary.BigEndian.Uint16(buf4) + if size <= 1500 { + break + } + + // 10 tries to find good packet + if i >= 10 { + return fmt.Errorf("RTSP wrong input") + } + } + } + } else { + // hope that the odd channels are always RTCP + channel = buf4[1] + + // get data size + size = binary.BigEndian.Uint16(buf4[2:]) + + // skip 4 bytes from c.reader.Peek + if _, err = c.reader.Discard(4); err != nil { + return err + } + } + + // init memory for data + buf := make([]byte, size) + if _, err = io.ReadFull(c.reader, buf); err != nil { + return err + } + + c.Recv += int(size) + + if channel&1 == 0 { + packet := &rtp.Packet{} + if err = packet.Unmarshal(buf); err != nil { + return err + } + + for _, receiver := range c.Receivers { + if receiver.ID == channel { + receiver.WriteRTP(packet) + break + } + } + } else { + msg := &RTCP{Channel: channel} + + if err = msg.Header.Unmarshal(buf); err != nil { + return nil + } + + msg.Packets, err = rtcp.Unmarshal(buf) + if err != nil { + return nil + } + + c.Fire(msg) + } + + return nil +} + func (c *Conn) WriteRequest(req *tcp.Request) error { if req.Proto == "" { req.Proto = ProtoRTSP diff --git a/pkg/rtsp/consumer.go b/pkg/rtsp/consumer.go index 860ed113..b5827436 100644 --- a/pkg/rtsp/consumer.go +++ b/pkg/rtsp/consumer.go @@ -85,13 +85,22 @@ func (c *Conn) packetWriter(codec *core.Codec, channel, payloadType uint8) core. } flushBuf := func() { - if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { - return - } //log.Printf("[rtsp] channel:%2d write_size:%6d buffer_size:%6d", channel, n, len(buf)) - if _, err := c.conn.Write(buf[:n]); err == nil { - c.Send += n + + if c.transportMode == TransportUDP { + if err := c.sendUDPRtpPacket(buf[:n]); err == nil { + c.Send += n + } + } else { + if err := c.conn.SetWriteDeadline(time.Now().Add(Timeout)); err != nil { + return + } + + if _, err := c.conn.Write(buf[:n]); err == nil { + c.Send += n + } } + n = 0 } diff --git a/pkg/rtsp/ports.go b/pkg/rtsp/ports.go new file mode 100644 index 00000000..d280ac6d --- /dev/null +++ b/pkg/rtsp/ports.go @@ -0,0 +1,75 @@ +package rtsp + +import ( + "fmt" + "net" + "sync" +) + +var mu sync.Mutex + +type UDPPortPair struct { + RTPListener *net.UDPConn + RTCPListener *net.UDPConn + RTPPort int + RTCPPort int +} + +func (p *UDPPortPair) Close() { + if p.RTPListener != nil { + _ = p.RTPListener.Close() + } + if p.RTCPListener != nil { + _ = p.RTCPListener.Close() + } +} + +func GetUDPPorts(ip net.IP, maxAttempts int) (*UDPPortPair, error) { + mu.Lock() + defer mu.Unlock() + + if ip == nil { + ip = net.IPv4(0, 0, 0, 0) + } + + for i := 0; i < maxAttempts; i++ { + // Get a random even port from the OS + tempListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: 0}) + if err != nil { + continue + } + + addr := tempListener.LocalAddr().(*net.UDPAddr) + basePort := addr.Port + tempListener.Close() + + // 11. RTP over Network and Transport Protocols (https://www.ietf.org/rfc/rfc3550.txt) + // For UDP and similar protocols, + // RTP SHOULD use an even destination port number and the corresponding + // RTCP stream SHOULD use the next higher (odd) destination port number + if basePort%2 == 1 { + basePort-- + } + + // Try to bind both ports + rtpListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: basePort}) + if err != nil { + continue + } + + rtcpListener, err := net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: basePort + 1}) + if err != nil { + rtpListener.Close() + continue + } + + return &UDPPortPair{ + RTPListener: rtpListener, + RTCPListener: rtcpListener, + RTPPort: basePort, + RTCPPort: basePort + 1, + }, nil + } + + return nil, fmt.Errorf("failed to allocate consecutive UDP ports after %d attempts", maxAttempts) +}