b2: Implement server side copy for files > 5GB - fixes #3991

This factors copy out of SetModTime and Copy so it can be called from
both places.

This also reworks all the multipart uploading to use sync.Errgroup and
memory pooling like the other backends. This makes it more memory
efficient and handle errors better.

See: https://forum.rclone.org/t/copying-files-within-a-b2-bucket/16680/10
This commit is contained in:
Nick Craig-Wood 2020-05-29 12:36:31 +01:00
parent 4c7f7582fd
commit fb61ed8506
3 changed files with 339 additions and 228 deletions

View File

@ -337,3 +337,11 @@ type CopyFileRequest struct {
Info map[string]string `json:"fileInfo,omitempty"` // This field stores the metadata that will be stored with the file. (REPLACE only) Info map[string]string `json:"fileInfo,omitempty"` // This field stores the metadata that will be stored with the file. (REPLACE only)
DestBucketID string `json:"destinationBucketId,omitempty"` // The destination ID of the bucket if set, if not the source bucket will be used DestBucketID string `json:"destinationBucketId,omitempty"` // The destination ID of the bucket if set, if not the source bucket will be used
} }
// CopyPartRequest is the request for b2_copy_part - the response is UploadPartResponse
type CopyPartRequest struct {
SourceID string `json:"sourceFileId"` // The ID of the source file being copied.
LargeFileID string `json:"largeFileId"` // The ID of the large file the part will belong to, as returned by b2_start_large_file.
PartNumber int64 `json:"partNumber"` // Which part this is (starting from 1)
Range string `json:"range,omitempty"` // The range of bytes to copy. If not provided, the whole source file will be copied.
}

View File

