diff --git a/internal/hass/api.go b/internal/hass/api.go index 5c8294eb..8824d47c 100644 --- a/internal/hass/api.go +++ b/internal/hass/api.go @@ -3,7 +3,6 @@ package hass import ( "encoding/base64" "encoding/json" - "github.com/AlexxIT/go2rtc/internal/api" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/internal/webrtc" "net" @@ -11,79 +10,69 @@ import ( "strings" ) -func initAPI() { - ok := func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`{"status":1,"payload":{}}`)) - } +func apiOK(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"status":1,"payload":{}}`)) +} - // support https://www.home-assistant.io/integrations/rtsp_to_webrtc/ - api.HandleFunc("/static", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - }) - - api.HandleFunc("/streams", ok) - - // api from RTSPtoWeb - api.HandleFunc("/stream/", func(w http.ResponseWriter, r *http.Request) { - switch { - // /stream/{id}/add - case strings.HasSuffix(r.RequestURI, "/add"): - var v addJSON - if err := json.NewDecoder(r.Body).Decode(&v); err != nil { - return - } - - // we can get three types of links: - // 1. link to go2rtc stream: rtsp://...:8554/{stream_name} - // 2. static link to Hass camera - // 3. dynamic link to Hass camera - stream := streams.Get(v.Name) - if stream == nil { - stream = streams.NewTemplate(v.Name, v.Channels.First.Url) - } - - stream.SetSource(v.Channels.First.Url) - - ok(w, r) - - // /stream/{id}/channel/0/webrtc - default: - i := strings.IndexByte(r.RequestURI[8:], '/') - if i <= 0 { - log.Warn().Msgf("wrong request: %s", r.RequestURI) - return - } - name := r.RequestURI[8 : 8+i] - - stream := streams.Get(name) - if stream == nil { - w.WriteHeader(http.StatusNotFound) - return - } - - if err := r.ParseForm(); err != nil { - log.Error().Err(err).Msg("[api.hass] parse form") - return - } - - s := r.FormValue("data") - offer, err := base64.StdEncoding.DecodeString(s) - if err != nil { - log.Error().Err(err).Msg("[api.hass] sdp64 decode") - return - } - - s, err = webrtc.ExchangeSDP(stream, string(offer), "WebRTC/Hass sync", r.UserAgent()) - if err != nil { - log.Error().Err(err).Msg("[api.hass] exchange SDP") - return - } - - s = base64.StdEncoding.EncodeToString([]byte(s)) - _, _ = w.Write([]byte(s)) +func apiStream(w http.ResponseWriter, r *http.Request) { + switch { + // /stream/{id}/add + case strings.HasSuffix(r.RequestURI, "/add"): + var v addJSON + if err := json.NewDecoder(r.Body).Decode(&v); err != nil { + return } - }) + + // we can get three types of links: + // 1. link to go2rtc stream: rtsp://...:8554/{stream_name} + // 2. static link to Hass camera + // 3. dynamic link to Hass camera + stream := streams.Get(v.Name) + if stream == nil { + stream = streams.NewTemplate(v.Name, v.Channels.First.Url) + } + + stream.SetSource(v.Channels.First.Url) + + apiOK(w, r) + + // /stream/{id}/channel/0/webrtc + default: + i := strings.IndexByte(r.RequestURI[8:], '/') + if i <= 0 { + log.Warn().Msgf("wrong request: %s", r.RequestURI) + return + } + name := r.RequestURI[8 : 8+i] + + stream := streams.Get(name) + if stream == nil { + w.WriteHeader(http.StatusNotFound) + return + } + + if err := r.ParseForm(); err != nil { + log.Error().Err(err).Msg("[api.hass] parse form") + return + } + + s := r.FormValue("data") + offer, err := base64.StdEncoding.DecodeString(s) + if err != nil { + log.Error().Err(err).Msg("[api.hass] sdp64 decode") + return + } + + s, err = webrtc.ExchangeSDP(stream, string(offer), "WebRTC/Hass sync", r.UserAgent()) + if err != nil { + log.Error().Err(err).Msg("[api.hass] exchange SDP") + return + } + + s = base64.StdEncoding.EncodeToString([]byte(s)) + _, _ = w.Write([]byte(s)) + } } func HassioAddr() string { diff --git a/internal/hass/hass.go b/internal/hass/hass.go index e9a9f9b4..67dbbf41 100644 --- a/internal/hass/hass.go +++ b/internal/hass/hass.go @@ -9,10 +9,12 @@ import ( "github.com/AlexxIT/go2rtc/internal/roborock" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/hass" "github.com/rs/zerolog" "net/http" "os" "path" + "sync" ) func Init() { @@ -29,10 +31,15 @@ func Init() { log = app.GetLogger("hass") - initAPI() + // support API for https://www.home-assistant.io/integrations/rtsp_to_webrtc/ + api.HandleFunc("/static", apiOK) + api.HandleFunc("/streams", apiOK) + api.HandleFunc("/stream/", apiStream) + + // load static entries from Hass config + if err := importConfig(conf.Mod.Config); err != nil { + log.Debug().Msgf("[hass] can't import config: %s", err) - entries := importEntries(conf.Mod.Config) - if entries == nil { api.HandleFunc("api/hass", func(w http.ResponseWriter, _ *http.Request) { http.Error(w, "no hass config", http.StatusNotFound) }) @@ -40,18 +47,35 @@ func Init() { } api.HandleFunc("api/hass", func(w http.ResponseWriter, _ *http.Request) { + once.Do(func() { + // load WebRTC entities from Hass API, works only for add-on version + if token := hass.SupervisorToken(); token != "" { + if err := importWebRTC(token); err != nil { + log.Warn().Err(err).Caller().Send() + } + } + }) + var items []api.Stream - for name, url := range entries { + for name, url := range entities { items = append(items, api.Stream{Name: name, URL: url}) } api.ResponseStreams(w, items) }) streams.HandleFunc("hass", func(url string) (core.Producer, error) { - if hurl := entries[url[5:]]; hurl != "" { - return streams.GetProducer(hurl) + // check entity by name + if url2 := entities[url[5:]]; url2 != "" { + return streams.GetProducer(url2) } - return nil, fmt.Errorf("can't get url: %s", url) + + // support hass://supervisor?entity_id=camera.driveway_doorbell + client, err := hass.NewClient(url) + if err != nil { + return nil, err + } + + return client, nil }) // for Addon listen on hassio interface, so WebUI feature will work @@ -68,12 +92,12 @@ func Init() { } } -func importEntries(config string) map[string]string { +func importConfig(config string) error { // support load cameras from Hass config file filename := path.Join(config, ".storage/core.config_entries") b, err := os.ReadFile(filename) if err != nil { - return nil + return err } var storage struct { @@ -88,11 +112,9 @@ func importEntries(config string) map[string]string { } if err = json.Unmarshal(b, &storage); err != nil { - return nil + return err } - urls := map[string]string{} - for _, entrie := range storage.Data.Entries { switch entrie.Domain { case "generic": @@ -102,7 +124,7 @@ func importEntries(config string) map[string]string { if err = json.Unmarshal(entrie.Options, &options); err != nil { continue } - urls[entrie.Title] = options.StreamSource + entities[entrie.Title] = options.StreamSource case "homekit_controller": if !bytes.Contains(entrie.Data, []byte("iOSPairingId")) { @@ -121,7 +143,7 @@ func importEntries(config string) map[string]string { if err = json.Unmarshal(entrie.Data, &data); err != nil { continue } - urls[entrie.Title] = fmt.Sprintf( + entities[entrie.Title] = fmt.Sprintf( "homekit://%s:%d?client_id=%s&client_private=%s%s&device_id=%s&device_public=%s", data.DeviceHost, data.DevicePort, data.ClientID, data.ClientPrivate, data.ClientPublic, @@ -143,22 +165,48 @@ func importEntries(config string) map[string]string { } if data.Username != "" && data.Password != "" { - urls[entrie.Title] = fmt.Sprintf( + entities[entrie.Title] = fmt.Sprintf( "onvif://%s:%s@%s:%d", data.Username, data.Password, data.Host, data.Port, ) } else { - urls[entrie.Title] = fmt.Sprintf("onvif://%s:%d", data.Host, data.Port) + entities[entrie.Title] = fmt.Sprintf("onvif://%s:%d", data.Host, data.Port) } default: continue } - log.Info().Str("url", "hass:"+entrie.Title).Msg("[hass] load stream") + log.Debug().Str("url", "hass:"+entrie.Title).Msg("[hass] load config") //streams.Get("hass:" + entrie.Title) } - return urls + return nil } +func importWebRTC(token string) error { + hassAPI, err := hass.NewAPI("ws://supervisor/core/websocket", token) + if err != nil { + return err + } + + webrtcEntities, err := hassAPI.GetWebRTCEntities() + if err != nil { + return err + } + + if len(webrtcEntities) == 0 { + log.Debug().Msg("[hass] webrtc cameras not found") + } + + for name, entityID := range webrtcEntities { + entities[name] = "hass://supervisor?entity_id=" + entityID + + log.Debug().Msgf("[hass] load webrtc name=%s entity_id=%d", name, entityID) + } + + return nil +} + +var entities = map[string]string{} var log zerolog.Logger +var once sync.Once diff --git a/pkg/hass/api.go b/pkg/hass/api.go new file mode 100644 index 00000000..6d5a9204 --- /dev/null +++ b/pkg/hass/api.go @@ -0,0 +1,143 @@ +package hass + +import ( + "errors" + "github.com/gorilla/websocket" + "os" +) + +type API struct { + ws *websocket.Conn +} + +func NewAPI(url, token string) (*API, error) { + ws, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, err + } + + api := &API{ws: ws} + if err = api.Auth(token); err != nil { + _ = ws.Close() + return nil, err + } + + return api, nil +} + +func (a *API) Auth(token string) error { + var res ResponseAuth + + if err := a.ws.ReadJSON(&res); err != nil { + return err + } + if res.Type != "auth_required" { + return errors.New("hass: wrong type: " + res.Type) + } + + s := `{"type":"auth","access_token":"` + token + `"}` + if err := a.ws.WriteMessage(websocket.TextMessage, []byte(s)); err != nil { + return err + } + if err := a.ws.ReadJSON(&res); err != nil { + return err + } + if res.Type != "auth_ok" { + return errors.New("hass: wrong type: " + res.Type) + } + + return nil +} + +func (a *API) Close() error { + return a.ws.Close() +} + +func (a *API) ExchangeSDP(entityID, offer string) (string, error) { + var msg = map[string]any{ + "id": 1, + "type": "camera/web_rtc_offer", + "entity_id": entityID, + "offer": offer, + } + if err := a.ws.WriteJSON(msg); err != nil { + return "", err + } + + var res ResponseOffer + if err := a.ws.ReadJSON(&res); err != nil { + return "", err + } + + if res.Type != "result" || !res.Success { + return "", errors.New("hass: wrong response") + } + + return res.Result.Answer, nil +} + +func (a *API) GetWebRTCEntities() (map[string]string, error) { + s := `{"id":1,"type":"get_states"}` + if err := a.ws.WriteMessage(websocket.TextMessage, []byte(s)); err != nil { + return nil, err + } + + var res ResponseStates + if err := a.ws.ReadJSON(&res); err != nil { + return nil, err + } + if res.Type != "result" || !res.Success { + return nil, errors.New("hass: wrong response") + } + + entities := map[string]string{} + + for _, entity := range res.Result { + if entity.Attributes.FrontendStreamType == "web_rtc" { + entities[entity.Attributes.FriendlyName] = entity.EntityId + } + } + + return entities, nil +} + +type ResponseAuth struct { + Type string `json:"type"` +} + +type ResponseStates struct { + //Id int `json:"id"` + Type string `json:"type"` + Success bool `json:"success"` + Result []struct { + EntityId string `json:"entity_id"` + //State string `json:"state"` + Attributes struct { + //ModelName string `json:"model_name"` + //Brand string `json:"brand"` + FrontendStreamType string `json:"frontend_stream_type"` + FriendlyName string `json:"friendly_name"` + //SupportedFeatures int `json:"supported_features"` + } `json:"attributes"` + //LastChanged time.Time `json:"last_changed"` + //LastUpdated time.Time `json:"last_updated"` + //Context struct { + // Id string `json:"id"` + // ParentId interface{} `json:"parent_id"` + // UserId interface{} `json:"user_id"` + //} `json:"context"` + } `json:"result"` +} + +type ResponseOffer struct { + //Id int `json:"id"` + Type string `json:"type"` + Success bool `json:"success"` + Result struct { + Answer string `json:"answer"` + } `json:"result"` +} + +func SupervisorToken() string { + return os.Getenv("SUPERVISOR_TOKEN") +} diff --git a/pkg/hass/client.go b/pkg/hass/client.go new file mode 100644 index 00000000..5b9a227a --- /dev/null +++ b/pkg/hass/client.go @@ -0,0 +1,115 @@ +package hass + +import ( + "errors" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/webrtc" + pion "github.com/pion/webrtc/v3" + "net/url" +) + +type Client struct { + conn *webrtc.Conn +} + +func NewClient(rawURL string) (*Client, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + query := u.Query() + + entityID := query.Get("entity_id") + if entityID == "" { + return nil, errors.New("hass: no entity_id") + } + + var uri, token string + + if u.Host == "supervisor" { + uri = "ws://supervisor/core/websocket" + token = SupervisorToken() + } else { + uri = "ws://" + u.Host + "/api/websocket" + token = query.Get("token") + } + + if token == "" { + return nil, errors.New("hass: no token") + } + + // 1. Check connection to Hass + hassAPI, err := NewAPI(uri, token) + if err != nil { + return nil, err + } + + defer hassAPI.Close() + + // 2. Create WebRTC client + rtcAPI, err := webrtc.NewAPI("") + if err != nil { + return nil, err + } + + conf := pion.Configuration{} + pc, err := rtcAPI.NewPeerConnection(conf) + if err != nil { + return nil, err + } + + conn := webrtc.NewConn(pc) + conn.Desc = "Hass" + conn.Mode = core.ModeActiveProducer + + // https://developers.google.com/nest/device-access/traits/device/camera-live-stream#generatewebrtcstream-request-fields + medias := []*core.Media{ + {Kind: core.KindAudio, Direction: core.DirectionRecvonly}, + {Kind: core.KindVideo, Direction: core.DirectionRecvonly}, + {Kind: "app"}, // important for Nest + } + + // 3. Create offer with candidates + offer, err := conn.CreateCompleteOffer(medias) + if err != nil { + return nil, err + } + + // 4. Exchange SDP via Hass + answer, err := hassAPI.ExchangeSDP(entityID, offer) + if err != nil { + return nil, err + } + + // 5. Set answer with remote medias + if err = conn.SetAnswer(answer); err != nil { + return nil, err + } + + return &Client{conn: conn}, nil +} + +func (c *Client) GetMedias() []*core.Media { + return c.conn.GetMedias() +} + +func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return c.conn.GetTrack(media, codec) +} + +func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + return c.conn.AddTrack(media, codec, track) +} + +func (c *Client) Start() error { + return c.conn.Start() +} + +func (c *Client) Stop() error { + return c.conn.Stop() +} + +func (c *Client) MarshalJSON() ([]byte, error) { + return c.conn.MarshalJSON() +} diff --git a/pkg/webrtc/client.go b/pkg/webrtc/client.go index 1e33fd10..50c7773d 100644 --- a/pkg/webrtc/client.go +++ b/pkg/webrtc/client.go @@ -24,6 +24,9 @@ func (c *Conn) CreateOffer(medias []*core.Media) (string, error) { case core.DirectionSendRecv: // default transceiver is sendrecv _, err = c.pc.AddTransceiverFromTrack(NewTrack(media.Kind)) + default: + // Nest cameras require data channel + _, err = c.pc.CreateDataChannel(media.Kind, nil) } if err != nil { diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index b7c3c628..e3b1c960 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -148,6 +148,17 @@ func (c *Conn) getTranseiver(mid string) *webrtc.RTPTransceiver { return nil } +func (c *Conn) getSenderTrack(mid string) *Track { + if tr := c.getTranseiver(mid); tr != nil { + if s := tr.Sender(); s != nil { + if t := s.Track().(*Track); t != nil { + return t + } + } + } + return nil +} + func (c *Conn) getMediaCodec(remote *webrtc.TrackRemote) (*core.Media, *core.Codec) { for _, tr := range c.pc.GetTransceivers() { // search Transeiver for this TrackRemote diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index b25cb7e3..070573c6 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -2,6 +2,7 @@ package webrtc import ( "encoding/json" + "errors" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/h264" "github.com/AlexxIT/go2rtc/pkg/h265" @@ -31,7 +32,11 @@ func (c *Conn) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiv panic(core.Caller()) } - localTrack := c.getTranseiver(media.ID).Sender().Track().(*Track) + localTrack := c.getSenderTrack(media.ID) + if localTrack == nil { + return errors.New("webrtc: can't get track") + } + payloadType := codec.PayloadType sender := core.NewSender(media, codec)