fs/accounting: rework to enable accounting to work with crypt and b2

This removes the old system of part accounting and replaces it with a
system of popping off the accounting reader and wrapping up new ones
as necessary.

This makes it much easier to carry the context down the chain of
wrapped readers and get the limiting as near as possible to the
output.  This makes the accounting more accurate and the bandwidth
limiting smoother.

Fixes #2029 and Fixes #1443
This commit is contained in:
Nick Craig-Wood 2018-02-01 15:41:58 +00:00
parent bea02fcf52
commit d0d6b83a7a
4 changed files with 280 additions and 78 deletions

View File

@ -70,6 +70,7 @@ type largeUpload struct {
f *Fs // parent Fs f *Fs // parent Fs
o *Object // object being uploaded o *Object // object being uploaded
in io.Reader // read the data from here in io.Reader // read the data from here
wrap accounting.WrapFn // account parts being transferred
id string // ID of the file being uploaded id string // ID of the file being uploaded
size int64 // total size size int64 // total size
parts int64 // calculated number of parts, if known parts int64 // calculated number of parts, if known
@ -126,10 +127,14 @@ func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *lar
if err != nil { if err != nil {
return nil, err return nil, err
} }
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in, wrap := accounting.UnWrap(in)
up = &largeUpload{ up = &largeUpload{
f: f, f: f,
o: o, o: o,
in: in, in: in,
wrap: wrap,
id: response.ID, id: response.ID,
size: size, size: size,
parts: parts, parts: parts,
@ -221,7 +226,7 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error {
opts := rest.Opts{ opts := rest.Opts{
Method: "POST", Method: "POST",
RootURL: upload.UploadURL, RootURL: upload.UploadURL,
Body: accounting.AccountPart(up.o, in), Body: up.wrap(in),
ExtraHeaders: map[string]string{ ExtraHeaders: map[string]string{
"Authorization": upload.AuthorizationToken, "Authorization": upload.AuthorizationToken,
"X-Bz-Part-Number": fmt.Sprintf("%d", part), "X-Bz-Part-Number": fmt.Sprintf("%d", part),
@ -331,7 +336,6 @@ func (up *largeUpload) Stream(initialUploadBlock []byte) (err error) {
errs := make(chan error, 1) errs := make(chan error, 1)
hasMoreParts := true hasMoreParts := true
var wg sync.WaitGroup var wg sync.WaitGroup
accounting.AccountByPart(up.o) // Cancel whole file accounting before reading
// Transfer initial chunk // Transfer initial chunk
up.size = int64(len(initialUploadBlock)) up.size = int64(len(initialUploadBlock))
@ -392,7 +396,6 @@ func (up *largeUpload) Upload() error {
errs := make(chan error, 1) errs := make(chan error, 1)
var wg sync.WaitGroup var wg sync.WaitGroup
var err error var err error
accounting.AccountByPart(up.o) // Cancel whole file accounting before reading
outer: outer:
for part := int64(1); part <= up.parts; part++ { for part := int64(1); part <= up.parts; part++ {
// Check any errors // Check any errors

View File

@ -14,6 +14,7 @@ import (
"unicode/utf8" "unicode/utf8"
"github.com/ncw/rclone/backend/crypt/pkcs7" "github.com/ncw/rclone/backend/crypt/pkcs7"
"github.com/ncw/rclone/fs/accounting"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/crypto/nacl/secretbox" "golang.org/x/crypto/nacl/secretbox"
@ -691,11 +692,12 @@ func (fh *encrypter) finish(err error) (int, error) {
// Encrypt data encrypts the data stream // Encrypt data encrypts the data stream
func (c *cipher) EncryptData(in io.Reader) (io.Reader, error) { func (c *cipher) EncryptData(in io.Reader) (io.Reader, error) {
in, wrap := accounting.UnWrap(in) // unwrap the accounting off the Reader
out, err := c.newEncrypter(in, nil) out, err := c.newEncrypter(in, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return out, nil return wrap(out), nil // and wrap the accounting back on
} }
// decrypter decrypts an io.ReaderCloser on the fly // decrypter decrypts an io.ReaderCloser on the fly

View File

@ -20,8 +20,9 @@ type Account struct {
// CancelRequest so this race can happen when it apparently // CancelRequest so this race can happen when it apparently
// shouldn't. // shouldn't.
mu sync.Mutex mu sync.Mutex
in io.ReadCloser in io.Reader
origIn io.ReadCloser origIn io.ReadCloser
close io.Closer
size int64 size int64
name string name string
statmu sync.Mutex // Separate mutex for stat values. statmu sync.Mutex // Separate mutex for stat values.
@ -33,8 +34,6 @@ type Account struct {
closed bool // set if the file is closed closed bool // set if the file is closed
exit chan struct{} // channel that will be closed when transfer is finished exit chan struct{} // channel that will be closed when transfer is finished
withBuf bool // is using a buffered in withBuf bool // is using a buffered in
wholeFileDisabled bool // disables the whole file when doing parts
} }
// NewAccountSizeName makes a Account reader for an io.ReadCloser of // NewAccountSizeName makes a Account reader for an io.ReadCloser of
@ -42,6 +41,7 @@ type Account struct {
func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account {
acc := &Account{ acc := &Account{
in: in, in: in,
close: in,
origIn: in, origIn: in,
size: size, size: size,
name: name, name: name,
@ -70,17 +70,18 @@ func (acc *Account) WithBuffer() *Account {
} }
// On big files add a buffer // On big files add a buffer
if buffers > 0 { if buffers > 0 {
in, err := asyncreader.New(acc.in, buffers) rc, err := asyncreader.New(acc.origIn, buffers)
if err != nil { if err != nil {
fs.Errorf(acc.name, "Failed to make buffer: %v", err) fs.Errorf(acc.name, "Failed to make buffer: %v", err)
} else { } else {
acc.in = in acc.in = rc
acc.close = rc
} }
} }
return acc return acc
} }
// GetReader returns the underlying io.ReadCloser // GetReader returns the underlying io.ReadCloser under any Buffer
func (acc *Account) GetReader() io.ReadCloser { func (acc *Account) GetReader() io.ReadCloser {
acc.mu.Lock() acc.mu.Lock()
defer acc.mu.Unlock() defer acc.mu.Unlock()
@ -94,29 +95,19 @@ func (acc *Account) StopBuffering() {
} }
} }
// UpdateReader updates the underlying io.ReadCloser // UpdateReader updates the underlying io.ReadCloser stopping the
// asynb buffer (if any) and re-adding it
func (acc *Account) UpdateReader(in io.ReadCloser) { func (acc *Account) UpdateReader(in io.ReadCloser) {
acc.mu.Lock() acc.mu.Lock()
acc.StopBuffering() acc.StopBuffering()
acc.in = in acc.in = in
acc.close = in
acc.origIn = in acc.origIn = in
acc.WithBuffer() acc.WithBuffer()
acc.mu.Unlock() acc.mu.Unlock()
} }
// disableWholeFileAccounting turns off the whole file accounting // averageLoop calculates averages for the stats in the background
func (acc *Account) disableWholeFileAccounting() {
acc.mu.Lock()
acc.wholeFileDisabled = true
acc.mu.Unlock()
}
// accountPart disables the whole file counter and returns an
// io.Reader to wrap a segment of the transfer.
func (acc *Account) accountPart(in io.Reader) io.Reader {
return newAccountStream(acc, in)
}
func (acc *Account) averageLoop() { func (acc *Account) averageLoop() {
tick := time.NewTicker(time.Second) tick := time.NewTicker(time.Second)
defer tick.Stop() defer tick.Stop()
@ -165,16 +156,25 @@ func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
func (acc *Account) Read(p []byte) (n int, err error) { func (acc *Account) Read(p []byte) (n int, err error) {
acc.mu.Lock() acc.mu.Lock()
defer acc.mu.Unlock() defer acc.mu.Unlock()
if acc.wholeFileDisabled {
// Don't account
return acc.in.Read(p)
}
return acc.read(acc.in, p) return acc.read(acc.in, p)
} }
// Progress returns bytes read as well as the size. // Close the object
func (acc *Account) Close() error {
acc.mu.Lock()
defer acc.mu.Unlock()
if acc.closed {
return nil
}
acc.closed = true
close(acc.exit)
Stats.inProgress.clear(acc.name)
return acc.close.Close()
}
// progress returns bytes read as well as the size.
// Size can be <= 0 if the size is unknown. // Size can be <= 0 if the size is unknown.
func (acc *Account) Progress() (bytes, size int64) { func (acc *Account) progress() (bytes, size int64) {
if acc == nil { if acc == nil {
return 0, 0 return 0, 0
} }
@ -184,10 +184,10 @@ func (acc *Account) Progress() (bytes, size int64) {
return bytes, size return bytes, size
} }
// Speed returns the speed of the current file transfer // speed returns the speed of the current file transfer
// in bytes per second, as well a an exponentially weighted moving average // in bytes per second, as well a an exponentially weighted moving average
// If no read has completed yet, 0 is returned for both values. // If no read has completed yet, 0 is returned for both values.
func (acc *Account) Speed() (bps, current float64) { func (acc *Account) speed() (bps, current float64) {
if acc == nil { if acc == nil {
return 0, 0 return 0, 0
} }
@ -203,10 +203,10 @@ func (acc *Account) Speed() (bps, current float64) {
return return
} }
// ETA returns the ETA of the current operation, // eta returns the ETA of the current operation,
// rounded to full seconds. // rounded to full seconds.
// If the ETA cannot be determined 'ok' returns false. // If the ETA cannot be determined 'ok' returns false.
func (acc *Account) ETA() (eta time.Duration, ok bool) { func (acc *Account) eta() (eta time.Duration, ok bool) {
if acc == nil || acc.size <= 0 { if acc == nil || acc.size <= 0 {
return 0, false return 0, false
} }
@ -230,9 +230,9 @@ func (acc *Account) ETA() (eta time.Duration, ok bool) {
// String produces stats for this file // String produces stats for this file
func (acc *Account) String() string { func (acc *Account) String() string {
a, b := acc.Progress() a, b := acc.progress()
_, cur := acc.Speed() _, cur := acc.speed()
eta, etaok := acc.ETA() eta, etaok := acc.eta()
etas := "-" etas := "-"
if etaok { if etaok {
if eta > 0 { if eta > 0 {
@ -268,17 +268,27 @@ func (acc *Account) String() string {
) )
} }
// Close the object // OldStream returns the top io.Reader
func (acc *Account) Close() error { func (acc *Account) OldStream() io.Reader {
acc.mu.Lock() acc.mu.Lock()
defer acc.mu.Unlock() defer acc.mu.Unlock()
if acc.closed { return acc.in
return nil }
// SetStream updates the top io.Reader
func (acc *Account) SetStream(in io.Reader) {
acc.mu.Lock()
acc.in = in
acc.mu.Unlock()
}
// WrapStream wraps an io Reader so it will be accounted in the same
// way as account
func (acc *Account) WrapStream(in io.Reader) io.Reader {
return &accountStream{
acc: acc,
in: in,
} }
acc.closed = true
close(acc.exit)
Stats.inProgress.clear(acc.name)
return acc.in.Close()
} }
// accountStream accounts a single io.Reader into a parent *Account // accountStream accounts a single io.Reader into a parent *Account
@ -287,12 +297,19 @@ type accountStream struct {
in io.Reader in io.Reader
} }
// newAccountStream makes a new accountStream for an in // OldStream return the underlying stream
func newAccountStream(acc *Account, in io.Reader) *accountStream { func (a *accountStream) OldStream() io.Reader {
return &accountStream{ return a.in
acc: acc, }
in: in,
} // SetStream set the underlying stream
func (a *accountStream) SetStream(in io.Reader) {
a.in = in
}
// WrapStream wrap in in an accounter
func (a *accountStream) WrapStream(in io.Reader) io.Reader {
return a.acc.WrapStream(in)
} }
// Read bytes from the object - see io.Reader // Read bytes from the object - see io.Reader
@ -300,33 +317,30 @@ func (a *accountStream) Read(p []byte) (n int, err error) {
return a.acc.read(a.in, p) return a.acc.read(a.in, p)
} }
// AccountByPart turns off whole file accounting // Accounter accounts a stream allowing the accounting to be removed and re-added
// type Accounter interface {
// Returns the current account or nil if not found io.Reader
func AccountByPart(obj fs.Object) *Account { OldStream() io.Reader
acc := Stats.inProgress.get(obj.Remote()) SetStream(io.Reader)
if acc == nil { WrapStream(io.Reader) io.Reader
fs.Debugf(obj, "Didn't find object to account part transfer")
return nil
}
acc.disableWholeFileAccounting()
return acc
} }
// AccountPart accounts for part of a transfer // WrapFn wraps an io.Reader (for accounting purposes usually)
// type WrapFn func(io.Reader) io.Reader
// It disables the whole file counter and returns an io.Reader to wrap
// a segment of the transfer.
func AccountPart(obj fs.Object, in io.Reader) io.Reader {
acc := AccountByPart(obj)
if acc == nil {
return in
}
return acc.accountPart(in)
}
// Check it satisfies the interface // UnWrap unwraps a reader returning unwrapped and wrap, a function to
var ( // wrap it back up again. If `in` is an Accounter then this function
_ io.ReadCloser = &Account{} // will take the accounting unwrapped and wrap will put it back on
_ io.Reader = &accountStream{} // again the new Reader passed in.
) //
// This allows functions which wrap io.Readers to move the accounting
// to the end of the wrapped chain of readers. This is very important
// if buffering is being introduced and if the Reader might be wrapped
// again.
func UnWrap(in io.Reader) (unwrapped io.Reader, wrap WrapFn) {
acc, ok := in.(Accounter)
if !ok {
return in, func(r io.Reader) io.Reader { return r }
}
return acc.OldStream(), acc.WrapStream
}

View File

@ -0,0 +1,183 @@
package accounting
import (
"bytes"
"io"
"io/ioutil"
"strings"
"testing"
"github.com/ncw/rclone/fs/asyncreader"
"github.com/ncw/rclone/fstest/mockobject"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Check it satisfies the interfaces
var (
_ io.ReadCloser = &Account{}
_ io.Reader = &accountStream{}
_ Accounter = &Account{}
_ Accounter = &accountStream{}
)
func TestNewAccountSizeName(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc := NewAccountSizeName(in, 1, "test")
assert.Equal(t, in, acc.in)
assert.Equal(t, acc, Stats.inProgress.get("test"))
err := acc.Close()
assert.NoError(t, err)
assert.Nil(t, Stats.inProgress.get("test"))
}
func TestNewAccount(t *testing.T) {
obj := mockobject.Object("test")
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc := NewAccount(in, obj)
assert.Equal(t, in, acc.in)
assert.Equal(t, acc, Stats.inProgress.get("test"))
err := acc.Close()
assert.NoError(t, err)
assert.Nil(t, Stats.inProgress.get("test"))
}
func TestAccountWithBuffer(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc := NewAccountSizeName(in, -1, "test")
acc.WithBuffer()
// should have a buffer for an unknown size
_, ok := acc.in.(*asyncreader.AsyncReader)
require.True(t, ok)
assert.NoError(t, acc.Close())
acc = NewAccountSizeName(in, 1, "test")
acc.WithBuffer()
// should not have a buffer for a small size
_, ok = acc.in.(*asyncreader.AsyncReader)
require.False(t, ok)
assert.NoError(t, acc.Close())
}
func TestAccountGetUpdateReader(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc := NewAccountSizeName(in, 1, "test")
assert.Equal(t, in, acc.GetReader())
in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1}))
acc.UpdateReader(in2)
assert.Equal(t, in2, acc.GetReader())
assert.NoError(t, acc.Close())
}
func TestAccountRead(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
acc := NewAccountSizeName(in, 1, "test")
assert.True(t, acc.start.IsZero())
assert.Equal(t, 0, acc.lpBytes)
assert.Equal(t, int64(0), acc.bytes)
assert.Equal(t, int64(0), Stats.bytes)
var buf = make([]byte, 2)
n, err := acc.Read(buf)
assert.NoError(t, err)
assert.Equal(t, 2, n)
assert.Equal(t, []byte{1, 2}, buf[:n])
assert.False(t, acc.start.IsZero())
assert.Equal(t, 2, acc.lpBytes)
assert.Equal(t, int64(2), acc.bytes)
assert.Equal(t, int64(2), Stats.bytes)
n, err = acc.Read(buf)
assert.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, []byte{3}, buf[:n])
n, err = acc.Read(buf)
assert.Equal(t, io.EOF, err)
assert.Equal(t, 0, n)
assert.NoError(t, acc.Close())
}
func TestAccountString(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
acc := NewAccountSizeName(in, 3, "test")
// FIXME not an exhaustive test!
assert.Equal(t, "test: 0% /3, 0/s, -", strings.TrimSpace(acc.String()))
var buf = make([]byte, 2)
n, err := acc.Read(buf)
assert.NoError(t, err)
assert.Equal(t, 2, n)
assert.Equal(t, "test: 66% /3, 0/s, -", strings.TrimSpace(acc.String()))
assert.NoError(t, acc.Close())
}
// Test the Accounter interface methods on Account and accountStream
func TestAccountAccounter(t *testing.T) {
in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3}))
acc := NewAccountSizeName(in, 3, "test")
assert.True(t, in == acc.OldStream())
in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{2, 3, 4}))
acc.SetStream(in2)
assert.True(t, in2 == acc.OldStream())
r := acc.WrapStream(in)
as, ok := r.(Accounter)
require.True(t, ok)
assert.True(t, in == as.OldStream())
assert.True(t, in2 == acc.OldStream())
accs, ok := r.(*accountStream)
require.True(t, ok)
assert.Equal(t, acc, accs.acc)
assert.True(t, in == accs.in)
// Check Read on the accountStream
var buf = make([]byte, 2)
n, err := r.Read(buf)
assert.NoError(t, err)
assert.Equal(t, 2, n)
assert.Equal(t, []byte{1, 2}, buf[:n])
// Test that we can get another accountstream out
in3 := ioutil.NopCloser(bytes.NewBuffer([]byte{3, 1, 2}))
r2 := as.WrapStream(in3)
as2, ok := r2.(Accounter)
require.True(t, ok)
assert.True(t, in3 == as2.OldStream())
assert.True(t, in2 == acc.OldStream())
accs2, ok := r2.(*accountStream)
require.True(t, ok)
assert.Equal(t, acc, accs2.acc)
assert.True(t, in3 == accs2.in)
// Test we can set this new accountStream
as2.SetStream(in)
assert.True(t, in == as2.OldStream())
// Test UnWrap on accountStream
unwrapped, wrap := UnWrap(r2)
assert.True(t, unwrapped == in)
r3 := wrap(in2)
assert.True(t, in2 == r3.(Accounter).OldStream())
// TestUnWrap on a normal io.Reader
unwrapped, wrap = UnWrap(in2)
assert.True(t, unwrapped == in2)
assert.True(t, wrap(in3) == in3)
}