Implement lightweight method to check is peer has update channel (#1351)

Instead of GetAllConnectedPeers that need to traverse the whole
connections map in order to find one channel there.
This commit is contained in:
Yury Gargay 2023-12-05 14:17:56 +01:00 committed by GitHub
parent 45fc89b2c9
commit 27ed88f918
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 49 deletions

View File

@ -114,6 +114,7 @@ type AccountManager interface {
LoginPeer(login PeerLogin) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API LoginPeer(login PeerLogin) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API
SyncPeer(sync PeerSync) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API SyncPeer(sync PeerSync) (*nbpeer.Peer, *NetworkMap, error) // used by peer gRPC API
GetAllConnectedPeers() (map[string]struct{}, error) GetAllConnectedPeers() (map[string]struct{}, error)
HasConnectedChannel(peerID string) bool
GetExternalCacheManager() ExternalCacheManager GetExternalCacheManager() ExternalCacheManager
} }
@ -1676,6 +1677,11 @@ func (am *DefaultAccountManager) GetAllConnectedPeers() (map[string]struct{}, er
return am.peersUpdateManager.GetAllConnectedPeers(), nil return am.peersUpdateManager.GetAllConnectedPeers(), nil
} }
// HasConnectedChannel returns true if peers has channel in update manager, otherwise false
func (am *DefaultAccountManager) HasConnectedChannel(peerID string) bool {
return am.peersUpdateManager.HasChannel(peerID)
}
var invalidDomainRegexp = regexp.MustCompile(`^([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,}$`) var invalidDomainRegexp = regexp.MustCompile(`^([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,}$`)
func isDomainValid(domain string) bool { func isDomainValid(domain string) bool {

View File

@ -35,14 +35,9 @@ func NewPeersHandler(accountManager server.AccountManager, authCfg AuthCfg) *Pee
func (h *PeersHandler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) { func (h *PeersHandler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
peerToReturn := peer.Copy() peerToReturn := peer.Copy()
if peer.Status.Connected { if peer.Status.Connected {
statuses, err := h.accountManager.GetAllConnectedPeers()
if err != nil {
return peerToReturn, err
}
// Although we have online status in store we do not yet have an updated channel so have to show it as disconnected // Although we have online status in store we do not yet have an updated channel so have to show it as disconnected
// This may happen after server restart when not all peers are yet connected // This may happen after server restart when not all peers are yet connected
if _, connected := statuses[peerToReturn.ID]; !connected { if !h.accountManager.HasConnectedChannel(peer.ID) {
peerToReturn.Status.Connected = false peerToReturn.Status.Connected = false
} }
} }

View File

@ -81,8 +81,7 @@ func initTestMetaData(peers ...*nbpeer.Peer) *PeersHandler {
}, },
}, user, nil }, user, nil
}, },
HasConnectedChannelFunc: func(peerID string) bool {
GetAllConnectedPeersFunc: func() (map[string]struct{}, error) {
statuses := make(map[string]struct{}) statuses := make(map[string]struct{})
for _, peer := range peers { for _, peer := range peers {
if peer.ID == noUpdateChannelTestPeerID { if peer.ID == noUpdateChannelTestPeerID {
@ -90,7 +89,8 @@ func initTestMetaData(peers ...*nbpeer.Peer) *PeersHandler {
} }
statuses[peer.ID] = struct{}{} statuses[peer.ID] = struct{}{}
} }
return statuses, nil _, ok := statuses[peerID]
return ok
}, },
}, },
claimsExtractor: jwtclaims.NewClaimsExtractor( claimsExtractor: jwtclaims.NewClaimsExtractor(

View File

@ -81,6 +81,7 @@ type MockAccountManager struct {
SyncPeerFunc func(sync server.PeerSync) (*nbpeer.Peer, *server.NetworkMap, error) SyncPeerFunc func(sync server.PeerSync) (*nbpeer.Peer, *server.NetworkMap, error)
InviteUserFunc func(accountID string, initiatorUserID string, targetUserEmail string) error InviteUserFunc func(accountID string, initiatorUserID string, targetUserEmail string) error
GetAllConnectedPeersFunc func() (map[string]struct{}, error) GetAllConnectedPeersFunc func() (map[string]struct{}, error)
HasConnectedChannelFunc func(peerID string) bool
GetExternalCacheManagerFunc func() server.ExternalCacheManager GetExternalCacheManagerFunc func() server.ExternalCacheManager
} }
@ -622,6 +623,14 @@ func (am *MockAccountManager) GetAllConnectedPeers() (map[string]struct{}, error
return nil, status.Errorf(codes.Unimplemented, "method GetAllConnectedPeers is not implemented") return nil, status.Errorf(codes.Unimplemented, "method GetAllConnectedPeers is not implemented")
} }
// HasconnectedChannel mocks HasConnectedChannel of the AccountManager interface
func (am *MockAccountManager) HasConnectedChannel(peerID string) bool {
if am.HasConnectedChannelFunc != nil {
return am.HasConnectedChannelFunc(peerID)
}
return false
}
// StoreEvent mocks StoreEvent of the AccountManager interface // StoreEvent mocks StoreEvent of the AccountManager interface
func (am *MockAccountManager) StoreEvent(initiatorID, targetID, accountID string, activityID activity.Activity, meta map[string]any) { func (am *MockAccountManager) StoreEvent(initiatorID, targetID, accountID string, activityID activity.Activity, meta map[string]any) {
if am.StoreEventFunc != nil { if am.StoreEventFunc != nil {

View File

@ -11,48 +11,29 @@ import (
// UpdateChannelMetrics represents all metrics related to the UpdateChannel // UpdateChannelMetrics represents all metrics related to the UpdateChannel
type UpdateChannelMetrics struct { type UpdateChannelMetrics struct {
createChannelDurationMs syncint64.Histogram
createChannelDurationMicro syncint64.Histogram createChannelDurationMicro syncint64.Histogram
closeChannelDurationMs syncint64.Histogram
closeChannelDurationMicro syncint64.Histogram closeChannelDurationMicro syncint64.Histogram
closeChannelsDurationMs syncint64.Histogram
closeChannelsDurationMicro syncint64.Histogram closeChannelsDurationMicro syncint64.Histogram
closeChannels syncint64.Histogram closeChannels syncint64.Histogram
sendUpdateDurationMs syncint64.Histogram
sendUpdateDurationMicro syncint64.Histogram sendUpdateDurationMicro syncint64.Histogram
getAllConnectedPeersDurationMs syncint64.Histogram
getAllConnectedPeersDurationMicro syncint64.Histogram getAllConnectedPeersDurationMicro syncint64.Histogram
getAllConnectedPeers syncint64.Histogram getAllConnectedPeers syncint64.Histogram
hasChannelDurationMicro syncint64.Histogram
ctx context.Context ctx context.Context
} }
// NewUpdateChannelMetrics creates an instance of UpdateChannel // NewUpdateChannelMetrics creates an instance of UpdateChannel
func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) { func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) {
createChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.ms")
if err != nil {
return nil, err
}
createChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.micro") createChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.micro")
if err != nil { if err != nil {
return nil, err return nil, err
} }
closeChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.ms")
if err != nil {
return nil, err
}
closeChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.micro") closeChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.micro")
if err != nil { if err != nil {
return nil, err return nil, err
} }
closeChannelsDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.ms")
if err != nil {
return nil, err
}
closeChannelsDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.micro") closeChannelsDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.micro")
if err != nil { if err != nil {
return nil, err return nil, err
@ -63,21 +44,11 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
return nil, err return nil, err
} }
sendUpdateDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.ms")
if err != nil {
return nil, err
}
sendUpdateDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.micro") sendUpdateDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.micro")
if err != nil { if err != nil {
return nil, err return nil, err
} }
getAllConnectedPeersDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.ms")
if err != nil {
return nil, err
}
getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro") getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro")
if err != nil { if err != nil {
return nil, err return nil, err
@ -88,19 +59,20 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
return nil, err return nil, err
} }
hasChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.haschannel.duration.micro")
if err != nil {
return nil, err
}
return &UpdateChannelMetrics{ return &UpdateChannelMetrics{
createChannelDurationMs: createChannelDurationMs,
createChannelDurationMicro: createChannelDurationMicro, createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMs: closeChannelDurationMs,
closeChannelDurationMicro: closeChannelDurationMicro, closeChannelDurationMicro: closeChannelDurationMicro,
closeChannelsDurationMs: closeChannelsDurationMs,
closeChannelsDurationMicro: closeChannelsDurationMicro, closeChannelsDurationMicro: closeChannelsDurationMicro,
closeChannels: closeChannels, closeChannels: closeChannels,
sendUpdateDurationMs: sendUpdateDurationMs,
sendUpdateDurationMicro: sendUpdateDurationMicro, sendUpdateDurationMicro: sendUpdateDurationMicro,
getAllConnectedPeersDurationMs: getAllConnectedPeersDurationMs,
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro, getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
getAllConnectedPeers: getAllConnectedPeers, getAllConnectedPeers: getAllConnectedPeers,
hasChannelDurationMicro: hasChannelDurationMicro,
ctx: ctx, ctx: ctx,
}, nil }, nil
} }
@ -108,19 +80,16 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
// CountCreateChannelDuration counts the duration of the CreateChannel method, // CountCreateChannelDuration counts the duration of the CreateChannel method,
// closed indicates if existing channel was closed before creation of a new one // closed indicates if existing channel was closed before creation of a new one
func (metrics *UpdateChannelMetrics) CountCreateChannelDuration(duration time.Duration, closed bool) { func (metrics *UpdateChannelMetrics) CountCreateChannelDuration(duration time.Duration, closed bool) {
metrics.createChannelDurationMs.Record(metrics.ctx, duration.Milliseconds(), attribute.Bool("closed", closed))
metrics.createChannelDurationMicro.Record(metrics.ctx, duration.Microseconds(), attribute.Bool("closed", closed)) metrics.createChannelDurationMicro.Record(metrics.ctx, duration.Microseconds(), attribute.Bool("closed", closed))
} }
// CountCloseChannelDuration counts the duration of the CloseChannel method // CountCloseChannelDuration counts the duration of the CloseChannel method
func (metrics *UpdateChannelMetrics) CountCloseChannelDuration(duration time.Duration) { func (metrics *UpdateChannelMetrics) CountCloseChannelDuration(duration time.Duration) {
metrics.closeChannelDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelDurationMicro.Record(metrics.ctx, duration.Microseconds()) metrics.closeChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
} }
// CountCloseChannelsDuration counts the duration of the CloseChannels method and the number of channels have been closed // CountCloseChannelsDuration counts the duration of the CloseChannels method and the number of channels have been closed
func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Duration, channels int) { func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Duration, channels int) {
metrics.closeChannelsDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelsDurationMicro.Record(metrics.ctx, duration.Microseconds()) metrics.closeChannelsDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.closeChannels.Record(metrics.ctx, int64(channels)) metrics.closeChannels.Record(metrics.ctx, int64(channels))
} }
@ -129,13 +98,16 @@ func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Du
// found indicates if peer had channel, dropped indicates if the message was dropped due channel buffer overload // found indicates if peer had channel, dropped indicates if the message was dropped due channel buffer overload
func (metrics *UpdateChannelMetrics) CountSendUpdateDuration(duration time.Duration, found, dropped bool) { func (metrics *UpdateChannelMetrics) CountSendUpdateDuration(duration time.Duration, found, dropped bool) {
attrs := []attribute.KeyValue{attribute.Bool("found", found), attribute.Bool("dropped", dropped)} attrs := []attribute.KeyValue{attribute.Bool("found", found), attribute.Bool("dropped", dropped)}
metrics.sendUpdateDurationMs.Record(metrics.ctx, duration.Milliseconds(), attrs...)
metrics.sendUpdateDurationMicro.Record(metrics.ctx, duration.Microseconds(), attrs...) metrics.sendUpdateDurationMicro.Record(metrics.ctx, duration.Microseconds(), attrs...)
} }
// CountGetAllConnectedPeersDuration counts the duration of the GetAllConnectedPeers method and the number of peers have been returned // CountGetAllConnectedPeersDuration counts the duration of the GetAllConnectedPeers method and the number of peers have been returned
func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration time.Duration, peers int) { func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration time.Duration, peers int) {
metrics.getAllConnectedPeersDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.getAllConnectedPeersDurationMicro.Record(metrics.ctx, duration.Microseconds()) metrics.getAllConnectedPeersDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.getAllConnectedPeers.Record(metrics.ctx, int64(peers)) metrics.getAllConnectedPeers.Record(metrics.ctx, int64(peers))
} }
// CountHasChannelDuration counts the duration of the HasChannel method
func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) {
metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
}

View File

@ -151,3 +151,21 @@ func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} {
return m return m
} }
// HasChannel returns true if peers has channel in update manager, otherwise false
func (p *PeersUpdateManager) HasChannel(peerID string) bool {
start := time.Now()
p.channelsMux.Lock()
defer func() {
p.channelsMux.Unlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start))
}
}()
_, ok := p.peerChannels[peerID]
return ok
}