From c309bb83e7bdd674602d46758dd3acab522a6409 Mon Sep 17 00:00:00 2001 From: Alex X Date: Sun, 28 Apr 2024 07:09:01 +0300 Subject: [PATCH] Code refactoring for Milestone client --- internal/webrtc/client.go | 2 +- internal/webrtc/milestone.go | 362 ++++++++++++++--------------------- 2 files changed, 148 insertions(+), 216 deletions(-) diff --git a/internal/webrtc/client.go b/internal/webrtc/client.go index a2fa8ae6..29853932 100644 --- a/internal/webrtc/client.go +++ b/internal/webrtc/client.go @@ -49,7 +49,7 @@ func streamsHandler(rawURL string) (core.Producer, error) { case "http", "https": if format == "milestone" { - return milestoneClient(rawURL, query, "Milestone") + return milestoneClient(rawURL, query) } else if format == "wyze" { // https://github.com/mrlt8/docker-wyze-bridge return wyzeClient(rawURL) diff --git a/internal/webrtc/milestone.go b/internal/webrtc/milestone.go index f63f49e4..b4e695c9 100644 --- a/internal/webrtc/milestone.go +++ b/internal/webrtc/milestone.go @@ -2,16 +2,15 @@ package webrtc import ( "bytes" - "crypto/tls" "encoding/json" "errors" - "fmt" - "io" "net/http" "net/url" "strconv" + "strings" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/AlexxIT/go2rtc/pkg/webrtc" pion "github.com/pion/webrtc/v3" ) @@ -20,197 +19,166 @@ import ( // session creation, and session update with an SDP answer. It is designed to be used with // a specific URL format that encodes session parameters. For example: // webrtc:https://milestone-host/api#format=milestone#username=User#password=TestPassword#cameraId=a539f254-af05-4d67-a1bb-cd9b3c74d122 -// see: https://github.com/milestonesys/mipsdk-samples-protocol/tree/main/WebRTC_JavaScript +// +// https://github.com/milestonesys/mipsdk-samples-protocol/tree/main/WebRTC_JavaScript -// MilestoneClient manages the configurations of the server and client -type MilestoneClient struct { - ApiGatewayUrl string - Username string - Password string - ClientID string - Token string - GrantType string - InsecureTls bool - PeerConnection *pion.PeerConnection +type milestoneAPI struct { + url string + query url.Values + token string + sessionID string } -// WebRTCSessionDetails structures the session details for the WebRTC connection. -type WebRTCSessionDetails struct { - CameraId string `json:"cameraId"` - StreamId *string `json:"streamId,omitempty"` - PlaybackTimeNode *PlaybackTimeDetails `json:"playbackTimeNode,omitempty"` - ICEServers []string `json:"iceServers"` - Resolution string `json:"resolution"` -} - -// PlaybackTimeDetails holds optional playback parameters -type PlaybackTimeDetails struct { - PlaybackTime string `json:"playbackTime"` - SkipGaps *bool `json:"skipGaps,omitempty"` - Speed *float64 `json:"speed,omitempty"` -} - -func setupMilestoneClient(rawURL string, query url.Values) *MilestoneClient { - return &MilestoneClient{ - ApiGatewayUrl: rawURL, - Username: query.Get("username"), - Password: query.Get("password"), - ClientID: "GrantValidatorClient", - GrantType: "password", - } -} - -func parseSessionDetails(mc *MilestoneClient, query url.Values) WebRTCSessionDetails { - details := WebRTCSessionDetails{ - CameraId: query.Get("cameraId"), - Resolution: "notInUse", - ICEServers: []string{}, +func (m *milestoneAPI) GetToken() error { + data := url.Values{ + "client_id": {"GrantValidatorClient"}, + "grant_type": {"password"}, + "username": m.query["username"], + "password": m.query["password"], } - if streamId := query.Get("streamId"); streamId != "" { - details.StreamId = &streamId - } - - // Check for playback related details and construct PlaybackTimeNode if necessary - var playbackTimeNode PlaybackTimeDetails - hasPlaybackDetails := false - - if playbackTime := query.Get("playbackTime"); playbackTime != "" { - playbackTimeNode.PlaybackTime = playbackTime - hasPlaybackDetails = true - } - - if skipGaps := query.Get("skipGaps"); skipGaps != "" { - skipGapsBool, err := strconv.ParseBool(skipGaps) - if err == nil { - playbackTimeNode.SkipGaps = &skipGapsBool - hasPlaybackDetails = true - } - } - - if speed := query.Get("speed"); speed != "" { - speedFloat, err := strconv.ParseFloat(speed, 64) - if err == nil { - playbackTimeNode.Speed = &speedFloat - hasPlaybackDetails = true - } - } - - if insecureTls := query.Get("insecureTls"); insecureTls != "" { - insecureTlsBool, err := strconv.ParseBool(insecureTls) - if err == nil { - mc.InsecureTls = insecureTlsBool - } - } - - if hasPlaybackDetails { - details.PlaybackTimeNode = &playbackTimeNode - } - - return details -} - -// Helper function to create an HTTP client based on URL schema -func createHTTPClient(insecureTls bool) *http.Client { - tlsConfig := &tls.Config{} - - // Set InsecureSkipVerify true only for "https" schema - if insecureTls { - tlsConfig.InsecureSkipVerify = true // FIXME, use httpx protocol - } - - return &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsConfig, - }, - } -} - -func createWebRTCSession(mc *MilestoneClient, details WebRTCSessionDetails) (*http.Response, error) { - body, err := json.Marshal(details) - if err != nil { - return nil, err - } - - url := mc.ApiGatewayUrl + "/REST/v1/WebRTC/Session" - req, err := http.NewRequest("POST", url, bytes.NewBuffer(body)) - if err != nil { - return nil, err - } - req.Header.Set("Authorization", "Bearer "+mc.Token) - req.Header.Set("Content-Type", "application/json") - - client := createHTTPClient(mc.InsecureTls) - return client.Do(req) -} - -func updateWebRTCSession(mc *MilestoneClient, sessionID string, answer pion.SessionDescription) (*http.Response, error) { - sdpJSON, err := json.Marshal(answer) - if err != nil { - return nil, err - } - - payload := fmt.Sprintf(`{"answerSDP":%s}`, strconv.Quote(string(sdpJSON))) - url := fmt.Sprintf("%s/REST/v1/WebRTC/Session/%s", mc.ApiGatewayUrl, sessionID) - req, err := http.NewRequest("PATCH", url, bytes.NewBufferString(payload)) - if err != nil { - return nil, err - } - req.Header.Set("Authorization", "Bearer "+mc.Token) - req.Header.Set("Content-Type", "application/json") - - client := createHTTPClient(mc.InsecureTls) - return client.Do(req) -} - -func (mc *MilestoneClient) Authenticate() error { - formData := url.Values{ - "grant_type": {mc.GrantType}, - "username": {mc.Username}, - "password": {mc.Password}, - "client_id": {mc.ClientID}, - } - - client := createHTTPClient(mc.InsecureTls) - resp, err := client.PostForm(mc.ApiGatewayUrl+"/IDP/connect/token", formData) + req, err := http.NewRequest("POST", m.url+"/IDP/connect/token", strings.NewReader(data.Encode())) if err != nil { return err } - defer resp.Body.Close() + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("authentication failed: status code %d", resp.StatusCode) + // support httpx protocol + res, err := tcp.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return errors.New("milesone: authentication failed: " + res.Status) } - var result map[string]interface{} - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + var payload map[string]interface{} + if err = json.NewDecoder(res.Body).Decode(&payload); err != nil { return err } - token, ok := result["access_token"].(string) + token, ok := payload["access_token"].(string) if !ok { - return errors.New("token not found in the response") + return errors.New("milesone: token not found in the response") } - mc.Token = token + + m.token = token return nil } -func milestoneClient(rawURL string, query url.Values, desc string) (core.Producer, error) { - mc := setupMilestoneClient(rawURL, query) +func parseFloat(s string) float64 { + if s == "" { + return 0 + } + f, _ := strconv.ParseFloat(s, 64) + return f +} - details := parseSessionDetails(mc, query) +func (m *milestoneAPI) GetOffer() (string, error) { + request := struct { + CameraId string `json:"cameraId"` + StreamId string `json:"streamId,omitempty"` + PlaybackTimeNode struct { + PlaybackTime string `json:"playbackTime,omitempty"` + SkipGaps bool `json:"skipGaps,omitempty"` + Speed float64 `json:"speed,omitempty"` + } `json:"playbackTimeNode,omitempty"` + //ICEServers []string `json:"iceServers,omitempty"` + //Resolution string `json:"resolution,omitempty"` + }{ + CameraId: m.query.Get("cameraId"), + StreamId: m.query.Get("streamId"), + } + request.PlaybackTimeNode.PlaybackTime = m.query.Get("playbackTime") + request.PlaybackTimeNode.SkipGaps = m.query.Has("skipGaps") + request.PlaybackTimeNode.Speed = parseFloat(m.query.Get("speed")) - if err := mc.Authenticate(); err != nil { - return nil, err + data, err := json.Marshal(request) + if err != nil { + return "", err } - config := pion.Configuration{ - ICEServers: []pion.ICEServer{ - { - URLs: details.ICEServers, - }, - }, + req, err := http.NewRequest("POST", m.url+"/REST/v1/WebRTC/Session", bytes.NewBuffer(data)) + if err != nil { + return "", err + } + req.Header.Set("Authorization", "Bearer "+m.token) + req.Header.Set("Content-Type", "application/json") + + res, err := tcp.Do(req) + if err != nil { + return "", err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return "", errors.New("milesone: create session: " + res.Status) + } + + var response struct { + SessionId string `json:"sessionId"` + OfferSDP string `json:"offerSDP"` + } + if err = json.NewDecoder(res.Body).Decode(&response); err != nil { + return "", err + } + + var offer pion.SessionDescription + if err = json.Unmarshal([]byte(response.OfferSDP), &offer); err != nil { + return "", err + } + + m.sessionID = response.SessionId + + return offer.SDP, nil +} + +func (m *milestoneAPI) SetAnswer(sdp string) error { + answer := pion.SessionDescription{ + Type: pion.SDPTypeAnswer, + SDP: sdp, + } + data, err := json.Marshal(answer) + if err != nil { + return err + } + + request := struct { + AnswerSDP string `json:"answerSDP"` + }{ + AnswerSDP: string(data), + } + if data, err = json.Marshal(request); err != nil { + return err + } + + req, err := http.NewRequest("PATCH", m.url+"/REST/v1/WebRTC/Session/"+m.sessionID, bytes.NewBuffer(data)) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+m.token) + req.Header.Set("Content-Type", "application/json") + + res, err := tcp.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return errors.New("milesone: patch session: " + res.Status) + } + + return nil +} + +func milestoneClient(rawURL string, query url.Values) (core.Producer, error) { + mc := &milestoneAPI{url: rawURL, query: query} + if err := mc.GetToken(); err != nil { + return nil, err } api, err := webrtc.NewAPI() @@ -218,68 +186,32 @@ func milestoneClient(rawURL string, query url.Values, desc string) (core.Produce return nil, err } - mc.PeerConnection, err = api.NewPeerConnection(config) + conf := pion.Configuration{} + pc, err := api.NewPeerConnection(conf) if err != nil { return nil, err } - var sendOffer core.Waiter - defer sendOffer.Done(nil) - - prod := webrtc.NewConn(mc.PeerConnection) - prod.Desc = "WebRTC/OpenIPC" + prod := webrtc.NewConn(pc) + prod.Desc = "WebRTC/Milestone" prod.Mode = core.ModeActiveProducer - resp, err := createWebRTCSession(mc, details) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - responseBody, err := io.ReadAll(resp.Body) + offer, err := mc.GetOffer() if err != nil { return nil, err } - var session struct { - SessionId string `json:"sessionId"` - OfferSDP string `json:"offerSDP"` - } - - if err := json.Unmarshal(responseBody, &session); err != nil { - return nil, fmt.Errorf("error parsing session response: %v", err) - } - - var offer pion.SessionDescription - if err := json.Unmarshal([]byte(session.OfferSDP), &offer); err != nil { - return nil, fmt.Errorf("failed to parse offer SDP: %v", err) - } - - if err = prod.SetOffer(string(offer.SDP)); err != nil { + if err = prod.SetOffer(offer); err != nil { return nil, err } - if err := mc.PeerConnection.SetRemoteDescription(offer); err != nil { - return nil, fmt.Errorf("failed to set remote description: %v", err) - } - - answer, err := mc.PeerConnection.CreateAnswer(nil) - if err != nil { - return nil, fmt.Errorf("failed to create answer: %v", err) - } - - if err := mc.PeerConnection.SetLocalDescription(answer); err != nil { - return nil, fmt.Errorf("failed to set local description: %v", err) - } - - resp, err = updateWebRTCSession(mc, session.SessionId, answer) + answer, err := prod.GetAnswer() if err != nil { return nil, err } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("server responded with non-OK status: %d", resp.StatusCode) + if err = mc.SetAnswer(answer); err != nil { + return nil, err } return prod, nil