Add event fields (#3390)

Co-authored-by: Maycon Santos <mlsmaycon@gmail.com>
This commit is contained in:
Viktor Liu 2025-02-26 12:06:06 +01:00 committed by GitHub
parent 6a775217cf
commit e943203ae2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 55 additions and 13 deletions

View File

@ -3,20 +3,51 @@ package flowstore
import ( import (
"context" "context"
"io" "io"
"net/netip"
"sync" "sync"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type Type int
const (
TypeStart = iota
TypeEnd
)
type Direction int
const (
Ingress = iota
Egress
)
type Event struct { type Event struct {
ID string ID string
FlowID string Timestamp time.Time
EventFields
}
type EventFields struct {
FlowID uuid.UUID
Type Type
Direction Direction
Protocol uint8
SourceIP netip.Addr
DestIP netip.Addr
SourcePort uint16
DestPort uint16
ICMPType uint8
ICMPCode uint8
} }
type Store interface { type Store interface {
io.Closer io.Closer
// stores a flow event // stores a flow event
StoreEvent(flowEvent Event) StoreEvent(flowEvent EventFields)
// returns all stored events // returns all stored events
GetEvents() []*Event GetEvents() []*Event
} }
@ -25,7 +56,7 @@ func New(ctx context.Context) Store {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
store := &memory{ store := &memory{
events: make(map[string]*Event), events: make(map[string]*Event),
rcvChan: make(chan *Event, 100), rcvChan: make(chan *EventFields, 100),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
} }
@ -36,7 +67,7 @@ func New(ctx context.Context) Store {
type memory struct { type memory struct {
mux sync.Mutex mux sync.Mutex
events map[string]*Event events map[string]*Event
rcvChan chan *Event rcvChan chan *EventFields
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
} }
@ -47,15 +78,22 @@ func (m *memory) startReceiver() {
case <-m.ctx.Done(): case <-m.ctx.Done():
log.Info("flow memory store receiver stopped") log.Info("flow memory store receiver stopped")
return return
case event := <-m.rcvChan: case eventFields := <-m.rcvChan:
id := uuid.NewString()
event := Event{
ID: id,
EventFields: *eventFields,
Timestamp: time.Now(),
}
m.mux.Lock() m.mux.Lock()
m.events[event.ID] = event m.events[id] = &event
m.mux.Unlock() m.mux.Unlock()
} }
} }
} }
func (m *memory) StoreEvent(flowEvent Event) { func (m *memory) StoreEvent(flowEvent EventFields) {
select { select {
case m.rcvChan <- &flowEvent: case m.rcvChan <- &flowEvent:
default: default:

View File

@ -4,6 +4,8 @@ import (
"context" "context"
"testing" "testing"
"github.com/google/uuid"
"github.com/netbirdio/netbird/client/internal/flowstore" "github.com/netbirdio/netbird/client/internal/flowstore"
) )
@ -13,16 +15,18 @@ func TestStore(t *testing.T) {
store.Close() store.Close()
}) })
event := flowstore.Event{ event := flowstore.EventFields{
ID: "1", FlowID: uuid.New(),
FlowID: "1", Type: flowstore.TypeStart,
Direction: flowstore.Ingress,
Protocol: 6,
} }
store.StoreEvent(event) store.StoreEvent(event)
allEvents := store.GetEvents() allEvents := store.GetEvents()
for _, e := range allEvents { for _, e := range allEvents {
if e.ID != event.ID { if e.EventFields.FlowID != event.FlowID {
t.Errorf("expected event ID %s, got %s", event.ID, e.ID) t.Errorf("expected event ID %s, got %s", event.FlowID, e.ID)
} }
} }
} }