diff --git a/client/internal/flowstore/store.go b/client/internal/flowstore/store.go index 27ff81718..ddb0f7dda 100644 --- a/client/internal/flowstore/store.go +++ b/client/internal/flowstore/store.go @@ -3,20 +3,51 @@ package flowstore import ( "context" "io" + "net/netip" "sync" + "time" + "github.com/google/uuid" log "github.com/sirupsen/logrus" ) +type Type int + +const ( + TypeStart = iota + TypeEnd +) + +type Direction int + +const ( + Ingress = iota + Egress +) + type Event struct { - ID string - FlowID string + ID string + Timestamp time.Time + EventFields +} + +type EventFields struct { + FlowID uuid.UUID + Type Type + Direction Direction + Protocol uint8 + SourceIP netip.Addr + DestIP netip.Addr + SourcePort uint16 + DestPort uint16 + ICMPType uint8 + ICMPCode uint8 } type Store interface { io.Closer // stores a flow event - StoreEvent(flowEvent Event) + StoreEvent(flowEvent EventFields) // returns all stored events GetEvents() []*Event } @@ -25,7 +56,7 @@ func New(ctx context.Context) Store { ctx, cancel := context.WithCancel(ctx) store := &memory{ events: make(map[string]*Event), - rcvChan: make(chan *Event, 100), + rcvChan: make(chan *EventFields, 100), ctx: ctx, cancel: cancel, } @@ -36,7 +67,7 @@ func New(ctx context.Context) Store { type memory struct { mux sync.Mutex events map[string]*Event - rcvChan chan *Event + rcvChan chan *EventFields ctx context.Context cancel context.CancelFunc } @@ -47,15 +78,22 @@ func (m *memory) startReceiver() { case <-m.ctx.Done(): log.Info("flow memory store receiver stopped") return - case event := <-m.rcvChan: + case eventFields := <-m.rcvChan: + id := uuid.NewString() + event := Event{ + ID: id, + EventFields: *eventFields, + Timestamp: time.Now(), + } + m.mux.Lock() - m.events[event.ID] = event + m.events[id] = &event m.mux.Unlock() } } } -func (m *memory) StoreEvent(flowEvent Event) { +func (m *memory) StoreEvent(flowEvent EventFields) { select { case m.rcvChan <- &flowEvent: default: diff --git a/client/internal/flowstore/store_test.go b/client/internal/flowstore/store_test.go index c0f2e1216..8584c28d1 100644 --- a/client/internal/flowstore/store_test.go +++ b/client/internal/flowstore/store_test.go @@ -4,6 +4,8 @@ import ( "context" "testing" + "github.com/google/uuid" + "github.com/netbirdio/netbird/client/internal/flowstore" ) @@ -13,16 +15,18 @@ func TestStore(t *testing.T) { store.Close() }) - event := flowstore.Event{ - ID: "1", - FlowID: "1", + event := flowstore.EventFields{ + FlowID: uuid.New(), + Type: flowstore.TypeStart, + Direction: flowstore.Ingress, + Protocol: 6, } store.StoreEvent(event) allEvents := store.GetEvents() for _, e := range allEvents { - if e.ID != event.ID { - t.Errorf("expected event ID %s, got %s", event.ID, e.ID) + if e.EventFields.FlowID != event.FlowID { + t.Errorf("expected event ID %s, got %s", event.FlowID, e.ID) } } }