diff --git a/loaders.go b/loaders.go index d506c80..9473e72 100644 --- a/loaders.go +++ b/loaders.go @@ -17,6 +17,7 @@ import ( "encoding/json" "io/ioutil" "os" + "strings" "github.com/pkg/errors" ) @@ -56,3 +57,21 @@ func LoadRoutes(path string) (Routes, error) { return routes, scanner.Err() } + +// ParseCredentialsFromString parses a dictionary string and returns its contents as a Credentials structure +func ParseCredentialsFromString(content string) (Credentials, error) { + var creds Credentials + + // Unmarshal content of JSON file into data structure + err := json.Unmarshal([]byte(content), &creds) + if err != nil { + return creds, err + } + + return creds, nil +} + +// ParseRoutesFromString parses a dictionary string and returns its contents as a Routes structure +func ParseRoutesFromString(content string) Routes { + return strings.Split(content, "\n") +} diff --git a/server/actor/pubsub.go b/server/actor/pubsub.go deleted file mode 100644 index e83838b..0000000 --- a/server/actor/pubsub.go +++ /dev/null @@ -1,48 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package actor - -// Sub/Unsub event type -const ( - SubscribeEvent = "subscribe" - UnsubscribeEvent = "unsubscribe" -) - -// Subscription contains a sub/unsub event -type Subscription struct { - Command string - Channel string - SubscribersCount uint -} - -// Publication contains a publish event -type Publication struct { - Channel string - Data string -} - -// PubSub is a generic interface for publishing data to subscribers using channels -// It exposes subscriptions events so the controller can create/delete -// data sources depending on the channels users subscribe to. -// ex: launch a camera stream only when users subscribe to it -type PubSub interface { - Run() - Sub() <-chan *Subscription - Pub() chan<- *Publication -} - -// ChannelAccessChecker allows to check for accesses on a given channel -type ChannelAccessChecker interface { - CheckAccess(channel, accessToken string) bool - ClearCache(accessToken string) -} diff --git a/server/actor/pubsub/mock.go b/server/actor/pubsub/mock.go deleted file mode 100644 index 853fe3f..0000000 --- a/server/actor/pubsub/mock.go +++ /dev/null @@ -1,57 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import ( - "github.com/stretchr/testify/mock" - - "github.com/EtixLabs/cameradar/server/actor" -) - -// Mock mocks a pubsub actor -type Mock struct { - mock.Mock -} - -// Sub mock -func (m *Mock) Sub() <-chan *actor.Subscription { - args := m.Called() - return args.Get(0).(<-chan *actor.Subscription) -} - -// Pub mock -func (m *Mock) Pub() chan<- *actor.Publication { - args := m.Called() - return args.Get(0).(chan<- *actor.Publication) -} - -// Run mock -func (m *Mock) Run() { - m.Called() -} - -// AccessCheckerMock mocks a channel access checker -type AccessCheckerMock struct { - mock.Mock -} - -// CheckAccess mocks an access check -func (m *AccessCheckerMock) CheckAccess(channel, accessToken string) bool { - args := m.Called(channel, accessToken) - return args.Bool(0) -} - -// ClearCache mocks a cache clear -func (m *AccessCheckerMock) ClearCache(accessToken string) { - m.Called(accessToken) -} diff --git a/server/actor/pubsub/webSocket.go b/server/actor/pubsub/webSocket.go deleted file mode 100644 index 93e79ad..0000000 --- a/server/actor/pubsub/webSocket.go +++ /dev/null @@ -1,238 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import ( - "fmt" - "net/http" - "strings" - - "github.com/EtixLabs/cameradar/server/actor" - "github.com/EtixLabs/cameradar/server/adaptor" -) - -// events allow to serialize in one go routine all events from the subscribers -const ( - eventSubscribe = "subscribe" - eventUnsubscribe = "unsubscribe" - eventDisconnect = "disconnect" -) - -type pubSubEvent struct { - name string - channel string - client adaptor.WebSocket -} - -// WebSocket manage pubsub communication using a websocket adaptor -type WebSocket struct { - wsf adaptor.WebSocketFactory - - subscriptions map[string][]adaptor.WebSocket - sub chan *actor.Subscription - pub chan *actor.Publication - events chan *pubSubEvent -} - -// NewWebSocket creates a PubSub actor that uses a websockets factory -func NewWebSocket( - wsf adaptor.WebSocketFactory, -) *WebSocket { - wsPubSub := &WebSocket{ - wsf: wsf, - - subscriptions: make(map[string][]adaptor.WebSocket), - sub: make(chan *actor.Subscription), - pub: make(chan *actor.Publication), - events: make(chan *pubSubEvent), - } - go wsPubSub.Run() - return wsPubSub -} - -// Sub return the chan where websocket event gonna be pushed -func (b *WebSocket) Sub() <-chan *actor.Subscription { - return b.sub -} - -// Pub return the chan where we consider publishement will be asked -func (b *WebSocket) Pub() chan<- *actor.Publication { - return b.pub -} - -// Run start to listen on pubsub events -func (b *WebSocket) Run() { - for { - select { - case event := <-b.events: - - client := event.client - channel := event.channel - - switch event.name { - case eventSubscribe: - b.handleSubscribe(client, channel) - case eventUnsubscribe: - b.handleUnsubscribe(client, channel) - case eventDisconnect: - b.handleDisconnect(client) - } - case publication := <-b.pub: - - subscribers := b.subscriptions[publication.Channel] - - // prepend channel name to message, so client knows from which channel - // the message comes from - message := fmt.Sprintf("%s/%s", publication.Channel, publication.Data) - - // broadcast message to subscribers - for _, client := range subscribers { - select { - case client.Write() <- message: - default: - // drop frame - } - } - } - } -} - -// Accept a new incoming connection and create a websocket using the factory -func (b *WebSocket) Accept(w http.ResponseWriter, req *http.Request) { - client, err := b.wsf.NewIncomingWebSocket(w, req) - if err != nil { - fmt.Printf("cannot accept incoming connection: %v\n", err) - return - } - - go b.readClient(client) -} - -func (b *WebSocket) readClient(client adaptor.WebSocket) { - for { - message, ok := <-client.Read() - if !ok { - // connection channel closed, disconnect client (in the main routine) - b.events <- &pubSubEvent{ - name: eventDisconnect, - client: client, - } - return - } - - // expect text message - command, ok := message.(string) - if !ok { - fmt.Printf("invalid non-text message: %v\n", message) - return - } - - // process command - // NOTE: if another protocol is needed, extract this behavior - if strings.HasPrefix(command, "s/") { - channel := strings.TrimPrefix(command, "s/") - - // process in main routine - b.events <- &pubSubEvent{ - name: eventSubscribe, - client: client, - channel: channel, - } - } else if strings.HasPrefix(command, "u/") { - channel := strings.TrimPrefix(command, "u/") - - // process in main routine - b.events <- &pubSubEvent{ - name: eventUnsubscribe, - client: client, - channel: channel, - } - } else { - fmt.Printf("invalid message '%s', should be [s|u]/{channel}\n", command) - } - } -} - -func (b *WebSocket) handleSubscribe(client adaptor.WebSocket, channel string) { - // if client is already subscribed, ignore - if b.alreadySubscribed(client, channel) { - return - } - - // add to subscribers map - subscribersCount := b.addSubscription(client, channel) - - // notify external world - b.sub <- &actor.Subscription{ - Command: actor.SubscribeEvent, - Channel: channel, - SubscribersCount: subscribersCount, - } -} - -func (b *WebSocket) handleUnsubscribe(client adaptor.WebSocket, channel string) { - // if client didn't subscribe, ignore - if !b.alreadySubscribed(client, channel) { - return - } - - // remove from map - subscribersCount := b.removeSubscription(client, channel) - - // notify external world - b.sub <- &actor.Subscription{ - Command: actor.UnsubscribeEvent, - Channel: channel, - SubscribersCount: subscribersCount, - } -} - -func (b *WebSocket) handleDisconnect(client adaptor.WebSocket) { - - // unsubscribe client from all its channels - for channel := range b.subscriptions { - b.handleUnsubscribe(client, channel) - } - - // close client write channel - close(client.Write()) -} - -func (b *WebSocket) alreadySubscribed(client adaptor.WebSocket, channel string) bool { - clients := b.subscriptions[channel] - for _, c := range clients { - if c == client { - return true - } - } - return false -} - -// addSubscription adds a subscription of a client to a channel and returns -// the new number of subscriptions on the given channel -func (b *WebSocket) addSubscription(client adaptor.WebSocket, channel string) uint { - b.subscriptions[channel] = append(b.subscriptions[channel], client) - return uint(len(b.subscriptions[channel])) -} - -func (b *WebSocket) removeSubscription(client adaptor.WebSocket, channel string) uint { - clients := b.subscriptions[channel] - for i, current := range clients { - if current == client { - clients = append(clients[:i], clients[i+1:]...) - break - } - } - b.subscriptions[channel] = clients - return uint(len(clients)) -} diff --git a/server/actor/server.go b/server/actor/server.go new file mode 100644 index 0000000..5487644 --- /dev/null +++ b/server/actor/server.go @@ -0,0 +1,19 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actor + +// Server is a generic interface for creating a bidirectional +// communication server through websocket. +type Server interface { + Run() +} diff --git a/server/actor/server/mock.go b/server/actor/server/mock.go new file mode 100644 index 0000000..e9d2d0c --- /dev/null +++ b/server/actor/server/mock.go @@ -0,0 +1,29 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + + "github.com/stretchr/testify/mock" +) + +// Mock mocks a pubsub actor +type Mock struct { + mock.Mock +} + +// Accept mock +func (m *Mock) Accept(w http.ResponseWriter, req *http.Request) { + m.Called(w, req) +} diff --git a/server/actor/server/webSocket.go b/server/actor/server/webSocket.go new file mode 100644 index 0000000..9dd4d08 --- /dev/null +++ b/server/actor/server/webSocket.go @@ -0,0 +1,81 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "net/http" + + "github.com/EtixLabs/cameradar/server/adaptor" +) + +// WebSocket manages server communication using a websocket adaptor +type WebSocket struct { + wsf adaptor.WebSocketFactory + + client adaptor.WebSocket + + fromClient chan<- string + toClient <-chan string + disconnect chan interface{} +} + +// New creates a Server actor that uses a WebSocketFactory +func New( + wsf adaptor.WebSocketFactory, + fromClient chan string, + toClient chan string, +) *WebSocket { + wsServer := &WebSocket{ + wsf: wsf, + + fromClient: fromClient, + toClient: toClient, + } + return wsServer +} + +// Accept a new incoming connection and create a websocket using the factory +func (ws *WebSocket) Accept(w http.ResponseWriter, req *http.Request) { + client, err := ws.wsf.NewIncomingWebSocket(w, req) + if err != nil { + fmt.Printf("cannot accept incoming connection: %v\n", err) + return + } + + go ws.readClient(client) +} + +func (ws *WebSocket) readClient(client adaptor.WebSocket) { + for { + select { + case message, ok := <-client.Read(): + if !ok { + // connection channel closed, disconnect client (in the main routine) + ws.disconnect <- struct{}{} + println("client disconnected") + return + } + + // expect text message + msg, ok := message.(string) + if !ok { + fmt.Printf("invalid non-text message: %v\n", message) + return + } + ws.fromClient <- msg + case msg := <-ws.toClient: + client.Write() <- msg + } + } +} diff --git a/server/main.go b/server/main.go index 38ea58e..fcf1c98 100644 --- a/server/main.go +++ b/server/main.go @@ -17,18 +17,33 @@ import ( "net/http" "os" - "github.com/EtixLabs/cameradar/server/actor/pubsub" + "github.com/EtixLabs/cameradar/server/actor/server" "github.com/EtixLabs/cameradar/server/adaptor/websocket" + "github.com/EtixLabs/cameradar/server/service" graceful "gopkg.in/tylerb/graceful.v1" ) func main() { webSocketFactory := websocket.NewGorillaFactory() - pubsub := pubsub.NewWebSocket(webSocketFactory) + fromClient := make(chan string) + toClient := make(chan string) + + server := server.New(webSocketFactory, fromClient, toClient) + + _, err := service.New( + "/Users/ullaakut/Work/go/src/github.com/EtixLabs/cameradar/dictionaries/routes", + "/Users/ullaakut/Work/go/src/github.com/EtixLabs/cameradar/dictionaries/credentials.json", + fromClient, + toClient, + ) + if err != nil { + fmt.Printf("could not start service: %v", err) + os.Exit(1) + } // create and setup the http server serverMux := http.NewServeMux() - serverMux.HandleFunc("/", pubsub.Accept) + serverMux.HandleFunc("/", server.Accept) httpServer := &graceful.Server{ NoSignalHandling: true, @@ -39,7 +54,7 @@ func main() { } fmt.Printf("cameradar server listening on %v\n", httpServer.Addr) - err := httpServer.ListenAndServe() + err = httpServer.ListenAndServe() if err != nil { fmt.Printf("could not start server: %v\n", err) os.Exit(1) diff --git a/server/service/service.go b/server/service/service.go index 283dbf5..181b220 100644 --- a/server/service/service.go +++ b/server/service/service.go @@ -13,6 +13,7 @@ package service import ( + "encoding/json" "fmt" "time" @@ -25,6 +26,15 @@ type Cameradar struct { Streams []cmrdr.Stream options *Options + + toClient chan<- string + fromClient <-chan string +} + +type request struct { + Method string + Target string + Ports string } // Options contains all options needed to launch a complete cameradar scan @@ -38,8 +48,8 @@ type Options struct { Timeout time.Duration } -// NewCameradar instanciates a new Cameradar service -func NewCameradar(routesFilePath, credentialsFilePath string) (*Cameradar, error) { +// New instanciates a new Cameradar service +func New(routesFilePath, credentialsFilePath string, fromClient <-chan string, toClient chan<- string) (*Cameradar, error) { routes, err := cmrdr.LoadRoutes(routesFilePath) if err != nil { return nil, errors.Wrap(err, "can't load routes dictionary") @@ -50,7 +60,7 @@ func NewCameradar(routesFilePath, credentialsFilePath string) (*Cameradar, error return nil, errors.Wrap(err, "can't load credentials dictionary") } - return &Cameradar{ + cameradar := &Cameradar{ Streams: nil, options: &Options{ Ports: "554,8554", @@ -59,7 +69,41 @@ func NewCameradar(routesFilePath, credentialsFilePath string) (*Cameradar, error Speed: 4, Timeout: 2000, }, - }, nil + + fromClient: fromClient, + toClient: toClient, + } + + go cameradar.Run() + return cameradar, nil +} + +// Run launches the service that will automatically call the service methods +// using the instructions received over websocket +func (c *Cameradar) Run() { + for { + msg, ok := <-c.fromClient + if !ok { + println("disconnected") + return + } + + var req request + err := json.Unmarshal([]byte(msg), &req) + if err != nil { + c.toClient <- "invalid request: " + err.Error() + continue + } + + switch req.Method { + case "discover": + c.toClient <- "" + case "attack": + c.toClient <- "" + default: + c.toClient <- "invalid method: " + req.Method + } + } } // Discover launches a Cameradar scan using the service's options