mirror of
https://github.com/netbirdio/netbird.git
synced 2024-11-21 23:53:14 +01:00
Save Peer Status separately in the FileStore (#554)
Due to peer reconnects when restarting the Management service, there are lots of SaveStore operations to update peer status. Store.SavePeerStatus stores peer status separately and the FileStore implementation stores it in memory.
This commit is contained in:
parent
7e262572a4
commit
f37b43a542
@ -62,7 +62,7 @@ func startManagement(t *testing.T, config *mgmt.Config) (*grpc.Server, net.Liste
|
||||
t.Fatal(err)
|
||||
}
|
||||
s := grpc.NewServer()
|
||||
store, err := mgmt.NewStore(config.Datadir)
|
||||
store, err := mgmt.NewFileStore(config.Datadir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -935,7 +935,7 @@ func startManagement(port int, dataDir string) (*grpc.Server, error) {
|
||||
return nil, err
|
||||
}
|
||||
s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
|
||||
store, err := server.NewStore(config.Datadir)
|
||||
store, err := server.NewFileStore(config.Datadir)
|
||||
if err != nil {
|
||||
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s := grpc.NewServer()
|
||||
store, err := mgmt.NewStore(config.Datadir)
|
||||
store, err := mgmt.NewFileStore(config.Datadir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ var (
|
||||
}
|
||||
}
|
||||
|
||||
store, err := server.NewStore(config.Datadir)
|
||||
store, err := server.NewFileStore(config.Datadir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed creating Store: %s: %v", config.Datadir, err)
|
||||
}
|
||||
@ -250,6 +250,7 @@ var (
|
||||
_ = certManager.Listener().Close()
|
||||
}
|
||||
gRPCAPIHandler.Stop()
|
||||
_ = store.Close()
|
||||
log.Infof("stopped Management Service")
|
||||
|
||||
return nil
|
||||
|
@ -1211,7 +1211,7 @@ func createManager(t *testing.T) (*DefaultAccountManager, error) {
|
||||
|
||||
func createStore(t *testing.T) (Store, error) {
|
||||
dataDir := t.TempDir()
|
||||
store, err := NewStore(dataDir)
|
||||
store, err := NewFileStore(dataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -37,8 +37,8 @@ type FileStore struct {
|
||||
|
||||
type StoredAccount struct{}
|
||||
|
||||
// NewStore restores a store from the file located in the datadir
|
||||
func NewStore(dataDir string) (*FileStore, error) {
|
||||
// NewFileStore restores a store from the file located in the datadir
|
||||
func NewFileStore(dataDir string) (*FileStore, error) {
|
||||
return restore(filepath.Join(dataDir, storeFileName))
|
||||
}
|
||||
|
||||
@ -198,7 +198,12 @@ func (s *FileStore) GetAccountByPrivateDomain(domain string) (*Account, error) {
|
||||
)
|
||||
}
|
||||
|
||||
return s.getAccount(accountID)
|
||||
account, err := s.getAccount(accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return account.Copy(), nil
|
||||
}
|
||||
|
||||
// GetAccountBySetupKey returns account by setup key id
|
||||
@ -211,7 +216,12 @@ func (s *FileStore) GetAccountBySetupKey(setupKey string) (*Account, error) {
|
||||
return nil, status.Errorf(codes.NotFound, "account not found: provided setup key doesn't exists")
|
||||
}
|
||||
|
||||
return s.getAccount(accountID)
|
||||
account, err := s.getAccount(accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return account.Copy(), nil
|
||||
}
|
||||
|
||||
// GetAllAccounts returns all accounts
|
||||
@ -225,13 +235,14 @@ func (s *FileStore) GetAllAccounts() (all []*Account) {
|
||||
return all
|
||||
}
|
||||
|
||||
// getAccount returns a reference to the Account. Should not return a copy.
|
||||
func (s *FileStore) getAccount(accountID string) (*Account, error) {
|
||||
account, accountFound := s.Accounts[accountID]
|
||||
if !accountFound {
|
||||
return nil, status.Errorf(codes.NotFound, "account not found")
|
||||
}
|
||||
|
||||
return account.Copy(), nil
|
||||
return account, nil
|
||||
}
|
||||
|
||||
// GetAccount returns an account for ID
|
||||
@ -239,7 +250,12 @@ func (s *FileStore) GetAccount(accountID string) (*Account, error) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
return s.getAccount(accountID)
|
||||
account, err := s.getAccount(accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return account.Copy(), nil
|
||||
}
|
||||
|
||||
// GetAccountByUser returns a user account
|
||||
@ -252,7 +268,12 @@ func (s *FileStore) GetAccountByUser(userID string) (*Account, error) {
|
||||
return nil, status.Errorf(codes.NotFound, "account not found")
|
||||
}
|
||||
|
||||
return s.getAccount(accountID)
|
||||
account, err := s.getAccount(accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return account.Copy(), nil
|
||||
}
|
||||
|
||||
// GetAccountByPeerPubKey returns an account for a given peer WireGuard public key
|
||||
@ -265,7 +286,12 @@ func (s *FileStore) GetAccountByPeerPubKey(peerKey string) (*Account, error) {
|
||||
return nil, status.Errorf(codes.NotFound, "Provided peer key doesn't exists %s", peerKey)
|
||||
}
|
||||
|
||||
return s.getAccount(accountID)
|
||||
account, err := s.getAccount(accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return account.Copy(), nil
|
||||
}
|
||||
|
||||
// GetInstallationID returns the installation ID from the store
|
||||
@ -274,11 +300,42 @@ func (s *FileStore) GetInstallationID() string {
|
||||
}
|
||||
|
||||
// SaveInstallationID saves the installation ID
|
||||
func (s *FileStore) SaveInstallationID(id string) error {
|
||||
func (s *FileStore) SaveInstallationID(ID string) error {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
s.InstallationID = id
|
||||
s.InstallationID = ID
|
||||
|
||||
return s.persist(s.storeFile)
|
||||
}
|
||||
|
||||
// SavePeerStatus stores the PeerStatus in memory. It doesn't attempt to persist data to speed up things.
|
||||
// PeerStatus will be saved eventually when some other changes occur.
|
||||
func (s *FileStore) SavePeerStatus(accountID, peerKey string, peerStatus PeerStatus) error {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
account, err := s.getAccount(accountID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
peer := account.Peers[peerKey]
|
||||
if peer == nil {
|
||||
return status.Errorf(codes.NotFound, "peer %s not found", peerKey)
|
||||
}
|
||||
|
||||
peer.Status = &peerStatus
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close the FileStore persisting data to disk
|
||||
func (s *FileStore) Close() error {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
log.Infof("closing FileStore")
|
||||
|
||||
return s.persist(s.storeFile)
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ func TestStore(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
restored, err := NewStore(store.storeFile)
|
||||
restored, err := NewFileStore(store.storeFile)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -129,7 +129,7 @@ func TestRestore(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
store, err := NewStore(storeDir)
|
||||
store, err := NewFileStore(storeDir)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -161,7 +161,7 @@ func TestGetAccountByPrivateDomain(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
store, err := NewStore(storeDir)
|
||||
store, err := NewFileStore(storeDir)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -190,7 +190,7 @@ func TestFileStore_GetAccount(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
store, err := NewStore(storeDir)
|
||||
store, err := NewFileStore(storeDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -218,8 +218,59 @@ func TestFileStore_GetAccount(t *testing.T) {
|
||||
assert.Len(t, account.NameServerGroups, len(expected.NameServerGroups))
|
||||
}
|
||||
|
||||
func TestFileStore_SavePeerStatus(t *testing.T) {
|
||||
storeDir := t.TempDir()
|
||||
|
||||
err := util.CopyFileContents("testdata/store.json", filepath.Join(storeDir, "store.json"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
store, err := NewFileStore(storeDir)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
account, err := store.getAccount("bf1c8084-ba50-4ce7-9439-34653001fc3b")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// save status of non-existing peer
|
||||
newStatus := PeerStatus{Connected: true, LastSeen: time.Now()}
|
||||
err = store.SavePeerStatus(account.Id, "non-existing-peer", newStatus)
|
||||
assert.Error(t, err)
|
||||
|
||||
// save new status of existing peer
|
||||
account.Peers["testpeer"] = &Peer{
|
||||
Key: "peerkey",
|
||||
SetupKey: "peerkeysetupkey",
|
||||
IP: net.IP{127, 0, 0, 1},
|
||||
Meta: PeerSystemMeta{},
|
||||
Name: "peer name",
|
||||
Status: &PeerStatus{Connected: false, LastSeen: time.Now()},
|
||||
}
|
||||
|
||||
err = store.SaveAccount(account)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = store.SavePeerStatus(account.Id, "testpeer", newStatus)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
account, err = store.getAccount(account.Id)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
actual := account.Peers["testpeer"].Status
|
||||
assert.Equal(t, newStatus, *actual)
|
||||
}
|
||||
|
||||
func newStore(t *testing.T) *FileStore {
|
||||
store, err := NewStore(t.TempDir())
|
||||
store, err := NewFileStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Errorf("failed creating a new store")
|
||||
}
|
||||
|
@ -398,7 +398,7 @@ func startManagement(t *testing.T, port int, config *Config) (*grpc.Server, erro
|
||||
return nil, err
|
||||
}
|
||||
s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
|
||||
store, err := NewStore(config.Datadir)
|
||||
store, err := NewFileStore(config.Datadir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -488,7 +488,7 @@ func startServer(config *server.Config) (*grpc.Server, net.Listener) {
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
s := grpc.NewServer()
|
||||
|
||||
store, err := server.NewStore(config.Datadir)
|
||||
store, err := server.NewFileStore(config.Datadir)
|
||||
if err != nil {
|
||||
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
|
||||
}
|
||||
|
@ -1061,7 +1061,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
|
||||
|
||||
func createNSStore(t *testing.T) (Store, error) {
|
||||
dataDir := t.TempDir()
|
||||
store, err := NewStore(dataDir)
|
||||
store, err := NewFileStore(dataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -74,6 +74,14 @@ func (p *Peer) Copy() *Peer {
|
||||
}
|
||||
}
|
||||
|
||||
// Copy PeerStatus
|
||||
func (p *PeerStatus) Copy() *PeerStatus {
|
||||
return &PeerStatus{
|
||||
LastSeen: p.LastSeen,
|
||||
Connected: p.Connected,
|
||||
}
|
||||
}
|
||||
|
||||
// GetPeer looks up peer by its public WireGuard key
|
||||
func (am *DefaultAccountManager) GetPeer(peerPubKey string) (*Peer, error) {
|
||||
|
||||
@ -133,12 +141,13 @@ func (am *DefaultAccountManager) MarkPeerConnected(peerPubKey string, connected
|
||||
return err
|
||||
}
|
||||
|
||||
peer.Status.LastSeen = time.Now()
|
||||
peer.Status.Connected = connected
|
||||
|
||||
newStatus := peer.Status.Copy()
|
||||
newStatus.LastSeen = time.Now()
|
||||
newStatus.Connected = connected
|
||||
peer.Status = newStatus
|
||||
account.UpdatePeer(peer)
|
||||
|
||||
err = am.Store.SaveAccount(account)
|
||||
err = am.Store.SavePeerStatus(account.Id, peerPubKey, *newStatus)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -788,7 +788,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
|
||||
|
||||
func createRouterStore(t *testing.T) (Store, error) {
|
||||
dataDir := t.TempDir()
|
||||
store, err := NewStore(dataDir)
|
||||
store, err := NewFileStore(dataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -9,9 +9,12 @@ type Store interface {
|
||||
GetAccountByPrivateDomain(domain string) (*Account, error)
|
||||
SaveAccount(account *Account) error
|
||||
GetInstallationID() string
|
||||
SaveInstallationID(id string) error
|
||||
SaveInstallationID(ID string) error
|
||||
// AcquireAccountLock should attempt to acquire account lock and return a function that releases the lock
|
||||
AcquireAccountLock(accountID string) func()
|
||||
// AcquireGlobalLock should attempt to acquire a global lock and return a function that releases the lock
|
||||
AcquireGlobalLock() func()
|
||||
SavePeerStatus(accountID, peerKey string, status PeerStatus) error
|
||||
// Close should close the store persisting all unsaved data.
|
||||
Close() error
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user