mirror of
https://github.com/rclone/rclone.git
synced 2024-11-24 09:24:05 +01:00
505 lines
15 KiB
Go
505 lines
15 KiB
Go
// Upload large files for b2
|
|
//
|
|
// Docs - https://www.backblaze.com/docs/cloud-storage-large-files
|
|
|
|
package b2
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha1"
|
|
"encoding/hex"
|
|
"fmt"
|
|
gohash "hash"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/rclone/rclone/backend/b2/api"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/accounting"
|
|
"github.com/rclone/rclone/fs/chunksize"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/lib/atexit"
|
|
"github.com/rclone/rclone/lib/pool"
|
|
"github.com/rclone/rclone/lib/rest"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type hashAppendingReader struct {
|
|
h gohash.Hash
|
|
in io.Reader
|
|
hexSum string
|
|
hexReader io.Reader
|
|
}
|
|
|
|
// Read returns bytes all bytes from the original reader, then the hex sum
|
|
// of what was read so far, then EOF.
|
|
func (har *hashAppendingReader) Read(b []byte) (int, error) {
|
|
if har.hexReader == nil {
|
|
n, err := har.in.Read(b)
|
|
if err == io.EOF {
|
|
har.in = nil // allow GC
|
|
err = nil // allow reading hexSum before EOF
|
|
|
|
har.hexSum = hex.EncodeToString(har.h.Sum(nil))
|
|
har.hexReader = strings.NewReader(har.hexSum)
|
|
}
|
|
return n, err
|
|
}
|
|
return har.hexReader.Read(b)
|
|
}
|
|
|
|
// AdditionalLength returns how many bytes the appended hex sum will take up.
|
|
func (har *hashAppendingReader) AdditionalLength() int {
|
|
return hex.EncodedLen(har.h.Size())
|
|
}
|
|
|
|
// HexSum returns the hash sum as hex. It's only available after the original
|
|
// reader has EOF'd. It's an empty string before that.
|
|
func (har *hashAppendingReader) HexSum() string {
|
|
return har.hexSum
|
|
}
|
|
|
|
// newHashAppendingReader takes a Reader and a Hash and will append the hex sum
|
|
// after the original reader reaches EOF. The increased size depends on the
|
|
// given hash, which may be queried through AdditionalLength()
|
|
func newHashAppendingReader(in io.Reader, h gohash.Hash) *hashAppendingReader {
|
|
withHash := io.TeeReader(in, h)
|
|
return &hashAppendingReader{h: h, in: withHash}
|
|
}
|
|
|
|
// largeUpload is used to control the upload of large files which need chunking
|
|
type largeUpload struct {
|
|
f *Fs // parent Fs
|
|
o *Object // object being uploaded
|
|
doCopy bool // doing copy rather than upload
|
|
what string // text name of operation for logs
|
|
in io.Reader // read the data from here
|
|
wrap accounting.WrapFn // account parts being transferred
|
|
id string // ID of the file being uploaded
|
|
size int64 // total size
|
|
parts int // calculated number of parts, if known
|
|
sha1smu sync.Mutex // mutex to protect sha1s
|
|
sha1s []string // slice of SHA1s for each part
|
|
uploadMu sync.Mutex // lock for upload variable
|
|
uploads []*api.GetUploadPartURLResponse // result of get upload URL calls
|
|
chunkSize int64 // chunk size to use
|
|
src *Object // if copying, object we are reading from
|
|
info *api.FileInfo // final response with info about the object
|
|
}
|
|
|
|
// newLargeUpload starts an upload of object o from in with metadata in src
|
|
//
|
|
// If newInfo is set then metadata from that will be used instead of reading it from src
|
|
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, defaultChunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File, options ...fs.OpenOption) (up *largeUpload, err error) {
|
|
size := src.Size()
|
|
parts := 0
|
|
chunkSize := defaultChunkSize
|
|
if size == -1 {
|
|
fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", f.opt.ChunkSize, maxParts*f.opt.ChunkSize)
|
|
} else {
|
|
chunkSize = chunksize.Calculator(o, size, maxParts, defaultChunkSize)
|
|
parts = int(size / int64(chunkSize))
|
|
if size%int64(chunkSize) != 0 {
|
|
parts++
|
|
}
|
|
}
|
|
bucket, bucketPath := o.split()
|
|
bucketID, err := f.getBucketID(ctx, bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var request = api.StartLargeFileRequest{
|
|
BucketID: bucketID,
|
|
Name: f.opt.Enc.FromStandardPath(bucketPath),
|
|
}
|
|
optionsToSend := make([]fs.OpenOption, 0, len(options))
|
|
if newInfo == nil {
|
|
modTime, err := o.getModTime(ctx, src, options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
request.ContentType = fs.MimeType(ctx, src)
|
|
request.Info = map[string]string{
|
|
timeKey: timeString(modTime),
|
|
}
|
|
// Custom upload headers - remove header prefix since they are sent in the body
|
|
for _, option := range options {
|
|
k, v := option.Header()
|
|
k = strings.ToLower(k)
|
|
if strings.HasPrefix(k, headerPrefix) {
|
|
request.Info[k[len(headerPrefix):]] = v
|
|
} else {
|
|
optionsToSend = append(optionsToSend, option)
|
|
}
|
|
}
|
|
// Set the SHA1 if known
|
|
if !o.fs.opt.DisableCheckSum || doCopy {
|
|
if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" {
|
|
request.Info[sha1Key] = calculatedSha1
|
|
}
|
|
}
|
|
} else {
|
|
request.ContentType = newInfo.ContentType
|
|
request.Info = newInfo.Info
|
|
}
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_start_large_file",
|
|
Options: optionsToSend,
|
|
}
|
|
var response api.StartLargeFileResponse
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
|
|
return f.shouldRetry(ctx, resp, err)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
up = &largeUpload{
|
|
f: f,
|
|
o: o,
|
|
doCopy: doCopy,
|
|
what: "upload",
|
|
id: response.ID,
|
|
size: size,
|
|
parts: parts,
|
|
sha1s: make([]string, 0, 16),
|
|
chunkSize: int64(chunkSize),
|
|
}
|
|
// unwrap the accounting from the input, we use wrap to put it
|
|
// back on after the buffering
|
|
if doCopy {
|
|
up.what = "copy"
|
|
up.src = src.(*Object)
|
|
} else {
|
|
up.in, up.wrap = accounting.UnWrap(in)
|
|
}
|
|
return up, nil
|
|
}
|
|
|
|
// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken
|
|
//
|
|
// This should be returned with returnUploadURL when finished
|
|
func (up *largeUpload) getUploadURL(ctx context.Context) (upload *api.GetUploadPartURLResponse, err error) {
|
|
up.uploadMu.Lock()
|
|
if len(up.uploads) > 0 {
|
|
upload, up.uploads = up.uploads[0], up.uploads[1:]
|
|
up.uploadMu.Unlock()
|
|
return upload, nil
|
|
}
|
|
up.uploadMu.Unlock()
|
|
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_get_upload_part_url",
|
|
}
|
|
var request = api.GetUploadPartURLRequest{
|
|
ID: up.id,
|
|
}
|
|
err = up.f.pacer.Call(func() (bool, error) {
|
|
resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &upload)
|
|
return up.f.shouldRetry(ctx, resp, err)
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get upload URL: %w", err)
|
|
}
|
|
return upload, nil
|
|
}
|
|
|
|
// returnUploadURL returns the UploadURL to the cache
|
|
func (up *largeUpload) returnUploadURL(upload *api.GetUploadPartURLResponse) {
|
|
if upload == nil {
|
|
return
|
|
}
|
|
up.uploadMu.Lock()
|
|
up.uploads = append(up.uploads, upload)
|
|
up.uploadMu.Unlock()
|
|
}
|
|
|
|
// Add an sha1 to the being built up sha1s
|
|
func (up *largeUpload) addSha1(chunkNumber int, sha1 string) {
|
|
up.sha1smu.Lock()
|
|
defer up.sha1smu.Unlock()
|
|
if len(up.sha1s) < chunkNumber+1 {
|
|
up.sha1s = append(up.sha1s, make([]string, chunkNumber+1-len(up.sha1s))...)
|
|
}
|
|
up.sha1s[chunkNumber] = sha1
|
|
}
|
|
|
|
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
|
|
func (up *largeUpload) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (size int64, err error) {
|
|
// Only account after the checksum reads have been done
|
|
if do, ok := reader.(pool.DelayAccountinger); ok {
|
|
// To figure out this number, do a transfer and if the accounted size is 0 or a
|
|
// multiple of what it should be, increase or decrease this number.
|
|
do.DelayAccounting(1)
|
|
}
|
|
|
|
err = up.f.pacer.Call(func() (bool, error) {
|
|
// Discover the size by seeking to the end
|
|
size, err = reader.Seek(0, io.SeekEnd)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// rewind the reader on retry and after reading size
|
|
_, err = reader.Seek(0, io.SeekStart)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
fs.Debugf(up.o, "Sending chunk %d length %d", chunkNumber, size)
|
|
|
|
// Get upload URL
|
|
upload, err := up.getUploadURL(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
in := newHashAppendingReader(reader, sha1.New())
|
|
sizeWithHash := size + int64(in.AdditionalLength())
|
|
|
|
// Authorization
|
|
//
|
|
// An upload authorization token, from b2_get_upload_part_url.
|
|
//
|
|
// X-Bz-Part-Number
|
|
//
|
|
// A number from 1 to 10000. The parts uploaded for one file
|
|
// must have contiguous numbers, starting with 1.
|
|
//
|
|
// Content-Length
|
|
//
|
|
// The number of bytes in the file being uploaded. Note that
|
|
// this header is required; you cannot leave it out and just
|
|
// use chunked encoding. The minimum size of every part but
|
|
// the last one is 100 MB (100,000,000 bytes)
|
|
//
|
|
// X-Bz-Content-Sha1
|
|
//
|
|
// The SHA1 checksum of the this part of the file. B2 will
|
|
// check this when the part is uploaded, to make sure that the
|
|
// data arrived correctly. The same SHA1 checksum must be
|
|
// passed to b2_finish_large_file.
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
RootURL: upload.UploadURL,
|
|
Body: up.wrap(in),
|
|
ExtraHeaders: map[string]string{
|
|
"Authorization": upload.AuthorizationToken,
|
|
"X-Bz-Part-Number": fmt.Sprintf("%d", chunkNumber+1),
|
|
sha1Header: "hex_digits_at_end",
|
|
},
|
|
ContentLength: &sizeWithHash,
|
|
}
|
|
|
|
var response api.UploadPartResponse
|
|
|
|
resp, err := up.f.srv.CallJSON(ctx, &opts, nil, &response)
|
|
retry, err := up.f.shouldRetry(ctx, resp, err)
|
|
if err != nil {
|
|
fs.Debugf(up.o, "Error sending chunk %d (retry=%v): %v: %#v", chunkNumber, retry, err, err)
|
|
}
|
|
// On retryable error clear PartUploadURL
|
|
if retry {
|
|
fs.Debugf(up.o, "Clearing part upload URL because of error: %v", err)
|
|
upload = nil
|
|
}
|
|
up.returnUploadURL(upload)
|
|
up.addSha1(chunkNumber, in.HexSum())
|
|
return retry, err
|
|
})
|
|
if err != nil {
|
|
fs.Debugf(up.o, "Error sending chunk %d: %v", chunkNumber, err)
|
|
} else {
|
|
fs.Debugf(up.o, "Done sending chunk %d", chunkNumber)
|
|
}
|
|
return size, err
|
|
}
|
|
|
|
// Copy a chunk
|
|
func (up *largeUpload) copyChunk(ctx context.Context, part int, partSize int64) error {
|
|
err := up.f.pacer.Call(func() (bool, error) {
|
|
fs.Debugf(up.o, "Copying chunk %d length %d", part, partSize)
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_copy_part",
|
|
}
|
|
offset := int64(part) * up.chunkSize // where we are in the source file
|
|
var request = api.CopyPartRequest{
|
|
SourceID: up.src.id,
|
|
LargeFileID: up.id,
|
|
PartNumber: int64(part + 1),
|
|
Range: fmt.Sprintf("bytes=%d-%d", offset, offset+partSize-1),
|
|
}
|
|
var response api.UploadPartResponse
|
|
resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response)
|
|
retry, err := up.f.shouldRetry(ctx, resp, err)
|
|
if err != nil {
|
|
fs.Debugf(up.o, "Error copying chunk %d (retry=%v): %v: %#v", part, retry, err, err)
|
|
}
|
|
up.addSha1(part, response.SHA1)
|
|
return retry, err
|
|
})
|
|
if err != nil {
|
|
fs.Debugf(up.o, "Error copying chunk %d: %v", part, err)
|
|
} else {
|
|
fs.Debugf(up.o, "Done copying chunk %d", part)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Close closes off the large upload
|
|
func (up *largeUpload) Close(ctx context.Context) error {
|
|
fs.Debugf(up.o, "Finishing large file %s with %d parts", up.what, up.parts)
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_finish_large_file",
|
|
}
|
|
var request = api.FinishLargeFileRequest{
|
|
ID: up.id,
|
|
SHA1s: up.sha1s,
|
|
}
|
|
var response api.FileInfo
|
|
err := up.f.pacer.Call(func() (bool, error) {
|
|
resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response)
|
|
return up.f.shouldRetry(ctx, resp, err)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
up.info = &response
|
|
return nil
|
|
}
|
|
|
|
// Abort aborts the large upload
|
|
func (up *largeUpload) Abort(ctx context.Context) error {
|
|
fs.Debugf(up.o, "Cancelling large file %s", up.what)
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
Path: "/b2_cancel_large_file",
|
|
}
|
|
var request = api.CancelLargeFileRequest{
|
|
ID: up.id,
|
|
}
|
|
var response api.CancelLargeFileResponse
|
|
err := up.f.pacer.Call(func() (bool, error) {
|
|
resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response)
|
|
return up.f.shouldRetry(ctx, resp, err)
|
|
})
|
|
if err != nil {
|
|
fs.Errorf(up.o, "Failed to cancel large file %s: %v", up.what, err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Stream uploads the chunks from the input, starting with a required initial
|
|
// chunk. Assumes the file size is unknown and will upload until the input
|
|
// reaches EOF.
|
|
//
|
|
// Note that initialUploadBlock must be returned to f.putBuf()
|
|
func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW) (err error) {
|
|
defer atexit.OnError(&err, func() { _ = up.Abort(ctx) })()
|
|
fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id)
|
|
var (
|
|
g, gCtx = errgroup.WithContext(ctx)
|
|
hasMoreParts = true
|
|
)
|
|
up.size = initialUploadBlock.Size()
|
|
up.parts = 0
|
|
for part := 0; hasMoreParts; part++ {
|
|
// Get a block of memory from the pool and token which limits concurrency.
|
|
var rw *pool.RW
|
|
if part == 0 {
|
|
rw = initialUploadBlock
|
|
} else {
|
|
rw = up.f.getRW(false)
|
|
}
|
|
|
|
// Fail fast, in case an errgroup managed function returns an error
|
|
// gCtx is cancelled. There is no point in uploading all the other parts.
|
|
if gCtx.Err() != nil {
|
|
up.f.putRW(rw)
|
|
break
|
|
}
|
|
|
|
// Read the chunk
|
|
var n int64
|
|
if part == 0 {
|
|
n = rw.Size()
|
|
} else {
|
|
n, err = io.CopyN(rw, up.in, up.chunkSize)
|
|
if err == io.EOF {
|
|
if n == 0 {
|
|
fs.Debugf(up.o, "Not sending empty chunk after EOF - ending.")
|
|
up.f.putRW(rw)
|
|
break
|
|
} else {
|
|
fs.Debugf(up.o, "Read less than a full chunk %d, making this the last one.", n)
|
|
}
|
|
hasMoreParts = false
|
|
} else if err != nil {
|
|
// other kinds of errors indicate failure
|
|
up.f.putRW(rw)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Keep stats up to date
|
|
up.parts += 1
|
|
up.size += n
|
|
if part > maxParts {
|
|
up.f.putRW(rw)
|
|
return fmt.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts)
|
|
}
|
|
|
|
part := part // for the closure
|
|
g.Go(func() (err error) {
|
|
defer up.f.putRW(rw)
|
|
_, err = up.WriteChunk(gCtx, part, rw)
|
|
return err
|
|
})
|
|
}
|
|
err = g.Wait()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return up.Close(ctx)
|
|
}
|
|
|
|
// Copy the chunks from the source to the destination
|
|
func (up *largeUpload) Copy(ctx context.Context) (err error) {
|
|
defer atexit.OnError(&err, func() { _ = up.Abort(ctx) })()
|
|
fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id)
|
|
var (
|
|
g, gCtx = errgroup.WithContext(ctx)
|
|
remaining = up.size
|
|
)
|
|
g.SetLimit(up.f.opt.UploadConcurrency)
|
|
for part := 0; part < up.parts; part++ {
|
|
// Fail fast, in case an errgroup managed function returns an error
|
|
// gCtx is cancelled. There is no point in copying all the other parts.
|
|
if gCtx.Err() != nil {
|
|
break
|
|
}
|
|
|
|
reqSize := remaining
|
|
if reqSize >= up.chunkSize {
|
|
reqSize = up.chunkSize
|
|
}
|
|
|
|
part := part // for the closure
|
|
g.Go(func() (err error) {
|
|
return up.copyChunk(gCtx, part, reqSize)
|
|
})
|
|
remaining -= reqSize
|
|
}
|
|
err = g.Wait()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return up.Close(ctx)
|
|
}
|