Use bytes for flows event id (#3439)

This commit is contained in:
Viktor Liu 2025-03-07 16:12:47 +01:00 committed by GitHub
parent cb16d0f45f
commit 86370a0e7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 23 additions and 20 deletions

View File

@ -74,7 +74,7 @@ func (l *Logger) startReceiver() {
log.Info("flow Memory store receiver stopped") log.Info("flow Memory store receiver stopped")
return return
case eventFields := <-c: case eventFields := <-c:
id := uuid.NewString() id := uuid.New()
event := types.Event{ event := types.Event{
ID: id, ID: id,
EventFields: *eventFields, EventFields: *eventFields,
@ -109,7 +109,7 @@ func (l *Logger) GetEvents() []*types.Event {
return l.Store.GetEvents() return l.Store.GetEvents()
} }
func (l *Logger) DeleteEvents(ids []string) { func (l *Logger) DeleteEvents(ids []uuid.UUID) {
l.Store.DeleteEvents(ids) l.Store.DeleteEvents(ids)
} }

View File

@ -8,6 +8,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
@ -169,7 +170,7 @@ func (m *Manager) startSender() {
func (m *Manager) receiveACKs(client *client.GRPCClient) { func (m *Manager) receiveACKs(client *client.GRPCClient) {
err := client.Receive(m.ctx, m.flowConfig.Interval, func(ack *proto.FlowEventAck) error { err := client.Receive(m.ctx, m.flowConfig.Interval, func(ack *proto.FlowEventAck) error {
log.Tracef("received flow event ack: %s", ack.EventId) log.Tracef("received flow event ack: %s", ack.EventId)
m.logger.DeleteEvents([]string{ack.EventId}) m.logger.DeleteEvents([]uuid.UUID{uuid.UUID(ack.EventId)})
return nil return nil
}) })
@ -192,7 +193,7 @@ func (m *Manager) send(event *nftypes.Event) error {
func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent { func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
protoEvent := &proto.FlowEvent{ protoEvent := &proto.FlowEvent{
EventId: event.ID, EventId: event.ID[:],
Timestamp: timestamppb.New(event.Timestamp), Timestamp: timestamppb.New(event.Timestamp),
PublicKey: publicKey, PublicKey: publicKey,
FlowFields: &proto.FlowFields{ FlowFields: &proto.FlowFields{

View File

@ -5,18 +5,20 @@ import (
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"github.com/google/uuid"
"github.com/netbirdio/netbird/client/internal/netflow/types" "github.com/netbirdio/netbird/client/internal/netflow/types"
) )
func NewMemoryStore() *Memory { func NewMemoryStore() *Memory {
return &Memory{ return &Memory{
events: make(map[string]*types.Event), events: make(map[uuid.UUID]*types.Event),
} }
} }
type Memory struct { type Memory struct {
mux sync.Mutex mux sync.Mutex
events map[string]*types.Event events map[uuid.UUID]*types.Event
} }
func (m *Memory) StoreEvent(event *types.Event) { func (m *Memory) StoreEvent(event *types.Event) {
@ -41,7 +43,7 @@ func (m *Memory) GetEvents() []*types.Event {
return events return events
} }
func (m *Memory) DeleteEvents(ids []string) { func (m *Memory) DeleteEvents(ids []uuid.UUID) {
m.mux.Lock() m.mux.Lock()
defer m.mux.Unlock() defer m.mux.Unlock()
for _, id := range ids { for _, id := range ids {

View File

@ -64,7 +64,7 @@ const (
) )
type Event struct { type Event struct {
ID string ID uuid.UUID
Timestamp time.Time Timestamp time.Time
EventFields EventFields
} }
@ -111,7 +111,7 @@ type FlowLogger interface {
// GetEvents returns all stored events // GetEvents returns all stored events
GetEvents() []*Event GetEvents() []*Event
// DeleteEvents deletes events from the store // DeleteEvents deletes events from the store
DeleteEvents([]string) DeleteEvents([]uuid.UUID)
// Close closes the logger // Close closes the logger
Close() Close()
// Enable enables the flow logger receiver // Enable enables the flow logger receiver
@ -126,7 +126,7 @@ type Store interface {
// GetEvents returns all stored events // GetEvents returns all stored events
GetEvents() []*Event GetEvents() []*Event
// DeleteEvents deletes events from the store // DeleteEvents deletes events from the store
DeleteEvents([]string) DeleteEvents([]uuid.UUID)
// Close closes the store // Close closes the store
Close() Close()
} }

View File

@ -130,7 +130,7 @@ type FlowEvent struct {
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
// Unique client event identifier // Unique client event identifier
EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` EventId []byte `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
// When the event occurred // When the event occurred
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// Public key of the sending peer // Public key of the sending peer
@ -170,11 +170,11 @@ func (*FlowEvent) Descriptor() ([]byte, []int) {
return file_flow_proto_rawDescGZIP(), []int{0} return file_flow_proto_rawDescGZIP(), []int{0}
} }
func (x *FlowEvent) GetEventId() string { func (x *FlowEvent) GetEventId() []byte {
if x != nil { if x != nil {
return x.EventId return x.EventId
} }
return "" return nil
} }
func (x *FlowEvent) GetTimestamp() *timestamppb.Timestamp { func (x *FlowEvent) GetTimestamp() *timestamppb.Timestamp {
@ -204,7 +204,7 @@ type FlowEventAck struct {
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
// Unique client event identifier that has been ack'ed // Unique client event identifier that has been ack'ed
EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` EventId []byte `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
} }
func (x *FlowEventAck) Reset() { func (x *FlowEventAck) Reset() {
@ -239,11 +239,11 @@ func (*FlowEventAck) Descriptor() ([]byte, []int) {
return file_flow_proto_rawDescGZIP(), []int{1} return file_flow_proto_rawDescGZIP(), []int{1}
} }
func (x *FlowEventAck) GetEventId() string { func (x *FlowEventAck) GetEventId() []byte {
if x != nil { if x != nil {
return x.EventId return x.EventId
} }
return "" return nil
} }
type FlowFields struct { type FlowFields struct {
@ -548,7 +548,7 @@ var file_flow_proto_rawDesc = []byte{
0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x22, 0xb2, 0x01, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x6f, 0x74, 0x6f, 0x22, 0xb2, 0x01, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
0x74, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09,
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d,
@ -559,7 +559,7 @@ var file_flow_proto_rawDesc = []byte{
0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x52, 0x0a, 0x66, 0x6c, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x52, 0x0a, 0x66, 0x6c,
0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x29, 0x0a, 0x0c, 0x46, 0x6c, 0x6f, 0x77, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x29, 0x0a, 0x0c, 0x46, 0x6c, 0x6f, 0x77,
0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e,
0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e,
0x74, 0x49, 0x64, 0x22, 0xc4, 0x03, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x74, 0x49, 0x64, 0x22, 0xc4, 0x03, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c,
0x64, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x64, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x74, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x74,

View File

@ -13,7 +13,7 @@ service FlowService {
message FlowEvent { message FlowEvent {
// Unique client event identifier // Unique client event identifier
string event_id = 1; bytes event_id = 1;
// When the event occurred // When the event occurred
google.protobuf.Timestamp timestamp = 2; google.protobuf.Timestamp timestamp = 2;
@ -26,7 +26,7 @@ message FlowEvent {
message FlowEventAck { message FlowEventAck {
// Unique client event identifier that has been ack'ed // Unique client event identifier that has been ack'ed
string event_id = 1; bytes event_id = 1;
} }
message FlowFields { message FlowFields {