fix logger stop (#3403)

* fix logger stop

* use context to stop receiver

* update test
This commit is contained in:
Maycon Santos 2025-02-28 00:28:17 +00:00 committed by GitHub
parent 6838f53f40
commit f48cfd52e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 53 additions and 16 deletions

View File

@ -2,6 +2,7 @@ package logger
import ( import (
"context" "context"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -14,21 +15,21 @@ import (
type rcvChan chan *types.EventFields type rcvChan chan *types.EventFields
type Logger struct { type Logger struct {
ctx context.Context mux sync.Mutex
cancel context.CancelFunc ctx context.Context
enabled atomic.Bool cancel context.CancelFunc
rcvChan atomic.Pointer[rcvChan] enabled atomic.Bool
stopChan chan struct{} rcvChan atomic.Pointer[rcvChan]
Store types.Store cancelReceiver context.CancelFunc
Store types.Store
} }
func New(ctx context.Context) *Logger { func New(ctx context.Context) *Logger {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
return &Logger{ return &Logger{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
Store: store.NewMemoryStore(), Store: store.NewMemoryStore(),
stopChan: make(chan struct{}),
} }
} }
@ -57,6 +58,10 @@ func (l *Logger) startReceiver() {
if l.enabled.Load() { if l.enabled.Load() {
return return
} }
l.mux.Lock()
ctx, cancel := context.WithCancel(l.ctx)
l.cancelReceiver = cancel
l.mux.Unlock()
c := make(rcvChan, 100) c := make(rcvChan, 100)
l.rcvChan.Swap(&c) l.rcvChan.Swap(&c)
@ -64,7 +69,7 @@ func (l *Logger) startReceiver() {
for { for {
select { select {
case <-l.ctx.Done(): case <-ctx.Done():
log.Info("flow Memory store receiver stopped") log.Info("flow Memory store receiver stopped")
return return
case eventFields := <-c: case eventFields := <-c:
@ -75,8 +80,6 @@ func (l *Logger) startReceiver() {
Timestamp: time.Now(), Timestamp: time.Now(),
} }
l.Store.StoreEvent(&event) l.Store.StoreEvent(&event)
case <-l.stopChan:
return
} }
} }
} }
@ -92,7 +95,12 @@ func (l *Logger) stop() {
} }
l.enabled.Store(false) l.enabled.Store(false)
l.stopChan <- struct{}{} l.mux.Lock()
if l.cancelReceiver != nil {
l.cancelReceiver()
l.cancelReceiver = nil
}
l.mux.Unlock()
} }
func (l *Logger) GetEvents() []*types.Event { func (l *Logger) GetEvents() []*types.Event {

View File

@ -21,9 +21,11 @@ func TestStore(t *testing.T) {
Direction: types.Ingress, Direction: types.Ingress,
Protocol: 6, Protocol: 6,
} }
time.Sleep(time.Millisecond)
wait := func() { time.Sleep(time.Millisecond) }
wait()
logger.StoreEvent(event) logger.StoreEvent(event)
time.Sleep(time.Millisecond) wait()
allEvents := logger.GetEvents() allEvents := logger.GetEvents()
matched := false matched := false
@ -35,4 +37,31 @@ func TestStore(t *testing.T) {
if !matched { if !matched {
t.Errorf("didn't match any event") t.Errorf("didn't match any event")
} }
// test disable
logger.Disable()
wait()
logger.StoreEvent(event)
wait()
allEvents = logger.GetEvents()
if len(allEvents) != 0 {
t.Errorf("expected 0 events, got %d", len(allEvents))
}
// test re-enable
logger.Enable()
wait()
logger.StoreEvent(event)
wait()
allEvents = logger.GetEvents()
matched = false
for _, e := range allEvents {
if e.EventFields.FlowID == event.FlowID {
matched = true
}
}
if !matched {
t.Errorf("didn't match any event")
}
} }