diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/client.go similarity index 73% rename from pkg/rtsp/conn.go rename to pkg/rtsp/client.go index f9e2866c..ba7a48b6 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/client.go @@ -6,10 +6,6 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/AlexxIT/go2rtc/pkg/aac" - "github.com/AlexxIT/go2rtc/pkg/h264" - "github.com/AlexxIT/go2rtc/pkg/h265" - "github.com/AlexxIT/go2rtc/pkg/mjpeg" "github.com/AlexxIT/go2rtc/pkg/streamer" "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/pion/rtcp" @@ -112,19 +108,6 @@ func NewClient(uri string) (*Conn, error) { return c, c.parseURI() } -func NewServer(conn net.Conn) *Conn { - c := new(Conn) - c.conn = conn - c.mode = ModeServerUnknown - c.reader = bufio.NewReader(conn) - return c -} - -func (c *Conn) Auth(username, password string) { - info := url.UserPassword(username, password) - c.auth = tcp.NewAuth(info) -} - func (c *Conn) parseURI() (err error) { c.URL, err = url.Parse(c.uri) if err != nil { @@ -532,151 +515,6 @@ func (c *Conn) Close() error { return c.conn.Close() } -const transport = "RTP/AVP/TCP;unicast;interleaved=" - -func (c *Conn) Accept() error { - for { - req, err := tcp.ReadRequest(c.reader) - if err != nil { - return err - } - - if c.URL == nil { - c.URL = req.URL - c.UserAgent = req.Header.Get("User-Agent") - } - - c.Fire(req) - - if !c.auth.Validate(req) { - res := &tcp.Response{ - Status: "401 Unauthorized", - Header: map[string][]string{"Www-Authenticate": {`Basic realm="go2rtc"`}}, - } - if err = c.Response(res); err != nil { - return err - } - continue - } - - // Receiver: OPTIONS > DESCRIBE > SETUP... > PLAY > TEARDOWN - // Sender: OPTIONS > ANNOUNCE > SETUP... > RECORD > TEARDOWN - switch req.Method { - case MethodOptions: - res := &tcp.Response{ - Header: map[string][]string{ - "Public": {"OPTIONS, SETUP, TEARDOWN, DESCRIBE, PLAY, PAUSE, ANNOUNCE, RECORD"}, - }, - Request: req, - } - if err = c.Response(res); err != nil { - return err - } - - case MethodAnnounce: - if req.Header.Get("Content-Type") != "application/sdp" { - return errors.New("wrong content type") - } - - c.Medias, err = UnmarshalSDP(req.Body) - if err != nil { - return err - } - - // TODO: fix someday... - c.channels = map[byte]*streamer.Track{} - for i, media := range c.Medias { - track := streamer.NewTrack(media.Codecs[0], media.Direction) - c.tracks = append(c.tracks, track) - c.channels[byte(i<<1)] = track - } - - c.mode = ModeServerProducer - c.Fire(MethodAnnounce) - - res := &tcp.Response{Request: req} - if err = c.Response(res); err != nil { - return err - } - - case MethodDescribe: - c.mode = ModeServerConsumer - c.Fire(MethodDescribe) - - if c.tracks == nil { - res := &tcp.Response{ - Status: "404 Not Found", - Request: req, - } - return c.Response(res) - } - - res := &tcp.Response{ - Header: map[string][]string{ - "Content-Type": {"application/sdp"}, - }, - Request: req, - } - - // convert tracks to real output medias medias - var medias []*streamer.Media - for _, track := range c.tracks { - media := &streamer.Media{ - Kind: streamer.GetKind(track.Codec.Name), - Direction: streamer.DirectionSendonly, - Codecs: []*streamer.Codec{track.Codec}, - } - medias = append(medias, media) - } - - res.Body, err = streamer.MarshalSDP(c.SessionName, medias) - if err != nil { - return err - } - - if err = c.Response(res); err != nil { - return err - } - - case MethodSetup: - tr := req.Header.Get("Transport") - - res := &tcp.Response{ - Header: map[string][]string{}, - Request: req, - } - - if strings.HasPrefix(tr, transport) { - c.Session = "1" // TODO: fixme - c.state = StateSetup - res.Header.Set("Transport", tr[:len(transport)+3]) - } else { - res.Status = "461 Unsupported transport" - } - - if err = c.Response(res); err != nil { - return err - } - - case MethodRecord, MethodPlay: - res := &tcp.Response{Request: req} - if err = c.Response(res); err == nil { - c.state = StatePlay - } - return err - - case MethodTeardown: - res := &tcp.Response{Request: req} - _ = c.Response(res) - c.state = StateNone - return c.conn.Close() - - default: - return fmt.Errorf("unsupported method: %s", req.Method) - } - } -} - func (c *Conn) Handle() (err error) { c.stateMu.Lock() @@ -881,54 +719,3 @@ func (c *Conn) GetChannel(media *streamer.Media) int { } return -1 } - -func (c *Conn) bindTrack( - track *streamer.Track, channel uint8, payloadType uint8, -) *streamer.Track { - push := func(packet *rtp.Packet) error { - if c.state == StateNone { - return nil - } - packet.Header.PayloadType = payloadType - - size := packet.MarshalSize() - - //log.Printf("[RTP] codec: %s, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, mark: %v", track.Codec.Name, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker) - - data := make([]byte, 4+size) - data[0] = '$' - data[1] = channel - binary.BigEndian.PutUint16(data[2:], uint16(size)) - - if _, err := packet.MarshalTo(data[4:]); err != nil { - return nil - } - - if _, err := c.conn.Write(data); err != nil { - return err - } - - c.send += size - - return nil - } - - if !track.Codec.IsRTP() { - switch track.Codec.Name { - case streamer.CodecH264: - wrapper := h264.RTPPay(1500) - push = wrapper(push) - case streamer.CodecH265: - wrapper := h265.RTPPay(1500) - push = wrapper(push) - case streamer.CodecAAC: - wrapper := aac.RTPPay(1500) - push = wrapper(push) - case streamer.CodecJPEG: - wrapper := mjpeg.RTPPay() - push = wrapper(push) - } - } - - return track.Bind(push) -} diff --git a/pkg/rtsp/consumer.go b/pkg/rtsp/consumer.go new file mode 100644 index 00000000..35ac5ade --- /dev/null +++ b/pkg/rtsp/consumer.go @@ -0,0 +1,102 @@ +package rtsp + +import ( + "encoding/binary" + "github.com/AlexxIT/go2rtc/pkg/aac" + "github.com/AlexxIT/go2rtc/pkg/h264" + "github.com/AlexxIT/go2rtc/pkg/h265" + "github.com/AlexxIT/go2rtc/pkg/mjpeg" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/pion/rtp" +) + +func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { + switch track.Direction { + // send our track to RTSP consumer (ex. FFmpeg) + case streamer.DirectionSendonly: + i := len(c.tracks) + channelID := byte(i << 1) + + codec := track.Codec.Clone() + codec.PayloadType = uint8(96 + i) + + if media.MatchAll() { + // fill consumer medias list + c.Medias = append(c.Medias, &streamer.Media{ + Kind: media.Kind, Direction: media.Direction, + Codecs: []*streamer.Codec{codec}, + }) + } else { + // find consumer media and replace codec with right one + for i, m := range c.Medias { + if m == media { + media.Codecs = []*streamer.Codec{codec} + c.Medias[i] = media + break + } + } + } + + track = c.bindTrack(track, channelID, codec.PayloadType) + track.Codec = codec + c.tracks = append(c.tracks, track) + + return track + + case streamer.DirectionRecvonly: + panic("not implemented") + } + + panic("wrong direction") +} + +func (c *Conn) bindTrack( + track *streamer.Track, channel uint8, payloadType uint8, +) *streamer.Track { + push := func(packet *rtp.Packet) error { + if c.state == StateNone { + return nil + } + packet.Header.PayloadType = payloadType + + size := packet.MarshalSize() + + //log.Printf("[RTP] codec: %s, size: %6d, ts: %10d, pt: %2d, ssrc: %d, seq: %d, mark: %v", track.Codec.Name, len(packet.Payload), packet.Timestamp, packet.PayloadType, packet.SSRC, packet.SequenceNumber, packet.Marker) + + data := make([]byte, 4+size) + data[0] = '$' + data[1] = channel + binary.BigEndian.PutUint16(data[2:], uint16(size)) + + if _, err := packet.MarshalTo(data[4:]); err != nil { + return nil + } + + if _, err := c.conn.Write(data); err != nil { + return err + } + + c.send += size + + return nil + } + + if !track.Codec.IsRTP() { + switch track.Codec.Name { + case streamer.CodecH264: + wrapper := h264.RTPPay(1500) + push = wrapper(push) + case streamer.CodecH265: + wrapper := h265.RTPPay(1500) + push = wrapper(push) + case streamer.CodecAAC: + wrapper := aac.RTPPay(1500) + push = wrapper(push) + case streamer.CodecJPEG: + wrapper := mjpeg.RTPPay() + push = wrapper(push) + } + } + + return track.Bind(push) +} diff --git a/pkg/rtsp/streamer.go b/pkg/rtsp/producer.go similarity index 67% rename from pkg/rtsp/streamer.go rename to pkg/rtsp/producer.go index 7fea6d97..7a6139b3 100644 --- a/pkg/rtsp/streamer.go +++ b/pkg/rtsp/producer.go @@ -6,8 +6,6 @@ import ( "github.com/AlexxIT/go2rtc/pkg/streamer" ) -// Element Producer - func (c *Conn) GetMedias() []*streamer.Media { if c.Medias != nil { return c.Medias @@ -70,50 +68,6 @@ func (c *Conn) Stop() error { return c.Close() } -// Consumer - -func (c *Conn) AddTrack(media *streamer.Media, track *streamer.Track) *streamer.Track { - switch track.Direction { - // send our track to RTSP consumer (ex. FFmpeg) - case streamer.DirectionSendonly: - i := len(c.tracks) - channelID := byte(i << 1) - - codec := track.Codec.Clone() - codec.PayloadType = uint8(96 + i) - - if media.MatchAll() { - // fill consumer medias list - c.Medias = append(c.Medias, &streamer.Media{ - Kind: media.Kind, Direction: media.Direction, - Codecs: []*streamer.Codec{codec}, - }) - } else { - // find consumer media and replace codec with right one - for i, m := range c.Medias { - if m == media { - media.Codecs = []*streamer.Codec{codec} - c.Medias[i] = media - break - } - } - } - - track = c.bindTrack(track, channelID, codec.PayloadType) - track.Codec = codec - c.tracks = append(c.tracks, track) - - return track - - case streamer.DirectionRecvonly: - panic("not implemented") - } - - panic("wrong direction") -} - -// - func (c *Conn) MarshalJSON() ([]byte, error) { info := &streamer.Info{ UserAgent: c.UserAgent, diff --git a/pkg/rtsp/server.go b/pkg/rtsp/server.go new file mode 100644 index 00000000..59b2484b --- /dev/null +++ b/pkg/rtsp/server.go @@ -0,0 +1,170 @@ +package rtsp + +import ( + "bufio" + "errors" + "fmt" + "github.com/AlexxIT/go2rtc/pkg/streamer" + "github.com/AlexxIT/go2rtc/pkg/tcp" + "net" + "net/url" + "strings" +) + +func NewServer(conn net.Conn) *Conn { + c := new(Conn) + c.conn = conn + c.mode = ModeServerUnknown + c.reader = bufio.NewReader(conn) + return c +} + +func (c *Conn) Auth(username, password string) { + info := url.UserPassword(username, password) + c.auth = tcp.NewAuth(info) +} + +const transport = "RTP/AVP/TCP;unicast;interleaved=" + +func (c *Conn) Accept() error { + for { + req, err := tcp.ReadRequest(c.reader) + if err != nil { + return err + } + + if c.URL == nil { + c.URL = req.URL + c.UserAgent = req.Header.Get("User-Agent") + } + + c.Fire(req) + + if !c.auth.Validate(req) { + res := &tcp.Response{ + Status: "401 Unauthorized", + Header: map[string][]string{"Www-Authenticate": {`Basic realm="go2rtc"`}}, + } + if err = c.Response(res); err != nil { + return err + } + continue + } + + // Receiver: OPTIONS > DESCRIBE > SETUP... > PLAY > TEARDOWN + // Sender: OPTIONS > ANNOUNCE > SETUP... > RECORD > TEARDOWN + switch req.Method { + case MethodOptions: + res := &tcp.Response{ + Header: map[string][]string{ + "Public": {"OPTIONS, SETUP, TEARDOWN, DESCRIBE, PLAY, PAUSE, ANNOUNCE, RECORD"}, + }, + Request: req, + } + if err = c.Response(res); err != nil { + return err + } + + case MethodAnnounce: + if req.Header.Get("Content-Type") != "application/sdp" { + return errors.New("wrong content type") + } + + c.Medias, err = UnmarshalSDP(req.Body) + if err != nil { + return err + } + + // TODO: fix someday... + c.channels = map[byte]*streamer.Track{} + for i, media := range c.Medias { + track := streamer.NewTrack(media.Codecs[0], media.Direction) + c.tracks = append(c.tracks, track) + c.channels[byte(i<<1)] = track + } + + c.mode = ModeServerProducer + c.Fire(MethodAnnounce) + + res := &tcp.Response{Request: req} + if err = c.Response(res); err != nil { + return err + } + + case MethodDescribe: + c.mode = ModeServerConsumer + c.Fire(MethodDescribe) + + if c.tracks == nil { + res := &tcp.Response{ + Status: "404 Not Found", + Request: req, + } + return c.Response(res) + } + + res := &tcp.Response{ + Header: map[string][]string{ + "Content-Type": {"application/sdp"}, + }, + Request: req, + } + + // convert tracks to real output medias medias + var medias []*streamer.Media + for _, track := range c.tracks { + media := &streamer.Media{ + Kind: streamer.GetKind(track.Codec.Name), + Direction: streamer.DirectionSendonly, + Codecs: []*streamer.Codec{track.Codec}, + } + medias = append(medias, media) + } + + res.Body, err = streamer.MarshalSDP(c.SessionName, medias) + if err != nil { + return err + } + + if err = c.Response(res); err != nil { + return err + } + + case MethodSetup: + tr := req.Header.Get("Transport") + + res := &tcp.Response{ + Header: map[string][]string{}, + Request: req, + } + + if strings.HasPrefix(tr, transport) { + c.Session = "1" // TODO: fixme + c.state = StateSetup + res.Header.Set("Transport", tr[:len(transport)+3]) + } else { + res.Status = "461 Unsupported transport" + } + + if err = c.Response(res); err != nil { + return err + } + + case MethodRecord, MethodPlay: + res := &tcp.Response{Request: req} + if err = c.Response(res); err == nil { + c.state = StatePlay + } + return err + + case MethodTeardown: + res := &tcp.Response{Request: req} + _ = c.Response(res) + c.state = StateNone + return c.conn.Close() + + default: + return fmt.Errorf("unsupported method: %s", req.Method) + } + } +}