package sync import ( "context" "sync" "sync/atomic" "testing" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fstest/mockobject" "github.com/stretchr/testify/assert" ) func TestPipe(t *testing.T) { var queueLength int var queueSize int64 stats := func(n int, size int64) { queueLength, queueSize = n, size } // Make a new pipe p := newPipe(stats, 10) checkStats := func(expectedN int, expectedSize int64) { n, size := p.Stats() assert.Equal(t, expectedN, n) assert.Equal(t, expectedSize, size) assert.Equal(t, expectedN, queueLength) assert.Equal(t, expectedSize, queueSize) } checkStats(0, 0) ctx := context.Background() obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) pair1 := fs.ObjectPair{Src: obj1, Dst: nil} // Put an object ok := p.Put(ctx, pair1) assert.Equal(t, true, ok) checkStats(1, 5) // Close the pipe showing reading on closed pipe is OK p.Close() // Read from pipe pair2, ok := p.Get(ctx) assert.Equal(t, pair1, pair2) assert.Equal(t, true, ok) checkStats(0, 0) // Check read on closed pipe pair2, ok = p.Get(ctx) assert.Equal(t, fs.ObjectPair{}, pair2) assert.Equal(t, false, ok) // Check panic on write to closed pipe assert.Panics(t, func() { p.Put(ctx, pair1) }) // Make a new pipe p = newPipe(stats, 10) ctx2, cancel := context.WithCancel(ctx) // cancel it in the background - check read ceases go cancel() pair2, ok = p.Get(ctx2) assert.Equal(t, fs.ObjectPair{}, pair2) assert.Equal(t, false, ok) // check we can't write ok = p.Put(ctx2, pair1) assert.Equal(t, false, ok) } // TestPipeConcurrent runs concurrent Get and Put to flush out any // race conditions and concurrency problems. func TestPipeConcurrent(t *testing.T) { const ( N = 1000 readWriters = 10 ) stats := func(n int, size int64) {} // Make a new pipe p := newPipe(stats, 10) var wg sync.WaitGroup obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) pair1 := fs.ObjectPair{Src: obj1, Dst: nil} ctx := context.Background() var count int64 for j := 0; j < readWriters; j++ { wg.Add(2) go func() { defer wg.Done() for i := 0; i < N; i++ { // Read from pipe pair2, ok := p.Get(ctx) assert.Equal(t, pair1, pair2) assert.Equal(t, true, ok) atomic.AddInt64(&count, -1) } }() go func() { defer wg.Done() for i := 0; i < N; i++ { // Put an object ok := p.Put(ctx, pair1) assert.Equal(t, true, ok) atomic.AddInt64(&count, 1) } }() } wg.Wait() assert.Equal(t, int64(0), count) }