@ -33,6 +33,7 @@ import (
"github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/rest"
) )
@ -54,6 +55,9 @@ const (
minChunkSize = 5 * fs.MebiByte minChunkSize = 5 * fs.MebiByte
defaultChunkSize = 96 * fs.MebiByte defaultChunkSize = 96 * fs.MebiByte
defaultUploadCutoff = 200 * fs.MebiByte defaultUploadCutoff = 200 * fs.MebiByte
largeFileCopyCutoff = 4 * fs.GibiByte // 5E9 is the max
memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long
memoryPoolUseMmap = false
) )
// Globals // Globals
@ -113,6 +117,16 @@ Files above this size will be uploaded in chunks of "--b2-chunk-size".
This value should be set no larger than 4.657GiB (== 5GB).`, This value should be set no larger than 4.657GiB (== 5GB).`,
Default: defaultUploadCutoff, Default: defaultUploadCutoff,
Advanced: true, Advanced: true,
}, {
Name: "copy_cutoff",
Help: `Cutoff for switching to multipart copy
Any files larger than this that need to be server side copied will be
copied in chunks of this size.
The minimum is 0 and the maximum is 4.6GB.`,
Default: largeFileCopyCutoff,
Advanced: true,
}, { }, {
Name: "chunk_size", Name: "chunk_size",
Help: `Upload chunk size. Must fit in memory. Help: `Upload chunk size. Must fit in memory.
@ -150,6 +164,18 @@ The duration before the download authorization token will expire.
The minimum value is 1 second. The maximum value is one week.`, The minimum value is 1 second. The maximum value is one week.`,
Default: fs.Duration(7 * 24 * time.Hour), Default: fs.Duration(7 * 24 * time.Hour),
Advanced: true, Advanced: true,
}, {
Name: "memory_pool_flush_time",
Default: memoryPoolFlushTime,
Advanced: true,
Help: `How often internal memory buffer pools will be flushed.
Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations.
This option controls how often unused buffers will be removed from the pool.`,
}, {
Name: "memory_pool_use_mmap",
Default: memoryPoolUseMmap,
Advanced: true,
Help: `Whether to use mmap buffers in internal memory pool.`,
}, { }, {
Name: config.ConfigEncoding, Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp, Help: config.ConfigEncodingHelp,
@ -173,10 +199,13 @@ type Options struct {
Versions bool `config:"versions"` Versions bool `config:"versions"`
HardDelete bool `config:"hard_delete"` HardDelete bool `config:"hard_delete"`
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
CopyCutoff fs.SizeSuffix `config:"copy_cutoff"`
ChunkSize fs.SizeSuffix `config:"chunk_size"` ChunkSize fs.SizeSuffix `config:"chunk_size"`
DisableCheckSum bool `config:"disable_checksum"` DisableCheckSum bool `config:"disable_checksum"`
DownloadURL string `config:"download_url"` DownloadURL string `config:"download_url"`
DownloadAuthorizationDuration fs.Duration `config:"download_auth_duration"` DownloadAuthorizationDuration fs.Duration `config:"download_auth_duration"`
MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"`
MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"`
Enc encoder.MultiEncoder `config:"encoding"` Enc encoder.MultiEncoder `config:"encoding"`
} }
@ -199,7 +228,8 @@ type Fs struct {
uploads map[string][]*api.GetUploadURLResponse // Upload URLs by buckedID uploads map[string][]*api.GetUploadURLResponse // Upload URLs by buckedID
authMu sync.Mutex // lock for authorizing the account authMu sync.Mutex // lock for authorizing the account
pacer *fs.Pacer // To pace and retry the API calls pacer *fs.Pacer // To pace and retry the API calls
bufferTokens chan []byte // control concurrency of multipart uploads uploadToken *pacer.TokenDispenser // control concurrency
pool *pool.Pool // memory pool
} }
// Object describes a b2 object // Object describes a b2 object
@ -335,7 +365,6 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
err = checkUploadChunkSize(cs) err = checkUploadChunkSize(cs)
if err == nil { if err == nil {
old, f.opt.ChunkSize = f.opt.ChunkSize, cs old, f.opt.ChunkSize = f.opt.ChunkSize, cs
f.fillBufferTokens() // reset the buffer tokens
} }
return return
} }
@ -396,6 +425,13 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
_bucketType: make(map[string]string, 1), _bucketType: make(map[string]string, 1),
uploads: make(map[string][]*api.GetUploadURLResponse), uploads: make(map[string][]*api.GetUploadURLResponse),
pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
pool: pool.New(
time.Duration(opt.MemoryPoolFlushTime),
int(opt.ChunkSize),
fs.Config.Transfers,
opt.MemoryPoolUseMmap,
),
} }
f.setRoot(root) f.setRoot(root)
f.features = (&fs.Features{ f.features = (&fs.Features{
@ -410,7 +446,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
f.srv.SetHeader(testModeHeader, testMode) f.srv.SetHeader(testModeHeader, testMode)
fs.Debugf(f, "Setting test header \"%s: %s\"", testModeHeader, testMode) fs.Debugf(f, "Setting test header \"%s: %s\"", testModeHeader, testMode)
} }
f.fillBufferTokens()
err = f.authorizeAccount(ctx) err = f.authorizeAccount(ctx)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to authorize account") return nil, errors.Wrap(err, "failed to authorize account")
@ -533,32 +568,25 @@ func (f *Fs) clearUploadURL(bucketID string) {
f.uploadMu.Unlock() f.uploadMu.Unlock()
} }
// Fill up (or reset) the buffer tokens // getBuf gets a buffer of f.opt.ChunkSize and an upload token
func (f *Fs) fillBufferTokens() { //
f.bufferTokens = make(chan []byte, fs.Config.Transfers) // If noBuf is set then it just gets an upload token
for i := 0; i < fs.Config.Transfers; i++ { func (f *Fs) getBuf(noBuf bool) (buf []byte) {
f.bufferTokens <- nil f.uploadToken.Get()
if !noBuf {
buf = f.pool.Get()
} }
}
// getUploadBlock gets a block from the pool of size chunkSize
func (f *Fs) getUploadBlock() []byte {
buf := <-f.bufferTokens
if buf == nil {
buf = make([]byte, f.opt.ChunkSize)
}
// fs.Debugf(f, "Getting upload block %p", buf)
return buf return buf
} }
// putUploadBlock returns a block to the pool of size chunkSize // putBuf returns a buffer to the memory pool and an upload token
func (f *Fs) putUploadBlock(buf []byte) { //
buf = buf[:cap(buf)] // If noBuf is set then it just returns the upload token
if len(buf) != int(f.opt.ChunkSize) { func (f *Fs) putBuf(buf []byte, noBuf bool) {
panic("bad blocksize returned to pool") if !noBuf {
f.pool.Put(buf)
} }
// fs.Debugf(f, "Returning upload block %p", buf) f.uploadToken.Put()
f.bufferTokens <- buf
} }
// Return an Object from a path // Return an Object from a path
@ -1205,6 +1233,63 @@ func (f *Fs) CleanUp(ctx context.Context) error {
return f.purge(ctx, f.rootBucket, f.rootDirectory, true) return f.purge(ctx, f.rootBucket, f.rootDirectory, true)
} }
// copy does a server side copy from dstObj <- srcObj
//
// If newInfo is nil then the metadata will be copied otherwise it
// will be replaced with newInfo
func (f *Fs) copy(ctx context.Context, dstObj *Object, srcObj *Object, newInfo *api.File) (err error) {
if srcObj.size >= int64(f.opt.CopyCutoff) {
if newInfo == nil {
newInfo, err = srcObj.getMetaData(ctx)
if err != nil {
return err
}
}
up, err := f.newLargeUpload(ctx, dstObj, nil, srcObj, f.opt.CopyCutoff, true, newInfo)
if err != nil {
return err
}
return up.Upload(ctx)
}
dstBucket, dstPath := dstObj.split()
err = f.makeBucket(ctx, dstBucket)
if err != nil {
return err
}
destBucketID, err := f.getBucketID(ctx, dstBucket)
if err != nil {
return err
}
opts := rest.Opts{
Method: "POST",
Path: "/b2_copy_file",
}
var request = api.CopyFileRequest{
SourceID: srcObj.id,
Name: f.opt.Enc.FromStandardPath(dstPath),
DestBucketID: destBucketID,
}
if newInfo == nil {
request.MetadataDirective = "COPY"
} else {
request.MetadataDirective = "REPLACE"
request.ContentType = newInfo.ContentType
request.Info = newInfo.Info
}
var response api.FileInfo
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return err
}
return dstObj.decodeMetaDataFileInfo(&response)
}
// Copy src to this remote using server side copy operations. // Copy src to this remote using server side copy operations.
// //
// This is stored with the remote path given // This is stored with the remote path given
@ -1215,47 +1300,21 @@ func (f *Fs) CleanUp(ctx context.Context) error {
// //
// If it isn't possible then return fs.ErrorCantCopy // If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
dstBucket, dstPath := f.split(remote)
err := f.makeBucket(ctx, dstBucket)
if err != nil {
return nil, err
}
srcObj, ok := src.(*Object) srcObj, ok := src.(*Object)
if !ok { if !ok {
fs.Debugf(src, "Can't copy - not same remote type") fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy return nil, fs.ErrorCantCopy
} }
destBucketID, err := f.getBucketID(ctx, dstBucket) // Temporary Object under construction
if err != nil { dstObj := &Object{
return nil, err
}
opts := rest.Opts{
Method: "POST",
Path: "/b2_copy_file",
}
var request = api.CopyFileRequest{
SourceID: srcObj.id,
Name: f.opt.Enc.FromStandardPath(dstPath),
MetadataDirective: "COPY",
DestBucketID: destBucketID,
}
var response api.FileInfo
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return nil, err
}
o := &Object{
fs: f, fs: f,
remote: remote, remote: remote,
} }
err = o.decodeMetaDataFileInfo(&response) err := f.copy(ctx, dstObj, srcObj, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return o, nil return dstObj, nil
} }
// Hashes returns the supported hash sets. // Hashes returns the supported hash sets.
@ -1526,28 +1585,10 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
if err != nil { if err != nil {
return err return err
} }
_, bucketPath := o.split()
info.Info[timeKey] = timeString(modTime) info.Info[timeKey] = timeString(modTime)
opts := rest.Opts{
Method: "POST", // Copy to the same name, overwriting the metadata only
Path: "/b2_copy_file", return o.fs.copy(ctx, o, o, info)
}
var request = api.CopyFileRequest{
SourceID: o.id,
Name: o.fs.opt.Enc.FromStandardPath(bucketPath), // copy to same name
MetadataDirective: "REPLACE",
ContentType: info.ContentType,
Info: info.Info,
}
var response api.FileInfo
err = o.fs.pacer.Call(func() (bool, error) {
resp, err := o.fs.srv.CallJSON(ctx, &opts, &request, &response)
return o.fs.shouldRetry(ctx, resp, err)
})
if err != nil {
return err
}
return o.decodeMetaDataFileInfo(&response)
} }
// Storable returns if this object is storable // Storable returns if this object is storable
@ -1723,7 +1764,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
} }
if size == -1 { if size == -1 {
// Check if the file is large enough for a chunked upload (needs to be at least two chunks) // Check if the file is large enough for a chunked upload (needs to be at least two chunks)
buf := o.fs.getUploadBlock() buf := o.fs.getBuf(false)
n, err := io.ReadFull(in, buf) n, err := io.ReadFull(in, buf)
if err == nil { if err == nil {
bufReader := bufio.NewReader(in) bufReader := bufio.NewReader(in)
@ -1733,22 +1775,24 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err == nil { if err == nil {
fs.Debugf(o, "File is big enough for chunked streaming") fs.Debugf(o, "File is big enough for chunked streaming")
up, err := o.fs.newLargeUpload(ctx, o, in, src) up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil)
if err != nil { if err != nil {
o.fs.putUploadBlock(buf) o.fs.putBuf(buf, false)
return err return err
} }
// NB Stream returns the buffer and token
return up.Stream(ctx, buf) return up.Stream(ctx, buf)
} else if err == io.EOF || err == io.ErrUnexpectedEOF { } else if err == io.EOF || err == io.ErrUnexpectedEOF {
fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n) fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n)
defer o.fs.putUploadBlock(buf) defer o.fs.putBuf(buf, false)
size = int64(n) size = int64(n)
in = bytes.NewReader(buf[:n]) in = bytes.NewReader(buf[:n])
} else { } else {
o.fs.putBuf(buf, false)
return err return err
} }
} else if size > int64(o.fs.opt.UploadCutoff) { } else if size > int64(o.fs.opt.UploadCutoff) {
up, err := o.fs.newLargeUpload(ctx, o, in, src) up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/rest"
"golang.org/x/sync/errgroup"
) )
type hashAppendingReader struct { type hashAppendingReader struct {
@ -68,20 +69,26 @@ func newHashAppendingReader(in io.Reader, h gohash.Hash) *hashAppendingReader {
// largeUpload is used to control the upload of large files which need chunking // largeUpload is used to control the upload of large files which need chunking
type largeUpload struct { type largeUpload struct {
f *Fs // parent Fs f *Fs // parent Fs
o *Object // object being uploaded o *Object // object being uploaded
in io.Reader // read the data from here doCopy bool // doing copy rather than upload
wrap accounting.WrapFn // account parts being transferred what string // text name of operation for logs
id string // ID of the file being uploaded in io.Reader // read the data from here
size int64 // total size wrap accounting.WrapFn // account parts being transferred
parts int64 // calculated number of parts, if known id string // ID of the file being uploaded
sha1s []string // slice of SHA1s for each part size int64 // total size
uploadMu sync.Mutex // lock for upload variable parts int64 // calculated number of parts, if known
uploads []*api.GetUploadPartURLResponse // result of get upload URL calls sha1s []string // slice of SHA1s for each part
uploadMu sync.Mutex // lock for upload variable
uploads []*api.GetUploadPartURLResponse // result of get upload URL calls
chunkSize int64 // chunk size to use
src *Object // if copying, object we are reading from
} }
// newLargeUpload starts an upload of object o from in with metadata in src // newLargeUpload starts an upload of object o from in with metadata in src
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) { //
// If newInfo is set then metadata from that will be used instead of reading it from src
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, chunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File) (up *largeUpload, err error) {
remote := o.remote remote := o.remote
size := src.Size() size := src.Size()
parts := int64(0) parts := int64(0)
@ -89,8 +96,8 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
if size == -1 { if size == -1 {
fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", f.opt.ChunkSize, maxParts*f.opt.ChunkSize) fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", f.opt.ChunkSize, maxParts*f.opt.ChunkSize)
} else { } else {
parts = size / int64(o.fs.opt.ChunkSize) parts = size / int64(chunkSize)
if size%int64(o.fs.opt.ChunkSize) != 0 { if size%int64(chunkSize) != 0 {
parts++ parts++
} }
if parts > maxParts { if parts > maxParts {
@ -99,7 +106,6 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
sha1SliceSize = parts sha1SliceSize = parts
} }
modTime := src.ModTime(ctx)
opts := rest.Opts{ opts := rest.Opts{
Method: "POST", Method: "POST",
Path: "/b2_start_large_file", Path: "/b2_start_large_file",
@ -110,18 +116,24 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
return nil, err return nil, err
} }
var request = api.StartLargeFileRequest{ var request = api.StartLargeFileRequest{
BucketID: bucketID, BucketID: bucketID,
Name: f.opt.Enc.FromStandardPath(bucketPath), Name: f.opt.Enc.FromStandardPath(bucketPath),
ContentType: fs.MimeType(ctx, src),
Info: map[string]string{
timeKey: timeString(modTime),
},
} }
// Set the SHA1 if known if newInfo == nil {
if !o.fs.opt.DisableCheckSum { modTime := src.ModTime(ctx)
if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" { request.ContentType = fs.MimeType(ctx, src)
request.Info[sha1Key] = calculatedSha1 request.Info = map[string]string{
timeKey: timeString(modTime),
} }
// Set the SHA1 if known
if !o.fs.opt.DisableCheckSum || doCopy {
if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" {
request.Info[sha1Key] = calculatedSha1
}
}
} else {
request.ContentType = newInfo.ContentType
request.Info = newInfo.Info
} }
var response api.StartLargeFileResponse var response api.StartLargeFileResponse
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
@ -131,18 +143,24 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
if err != nil { if err != nil {
return nil, err return nil, err
} }
up = &largeUpload{
f: f,
o: o,
doCopy: doCopy,
what: "upload",
id: response.ID,
size: size,
parts: parts,
sha1s: make([]string, sha1SliceSize),
chunkSize: int64(chunkSize),
}
// unwrap the accounting from the input, we use wrap to put it // unwrap the accounting from the input, we use wrap to put it
// back on after the buffering // back on after the buffering
in, wrap := accounting.UnWrap(in) if doCopy {
up = &largeUpload{ up.what = "copy"
f: f, up.src = src.(*Object)
o: o, } else {
in: in, up.in, up.wrap = accounting.UnWrap(in)
wrap: wrap,
id: response.ID,
size: size,
parts: parts,
sha1s: make([]string, sha1SliceSize),
} }
return up, nil return up, nil
} }
@ -256,9 +274,41 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt
return err return err
} }
// Copy a chunk
func (up *largeUpload) copyChunk(ctx context.Context, part int64, partSize int64) error {
err := up.f.pacer.Call(func() (bool, error) {
fs.Debugf(up.o, "Copying chunk %d length %d", part, partSize)
opts := rest.Opts{
Method: "POST",
Path: "/b2_copy_part",
}
offset := (part - 1) * up.chunkSize // where we are in the source file
var request = api.CopyPartRequest{
SourceID: up.src.id,
LargeFileID: up.id,
PartNumber: part,
Range: fmt.Sprintf("bytes=%d-%d", offset, offset+partSize-1),
}
var response api.UploadPartResponse
resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response)
retry, err := up.f.shouldRetry(ctx, resp, err)
if err != nil {
fs.Debugf(up.o, "Error copying chunk %d (retry=%v): %v: %#v", part, retry, err, err)
}
up.sha1s[part-1] = response.SHA1
return retry, err
})
if err != nil {
fs.Debugf(up.o, "Error copying chunk %d: %v", part, err)
} else {
fs.Debugf(up.o, "Done copying chunk %d", part)
}
return err
}
// finish closes off the large upload // finish closes off the large upload
func (up *largeUpload) finish(ctx context.Context) error { func (up *largeUpload) finish(ctx context.Context) error {
fs.Debugf(up.o, "Finishing large file upload with %d parts", up.parts) fs.Debugf(up.o, "Finishing large file %s with %d parts", up.what, up.parts)
opts := rest.Opts{ opts := rest.Opts{
Method: "POST", Method: "POST",
Path: "/b2_finish_large_file", Path: "/b2_finish_large_file",
@ -295,136 +345,145 @@ func (up *largeUpload) cancel(ctx context.Context) error {
return err return err
} }
func (up *largeUpload) managedTransferChunk(ctx context.Context, wg *sync.WaitGroup, errs chan error, part int64, buf []byte) { // If the error pointer is not nil then cancel the transfer
wg.Add(1) func (up *largeUpload) cancelOnError(ctx context.Context, err *error) {
go func(part int64, buf []byte) { if *err == nil {
defer wg.Done() return
defer up.f.putUploadBlock(buf)
err := up.transferChunk(ctx, part, buf)
if err != nil {
select {
case errs <- err:
default:
}
}
}(part, buf)
}
func (up *largeUpload) finishOrCancelOnError(ctx context.Context, err error, errs chan error) error {
if err == nil {
select {
case err = <-errs:
default:
}
} }
if err != nil { fs.Debugf(up.o, "Cancelling large file %s due to error: %v", up.what, *err)
fs.Debugf(up.o, "Cancelling large file upload due to error: %v", err) cancelErr := up.cancel(ctx)
cancelErr := up.cancel(ctx) if cancelErr != nil {
if cancelErr != nil { fs.Errorf(up.o, "Failed to cancel large file %s: %v", up.what, cancelErr)
fs.Errorf(up.o, "Failed to cancel large file upload: %v", cancelErr)
}
return err
} }
return up.finish(ctx)
} }
// Stream uploads the chunks from the input, starting with a required initial // Stream uploads the chunks from the input, starting with a required initial
// chunk. Assumes the file size is unknown and will upload until the input // chunk. Assumes the file size is unknown and will upload until the input
// reaches EOF. // reaches EOF.
//
// Note that initialUploadBlock must be returned to f.putBuf()
func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock []byte) (err error) { func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock []byte) (err error) {
defer up.cancelOnError(ctx, &err)
fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id) fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id)
errs := make(chan error, 1) var (
hasMoreParts := true g, gCtx = errgroup.WithContext(ctx)
var wg sync.WaitGroup hasMoreParts = true
)
// Transfer initial chunk
up.size = int64(len(initialUploadBlock)) up.size = int64(len(initialUploadBlock))
up.managedTransferChunk(ctx, &wg, errs, 1, initialUploadBlock) g.Go(func() error {
for part := int64(1); hasMoreParts; part++ {
// Get a block of memory from the pool and token which limits concurrency.
var buf []byte
if part == 1 {
buf = initialUploadBlock
} else {
buf = up.f.getBuf(false)
}
outer: // Fail fast, in case an errgroup managed function returns an error
for part := int64(2); hasMoreParts; part++ { // gCtx is cancelled. There is no point in uploading all the other parts.
// Check any errors if gCtx.Err() != nil {
select { up.f.putBuf(buf, false)
case err = <-errs: return nil
break outer }
default:
// Read the chunk
var n int
if part == 1 {
n = len(buf)
} else {
n, err = io.ReadFull(up.in, buf)
if err == io.ErrUnexpectedEOF {
fs.Debugf(up.o, "Read less than a full chunk, making this the last one.")
buf = buf[:n]
hasMoreParts = false
} else if err == io.EOF {
fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.")
up.f.putBuf(buf, false)
return nil
} else if err != nil {
// other kinds of errors indicate failure
up.f.putBuf(buf, false)
return err
}
}
// Keep stats up to date
up.parts = part
up.size += int64(n)
if part > maxParts {
up.f.putBuf(buf, false)
return errors.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts)
}
part := part // for the closure
g.Go(func() (err error) {
defer up.f.putBuf(buf, false)
return up.transferChunk(gCtx, part, buf)
})
} }
return nil
// Get a block of memory })
buf := up.f.getUploadBlock() err = g.Wait()
if err != nil {
// Read the chunk return err
var n int
n, err = io.ReadFull(up.in, buf)
if err == io.ErrUnexpectedEOF {
fs.Debugf(up.o, "Read less than a full chunk, making this the last one.")
buf = buf[:n]
hasMoreParts = false
err = nil
} else if err == io.EOF {
fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.")
up.f.putUploadBlock(buf)
err = nil
break outer
} else if err != nil {
// other kinds of errors indicate failure
up.f.putUploadBlock(buf)
break outer
}
// Keep stats up to date
up.parts = part
up.size += int64(n)
if part > maxParts {
err = errors.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts)
break outer
}
// Transfer the chunk
up.managedTransferChunk(ctx, &wg, errs, part, buf)
} }
wg.Wait()
up.sha1s = up.sha1s[:up.parts] up.sha1s = up.sha1s[:up.parts]
return up.finish(ctx)
return up.finishOrCancelOnError(ctx, err, errs)
} }
// Upload uploads the chunks from the input // Upload uploads the chunks from the input
func (up *largeUpload) Upload(ctx context.Context) error { func (up *largeUpload) Upload(ctx context.Context) (err error) {
fs.Debugf(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) defer up.cancelOnError(ctx, &err)
remaining := up.size fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id)
errs := make(chan error, 1) var (
var wg sync.WaitGroup g, gCtx = errgroup.WithContext(ctx)
var err error remaining = up.size
outer: )
for part := int64(1); part <= up.parts; part++ { g.Go(func() error {
// Check any errors for part := int64(1); part <= up.parts; part++ {
select { // Get a block of memory from the pool and token which limits concurrency.
case err = <-errs: buf := up.f.getBuf(up.doCopy)
break outer
default: // Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
up.f.putBuf(buf, up.doCopy)
return nil
}
reqSize := remaining
if reqSize >= up.chunkSize {
reqSize = up.chunkSize
}
if !up.doCopy {
// Read the chunk
buf = buf[:reqSize]
_, err = io.ReadFull(up.in, buf)
if err != nil {
up.f.putBuf(buf, up.doCopy)
return err
}
}
part := part // for the closure
g.Go(func() (err error) {
defer up.f.putBuf(buf, up.doCopy)
if !up.doCopy {
err = up.transferChunk(gCtx, part, buf)
} else {
err = up.copyChunk(gCtx, part, reqSize)
}
return err
})
remaining -= reqSize
} }
return nil
reqSize := remaining })
if reqSize >= int64(up.f.opt.ChunkSize) { err = g.Wait()
reqSize = int64(up.f.opt.ChunkSize) if err != nil {
} return err
// Get a block of memory
buf := up.f.getUploadBlock()[:reqSize]
// Read the chunk
_, err = io.ReadFull(up.in, buf)
if err != nil {
up.f.putUploadBlock(buf)
break outer
}
// Transfer the chunk
up.managedTransferChunk(ctx, &wg, errs, part, buf)
remaining -= reqSize
} }
wg.Wait() return up.finish(ctx)
return up.finishOrCancelOnError(ctx, err, errs)
} }