mirror of
https://github.com/netbirdio/netbird.git
synced 2025-03-26 15:36:57 +01:00
Delete peer (#114)
* feature: add peer deletion * feature: add peer deletion [CLIENT] * fix: lint error * test: fix sync block * test: fix management test * feature: add client stop after was deleted * chore: remove permission denied cancellation * chore: add larger signal backoff * feature: notify deleted peer of removal * fix: lint issue * chore: add 2nd default key - one off * test: fix account key check
This commit is contained in:
parent
a859f6c511
commit
ec759bc461
@ -37,8 +37,8 @@ func startManagement(config *mgmt.Config, t *testing.T) (*grpc.Server, net.Liste
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
accountManager := mgmt.NewManager(store)
|
||||
peersUpdateManager := mgmt.NewPeersUpdateManager()
|
||||
accountManager := mgmt.NewManager(store, peersUpdateManager)
|
||||
turnManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig)
|
||||
mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager)
|
||||
if err != nil {
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/wiretrustee/wiretrustee/client/internal"
|
||||
"github.com/wiretrustee/wiretrustee/iface"
|
||||
mgm "github.com/wiretrustee/wiretrustee/management/client"
|
||||
mgmProto "github.com/wiretrustee/wiretrustee/management/proto"
|
||||
signal "github.com/wiretrustee/wiretrustee/signal/client"
|
||||
@ -38,8 +37,8 @@ var (
|
||||
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
mgmTlsEnabled := false
|
||||
if config.ManagementURL.Scheme == "https" {
|
||||
@ -67,7 +66,7 @@ var (
|
||||
}
|
||||
|
||||
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
|
||||
engine := internal.NewEngine(signalClient, mgmClient, engineConfig)
|
||||
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel)
|
||||
err = engine.Start()
|
||||
if err != nil {
|
||||
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
|
||||
@ -75,7 +74,12 @@ var (
|
||||
}
|
||||
|
||||
SetupCloseHandler()
|
||||
<-stopCh
|
||||
|
||||
select {
|
||||
case <-stopCh:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
log.Infof("receive signal to stop running")
|
||||
err = mgmClient.Close()
|
||||
if err != nil {
|
||||
@ -88,10 +92,9 @@ var (
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("removing Wiretrustee interface %s", config.WgIface)
|
||||
err = iface.Close()
|
||||
err = engine.Stop()
|
||||
if err != nil {
|
||||
log.Errorf("failed closing Wiretrustee interface %s %v", config.WgIface, err)
|
||||
log.Errorf("failed stopping engine %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
ice "github.com/pion/ice/v2"
|
||||
@ -54,6 +55,8 @@ type Engine struct {
|
||||
STUNs []*ice.URL
|
||||
// TURNs is a list of STUN servers used by ICE
|
||||
TURNs []*ice.URL
|
||||
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// Peer is an instance of the Connection Peer
|
||||
@ -63,7 +66,7 @@ type Peer struct {
|
||||
}
|
||||
|
||||
// NewEngine creates a new Connection Engine
|
||||
func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig) *Engine {
|
||||
func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc) *Engine {
|
||||
return &Engine{
|
||||
signal: signalClient,
|
||||
mgmClient: mgmClient,
|
||||
@ -73,9 +76,21 @@ func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *Engin
|
||||
config: config,
|
||||
STUNs: []*ice.URL{},
|
||||
TURNs: []*ice.URL{},
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) Stop() error {
|
||||
log.Debugf("removing Wiretrustee interface %s", e.config.WgIface)
|
||||
err := iface.Close()
|
||||
if err != nil {
|
||||
log.Errorf("failed closing Wiretrustee interface %s %v", e.config.WgIface, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start creates a new Wireguard tunnel interface and listens to events from Signal and Management services
|
||||
// Connections to remote peers are not established here.
|
||||
// However, they will be established once an event with a list of peers to connect to will be received from Management Service
|
||||
@ -262,36 +277,42 @@ func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.K
|
||||
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
|
||||
// E.g. when a new peer has been registered and we are allowed to connect to it.
|
||||
func (e *Engine) receiveManagementEvents() {
|
||||
go func() {
|
||||
err := e.mgmClient.Sync(func(update *mgmProto.SyncResponse) error {
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
|
||||
log.Debugf("connecting to Management Service updates stream")
|
||||
if update.GetWiretrusteeConfig() != nil {
|
||||
err := e.updateTURNs(update.GetWiretrusteeConfig().GetTurns())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.mgmClient.Sync(func(update *mgmProto.SyncResponse) error {
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
err = e.updateSTUNs(update.GetWiretrusteeConfig().GetStuns())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if update.GetWiretrusteeConfig() != nil {
|
||||
err := e.updateTURNs(update.GetWiretrusteeConfig().GetTurns())
|
||||
if err != nil {
|
||||
return err
|
||||
//todo update signal
|
||||
}
|
||||
|
||||
err = e.updateSTUNs(update.GetWiretrusteeConfig().GetStuns())
|
||||
if err != nil {
|
||||
return err
|
||||
if update.GetRemotePeers() != nil || update.GetRemotePeersIsEmpty() {
|
||||
// empty arrays are serialized by protobuf to null, but for our case empty array is a valid state.
|
||||
err := e.updatePeers(update.GetRemotePeers())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//todo update signal
|
||||
}
|
||||
|
||||
err := e.updatePeers(update.GetRemotePeers())
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
e.cancel()
|
||||
return
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
log.Infof("connected to Management Service updates stream")
|
||||
log.Infof("connected to Management Service updates stream")
|
||||
}()
|
||||
log.Debugf("connecting to Management Service updates stream")
|
||||
}
|
||||
|
||||
func (e *Engine) updateSTUNs(stuns []*mgmProto.HostConfig) error {
|
||||
@ -333,10 +354,6 @@ func (e *Engine) updateTURNs(turns []*mgmProto.ProtectedHostConfig) error {
|
||||
}
|
||||
|
||||
func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {
|
||||
if len(remotePeers) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debugf("got peers update from Management Service, updating")
|
||||
remotePeerMap := make(map[string]struct{})
|
||||
for _, peer := range remotePeers {
|
||||
|
@ -64,54 +64,61 @@ func (c *Client) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
//defaultBackoff is a basic backoff mechanism for general issues
|
||||
func defaultBackoff() backoff.BackOff {
|
||||
return &backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 30 * time.Second,
|
||||
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}
|
||||
}
|
||||
|
||||
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
|
||||
// Non blocking request (executed in go routine). The result will be sent via msgHandler callback function
|
||||
func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) {
|
||||
// Blocking request. The result will be sent via msgHandler callback function
|
||||
func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
|
||||
go func() {
|
||||
var backOff = defaultBackoff()
|
||||
|
||||
var backOff = &backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 3 * time.Second,
|
||||
MaxElapsedTime: time.Duration(0), //never stop retrying
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}
|
||||
operation := func() error {
|
||||
|
||||
operation := func() error {
|
||||
|
||||
// todo we already have it since we did the Login, maybe cache it locally?
|
||||
serverPubKey, err := c.GetServerPublicKey()
|
||||
if err != nil {
|
||||
log.Errorf("failed getting Management Service public key: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
stream, err := c.connectToStream(*serverPubKey)
|
||||
if err != nil {
|
||||
log.Errorf("failed to open Management Service stream: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("connected to the Management Service Stream")
|
||||
|
||||
// blocking until error
|
||||
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
backOff.Reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
// todo we already have it since we did the Login, maybe cache it locally?
|
||||
serverPubKey, err := c.GetServerPublicKey()
|
||||
if err != nil {
|
||||
log.Errorf("failed communicating with Management Service %s ", err)
|
||||
return
|
||||
log.Errorf("failed getting Management Service public key: %s", err)
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
stream, err := c.connectToStream(*serverPubKey)
|
||||
if err != nil {
|
||||
log.Errorf("failed to open Management Service stream: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("connected to the Management Service Stream")
|
||||
|
||||
// blocking until error
|
||||
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
|
||||
if err != nil {
|
||||
/*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied {
|
||||
//todo handle differently??
|
||||
}*/
|
||||
return err
|
||||
}
|
||||
backOff.Reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
if err != nil {
|
||||
log.Errorf("exiting Management Service connection retry loop due to unrecoverable error %s ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) connectToStream(serverPubKey wgtypes.Key) (proto.ManagementService_SyncClient, error) {
|
||||
@ -138,7 +145,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("disconnected from Management Service syn stream: %v", err)
|
||||
log.Errorf("disconnected from Management Service sync stream: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -60,8 +60,8 @@ func startManagement(config *mgmt.Config, t *testing.T) (*grpc.Server, net.Liste
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
accountManager := mgmt.NewManager(store)
|
||||
peersUpdateManager := mgmt.NewPeersUpdateManager()
|
||||
accountManager := mgmt.NewManager(store, peersUpdateManager)
|
||||
turnManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig)
|
||||
mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager)
|
||||
if err != nil {
|
||||
@ -146,10 +146,15 @@ func TestClient_Sync(t *testing.T) {
|
||||
|
||||
ch := make(chan *mgmtProto.SyncResponse, 1)
|
||||
|
||||
tested.Sync(func(msg *mgmtProto.SyncResponse) error {
|
||||
ch <- msg
|
||||
return nil
|
||||
})
|
||||
go func() {
|
||||
err = tested.Sync(func(msg *mgmtProto.SyncResponse) error {
|
||||
ch <- msg
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case resp := <-ch:
|
||||
@ -162,6 +167,9 @@ func TestClient_Sync(t *testing.T) {
|
||||
if len(resp.GetRemotePeers()) != 1 {
|
||||
t.Errorf("expecting RemotePeers size %d got %d", 1, len(resp.GetRemotePeers()))
|
||||
}
|
||||
if resp.GetRemotePeersIsEmpty() == true {
|
||||
t.Error("expecting RemotePeers property to be false, got true")
|
||||
}
|
||||
if resp.GetRemotePeers()[0].GetWgPubKey() != remoteKey.PublicKey().String() {
|
||||
t.Errorf("expecting RemotePeer public key %s got %s", remoteKey.PublicKey().String(), resp.GetRemotePeers()[0].GetWgPubKey())
|
||||
}
|
||||
|
@ -64,7 +64,8 @@ var (
|
||||
if err != nil {
|
||||
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
|
||||
}
|
||||
accountManager := server.NewManager(store)
|
||||
peersUpdateManager := server.NewPeersUpdateManager()
|
||||
accountManager := server.NewManager(store, peersUpdateManager)
|
||||
|
||||
var opts []grpc.ServerOption
|
||||
|
||||
@ -81,7 +82,6 @@ var (
|
||||
|
||||
opts = append(opts, grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
|
||||
grpcServer := grpc.NewServer(opts...)
|
||||
peersUpdateManager := server.NewPeersUpdateManager()
|
||||
turnManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig)
|
||||
server, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager)
|
||||
if err != nil {
|
||||
|
@ -181,6 +181,8 @@ type SyncResponse struct {
|
||||
WiretrusteeConfig *WiretrusteeConfig `protobuf:"bytes,1,opt,name=wiretrusteeConfig,proto3" json:"wiretrusteeConfig,omitempty"`
|
||||
PeerConfig *PeerConfig `protobuf:"bytes,2,opt,name=peerConfig,proto3" json:"peerConfig,omitempty"`
|
||||
RemotePeers []*RemotePeerConfig `protobuf:"bytes,3,rep,name=remotePeers,proto3" json:"remotePeers,omitempty"`
|
||||
// Indicates whether remotePeers array is empty or not to bypass protobuf null and empty array equality.
|
||||
RemotePeersIsEmpty bool `protobuf:"varint,4,opt,name=remotePeersIsEmpty,proto3" json:"remotePeersIsEmpty,omitempty"`
|
||||
}
|
||||
|
||||
func (x *SyncResponse) Reset() {
|
||||
@ -236,6 +238,13 @@ func (x *SyncResponse) GetRemotePeers() []*RemotePeerConfig {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *SyncResponse) GetRemotePeersIsEmpty() bool {
|
||||
if x != nil {
|
||||
return x.RemotePeersIsEmpty
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type LoginRequest struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
@ -860,7 +869,7 @@ var file_management_proto_rawDesc = []byte{
|
||||
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x67, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12,
|
||||
0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62,
|
||||
0x6f, 0x64, 0x79, 0x22, 0x0d, 0x0a, 0x0b, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65,
|
||||
0x73, 0x74, 0x22, 0xd3, 0x01, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f,
|
||||
0x73, 0x74, 0x22, 0x83, 0x02, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f,
|
||||
0x6e, 0x73, 0x65, 0x12, 0x4b, 0x0a, 0x11, 0x77, 0x69, 0x72, 0x65, 0x74, 0x72, 0x75, 0x73, 0x74,
|
||||
0x65, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d,
|
||||
0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x57, 0x69, 0x72, 0x65,
|
||||
@ -873,7 +882,10 @@ var file_management_proto_rawDesc = []byte{
|
||||
0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e,
|
||||
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x74,
|
||||
0x65, 0x50, 0x65, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x72, 0x65, 0x6d,
|
||||
0x6f, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x73, 0x22, 0x5a, 0x0a, 0x0c, 0x4c, 0x6f, 0x67, 0x69,
|
||||
0x6f, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x73, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6d, 0x6f,
|
||||
0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x73, 0x49, 0x73, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x18, 0x04,
|
||||
0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72,
|
||||
0x73, 0x49, 0x73, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x5a, 0x0a, 0x0c, 0x4c, 0x6f, 0x67, 0x69,
|
||||
0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x74, 0x75,
|
||||
0x70, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x74, 0x75,
|
||||
0x70, 0x4b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01,
|
||||
|
@ -42,6 +42,8 @@ message SyncResponse {
|
||||
PeerConfig peerConfig = 2;
|
||||
|
||||
repeated RemotePeerConfig remotePeers = 3;
|
||||
// Indicates whether remotePeers array is empty or not to bypass protobuf null and empty array equality.
|
||||
bool remotePeersIsEmpty = 4;
|
||||
}
|
||||
|
||||
message LoginRequest {
|
||||
|
@ -13,7 +13,8 @@ import (
|
||||
type AccountManager struct {
|
||||
Store Store
|
||||
// mutex to synchronise account operations (e.g. generating Peer IP address inside the Network)
|
||||
mux sync.Mutex
|
||||
mux sync.Mutex
|
||||
peersUpdateManager *PeersUpdateManager
|
||||
}
|
||||
|
||||
// Account represents a unique account of the system
|
||||
@ -25,19 +26,20 @@ type Account struct {
|
||||
}
|
||||
|
||||
// NewManager creates a new AccountManager with a provided Store
|
||||
func NewManager(store Store) *AccountManager {
|
||||
func NewManager(store Store, peersUpdateManager *PeersUpdateManager) *AccountManager {
|
||||
return &AccountManager{
|
||||
Store: store,
|
||||
mux: sync.Mutex{},
|
||||
Store: store,
|
||||
mux: sync.Mutex{},
|
||||
peersUpdateManager: peersUpdateManager,
|
||||
}
|
||||
}
|
||||
|
||||
//AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account
|
||||
func (manager *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn time.Duration) (*SetupKey, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn time.Duration) (*SetupKey, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
account, err := manager.Store.GetAccount(accountId)
|
||||
account, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "account not found")
|
||||
}
|
||||
@ -45,7 +47,7 @@ func (manager *AccountManager) AddSetupKey(accountId string, keyName string, key
|
||||
setupKey := GenerateSetupKey(keyName, keyType, expiresIn)
|
||||
account.SetupKeys[setupKey.Key] = setupKey
|
||||
|
||||
err = manager.Store.SaveAccount(account)
|
||||
err = am.Store.SaveAccount(account)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed adding account key")
|
||||
}
|
||||
@ -54,11 +56,11 @@ func (manager *AccountManager) AddSetupKey(accountId string, keyName string, key
|
||||
}
|
||||
|
||||
//RevokeSetupKey marks SetupKey as revoked - becomes not valid anymore
|
||||
func (manager *AccountManager) RevokeSetupKey(accountId string, keyId string) (*SetupKey, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) RevokeSetupKey(accountId string, keyId string) (*SetupKey, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
account, err := manager.Store.GetAccount(accountId)
|
||||
account, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "account not found")
|
||||
}
|
||||
@ -71,7 +73,7 @@ func (manager *AccountManager) RevokeSetupKey(accountId string, keyId string) (*
|
||||
keyCopy := setupKey.Copy()
|
||||
keyCopy.Revoked = true
|
||||
account.SetupKeys[keyCopy.Key] = keyCopy
|
||||
err = manager.Store.SaveAccount(account)
|
||||
err = am.Store.SaveAccount(account)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed adding account key")
|
||||
}
|
||||
@ -80,11 +82,11 @@ func (manager *AccountManager) RevokeSetupKey(accountId string, keyId string) (*
|
||||
}
|
||||
|
||||
//RenameSetupKey renames existing setup key of the specified account.
|
||||
func (manager *AccountManager) RenameSetupKey(accountId string, keyId string, newName string) (*SetupKey, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) RenameSetupKey(accountId string, keyId string, newName string) (*SetupKey, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
account, err := manager.Store.GetAccount(accountId)
|
||||
account, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "account not found")
|
||||
}
|
||||
@ -97,7 +99,7 @@ func (manager *AccountManager) RenameSetupKey(accountId string, keyId string, ne
|
||||
keyCopy := setupKey.Copy()
|
||||
keyCopy.Name = newName
|
||||
account.SetupKeys[keyCopy.Key] = keyCopy
|
||||
err = manager.Store.SaveAccount(account)
|
||||
err = am.Store.SaveAccount(account)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed adding account key")
|
||||
}
|
||||
@ -106,11 +108,11 @@ func (manager *AccountManager) RenameSetupKey(accountId string, keyId string, ne
|
||||
}
|
||||
|
||||
//GetAccount returns an existing account or error (NotFound) if doesn't exist
|
||||
func (manager *AccountManager) GetAccount(accountId string) (*Account, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) GetAccount(accountId string) (*Account, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
account, err := manager.Store.GetAccount(accountId)
|
||||
account, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "account not found")
|
||||
}
|
||||
@ -119,21 +121,21 @@ func (manager *AccountManager) GetAccount(accountId string) (*Account, error) {
|
||||
}
|
||||
|
||||
// GetOrCreateAccount returns an existing account or creates a new one if doesn't exist
|
||||
func (manager *AccountManager) GetOrCreateAccount(accountId string) (*Account, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) GetOrCreateAccount(accountId string) (*Account, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
_, err := manager.Store.GetAccount(accountId)
|
||||
_, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
|
||||
return manager.createAccount(accountId)
|
||||
return am.createAccount(accountId)
|
||||
} else {
|
||||
// other error
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
account, err := manager.Store.GetAccount(accountId)
|
||||
account, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed retrieving account")
|
||||
}
|
||||
@ -142,12 +144,12 @@ func (manager *AccountManager) GetOrCreateAccount(accountId string) (*Account, e
|
||||
}
|
||||
|
||||
//AccountExists checks whether account exists (returns true) or not (returns false)
|
||||
func (manager *AccountManager) AccountExists(accountId string) (*bool, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) AccountExists(accountId string) (*bool, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
var res bool
|
||||
_, err := manager.Store.GetAccount(accountId)
|
||||
_, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
|
||||
res = false
|
||||
@ -162,19 +164,19 @@ func (manager *AccountManager) AccountExists(accountId string) (*bool, error) {
|
||||
}
|
||||
|
||||
// AddAccount generates a new Account with a provided accountId and saves to the Store
|
||||
func (manager *AccountManager) AddAccount(accountId string) (*Account, error) {
|
||||
func (am *AccountManager) AddAccount(accountId string) (*Account, error) {
|
||||
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
return manager.createAccount(accountId)
|
||||
return am.createAccount(accountId)
|
||||
|
||||
}
|
||||
|
||||
func (manager *AccountManager) createAccount(accountId string) (*Account, error) {
|
||||
func (am *AccountManager) createAccount(accountId string) (*Account, error) {
|
||||
account, _ := newAccountWithId(accountId)
|
||||
|
||||
err := manager.Store.SaveAccount(account)
|
||||
err := am.Store.SaveAccount(account)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed creating account")
|
||||
}
|
||||
@ -188,17 +190,19 @@ func newAccountWithId(accountId string) (*Account, *SetupKey) {
|
||||
log.Debugf("creating new account")
|
||||
|
||||
setupKeys := make(map[string]*SetupKey)
|
||||
setupKey := GenerateDefaultSetupKey()
|
||||
setupKeys[setupKey.Key] = setupKey
|
||||
defaultKey := GenerateDefaultSetupKey()
|
||||
oneOffKey := GenerateSetupKey("One-off key", SetupKeyOneOff, DefaultSetupKeyDuration)
|
||||
setupKeys[defaultKey.Key] = defaultKey
|
||||
setupKeys[oneOffKey.Key] = oneOffKey
|
||||
network := &Network{
|
||||
Id: uuid.New().String(),
|
||||
Net: net.IPNet{IP: net.ParseIP("100.64.0.0"), Mask: net.IPMask{255, 192, 0, 0}},
|
||||
Dns: ""}
|
||||
peers := make(map[string]*Peer)
|
||||
|
||||
log.Debugf("created new account %s with setup key %s", accountId, setupKey.Key)
|
||||
log.Debugf("created new account %s with setup key %s", accountId, defaultKey.Key)
|
||||
|
||||
return &Account{Id: accountId, SetupKeys: setupKeys, Network: network, Peers: peers}, setupKey
|
||||
return &Account{Id: accountId, SetupKeys: setupKeys, Network: network, Peers: peers}, defaultKey
|
||||
}
|
||||
|
||||
// newAccount creates a new Account with a default SetupKey (doesn't store in a Store)
|
||||
|
@ -17,7 +17,7 @@ func TestAccountManager_AddAccount(t *testing.T) {
|
||||
|
||||
expectedId := "test_account"
|
||||
expectedPeersSize := 0
|
||||
expectedSetupKeysSize := 1
|
||||
expectedSetupKeysSize := 2
|
||||
expectedNetwork := net.IPNet{
|
||||
IP: net.IP{100, 64, 0, 0},
|
||||
Mask: net.IPMask{255, 192, 0, 0},
|
||||
@ -201,7 +201,7 @@ func createManager(t *testing.T) (*AccountManager, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewManager(store), nil
|
||||
return NewManager(store, NewPeersUpdateManager()), nil
|
||||
}
|
||||
|
||||
func createStore(t *testing.T) (Store, error) {
|
||||
|
@ -190,6 +190,22 @@ func (s *FileStore) GetAccountBySetupKey(setupKey string) (*Account, error) {
|
||||
|
||||
return account, nil
|
||||
}
|
||||
func (s *FileStore) GetAccountPeers(accountId string) ([]*Peer, error) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
account, err := s.GetAccount(accountId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var peers []*Peer
|
||||
for _, peer := range account.Peers {
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
func (s *FileStore) GetAccount(accountId string) (*Account, error) {
|
||||
|
||||
|
@ -118,6 +118,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "failed sending update message")
|
||||
}
|
||||
log.Debugf("sent an update to peer %s", peerKey.String())
|
||||
// condition when client <-> server connection has been terminated
|
||||
case <-srv.Context().Done():
|
||||
// happens when connection drops, e.g. client disconnects
|
||||
@ -274,7 +275,7 @@ func toWiretrusteeConfig(config *Config, turnCredentials *TURNCredentials) *prot
|
||||
password = turnCredentials.Password
|
||||
} else {
|
||||
username = turn.Username
|
||||
password = string(turn.Password)
|
||||
password = turn.Password
|
||||
}
|
||||
turns = append(turns, &proto.ProtectedHostConfig{
|
||||
HostConfig: &proto.HostConfig{
|
||||
@ -302,13 +303,9 @@ func toPeerConfig(peer *Peer) *proto.PeerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
func toSyncResponse(config *Config, peer *Peer, peers []*Peer, turnCredentials *TURNCredentials) *proto.SyncResponse {
|
||||
func toRemotePeerConfig(peers []*Peer) []*proto.RemotePeerConfig {
|
||||
|
||||
wtConfig := toWiretrusteeConfig(config, turnCredentials)
|
||||
|
||||
pConfig := toPeerConfig(peer)
|
||||
|
||||
remotePeers := make([]*proto.RemotePeerConfig, 0, len(peers))
|
||||
remotePeers := []*proto.RemotePeerConfig{}
|
||||
for _, rPeer := range peers {
|
||||
remotePeers = append(remotePeers, &proto.RemotePeerConfig{
|
||||
WgPubKey: rPeer.Key,
|
||||
@ -316,10 +313,23 @@ func toSyncResponse(config *Config, peer *Peer, peers []*Peer, turnCredentials *
|
||||
})
|
||||
}
|
||||
|
||||
return remotePeers
|
||||
|
||||
}
|
||||
|
||||
func toSyncResponse(config *Config, peer *Peer, peers []*Peer, turnCredentials *TURNCredentials) *proto.SyncResponse {
|
||||
|
||||
wtConfig := toWiretrusteeConfig(config, turnCredentials)
|
||||
|
||||
pConfig := toPeerConfig(peer)
|
||||
|
||||
remotePeers := toRemotePeerConfig(peers)
|
||||
|
||||
return &proto.SyncResponse{
|
||||
WiretrusteeConfig: wtConfig,
|
||||
PeerConfig: pConfig,
|
||||
RemotePeers: remotePeers,
|
||||
WiretrusteeConfig: wtConfig,
|
||||
PeerConfig: pConfig,
|
||||
RemotePeers: remotePeers,
|
||||
RemotePeersIsEmpty: len(remotePeers) == 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -492,8 +492,8 @@ func startServer(config *server.Config) (*grpc.Server, net.Listener) {
|
||||
if err != nil {
|
||||
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
|
||||
}
|
||||
accountManager := server.NewManager(store)
|
||||
peersUpdateManager := server.NewPeersUpdateManager()
|
||||
accountManager := server.NewManager(store, peersUpdateManager)
|
||||
turnManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig)
|
||||
mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
@ -1,6 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"github.com/wiretrustee/wiretrustee/management/proto"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"net"
|
||||
@ -55,11 +56,11 @@ func (p *Peer) Copy() *Peer {
|
||||
}
|
||||
|
||||
//GetPeer returns a peer from a Store
|
||||
func (manager *AccountManager) GetPeer(peerKey string) (*Peer, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) GetPeer(peerKey string) (*Peer, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
peer, err := manager.Store.GetPeer(peerKey)
|
||||
peer, err := am.Store.GetPeer(peerKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -68,16 +69,16 @@ func (manager *AccountManager) GetPeer(peerKey string) (*Peer, error) {
|
||||
}
|
||||
|
||||
//MarkPeerConnected marks peer as connected (true) or disconnected (false)
|
||||
func (manager *AccountManager) MarkPeerConnected(peerKey string, connected bool) error {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) MarkPeerConnected(peerKey string, connected bool) error {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
peer, err := manager.Store.GetPeer(peerKey)
|
||||
peer, err := am.Store.GetPeer(peerKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
account, err := manager.Store.GetPeerAccount(peerKey)
|
||||
account, err := am.Store.GetPeerAccount(peerKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -85,7 +86,7 @@ func (manager *AccountManager) MarkPeerConnected(peerKey string, connected bool)
|
||||
peerCopy := peer.Copy()
|
||||
peerCopy.Status.LastSeen = time.Now()
|
||||
peerCopy.Status.Connected = connected
|
||||
err = manager.Store.SavePeer(account.Id, peerCopy)
|
||||
err = am.Store.SavePeer(account.Id, peerCopy)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -93,18 +94,18 @@ func (manager *AccountManager) MarkPeerConnected(peerKey string, connected bool)
|
||||
}
|
||||
|
||||
//RenamePeer changes peer's name
|
||||
func (manager *AccountManager) RenamePeer(accountId string, peerKey string, newName string) (*Peer, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) RenamePeer(accountId string, peerKey string, newName string) (*Peer, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
peer, err := manager.Store.GetPeer(peerKey)
|
||||
peer, err := am.Store.GetPeer(peerKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
peerCopy := peer.Copy()
|
||||
peerCopy.Name = newName
|
||||
err = manager.Store.SavePeer(accountId, peerCopy)
|
||||
err = am.Store.SavePeer(accountId, peerCopy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -113,18 +114,60 @@ func (manager *AccountManager) RenamePeer(accountId string, peerKey string, newN
|
||||
}
|
||||
|
||||
//DeletePeer removes peer from the account by it's IP
|
||||
func (manager *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
return manager.Store.DeletePeer(accountId, peerKey)
|
||||
func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
peer, err := am.Store.DeletePeer(accountId, peerKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = am.peersUpdateManager.SendUpdate(peerKey,
|
||||
&UpdateMessage{
|
||||
Update: &proto.SyncResponse{
|
||||
RemotePeers: []*proto.RemotePeerConfig{},
|
||||
RemotePeersIsEmpty: true,
|
||||
}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//notify other peers of the change
|
||||
peers, err := am.Store.GetAccountPeers(accountId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, p := range peers {
|
||||
peersToSend := []*Peer{}
|
||||
for _, remote := range peers {
|
||||
if p.Key != remote.Key {
|
||||
peersToSend = append(peersToSend, remote)
|
||||
}
|
||||
}
|
||||
update := toRemotePeerConfig(peersToSend)
|
||||
err = am.peersUpdateManager.SendUpdate(p.Key,
|
||||
&UpdateMessage{
|
||||
Update: &proto.SyncResponse{
|
||||
RemotePeers: update,
|
||||
RemotePeersIsEmpty: len(update) == 0,
|
||||
}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
am.peersUpdateManager.CloseChannel(peerKey)
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
//GetPeerByIP returns peer by it's IP
|
||||
func (manager *AccountManager) GetPeerByIP(accountId string, peerIP string) (*Peer, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) GetPeerByIP(accountId string, peerIP string) (*Peer, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
account, err := manager.Store.GetAccount(accountId)
|
||||
account, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "account not found")
|
||||
}
|
||||
@ -140,11 +183,11 @@ func (manager *AccountManager) GetPeerByIP(accountId string, peerIP string) (*Pe
|
||||
|
||||
// GetPeersForAPeer returns a list of peers available for a given peer (key)
|
||||
// Effectively all the peers of the original peer's account except for the peer itself
|
||||
func (manager *AccountManager) GetPeersForAPeer(peerKey string) ([]*Peer, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) GetPeersForAPeer(peerKey string) ([]*Peer, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
account, err := manager.Store.GetPeerAccount(peerKey)
|
||||
account, err := am.Store.GetPeerAccount(peerKey)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "Invalid peer key %s", peerKey)
|
||||
}
|
||||
@ -165,9 +208,9 @@ func (manager *AccountManager) GetPeersForAPeer(peerKey string) ([]*Peer, error)
|
||||
// Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused).
|
||||
// If the specified setupKey is empty then a new Account will be created //todo remove this part
|
||||
// The peer property is just a placeholder for the Peer properties to pass further
|
||||
func (manager *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error) {
|
||||
manager.mux.Lock()
|
||||
defer manager.mux.Unlock()
|
||||
func (am *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
upperKey := strings.ToUpper(setupKey)
|
||||
|
||||
@ -178,7 +221,7 @@ func (manager *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error
|
||||
// Empty setup key, create a new account for it.
|
||||
account, sk = newAccount()
|
||||
} else {
|
||||
account, err = manager.Store.GetAccountBySetupKey(upperKey)
|
||||
account, err = am.Store.GetAccountBySetupKey(upperKey)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "unknown setupKey %s", upperKey)
|
||||
}
|
||||
@ -213,7 +256,7 @@ func (manager *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error
|
||||
|
||||
account.Peers[newPeer.Key] = newPeer
|
||||
account.SetupKeys[sk.Key] = sk.IncrementUsage()
|
||||
err = manager.Store.SaveAccount(account)
|
||||
err = am.Store.SaveAccount(account)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed adding peer")
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ type Store interface {
|
||||
DeletePeer(accountId string, peerKey string) (*Peer, error)
|
||||
SavePeer(accountId string, peer *Peer) error
|
||||
GetAccount(accountId string) (*Account, error)
|
||||
GetAccountPeers(accountId string) ([]*Peer, error)
|
||||
GetPeerAccount(peerKey string) (*Account, error)
|
||||
GetAccountBySetupKey(setupKey string) (*Account, error)
|
||||
SaveAccount(account *Account) error
|
||||
|
@ -75,6 +75,19 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
|
||||
}, nil
|
||||
}
|
||||
|
||||
//defaultBackoff is a basic backoff mechanism for general issues
|
||||
func defaultBackoff() backoff.BackOff {
|
||||
return &backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 30 * time.Second,
|
||||
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}
|
||||
}
|
||||
|
||||
// Receive Connects to the Signal Exchange message stream and starts receiving messages.
|
||||
// The messages will be handled by msgHandler function provided.
|
||||
// This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
|
||||
@ -83,15 +96,7 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
|
||||
c.connWg.Add(1)
|
||||
go func() {
|
||||
|
||||
var backOff = &backoff.ExponentialBackOff{
|
||||
InitialInterval: backoff.DefaultInitialInterval,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 3 * time.Second,
|
||||
MaxElapsedTime: time.Duration(0), //never stop
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}
|
||||
var backOff = defaultBackoff()
|
||||
|
||||
operation := func() error {
|
||||
err := c.connect(c.key.PublicKey().String(), msgHandler)
|
||||
|
Loading…
Reference in New Issue
Block a user