operations: fix accounting for multi-thread transfers

This stops the accounting moving in large chunks and gets it to move
as the data is read out of the buffer.
This commit is contained in:
Nick Craig-Wood 2023-08-24 17:16:01 +01:00
parent 2f424ceecf
commit 1b5b36523b

View File

@ -91,16 +91,19 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ
var rs io.ReadSeeker
if mc.noSeek {
// Read directly if we are sure we aren't going to seek
rs = readers.NoSeeker{Reader: rc}
// and account with accounting
rs = readers.NoSeeker{Reader: mc.acc.WrapStream(rc)}
} else {
// Read the chunk into buffered reader
wr := multipart.NewRW()
defer fs.CheckClose(wr, &err)
_, err = io.CopyN(wr, rc, size)
rw := multipart.NewRW()
defer fs.CheckClose(rw, &err)
_, err = io.CopyN(rw, rc, size)
if err != nil {
return fmt.Errorf("multi-thread copy: failed to read chunk: %w", err)
}
rs = wr
// Account as we go
rw.SetAccounting(mc.acc.AccountRead)
rs = rw
}
// Write the chunk
@ -109,13 +112,6 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ
return fmt.Errorf("multi-thread copy: failed to write chunk: %w", err)
}
// FIXME: Wrap ReadSeeker for Accounting
// However, to ensure reporting is correctly seeks have to be handled properly
errAccRead := mc.acc.AccountRead(int(bytesWritten))
if errAccRead != nil {
return errAccRead
}
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v finished", stream+1, mc.numChunks, start, end, fs.SizeSuffix(bytesWritten))
return nil
}