diff --git a/.gitignore b/.gitignore index c14acf9..c295d4f 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,6 @@ .vscode/ # Deps -glide.lock /vendor # Golang diff --git a/cameradar/cameradar.go b/cameradar/cameradar.go index db17b35..13d81ef 100644 --- a/cameradar/cameradar.go +++ b/cameradar/cameradar.go @@ -46,6 +46,7 @@ func main() { w := startSpinner(options.EnableLogs) + updateSpinner(w, "Loading dictionaries...", options.EnableLogs) gopath := os.Getenv("GOPATH") options.Credentials = strings.Replace(options.Credentials, "", gopath, 1) options.Routes = strings.Replace(options.Routes, "", gopath, 1) diff --git a/glide.lock b/glide.lock new file mode 100644 index 0000000..6c8f130 --- /dev/null +++ b/glide.lock @@ -0,0 +1,59 @@ +hash: f646c12961eac693464d18f1ab91a7dd734514a1de59dfce2fe6de1507512bf7 +updated: 2017-10-20T08:57:53.219213734+02:00 +imports: +- name: github.com/andelf/go-curl + version: f8b334df3789fbdf98df3b3b22815e958b956c19 +- name: github.com/davecgh/go-spew + version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 + subpackages: + - spew +- name: github.com/fatih/color + version: 570b54cabe6b8eb0bc2dfce68d964677d63b5260 +- name: github.com/gernest/wow + version: 8da164fc5bfb8099d4aceccb1c20d5934054723d + subpackages: + - spin +- name: github.com/go-playground/locales + version: e4cbcb5d0652150d40ad0646651076b6bd2be4f6 + subpackages: + - currency +- name: github.com/go-playground/universal-translator + version: b32fa301c9fe55953584134cb6853a13c87ec0a1 +- name: github.com/gorilla/websocket + version: ea4d1f681babbce9545c9c5f3d5194a789c89f5b +- name: github.com/jessevdk/go-flags + version: 96dc06278ce32a0e9d957d590bb987c81ee66407 +- name: github.com/mattn/go-colorable + version: 5411d3eea5978e6cdc258b30de592b60df6aba96 + repo: https://github.com/mattn/go-colorable +- name: github.com/mattn/go-isatty + version: 57fdcb988a5c543893cc61bce354a6e24ab70022 + repo: https://github.com/mattn/go-isatty +- name: github.com/pkg/errors + version: 645ef00459ed84a119197bfb8d8205042c6df63d +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib +- name: github.com/stretchr/objx + version: cbeaeb16a013161a98496fad62933b1d21786672 +- name: github.com/stretchr/testify + version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 + subpackages: + - assert + - mock +- name: golang.org/x/crypto + version: 9419663f5a44be8b34ca85f08abc5fe1be11f8a3 + subpackages: + - ssh/terminal +- name: golang.org/x/sys + version: e24f485414aeafb646f6fca458b0bf869c0880a1 + repo: https://go.googlesource.com/sys + subpackages: + - unix + - windows +- name: gopkg.in/go-playground/validator.v9 + version: a021b2ec9a8a8bb970f3f15bc42617cb520e8a64 +- name: gopkg.in/tylerb/graceful.v1 + version: 4654dfbb6ad53cb5e27f37d99b02e16c1872fbbb +testImports: [] diff --git a/glide.yaml b/glide.yaml index 318be5d..4dc20b8 100644 --- a/glide.yaml +++ b/glide.yaml @@ -6,18 +6,21 @@ import: - package: github.com/gernest/wow subpackages: - spin +- package: github.com/gorilla/websocket + version: ~1.2.0 - package: github.com/jessevdk/go-flags version: ~1.3.0 - package: github.com/pkg/errors version: ~0.8.0 -- package: gopkg.in/go-playground/validator.v9 - version: ~9.7.0 -- package: github.com/go-playground/universal-translator - version: ~0.16.0 -- package: github.com/go-playground/locales - version: ~0.11.1 -testImport: - package: github.com/stretchr/testify version: ~1.1.4 subpackages: - - assert + - mock +- package: gopkg.in/go-playground/validator.v9 + version: ~9.7.0 +- package: gopkg.in/tylerb/graceful.v1 + version: ~1.2.15 +- package: github.com/go-playground/universal-translator + version: ~0.16.0 +- package: github.com/go-playground/locales + version: ~0.11.2 diff --git a/server/actor/pubsub.go b/server/actor/pubsub.go new file mode 100644 index 0000000..e83838b --- /dev/null +++ b/server/actor/pubsub.go @@ -0,0 +1,48 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actor + +// Sub/Unsub event type +const ( + SubscribeEvent = "subscribe" + UnsubscribeEvent = "unsubscribe" +) + +// Subscription contains a sub/unsub event +type Subscription struct { + Command string + Channel string + SubscribersCount uint +} + +// Publication contains a publish event +type Publication struct { + Channel string + Data string +} + +// PubSub is a generic interface for publishing data to subscribers using channels +// It exposes subscriptions events so the controller can create/delete +// data sources depending on the channels users subscribe to. +// ex: launch a camera stream only when users subscribe to it +type PubSub interface { + Run() + Sub() <-chan *Subscription + Pub() chan<- *Publication +} + +// ChannelAccessChecker allows to check for accesses on a given channel +type ChannelAccessChecker interface { + CheckAccess(channel, accessToken string) bool + ClearCache(accessToken string) +} diff --git a/server/actor/pubsub/mock.go b/server/actor/pubsub/mock.go new file mode 100644 index 0000000..853fe3f --- /dev/null +++ b/server/actor/pubsub/mock.go @@ -0,0 +1,57 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "github.com/stretchr/testify/mock" + + "github.com/EtixLabs/cameradar/server/actor" +) + +// Mock mocks a pubsub actor +type Mock struct { + mock.Mock +} + +// Sub mock +func (m *Mock) Sub() <-chan *actor.Subscription { + args := m.Called() + return args.Get(0).(<-chan *actor.Subscription) +} + +// Pub mock +func (m *Mock) Pub() chan<- *actor.Publication { + args := m.Called() + return args.Get(0).(chan<- *actor.Publication) +} + +// Run mock +func (m *Mock) Run() { + m.Called() +} + +// AccessCheckerMock mocks a channel access checker +type AccessCheckerMock struct { + mock.Mock +} + +// CheckAccess mocks an access check +func (m *AccessCheckerMock) CheckAccess(channel, accessToken string) bool { + args := m.Called(channel, accessToken) + return args.Bool(0) +} + +// ClearCache mocks a cache clear +func (m *AccessCheckerMock) ClearCache(accessToken string) { + m.Called(accessToken) +} diff --git a/server/actor/pubsub/webSocket.go b/server/actor/pubsub/webSocket.go new file mode 100644 index 0000000..93e79ad --- /dev/null +++ b/server/actor/pubsub/webSocket.go @@ -0,0 +1,238 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "fmt" + "net/http" + "strings" + + "github.com/EtixLabs/cameradar/server/actor" + "github.com/EtixLabs/cameradar/server/adaptor" +) + +// events allow to serialize in one go routine all events from the subscribers +const ( + eventSubscribe = "subscribe" + eventUnsubscribe = "unsubscribe" + eventDisconnect = "disconnect" +) + +type pubSubEvent struct { + name string + channel string + client adaptor.WebSocket +} + +// WebSocket manage pubsub communication using a websocket adaptor +type WebSocket struct { + wsf adaptor.WebSocketFactory + + subscriptions map[string][]adaptor.WebSocket + sub chan *actor.Subscription + pub chan *actor.Publication + events chan *pubSubEvent +} + +// NewWebSocket creates a PubSub actor that uses a websockets factory +func NewWebSocket( + wsf adaptor.WebSocketFactory, +) *WebSocket { + wsPubSub := &WebSocket{ + wsf: wsf, + + subscriptions: make(map[string][]adaptor.WebSocket), + sub: make(chan *actor.Subscription), + pub: make(chan *actor.Publication), + events: make(chan *pubSubEvent), + } + go wsPubSub.Run() + return wsPubSub +} + +// Sub return the chan where websocket event gonna be pushed +func (b *WebSocket) Sub() <-chan *actor.Subscription { + return b.sub +} + +// Pub return the chan where we consider publishement will be asked +func (b *WebSocket) Pub() chan<- *actor.Publication { + return b.pub +} + +// Run start to listen on pubsub events +func (b *WebSocket) Run() { + for { + select { + case event := <-b.events: + + client := event.client + channel := event.channel + + switch event.name { + case eventSubscribe: + b.handleSubscribe(client, channel) + case eventUnsubscribe: + b.handleUnsubscribe(client, channel) + case eventDisconnect: + b.handleDisconnect(client) + } + case publication := <-b.pub: + + subscribers := b.subscriptions[publication.Channel] + + // prepend channel name to message, so client knows from which channel + // the message comes from + message := fmt.Sprintf("%s/%s", publication.Channel, publication.Data) + + // broadcast message to subscribers + for _, client := range subscribers { + select { + case client.Write() <- message: + default: + // drop frame + } + } + } + } +} + +// Accept a new incoming connection and create a websocket using the factory +func (b *WebSocket) Accept(w http.ResponseWriter, req *http.Request) { + client, err := b.wsf.NewIncomingWebSocket(w, req) + if err != nil { + fmt.Printf("cannot accept incoming connection: %v\n", err) + return + } + + go b.readClient(client) +} + +func (b *WebSocket) readClient(client adaptor.WebSocket) { + for { + message, ok := <-client.Read() + if !ok { + // connection channel closed, disconnect client (in the main routine) + b.events <- &pubSubEvent{ + name: eventDisconnect, + client: client, + } + return + } + + // expect text message + command, ok := message.(string) + if !ok { + fmt.Printf("invalid non-text message: %v\n", message) + return + } + + // process command + // NOTE: if another protocol is needed, extract this behavior + if strings.HasPrefix(command, "s/") { + channel := strings.TrimPrefix(command, "s/") + + // process in main routine + b.events <- &pubSubEvent{ + name: eventSubscribe, + client: client, + channel: channel, + } + } else if strings.HasPrefix(command, "u/") { + channel := strings.TrimPrefix(command, "u/") + + // process in main routine + b.events <- &pubSubEvent{ + name: eventUnsubscribe, + client: client, + channel: channel, + } + } else { + fmt.Printf("invalid message '%s', should be [s|u]/{channel}\n", command) + } + } +} + +func (b *WebSocket) handleSubscribe(client adaptor.WebSocket, channel string) { + // if client is already subscribed, ignore + if b.alreadySubscribed(client, channel) { + return + } + + // add to subscribers map + subscribersCount := b.addSubscription(client, channel) + + // notify external world + b.sub <- &actor.Subscription{ + Command: actor.SubscribeEvent, + Channel: channel, + SubscribersCount: subscribersCount, + } +} + +func (b *WebSocket) handleUnsubscribe(client adaptor.WebSocket, channel string) { + // if client didn't subscribe, ignore + if !b.alreadySubscribed(client, channel) { + return + } + + // remove from map + subscribersCount := b.removeSubscription(client, channel) + + // notify external world + b.sub <- &actor.Subscription{ + Command: actor.UnsubscribeEvent, + Channel: channel, + SubscribersCount: subscribersCount, + } +} + +func (b *WebSocket) handleDisconnect(client adaptor.WebSocket) { + + // unsubscribe client from all its channels + for channel := range b.subscriptions { + b.handleUnsubscribe(client, channel) + } + + // close client write channel + close(client.Write()) +} + +func (b *WebSocket) alreadySubscribed(client adaptor.WebSocket, channel string) bool { + clients := b.subscriptions[channel] + for _, c := range clients { + if c == client { + return true + } + } + return false +} + +// addSubscription adds a subscription of a client to a channel and returns +// the new number of subscriptions on the given channel +func (b *WebSocket) addSubscription(client adaptor.WebSocket, channel string) uint { + b.subscriptions[channel] = append(b.subscriptions[channel], client) + return uint(len(b.subscriptions[channel])) +} + +func (b *WebSocket) removeSubscription(client adaptor.WebSocket, channel string) uint { + clients := b.subscriptions[channel] + for i, current := range clients { + if current == client { + clients = append(clients[:i], clients[i+1:]...) + break + } + } + b.subscriptions[channel] = clients + return uint(len(clients)) +} diff --git a/server/adaptor/websocket.go b/server/adaptor/websocket.go new file mode 100644 index 0000000..14de207 --- /dev/null +++ b/server/adaptor/websocket.go @@ -0,0 +1,20 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adaptor + +// WebSocket is an interface that represents an authenticated websocket connection +type WebSocket interface { + AccessToken() string + Read() <-chan interface{} + Write() chan<- interface{} +} diff --git a/server/adaptor/websocket/factoryMock.go b/server/adaptor/websocket/factoryMock.go new file mode 100644 index 0000000..946f7fe --- /dev/null +++ b/server/adaptor/websocket/factoryMock.go @@ -0,0 +1,45 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package websocket + +import ( + "net/http" + + "github.com/EtixLabs/cameradar/server/adaptor" + + "github.com/stretchr/testify/mock" +) + +// FactoryMock mocks a websocket factory +type FactoryMock struct { + mock.Mock +} + +// NewIncomingWebSocket mocks the creation of a websocket adaptor +func (m *FactoryMock) NewIncomingWebSocket( + w http.ResponseWriter, + req *http.Request, +) (adaptor.WebSocket, error) { + args := m.Called(w, req) + return args.Get(0).(adaptor.WebSocket), args.Error(1) +} + +// NewWebSocket mocks the creation of a websocket adaptor +func (m *FactoryMock) NewWebSocket(url string) (adaptor.WebSocket, error) { + args := m.Called(url) + ws := args.Get(0) + if ws != nil { + return ws.(adaptor.WebSocket), args.Error(1) + } + return nil, args.Error(1) +} diff --git a/server/adaptor/websocket/gorilla.go b/server/adaptor/websocket/gorilla.go new file mode 100644 index 0000000..6c813da --- /dev/null +++ b/server/adaptor/websocket/gorilla.go @@ -0,0 +1,119 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package websocket + +import ( + "fmt" + "time" + + gorilla "github.com/gorilla/websocket" +) + +// Gorilla implements WebSocket interface using Gorilla library +type Gorilla struct { + conn *gorilla.Conn + accessToken string + + input chan interface{} + output chan interface{} +} + +// AccessToken returns the user authentication token +func (g *Gorilla) AccessToken() string { + return g.accessToken +} + +// Write return a chan to retrieve websocket inputs +func (g *Gorilla) Read() <-chan interface{} { + return g.input +} + +// Write returns a chan to write on websocket +func (g *Gorilla) Write() chan<- interface{} { + return g.output +} + +func (g *Gorilla) read(readLimit int64, pongWait time.Duration) { + defer (func() { + g.conn.Close() + close(g.input) + })() + + // setup read to timeout if no pong is received after `pongWait` + g.conn.SetReadDeadline(time.Now().Add(pongWait)) + g.conn.SetPongHandler(func(string) error { + g.conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + g.conn.SetReadLimit(readLimit) + for { + messageType, message, err := g.conn.ReadMessage() + if err != nil { + if _, ok := err.(*gorilla.CloseError); ok { + fmt.Printf("ws connection closed from %v (%v)\n", g.conn.RemoteAddr(), err) + } else { + // most of the time, a read error is not an error (connection closed, ...) + fmt.Printf("ws read error: %v\n", err) + } + break + } + + switch messageType { + case gorilla.TextMessage: + g.input <- string(message) + case gorilla.BinaryMessage: + g.input <- message + default: + fmt.Printf("received invalid message type: %v\n", messageType) + } + + } +} + +func (g *Gorilla) pingAndWrite(pingInterval time.Duration) { + defer g.conn.Close() + + pinger := time.NewTicker(pingInterval) + + for { + select { + case <-pinger.C: + if err := g.conn.WriteMessage(gorilla.PingMessage, []byte{}); err != nil { + fmt.Printf("ping write error: %v\n", err) + return + } + case message, ok := <-g.output: + if !ok { + // chan closed, stop write routine + return + } + + var err error + + switch msg := message.(type) { + case []byte: + err = g.conn.WriteMessage(gorilla.BinaryMessage, msg) + case string: + err = g.conn.WriteMessage(gorilla.TextMessage, []byte(msg)) + default: + err = fmt.Errorf("invalid message type: %T", msg) + } + + if err != nil { + fmt.Printf("write error: %v\n", err) + return + } + } + } +} diff --git a/server/adaptor/websocket/gorillaFactory.go b/server/adaptor/websocket/gorillaFactory.go new file mode 100644 index 0000000..f068df7 --- /dev/null +++ b/server/adaptor/websocket/gorillaFactory.go @@ -0,0 +1,132 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package websocket + +import ( + "fmt" + "net/http" + "time" + + "github.com/EtixLabs/cameradar/server/adaptor" + + gorilla "github.com/gorilla/websocket" + "github.com/pkg/errors" +) + +// GorillaFactory is a websocket Factory using Gorilla websocket client +type GorillaFactory struct { + readLimit int64 + pingInterval time.Duration + pongWait time.Duration + writeChanBufferSize uint + upgrader gorilla.Upgrader +} + +// NewGorillaFactory instantiates and returns a Gorilla Factory +func NewGorillaFactory(options ...func(*GorillaFactory)) *GorillaFactory { + gf := &GorillaFactory{ + // readLimit: default to 1MB + readLimit: 1024 * 1024, + pingInterval: 5 * time.Second, + pongWait: 10 * time.Second, + + // allow about 1 frame per grid cell to be buffered (grids contain about ~16 cameras) + // NOTE: this should be about the same size as the number of subcriptions the client has + writeChanBufferSize: 20, + + // default upgrader: don't check requests origin + upgrader: gorilla.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + }, + } + // apply the options to the struct + for _, option := range options { + option(gf) + } + return gf +} + +// NewIncomingWebSocket instantiates a Gorilla websocket from an http incoming connection +func (gf *GorillaFactory) NewIncomingWebSocket(w http.ResponseWriter, req *http.Request) (adaptor.WebSocket, error) { + fmt.Printf("new ws connection from %v\n", req.RemoteAddr) + + // create WS connection + conn, err := gf.upgrader.Upgrade(w, req, nil) + if err != nil { + return nil, errors.Wrap(err, "cannot upgrade ws connection") + } + + g := &Gorilla{ + conn: conn, + accessToken: req.Header.Get("Sec-WebSocket-Protocol"), + + input: make(chan interface{}), + output: make(chan interface{}, gf.writeChanBufferSize), + } + + go g.pingAndWrite(gf.pingInterval) + go g.read(gf.readLimit, gf.pongWait) + + return g, nil +} + +// NewWebSocket attemps to connect to a ws server using Gorilla library +func (gf *GorillaFactory) NewWebSocket(url string) (adaptor.WebSocket, error) { + fmt.Printf("opening new ws connection to %v\n", url) + + // create WS connection + conn, _, err := gorilla.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, errors.Wrap(err, "cannot open ws connection") + } + + g := &Gorilla{ + conn: conn, + + input: make(chan interface{}), + output: make(chan interface{}, gf.writeChanBufferSize), + } + + go g.pingAndWrite(gf.pingInterval) + go g.read(gf.readLimit, gf.pongWait) + + return g, nil +} + +// SetReadLimit sets the maximum size read from an incoming message +func SetReadLimit(limit int64) func(gf *GorillaFactory) { + return func(gf *GorillaFactory) { + gf.readLimit = limit + } +} + +// SetPingInterval sets the interval between pings +func SetPingInterval(interval time.Duration) func(gf *GorillaFactory) { + return func(gf *GorillaFactory) { + gf.pingInterval = interval + } +} + +// SetPongWait sets the time before read should timeout if no pong is received +func SetPongWait(pongWait time.Duration) func(gf *GorillaFactory) { + return func(gf *GorillaFactory) { + gf.pongWait = pongWait + } +} + +// SetWriteChanBufferSize sets the buffer size of the write channel +func SetWriteChanBufferSize(size uint) func(gf *GorillaFactory) { + return func(gf *GorillaFactory) { + gf.writeChanBufferSize = size + } +} diff --git a/server/adaptor/websocket/mock.go b/server/adaptor/websocket/mock.go new file mode 100644 index 0000000..5c6ea3a --- /dev/null +++ b/server/adaptor/websocket/mock.go @@ -0,0 +1,74 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package websocket + +import ( + "github.com/stretchr/testify/mock" +) + +// Mock mocks a websocket adaptor +type Mock struct { + mock.Mock + + Token string + ReadChan chan interface{} + WriteChan chan interface{} +} + +// NewMock create a new websocket adaptor mock, with helper read/write +// chans already created +func NewMock(accessToken string, writeChanBuffer uint) *Mock { + return &Mock{ + Token: accessToken, + ReadChan: make(chan interface{}), + WriteChan: make(chan interface{}, writeChanBuffer), + } +} + +// AccessToken mocks token getter +func (m *Mock) AccessToken() string { + args := m.Called() + return args.String(0) +} + +// OnAccessToken is a helper method to setup an "AccessToken()" handler +// with the mock accessToken +func (m *Mock) OnAccessToken() *mock.Call { + return m.On("AccessToken").Return(m.Token) +} + +// Read mocks read channel getter +func (m *Mock) Read() <-chan interface{} { + args := m.Called() + return args.Get(0).(<-chan interface{}) +} + +// OnRead is a helper method to setup a "Read()" handler +// with the mock readChan +func (m *Mock) OnRead() *mock.Call { + var readOnlyChan <-chan interface{} = m.ReadChan + return m.On("Read").Return(readOnlyChan) +} + +// Write mocks write channel getter +func (m *Mock) Write() chan<- interface{} { + args := m.Called() + return args.Get(0).(chan<- interface{}) +} + +// OnWrite is a helper method to setup a "Write()" handler +// with the mock writeChan +func (m *Mock) OnWrite() *mock.Call { + var writeOnlyChan chan<- interface{} = m.WriteChan + return m.On("Write").Return(writeOnlyChan) +} diff --git a/server/adaptor/websocketFactory.go b/server/adaptor/websocketFactory.go new file mode 100644 index 0000000..4c7567e --- /dev/null +++ b/server/adaptor/websocketFactory.go @@ -0,0 +1,21 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adaptor + +import "net/http" + +// WebSocketFactory is an interface for creating generic websocket connections +type WebSocketFactory interface { + NewIncomingWebSocket(w http.ResponseWriter, req *http.Request) (WebSocket, error) + NewWebSocket(url string) (WebSocket, error) +} diff --git a/server/main.go b/server/main.go new file mode 100644 index 0000000..38ea58e --- /dev/null +++ b/server/main.go @@ -0,0 +1,47 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "net/http" + "os" + + "github.com/EtixLabs/cameradar/server/actor/pubsub" + "github.com/EtixLabs/cameradar/server/adaptor/websocket" + graceful "gopkg.in/tylerb/graceful.v1" +) + +func main() { + webSocketFactory := websocket.NewGorillaFactory() + pubsub := pubsub.NewWebSocket(webSocketFactory) + + // create and setup the http server + serverMux := http.NewServeMux() + serverMux.HandleFunc("/", pubsub.Accept) + + httpServer := &graceful.Server{ + NoSignalHandling: true, + Server: &http.Server{ + Addr: fmt.Sprintf("%v:%v", "0.0.0.0", 7000), + Handler: serverMux, + }, + } + + fmt.Printf("cameradar server listening on %v\n", httpServer.Addr) + err := httpServer.ListenAndServe() + if err != nil { + fmt.Printf("could not start server: %v\n", err) + os.Exit(1) + } +} diff --git a/server/service/service.go b/server/service/service.go new file mode 100644 index 0000000..283dbf5 --- /dev/null +++ b/server/service/service.go @@ -0,0 +1,131 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "fmt" + "time" + + "github.com/EtixLabs/cameradar" + "github.com/pkg/errors" +) + +// Cameradar is the service in charge of communicating with the GUI +type Cameradar struct { + Streams []cmrdr.Stream + + options *Options +} + +// Options contains all options needed to launch a complete cameradar scan +type Options struct { + Target string + Ports string + OutputFile string + Routes cmrdr.Routes + Credentials cmrdr.Credentials + Speed int + Timeout time.Duration +} + +// NewCameradar instanciates a new Cameradar service +func NewCameradar(routesFilePath, credentialsFilePath string) (*Cameradar, error) { + routes, err := cmrdr.LoadRoutes(routesFilePath) + if err != nil { + return nil, errors.Wrap(err, "can't load routes dictionary") + } + + credentials, err := cmrdr.LoadCredentials(credentialsFilePath) + if err != nil { + return nil, errors.Wrap(err, "can't load credentials dictionary") + } + + return &Cameradar{ + Streams: nil, + options: &Options{ + Ports: "554,8554", + Routes: routes, + Credentials: credentials, + Speed: 4, + Timeout: 2000, + }, + }, nil +} + +// Discover launches a Cameradar scan using the service's options +func (c *Cameradar) Discover() error { + streams, err := cmrdr.Discover(c.options.Target, c.options.Ports, c.options.OutputFile, c.options.Speed, true) + if err != nil { + return errors.Wrap(err, "could not discover streams") + } + c.Streams = streams + return nil +} + +// AttackRoute launches a Cameradar route attack using the service's options +func (c *Cameradar) AttackRoute() error { + streams, err := cmrdr.AttackRoute(c.Streams, c.options.Routes, c.options.Timeout, true) + if err != nil { + return errors.Wrap(err, "could not discover streams") + } + c.Streams = streams + return nil +} + +// AttackCredentials launches a Cameradar credential attack using the service's options +func (c *Cameradar) AttackCredentials() error { + streams, err := cmrdr.AttackCredentials(c.Streams, c.options.Credentials, c.options.Timeout, true) + if err != nil { + return errors.Wrap(err, "could not discover streams") + } + c.Streams = streams + return nil +} + +// SetNmapOutputFile sets the OutputFile option +func (c *Cameradar) SetNmapOutputFile(path string) { + c.options.OutputFile = path +} + +// SetRoutes overwrites the routes dictionary with new values +func (c *Cameradar) SetRoutes(routes string) { + c.options.Routes = cmrdr.ParseRoutesFromString(routes) +} + +// SetCredentials overwrites the routes dictionary with new values +func (c *Cameradar) SetCredentials(credentials string) error { + newCredentials, err := cmrdr.ParseCredentialsFromString(credentials) + if err != nil { + return errors.Wrap(err, "could not decode credentials") + } + c.options.Credentials = newCredentials + return nil +} + +// SetSpeed sets the Speed option +func (c *Cameradar) SetSpeed(speed int) error { + if speed < cmrdr.PARANOIAC || speed > cmrdr.INSANE { + return fmt.Errorf("invalid speed value '%d'. should be between '%d' and '%d'", speed, cmrdr.PARANOIAC, cmrdr.INSANE) + } + c.options.Speed = speed + return nil +} + +// SetTimeout sets the Timeout option +func (c *Cameradar) SetTimeout(timeout int) error { + if timeout < 0 { + return fmt.Errorf("invalid timeout value '%d'. should be superior to 0", timeout) + } + c.options.Timeout = time.Millisecond * time.Duration(timeout) + return nil +}