Add netflow manager (#3398)

* Add netflow manager

* fix linter issues
This commit is contained in:
Maycon Santos 2025-02-27 12:05:20 +00:00 committed by GitHub
parent 994b923d56
commit 8276236dfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 345 additions and 169 deletions

View File

@ -33,8 +33,9 @@ import (
"github.com/netbirdio/netbird/client/internal/acl"
"github.com/netbirdio/netbird/client/internal/dns"
"github.com/netbirdio/netbird/client/internal/dnsfwd"
"github.com/netbirdio/netbird/client/internal/flowstore"
"github.com/netbirdio/netbird/client/internal/ingressgw"
"github.com/netbirdio/netbird/client/internal/netflow"
"github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/client/internal/networkmonitor"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/peer/guard"
@ -190,7 +191,7 @@ type Engine struct {
persistNetworkMap bool
latestNetworkMap *mgmProto.NetworkMap
connSemaphore *semaphoregroup.SemaphoreGroup
flowStore flowstore.Store
flowManager types.FlowManager
}
// Peer is an instance of the Connection Peer
@ -233,6 +234,7 @@ func NewEngine(
statusRecorder: statusRecorder,
checks: checks,
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
flowManager: netflow.NewManager(clientCtx),
}
if runtime.GOOS == "ios" {
if !fileExists(mobileDep.StateFilePath) {
@ -301,6 +303,8 @@ func (e *Engine) Stop() error {
return fmt.Errorf("failed to remove all peers: %s", err)
}
e.flowManager.Close()
if e.cancel != nil {
e.cancel()
}
@ -322,13 +326,6 @@ func (e *Engine) Stop() error {
log.Errorf("failed to persist state: %v", err)
}
if e.flowStore != nil {
if err := e.flowStore.Close(); err != nil {
e.flowStore = nil
log.Errorf("failed to close flow store: %v", err)
}
}
return nil
}
@ -712,22 +709,29 @@ func (e *Engine) handleRelayUpdate(update *mgmProto.RelayConfig) error {
return nil
}
func (e *Engine) handleFlowUpdate(update *mgmProto.FlowConfig) error {
if update == nil {
func (e *Engine) handleFlowUpdate(config *mgmProto.FlowConfig) error {
if config == nil {
return nil
}
if update.GetEnabled() && e.flowStore == nil {
e.flowStore = flowstore.New(e.ctx)
return nil
}
if !update.GetEnabled() && e.flowStore != nil {
err := e.flowStore.Close()
e.flowStore = nil
flowConfig, err := toFlowLoggerConfig(config)
if err != nil {
return err
}
return nil
return e.flowManager.Update(flowConfig)
}
func toFlowLoggerConfig(config *mgmProto.FlowConfig) (*types.FlowConfig, error) {
if config.GetInterval() == nil {
return nil, errors.New("flow interval is nil")
}
return &types.FlowConfig{
Enabled: config.GetEnabled(),
URL: config.GetUrl(),
TokenPayload: config.GetTokenPayload(),
TokenSignature: config.GetTokenSignature(),
Interval: config.GetInterval().AsDuration(),
}, nil
}
// updateChecksIfNew updates checks if there are changes and sync new meta with management

View File

@ -1,117 +0,0 @@
package flowstore
import (
"context"
"io"
"net/netip"
"sync"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)
type Type int
const (
TypeStart = iota
TypeEnd
)
type Direction int
const (
Ingress = iota
Egress
)
type Event struct {
ID string
Timestamp time.Time
EventFields
}
type EventFields struct {
FlowID uuid.UUID
Type Type
Direction Direction
Protocol uint8
SourceIP netip.Addr
DestIP netip.Addr
SourcePort uint16
DestPort uint16
ICMPType uint8
ICMPCode uint8
}
type Store interface {
io.Closer
// stores a flow event
StoreEvent(flowEvent EventFields)
// returns all stored events
GetEvents() []*Event
}
func New(ctx context.Context) Store {
ctx, cancel := context.WithCancel(ctx)
store := &memory{
events: make(map[string]*Event),
rcvChan: make(chan *EventFields, 100),
ctx: ctx,
cancel: cancel,
}
go store.startReceiver()
return store
}
type memory struct {
mux sync.Mutex
events map[string]*Event
rcvChan chan *EventFields
ctx context.Context
cancel context.CancelFunc
}
func (m *memory) startReceiver() {
for {
select {
case <-m.ctx.Done():
log.Info("flow memory store receiver stopped")
return
case eventFields := <-m.rcvChan:
id := uuid.NewString()
event := Event{
ID: id,
EventFields: *eventFields,
Timestamp: time.Now(),
}
m.mux.Lock()
m.events[id] = &event
m.mux.Unlock()
}
}
}
func (m *memory) StoreEvent(flowEvent EventFields) {
select {
case m.rcvChan <- &flowEvent:
default:
log.Warn("flow memory store receiver is busy")
}
}
func (m *memory) Close() error {
m.cancel()
return nil
}
func (m *memory) GetEvents() []*Event {
m.mux.Lock()
defer m.mux.Unlock()
events := make([]*Event, 0, len(m.events))
for _, event := range m.events {
events = append(events, event)
}
return events
}

View File

@ -1,32 +0,0 @@
package flowstore_test
import (
"context"
"testing"
"github.com/google/uuid"
"github.com/netbirdio/netbird/client/internal/flowstore"
)
func TestStore(t *testing.T) {
store := flowstore.New(context.Background())
t.Cleanup(func() {
store.Close()
})
event := flowstore.EventFields{
FlowID: uuid.New(),
Type: flowstore.TypeStart,
Direction: flowstore.Ingress,
Protocol: 6,
}
store.StoreEvent(event)
allEvents := store.GetEvents()
for _, e := range allEvents {
if e.EventFields.FlowID != event.FlowID {
t.Errorf("expected event ID %s, got %s", event.FlowID, e.ID)
}
}
}

View File

@ -0,0 +1,105 @@
package logger
import (
"context"
"sync/atomic"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/netflow/store"
"github.com/netbirdio/netbird/client/internal/netflow/types"
)
type rcvChan chan *types.EventFields
type Logger struct {
ctx context.Context
cancel context.CancelFunc
enabled atomic.Bool
rcvChan atomic.Pointer[rcvChan]
stopChan chan struct{}
Store types.Store
}
func New(ctx context.Context) *Logger {
ctx, cancel := context.WithCancel(ctx)
return &Logger{
ctx: ctx,
cancel: cancel,
Store: store.NewMemoryStore(),
stopChan: make(chan struct{}),
}
}
func (l *Logger) StoreEvent(flowEvent types.EventFields) {
if !l.enabled.Load() {
return
}
c := l.rcvChan.Load()
if c == nil {
return
}
select {
case *c <- &flowEvent:
default:
// todo: we should collect or log on this
}
}
func (l *Logger) Enable() {
go l.startReceiver()
}
func (l *Logger) startReceiver() {
if l.enabled.Load() {
return
}
c := make(rcvChan, 100)
l.rcvChan.Swap(&c)
l.enabled.Store(true)
for {
select {
case <-l.ctx.Done():
log.Info("flow Memory store receiver stopped")
return
case eventFields := <-c:
id := uuid.NewString()
event := types.Event{
ID: id,
EventFields: *eventFields,
Timestamp: time.Now(),
}
l.Store.StoreEvent(&event)
case <-l.stopChan:
return
}
}
}
func (l *Logger) Disable() {
l.stop()
l.Store.Close()
}
func (l *Logger) stop() {
if !l.enabled.Load() {
return
}
l.enabled.Store(false)
l.stopChan <- struct{}{}
}
func (l *Logger) GetEvents() []*types.Event {
return l.Store.GetEvents()
}
func (l *Logger) Close() {
l.stop()
l.cancel()
}

View File

@ -0,0 +1,38 @@
package logger_test
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/netbirdio/netbird/client/internal/netflow/logger"
"github.com/netbirdio/netbird/client/internal/netflow/types"
)
func TestStore(t *testing.T) {
logger := logger.New(context.Background())
logger.Enable()
event := types.EventFields{
FlowID: uuid.New(),
Type: types.TypeStart,
Direction: types.Ingress,
Protocol: 6,
}
time.Sleep(time.Millisecond)
logger.StoreEvent(event)
time.Sleep(time.Millisecond)
allEvents := logger.GetEvents()
matched := false
for _, e := range allEvents {
if e.EventFields.FlowID == event.FlowID {
matched = true
}
}
if !matched {
t.Errorf("didn't match any event")
}
}

View File

@ -0,0 +1,48 @@
package netflow
import (
"context"
"sync"
"github.com/netbirdio/netbird/client/internal/netflow/logger"
"github.com/netbirdio/netbird/client/internal/netflow/types"
)
type Manager struct {
mux sync.Mutex
logger types.FlowLogger
flowConfig *types.FlowConfig
}
func NewManager(ctx context.Context) *Manager {
return &Manager{
logger: logger.New(ctx),
}
}
func (m *Manager) Update(update *types.FlowConfig) error {
m.mux.Lock()
defer m.mux.Unlock()
if update == nil {
return nil
}
m.flowConfig = update
if update.Enabled {
m.logger.Enable()
return nil
}
m.logger.Disable()
return nil
}
func (m *Manager) Close() {
m.logger.Close()
}
func (m *Manager) GetLogger() types.FlowLogger {
return m.logger
}

View File

@ -0,0 +1,48 @@
package store
import (
"sync"
"github.com/netbirdio/netbird/client/internal/netflow/types"
)
func NewMemoryStore() *Memory {
return &Memory{
events: make(map[string]*types.Event),
}
}
type Memory struct {
mux sync.Mutex
events map[string]*types.Event
}
func (m *Memory) StoreEvent(event *types.Event) {
m.mux.Lock()
defer m.mux.Unlock()
m.events[event.ID] = event
}
func (m *Memory) Close() {
m.mux.Lock()
defer m.mux.Unlock()
m.events = make(map[string]*types.Event)
}
func (m *Memory) GetEvents() []*types.Event {
m.mux.Lock()
defer m.mux.Unlock()
events := make([]*types.Event, 0, len(m.events))
for _, event := range m.events {
events = append(events, event)
}
return events
}
func (m *Memory) DeleteEvents(ids []string) {
m.mux.Lock()
defer m.mux.Unlock()
for _, id := range ids {
delete(m.events, id)
}
}

View File

@ -0,0 +1,82 @@
package types
import (
"net/netip"
"time"
"github.com/google/uuid"
)
type Type int
const (
TypeStart = iota
TypeEnd
)
type Direction int
const (
Ingress = iota
Egress
)
type Event struct {
ID string
Timestamp time.Time
EventFields
}
type EventFields struct {
FlowID uuid.UUID
Type Type
Direction Direction
Protocol uint8
SourceIP netip.Addr
DestIP netip.Addr
SourcePort uint16
DestPort uint16
ICMPType uint8
ICMPCode uint8
}
type FlowConfig struct {
URL string
Interval time.Duration
Enabled bool
TokenPayload string
TokenSignature string
}
type FlowManager interface {
// FlowConfig handles network map updates
Update(update *FlowConfig) error
// Close closes the manager
Close()
// GetLogger returns a flow logger
GetLogger() FlowLogger
}
type FlowLogger interface {
// StoreEvent stores a flow event
StoreEvent(flowEvent EventFields)
// GetEvents returns all stored events
GetEvents() []*Event
// Close closes the logger
Close()
// Enable enables the flow logger receiver
Enable()
// Disable disables the flow logger receiver
Disable()
}
type Store interface {
// StoreEvent stores a flow event
StoreEvent(event *Event)
// GetEvents returns all stored events
GetEvents() []*Event
// DeleteEvents deletes events from the store
DeleteEvents([]string)
// Close closes the store
Close()
}