From d08b49d723b714f8d39fc2694e5bb0da23791c7e Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 13 Mar 2024 16:32:45 +0000 Subject: [PATCH] pool: Add ability to wait for a write to RW --- lib/pool/reader_writer.go | 45 ++++++++++++++++++++++++++++------ lib/pool/reader_writer_test.go | 8 +++--- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/lib/pool/reader_writer.go b/lib/pool/reader_writer.go index c6cf2caea..f7c263bd1 100644 --- a/lib/pool/reader_writer.go +++ b/lib/pool/reader_writer.go @@ -1,9 +1,11 @@ package pool import ( + "context" "errors" "io" "sync" + "time" ) // RWAccount is a function which will be called after every read @@ -24,10 +26,11 @@ type RW struct { // Shared variables between Read and Write // Write updates these but Read reads from them // They must all stay in sync together - mu sync.Mutex // protect the shared variables - pages [][]byte // backing store - size int // size written - lastOffset int // size in last page + mu sync.Mutex // protect the shared variables + pages [][]byte // backing store + size int // size written + lastOffset int // size in last page + written chan struct{} // signalled when a write happens // Read side Variables out int // offset we are reading from @@ -48,10 +51,12 @@ var ( // // When writing it only appends data. Seek only applies to reading. func NewRW(pool *Pool) *RW { - return &RW{ - pool: pool, - pages: make([][]byte, 0, 16), + rw := &RW{ + pool: pool, + pages: make([][]byte, 0, 16), + written: make(chan struct{}, 1), } + return rw } // SetAccounting should be provided with a function which will be @@ -217,6 +222,7 @@ func (rw *RW) Write(p []byte) (n int, err error) { rw.size += nn rw.lastOffset += nn rw.mu.Unlock() + rw.signalWrite() // signal more data available } return n, nil } @@ -240,6 +246,7 @@ func (rw *RW) ReadFrom(r io.Reader) (n int64, err error) { rw.size += nn rw.lastOffset += nn rw.mu.Unlock() + rw.signalWrite() // signal more data available } if err == io.EOF { err = nil @@ -247,6 +254,29 @@ func (rw *RW) ReadFrom(r io.Reader) (n int64, err error) { return n, err } +// signal that a write has happened +func (rw *RW) signalWrite() { + select { + case rw.written <- struct{}{}: + default: + } +} + +// WaitWrite sleeps until a data is written to the RW or Close is +// called or the context is cancelled occurs or for a maximum of 1 +// Second then returns. +// +// This can be used when calling Read while the buffer is filling up. +func (rw *RW) WaitWrite(ctx context.Context) { + timer := time.NewTimer(time.Second) + select { + case <-timer.C: + case <-ctx.Done(): + case <-rw.written: + } + timer.Stop() +} + // Seek sets the offset for the next Read (not Write - this is always // appended) to offset, interpreted according to whence: SeekStart // means relative to the start of the file, SeekCurrent means relative @@ -286,6 +316,7 @@ func (rw *RW) Seek(offset int64, whence int) (int64, error) { func (rw *RW) Close() error { rw.mu.Lock() defer rw.mu.Unlock() + rw.signalWrite() // signal more data available for _, page := range rw.pages { rw.pool.Put(page) } diff --git a/lib/pool/reader_writer_test.go b/lib/pool/reader_writer_test.go index 7f5315fc6..d51c70b58 100644 --- a/lib/pool/reader_writer_test.go +++ b/lib/pool/reader_writer_test.go @@ -2,6 +2,7 @@ package pool import ( "bytes" + "context" "errors" "io" "sync" @@ -526,7 +527,7 @@ func TestRWConcurrency(t *testing.T) { } // Read the data back from inP and check it is OK - check := func(in io.Reader, size int64) { + check := func(in io.Reader, size int64, rw *RW) { ck := readers.NewPatternReader(size) ckBuf := make([]byte, bufSize) rwBuf := make([]byte, bufSize) @@ -549,6 +550,7 @@ func TestRWConcurrency(t *testing.T) { if nin >= len(rwBuf) || nn >= size || inErr != io.EOF { break } + rw.WaitWrite(context.Background()) } require.Equal(t, ckBuf[:nck], rwBuf[:nin]) if ckErr == io.EOF && inErr == io.EOF { @@ -560,7 +562,7 @@ func TestRWConcurrency(t *testing.T) { // Read the data back and check it is OK read := func(rw *RW, size int64) { - check(rw, size) + check(rw, size, rw) } // Read the data back and check it is OK in using WriteTo @@ -570,7 +572,7 @@ func TestRWConcurrency(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - check(in, size) + check(in, size, rw) }() var n int64 for n < size {