diff --git a/client/internal/engine.go b/client/internal/engine.go index 3f0732be8..db0b8f38e 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -33,8 +33,9 @@ import ( "github.com/netbirdio/netbird/client/internal/acl" "github.com/netbirdio/netbird/client/internal/dns" "github.com/netbirdio/netbird/client/internal/dnsfwd" - "github.com/netbirdio/netbird/client/internal/flowstore" "github.com/netbirdio/netbird/client/internal/ingressgw" + "github.com/netbirdio/netbird/client/internal/netflow" + "github.com/netbirdio/netbird/client/internal/netflow/types" "github.com/netbirdio/netbird/client/internal/networkmonitor" "github.com/netbirdio/netbird/client/internal/peer" "github.com/netbirdio/netbird/client/internal/peer/guard" @@ -190,7 +191,7 @@ type Engine struct { persistNetworkMap bool latestNetworkMap *mgmProto.NetworkMap connSemaphore *semaphoregroup.SemaphoreGroup - flowStore flowstore.Store + flowManager types.FlowManager } // Peer is an instance of the Connection Peer @@ -233,6 +234,7 @@ func NewEngine( statusRecorder: statusRecorder, checks: checks, connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit), + flowManager: netflow.NewManager(clientCtx), } if runtime.GOOS == "ios" { if !fileExists(mobileDep.StateFilePath) { @@ -301,6 +303,8 @@ func (e *Engine) Stop() error { return fmt.Errorf("failed to remove all peers: %s", err) } + e.flowManager.Close() + if e.cancel != nil { e.cancel() } @@ -322,13 +326,6 @@ func (e *Engine) Stop() error { log.Errorf("failed to persist state: %v", err) } - if e.flowStore != nil { - if err := e.flowStore.Close(); err != nil { - e.flowStore = nil - log.Errorf("failed to close flow store: %v", err) - } - } - return nil } @@ -712,22 +709,29 @@ func (e *Engine) handleRelayUpdate(update *mgmProto.RelayConfig) error { return nil } -func (e *Engine) handleFlowUpdate(update *mgmProto.FlowConfig) error { - if update == nil { +func (e *Engine) handleFlowUpdate(config *mgmProto.FlowConfig) error { + if config == nil { return nil } - if update.GetEnabled() && e.flowStore == nil { - e.flowStore = flowstore.New(e.ctx) - return nil - } - - if !update.GetEnabled() && e.flowStore != nil { - err := e.flowStore.Close() - e.flowStore = nil + flowConfig, err := toFlowLoggerConfig(config) + if err != nil { return err } - return nil + return e.flowManager.Update(flowConfig) +} + +func toFlowLoggerConfig(config *mgmProto.FlowConfig) (*types.FlowConfig, error) { + if config.GetInterval() == nil { + return nil, errors.New("flow interval is nil") + } + return &types.FlowConfig{ + Enabled: config.GetEnabled(), + URL: config.GetUrl(), + TokenPayload: config.GetTokenPayload(), + TokenSignature: config.GetTokenSignature(), + Interval: config.GetInterval().AsDuration(), + }, nil } // updateChecksIfNew updates checks if there are changes and sync new meta with management diff --git a/client/internal/flowstore/store.go b/client/internal/flowstore/store.go deleted file mode 100644 index ddb0f7dda..000000000 --- a/client/internal/flowstore/store.go +++ /dev/null @@ -1,117 +0,0 @@ -package flowstore - -import ( - "context" - "io" - "net/netip" - "sync" - "time" - - "github.com/google/uuid" - log "github.com/sirupsen/logrus" -) - -type Type int - -const ( - TypeStart = iota - TypeEnd -) - -type Direction int - -const ( - Ingress = iota - Egress -) - -type Event struct { - ID string - Timestamp time.Time - EventFields -} - -type EventFields struct { - FlowID uuid.UUID - Type Type - Direction Direction - Protocol uint8 - SourceIP netip.Addr - DestIP netip.Addr - SourcePort uint16 - DestPort uint16 - ICMPType uint8 - ICMPCode uint8 -} - -type Store interface { - io.Closer - // stores a flow event - StoreEvent(flowEvent EventFields) - // returns all stored events - GetEvents() []*Event -} - -func New(ctx context.Context) Store { - ctx, cancel := context.WithCancel(ctx) - store := &memory{ - events: make(map[string]*Event), - rcvChan: make(chan *EventFields, 100), - ctx: ctx, - cancel: cancel, - } - go store.startReceiver() - return store -} - -type memory struct { - mux sync.Mutex - events map[string]*Event - rcvChan chan *EventFields - ctx context.Context - cancel context.CancelFunc -} - -func (m *memory) startReceiver() { - for { - select { - case <-m.ctx.Done(): - log.Info("flow memory store receiver stopped") - return - case eventFields := <-m.rcvChan: - id := uuid.NewString() - event := Event{ - ID: id, - EventFields: *eventFields, - Timestamp: time.Now(), - } - - m.mux.Lock() - m.events[id] = &event - m.mux.Unlock() - } - } -} - -func (m *memory) StoreEvent(flowEvent EventFields) { - select { - case m.rcvChan <- &flowEvent: - default: - log.Warn("flow memory store receiver is busy") - } -} - -func (m *memory) Close() error { - m.cancel() - return nil -} - -func (m *memory) GetEvents() []*Event { - m.mux.Lock() - defer m.mux.Unlock() - events := make([]*Event, 0, len(m.events)) - for _, event := range m.events { - events = append(events, event) - } - return events -} diff --git a/client/internal/flowstore/store_test.go b/client/internal/flowstore/store_test.go deleted file mode 100644 index 8584c28d1..000000000 --- a/client/internal/flowstore/store_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package flowstore_test - -import ( - "context" - "testing" - - "github.com/google/uuid" - - "github.com/netbirdio/netbird/client/internal/flowstore" -) - -func TestStore(t *testing.T) { - store := flowstore.New(context.Background()) - t.Cleanup(func() { - store.Close() - }) - - event := flowstore.EventFields{ - FlowID: uuid.New(), - Type: flowstore.TypeStart, - Direction: flowstore.Ingress, - Protocol: 6, - } - - store.StoreEvent(event) - allEvents := store.GetEvents() - for _, e := range allEvents { - if e.EventFields.FlowID != event.FlowID { - t.Errorf("expected event ID %s, got %s", event.FlowID, e.ID) - } - } -} diff --git a/client/internal/netflow/logger/logger.go b/client/internal/netflow/logger/logger.go new file mode 100644 index 000000000..d1afa7667 --- /dev/null +++ b/client/internal/netflow/logger/logger.go @@ -0,0 +1,105 @@ +package logger + +import ( + "context" + "sync/atomic" + "time" + + "github.com/google/uuid" + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/internal/netflow/store" + "github.com/netbirdio/netbird/client/internal/netflow/types" +) + +type rcvChan chan *types.EventFields +type Logger struct { + ctx context.Context + cancel context.CancelFunc + enabled atomic.Bool + rcvChan atomic.Pointer[rcvChan] + stopChan chan struct{} + Store types.Store +} + +func New(ctx context.Context) *Logger { + ctx, cancel := context.WithCancel(ctx) + return &Logger{ + ctx: ctx, + cancel: cancel, + Store: store.NewMemoryStore(), + stopChan: make(chan struct{}), + } +} + +func (l *Logger) StoreEvent(flowEvent types.EventFields) { + if !l.enabled.Load() { + return + } + + c := l.rcvChan.Load() + if c == nil { + return + } + + select { + case *c <- &flowEvent: + default: + // todo: we should collect or log on this + } +} + +func (l *Logger) Enable() { + go l.startReceiver() +} + +func (l *Logger) startReceiver() { + if l.enabled.Load() { + return + } + + c := make(rcvChan, 100) + l.rcvChan.Swap(&c) + l.enabled.Store(true) + + for { + select { + case <-l.ctx.Done(): + log.Info("flow Memory store receiver stopped") + return + case eventFields := <-c: + id := uuid.NewString() + event := types.Event{ + ID: id, + EventFields: *eventFields, + Timestamp: time.Now(), + } + l.Store.StoreEvent(&event) + case <-l.stopChan: + return + } + } +} + +func (l *Logger) Disable() { + l.stop() + l.Store.Close() +} + +func (l *Logger) stop() { + if !l.enabled.Load() { + return + } + + l.enabled.Store(false) + l.stopChan <- struct{}{} +} + +func (l *Logger) GetEvents() []*types.Event { + return l.Store.GetEvents() +} + +func (l *Logger) Close() { + l.stop() + l.cancel() +} diff --git a/client/internal/netflow/logger/logger_test.go b/client/internal/netflow/logger/logger_test.go new file mode 100644 index 000000000..cb0913639 --- /dev/null +++ b/client/internal/netflow/logger/logger_test.go @@ -0,0 +1,38 @@ +package logger_test + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + + "github.com/netbirdio/netbird/client/internal/netflow/logger" + "github.com/netbirdio/netbird/client/internal/netflow/types" +) + +func TestStore(t *testing.T) { + logger := logger.New(context.Background()) + logger.Enable() + + event := types.EventFields{ + FlowID: uuid.New(), + Type: types.TypeStart, + Direction: types.Ingress, + Protocol: 6, + } + time.Sleep(time.Millisecond) + logger.StoreEvent(event) + time.Sleep(time.Millisecond) + + allEvents := logger.GetEvents() + matched := false + for _, e := range allEvents { + if e.EventFields.FlowID == event.FlowID { + matched = true + } + } + if !matched { + t.Errorf("didn't match any event") + } +} diff --git a/client/internal/netflow/manager.go b/client/internal/netflow/manager.go new file mode 100644 index 000000000..b3f2594eb --- /dev/null +++ b/client/internal/netflow/manager.go @@ -0,0 +1,48 @@ +package netflow + +import ( + "context" + "sync" + + "github.com/netbirdio/netbird/client/internal/netflow/logger" + "github.com/netbirdio/netbird/client/internal/netflow/types" +) + +type Manager struct { + mux sync.Mutex + logger types.FlowLogger + flowConfig *types.FlowConfig +} + +func NewManager(ctx context.Context) *Manager { + return &Manager{ + logger: logger.New(ctx), + } +} + +func (m *Manager) Update(update *types.FlowConfig) error { + m.mux.Lock() + defer m.mux.Unlock() + if update == nil { + return nil + } + + m.flowConfig = update + + if update.Enabled { + m.logger.Enable() + return nil + } + + m.logger.Disable() + + return nil +} + +func (m *Manager) Close() { + m.logger.Close() +} + +func (m *Manager) GetLogger() types.FlowLogger { + return m.logger +} diff --git a/client/internal/netflow/store/memory.go b/client/internal/netflow/store/memory.go new file mode 100644 index 000000000..b0dcbd6f8 --- /dev/null +++ b/client/internal/netflow/store/memory.go @@ -0,0 +1,48 @@ +package store + +import ( + "sync" + + "github.com/netbirdio/netbird/client/internal/netflow/types" +) + +func NewMemoryStore() *Memory { + return &Memory{ + events: make(map[string]*types.Event), + } +} + +type Memory struct { + mux sync.Mutex + events map[string]*types.Event +} + +func (m *Memory) StoreEvent(event *types.Event) { + m.mux.Lock() + defer m.mux.Unlock() + m.events[event.ID] = event +} + +func (m *Memory) Close() { + m.mux.Lock() + defer m.mux.Unlock() + m.events = make(map[string]*types.Event) +} + +func (m *Memory) GetEvents() []*types.Event { + m.mux.Lock() + defer m.mux.Unlock() + events := make([]*types.Event, 0, len(m.events)) + for _, event := range m.events { + events = append(events, event) + } + return events +} + +func (m *Memory) DeleteEvents(ids []string) { + m.mux.Lock() + defer m.mux.Unlock() + for _, id := range ids { + delete(m.events, id) + } +} diff --git a/client/internal/netflow/types/types.go b/client/internal/netflow/types/types.go new file mode 100644 index 000000000..41c2bc0f1 --- /dev/null +++ b/client/internal/netflow/types/types.go @@ -0,0 +1,82 @@ +package types + +import ( + "net/netip" + "time" + + "github.com/google/uuid" +) + +type Type int + +const ( + TypeStart = iota + TypeEnd +) + +type Direction int + +const ( + Ingress = iota + Egress +) + +type Event struct { + ID string + Timestamp time.Time + EventFields +} + +type EventFields struct { + FlowID uuid.UUID + Type Type + Direction Direction + Protocol uint8 + SourceIP netip.Addr + DestIP netip.Addr + SourcePort uint16 + DestPort uint16 + ICMPType uint8 + ICMPCode uint8 +} + +type FlowConfig struct { + URL string + Interval time.Duration + Enabled bool + TokenPayload string + TokenSignature string +} + +type FlowManager interface { + // FlowConfig handles network map updates + Update(update *FlowConfig) error + // Close closes the manager + Close() + // GetLogger returns a flow logger + GetLogger() FlowLogger +} + +type FlowLogger interface { + // StoreEvent stores a flow event + StoreEvent(flowEvent EventFields) + // GetEvents returns all stored events + GetEvents() []*Event + // Close closes the logger + Close() + // Enable enables the flow logger receiver + Enable() + // Disable disables the flow logger receiver + Disable() +} + +type Store interface { + // StoreEvent stores a flow event + StoreEvent(event *Event) + // GetEvents returns all stored events + GetEvents() []*Event + // DeleteEvents deletes events from the store + DeleteEvents([]string) + // Close closes the store + Close() +}