2016-06-15 19:49:11 +02:00
// Upload large files for b2
//
// Docs - https://www.backblaze.com/b2/docs/large_files.html
package b2
import (
"bytes"
2019-06-17 10:34:30 +02:00
"context"
2016-06-15 19:49:11 +02:00
"crypto/sha1"
2017-08-12 12:57:34 +02:00
"encoding/hex"
2016-06-15 19:49:11 +02:00
"fmt"
2018-01-12 17:30:54 +01:00
gohash "hash"
2016-06-15 19:49:11 +02:00
"io"
2017-08-12 12:57:34 +02:00
"strings"
2016-06-15 19:49:11 +02:00
"sync"
"github.com/pkg/errors"
2019-07-28 19:47:38 +02:00
"github.com/rclone/rclone/backend/b2/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/rest"
2016-06-15 19:49:11 +02:00
)
2017-08-12 12:57:34 +02:00
type hashAppendingReader struct {
2018-01-12 17:30:54 +01:00
h gohash . Hash
2017-08-12 12:57:34 +02:00
in io . Reader
hexSum string
hexReader io . Reader
}
// Read returns bytes all bytes from the original reader, then the hex sum
// of what was read so far, then EOF.
func ( har * hashAppendingReader ) Read ( b [ ] byte ) ( int , error ) {
if har . hexReader == nil {
n , err := har . in . Read ( b )
if err == io . EOF {
har . in = nil // allow GC
err = nil // allow reading hexSum before EOF
har . hexSum = hex . EncodeToString ( har . h . Sum ( nil ) )
har . hexReader = strings . NewReader ( har . hexSum )
}
return n , err
}
return har . hexReader . Read ( b )
}
// AdditionalLength returns how many bytes the appended hex sum will take up.
func ( har * hashAppendingReader ) AdditionalLength ( ) int {
return hex . EncodedLen ( har . h . Size ( ) )
}
// HexSum returns the hash sum as hex. It's only available after the original
// reader has EOF'd. It's an empty string before that.
func ( har * hashAppendingReader ) HexSum ( ) string {
return har . hexSum
}
// newHashAppendingReader takes a Reader and a Hash and will append the hex sum
// after the original reader reaches EOF. The increased size depends on the
// given hash, which may be queried through AdditionalLength()
2018-01-12 17:30:54 +01:00
func newHashAppendingReader ( in io . Reader , h gohash . Hash ) * hashAppendingReader {
2017-08-12 12:57:34 +02:00
withHash := io . TeeReader ( in , h )
return & hashAppendingReader { h : h , in : withHash }
}
2016-06-15 19:49:11 +02:00
// largeUpload is used to control the upload of large files which need chunking
type largeUpload struct {
f * Fs // parent Fs
o * Object // object being uploaded
in io . Reader // read the data from here
2018-02-01 16:41:58 +01:00
wrap accounting . WrapFn // account parts being transferred
2016-06-15 19:49:11 +02:00
id string // ID of the file being uploaded
size int64 // total size
2017-09-16 22:43:48 +02:00
parts int64 // calculated number of parts, if known
2016-06-15 19:49:11 +02:00
sha1s [ ] string // slice of SHA1s for each part
uploadMu sync . Mutex // lock for upload variable
uploads [ ] * api . GetUploadPartURLResponse // result of get upload URL calls
}
// newLargeUpload starts an upload of object o from in with metadata in src
2019-06-17 10:34:30 +02:00
func ( f * Fs ) newLargeUpload ( ctx context . Context , o * Object , in io . Reader , src fs . ObjectInfo ) ( up * largeUpload , err error ) {
2016-07-13 16:28:39 +02:00
remote := o . remote
2016-06-15 19:49:11 +02:00
size := src . Size ( )
2017-09-16 22:43:48 +02:00
parts := int64 ( 0 )
sha1SliceSize := int64 ( maxParts )
if size == - 1 {
2018-05-14 19:06:57 +02:00
fs . Debugf ( o , "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached." , f . opt . ChunkSize , maxParts * f . opt . ChunkSize )
2017-09-16 22:43:48 +02:00
} else {
2018-05-14 19:06:57 +02:00
parts = size / int64 ( o . fs . opt . ChunkSize )
if size % int64 ( o . fs . opt . ChunkSize ) != 0 {
2017-09-16 22:43:48 +02:00
parts ++
}
if parts > maxParts {
return nil , errors . Errorf ( "%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size" , remote , size , parts , maxParts )
}
sha1SliceSize = parts
2016-06-15 19:49:11 +02:00
}
2017-09-16 22:43:48 +02:00
2019-06-17 10:34:30 +02:00
modTime := src . ModTime ( ctx )
2016-06-15 19:49:11 +02:00
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_start_large_file" ,
}
2019-08-09 16:19:02 +02:00
bucket , bucketPath := o . split ( )
2019-09-04 21:00:37 +02:00
bucketID , err := f . getBucketID ( ctx , bucket )
2016-06-15 19:49:11 +02:00
if err != nil {
return nil , err
}
var request = api . StartLargeFileRequest {
BucketID : bucketID ,
2020-01-14 18:33:35 +01:00
Name : f . opt . Enc . FromStandardPath ( bucketPath ) ,
2019-06-17 10:34:30 +02:00
ContentType : fs . MimeType ( ctx , src ) ,
2016-06-15 19:49:11 +02:00
Info : map [ string ] string {
timeKey : timeString ( modTime ) ,
} ,
}
// Set the SHA1 if known
2019-01-20 16:33:42 +01:00
if ! o . fs . opt . DisableCheckSum {
2019-06-17 10:34:30 +02:00
if calculatedSha1 , err := src . Hash ( ctx , hash . SHA1 ) ; err == nil && calculatedSha1 != "" {
2019-01-20 16:33:42 +01:00
request . Info [ sha1Key ] = calculatedSha1
}
2016-06-15 19:49:11 +02:00
}
var response api . StartLargeFileResponse
err = f . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 21:00:37 +02:00
resp , err := f . srv . CallJSON ( ctx , & opts , & request , & response )
return f . shouldRetry ( ctx , resp , err )
2016-06-15 19:49:11 +02:00
} )
if err != nil {
return nil , err
}
2018-02-01 16:41:58 +01:00
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in , wrap := accounting . UnWrap ( in )
2016-06-15 19:49:11 +02:00
up = & largeUpload {
f : f ,
o : o ,
in : in ,
2018-02-01 16:41:58 +01:00
wrap : wrap ,
2016-06-15 19:49:11 +02:00
id : response . ID ,
size : size ,
parts : parts ,
2017-09-16 22:43:48 +02:00
sha1s : make ( [ ] string , sha1SliceSize ) ,
2016-06-15 19:49:11 +02:00
}
return up , nil
}
// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken
//
// This should be returned with returnUploadURL when finished
2019-09-04 21:00:37 +02:00
func ( up * largeUpload ) getUploadURL ( ctx context . Context ) ( upload * api . GetUploadPartURLResponse , err error ) {
2016-06-15 19:49:11 +02:00
up . uploadMu . Lock ( )
defer up . uploadMu . Unlock ( )
if len ( up . uploads ) == 0 {
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_get_upload_part_url" ,
}
var request = api . GetUploadPartURLRequest {
ID : up . id ,
}
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 21:00:37 +02:00
resp , err := up . f . srv . CallJSON ( ctx , & opts , & request , & upload )
return up . f . shouldRetry ( ctx , resp , err )
2016-06-15 19:49:11 +02:00
} )
if err != nil {
return nil , errors . Wrap ( err , "failed to get upload URL" )
}
} else {
upload , up . uploads = up . uploads [ 0 ] , up . uploads [ 1 : ]
}
return upload , nil
}
// returnUploadURL returns the UploadURL to the cache
func ( up * largeUpload ) returnUploadURL ( upload * api . GetUploadPartURLResponse ) {
if upload == nil {
return
}
up . uploadMu . Lock ( )
up . uploads = append ( up . uploads , upload )
up . uploadMu . Unlock ( )
}
// Transfer a chunk
2019-09-04 21:00:37 +02:00
func ( up * largeUpload ) transferChunk ( ctx context . Context , part int64 , body [ ] byte ) error {
2016-06-15 19:49:11 +02:00
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
2017-02-09 12:01:20 +01:00
fs . Debugf ( up . o , "Sending chunk %d length %d" , part , len ( body ) )
2016-06-15 19:49:11 +02:00
// Get upload URL
2019-09-04 21:00:37 +02:00
upload , err := up . getUploadURL ( ctx )
2016-06-15 19:49:11 +02:00
if err != nil {
return false , err
}
2017-12-13 11:11:20 +01:00
in := newHashAppendingReader ( bytes . NewReader ( body ) , sha1 . New ( ) )
size := int64 ( len ( body ) ) + int64 ( in . AdditionalLength ( ) )
2016-06-15 19:49:11 +02:00
// Authorization
//
// An upload authorization token, from b2_get_upload_part_url.
//
// X-Bz-Part-Number
//
// A number from 1 to 10000. The parts uploaded for one file
// must have contiguous numbers, starting with 1.
//
// Content-Length
//
// The number of bytes in the file being uploaded. Note that
// this header is required; you cannot leave it out and just
// use chunked encoding. The minimum size of every part but
// the last one is 100MB.
//
// X-Bz-Content-Sha1
//
// The SHA1 checksum of the this part of the file. B2 will
// check this when the part is uploaded, to make sure that the
// data arrived correctly. The same SHA1 checksum must be
// passed to b2_finish_large_file.
opts := rest . Opts {
2017-07-07 09:18:13 +02:00
Method : "POST" ,
RootURL : upload . UploadURL ,
2018-02-01 16:41:58 +01:00
Body : up . wrap ( in ) ,
2016-06-15 19:49:11 +02:00
ExtraHeaders : map [ string ] string {
"Authorization" : upload . AuthorizationToken ,
"X-Bz-Part-Number" : fmt . Sprintf ( "%d" , part ) ,
2017-08-12 12:57:34 +02:00
sha1Header : "hex_digits_at_end" ,
2016-06-15 19:49:11 +02:00
} ,
ContentLength : & size ,
}
var response api . UploadPartResponse
2019-09-04 21:00:37 +02:00
resp , err := up . f . srv . CallJSON ( ctx , & opts , nil , & response )
retry , err := up . f . shouldRetry ( ctx , resp , err )
2018-02-04 12:25:44 +01:00
if err != nil {
fs . Debugf ( up . o , "Error sending chunk %d (retry=%v): %v: %#v" , part , retry , err , err )
}
2016-07-01 17:23:23 +02:00
// On retryable error clear PartUploadURL
if retry {
2017-02-09 12:01:20 +01:00
fs . Debugf ( up . o , "Clearing part upload URL because of error: %v" , err )
2016-07-01 17:23:23 +02:00
upload = nil
2016-06-15 19:49:11 +02:00
}
up . returnUploadURL ( upload )
2017-12-13 11:11:20 +01:00
up . sha1s [ part - 1 ] = in . HexSum ( )
2016-07-01 17:23:23 +02:00
return retry , err
2016-06-15 19:49:11 +02:00
} )
2016-07-01 11:04:52 +02:00
if err != nil {
2017-02-09 12:01:20 +01:00
fs . Debugf ( up . o , "Error sending chunk %d: %v" , part , err )
2016-07-01 11:04:52 +02:00
} else {
2017-02-09 12:01:20 +01:00
fs . Debugf ( up . o , "Done sending chunk %d" , part )
2016-07-01 11:04:52 +02:00
}
2016-06-15 19:49:11 +02:00
return err
}
// finish closes off the large upload
2019-09-04 21:00:37 +02:00
func ( up * largeUpload ) finish ( ctx context . Context ) error {
2017-09-16 22:43:48 +02:00
fs . Debugf ( up . o , "Finishing large file upload with %d parts" , up . parts )
2016-06-15 19:49:11 +02:00
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_finish_large_file" ,
}
var request = api . FinishLargeFileRequest {
ID : up . id ,
SHA1s : up . sha1s ,
}
var response api . FileInfo
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 21:00:37 +02:00
resp , err := up . f . srv . CallJSON ( ctx , & opts , & request , & response )
return up . f . shouldRetry ( ctx , resp , err )
2016-06-15 19:49:11 +02:00
} )
if err != nil {
return err
}
return up . o . decodeMetaDataFileInfo ( & response )
}
2016-07-01 11:04:52 +02:00
// cancel aborts the large upload
2019-09-04 21:00:37 +02:00
func ( up * largeUpload ) cancel ( ctx context . Context ) error {
2016-07-01 11:04:52 +02:00
opts := rest . Opts {
Method : "POST" ,
Path : "/b2_cancel_large_file" ,
}
var request = api . CancelLargeFileRequest {
ID : up . id ,
}
var response api . CancelLargeFileResponse
err := up . f . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 21:00:37 +02:00
resp , err := up . f . srv . CallJSON ( ctx , & opts , & request , & response )
return up . f . shouldRetry ( ctx , resp , err )
2016-07-01 11:04:52 +02:00
} )
return err
}
2019-09-04 21:00:37 +02:00
func ( up * largeUpload ) managedTransferChunk ( ctx context . Context , wg * sync . WaitGroup , errs chan error , part int64 , buf [ ] byte ) {
2017-09-16 22:43:48 +02:00
wg . Add ( 1 )
go func ( part int64 , buf [ ] byte ) {
defer wg . Done ( )
defer up . f . putUploadBlock ( buf )
2019-09-04 21:00:37 +02:00
err := up . transferChunk ( ctx , part , buf )
2017-09-16 22:43:48 +02:00
if err != nil {
select {
case errs <- err :
default :
}
}
} ( part , buf )
}
2019-09-04 21:00:37 +02:00
func ( up * largeUpload ) finishOrCancelOnError ( ctx context . Context , err error , errs chan error ) error {
2017-09-16 22:43:48 +02:00
if err == nil {
select {
case err = <- errs :
default :
}
}
if err != nil {
fs . Debugf ( up . o , "Cancelling large file upload due to error: %v" , err )
2019-09-04 21:00:37 +02:00
cancelErr := up . cancel ( ctx )
2017-09-16 22:43:48 +02:00
if cancelErr != nil {
fs . Errorf ( up . o , "Failed to cancel large file upload: %v" , cancelErr )
}
return err
}
2019-09-04 21:00:37 +02:00
return up . finish ( ctx )
2017-09-16 22:43:48 +02:00
}
// Stream uploads the chunks from the input, starting with a required initial
// chunk. Assumes the file size is unknown and will upload until the input
// reaches EOF.
2019-09-04 21:00:37 +02:00
func ( up * largeUpload ) Stream ( ctx context . Context , initialUploadBlock [ ] byte ) ( err error ) {
2017-09-16 22:43:48 +02:00
fs . Debugf ( up . o , "Starting streaming of large file (id %q)" , up . id )
errs := make ( chan error , 1 )
hasMoreParts := true
var wg sync . WaitGroup
// Transfer initial chunk
up . size = int64 ( len ( initialUploadBlock ) )
2019-09-04 21:00:37 +02:00
up . managedTransferChunk ( ctx , & wg , errs , 1 , initialUploadBlock )
2017-09-16 22:43:48 +02:00
outer :
for part := int64 ( 2 ) ; hasMoreParts ; part ++ {
// Check any errors
select {
case err = <- errs :
break outer
default :
}
// Get a block of memory
buf := up . f . getUploadBlock ( )
// Read the chunk
2018-05-04 16:19:50 +02:00
var n int
n , err = io . ReadFull ( up . in , buf )
2017-09-16 22:43:48 +02:00
if err == io . ErrUnexpectedEOF {
fs . Debugf ( up . o , "Read less than a full chunk, making this the last one." )
buf = buf [ : n ]
hasMoreParts = false
err = nil
} else if err == io . EOF {
fs . Debugf ( up . o , "Could not read any more bytes, previous chunk was the last." )
up . f . putUploadBlock ( buf )
err = nil
break outer
} else if err != nil {
// other kinds of errors indicate failure
up . f . putUploadBlock ( buf )
break outer
}
// Keep stats up to date
up . parts = part
up . size += int64 ( n )
if part > maxParts {
err = errors . Errorf ( "%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size" , up . o , up . size , up . parts , maxParts )
break outer
}
// Transfer the chunk
2019-09-04 21:00:37 +02:00
up . managedTransferChunk ( ctx , & wg , errs , part , buf )
2017-09-16 22:43:48 +02:00
}
wg . Wait ( )
up . sha1s = up . sha1s [ : up . parts ]
2019-09-04 21:00:37 +02:00
return up . finishOrCancelOnError ( ctx , err , errs )
2017-09-16 22:43:48 +02:00
}
2016-06-15 19:49:11 +02:00
// Upload uploads the chunks from the input
2019-09-04 21:00:37 +02:00
func ( up * largeUpload ) Upload ( ctx context . Context ) error {
2017-02-09 12:01:20 +01:00
fs . Debugf ( up . o , "Starting upload of large file in %d chunks (id %q)" , up . parts , up . id )
2016-06-15 19:49:11 +02:00
remaining := up . size
2016-07-01 11:04:52 +02:00
errs := make ( chan error , 1 )
var wg sync . WaitGroup
var err error
outer :
2016-06-15 19:49:11 +02:00
for part := int64 ( 1 ) ; part <= up . parts ; part ++ {
2016-07-01 11:04:52 +02:00
// Check any errors
select {
case err = <- errs :
break outer
default :
}
2016-06-15 19:49:11 +02:00
reqSize := remaining
2018-05-14 19:06:57 +02:00
if reqSize >= int64 ( up . f . opt . ChunkSize ) {
reqSize = int64 ( up . f . opt . ChunkSize )
2016-06-15 19:49:11 +02:00
}
2017-01-29 23:21:39 +01:00
// Get a block of memory
buf := up . f . getUploadBlock ( ) [ : reqSize ]
2016-06-15 19:49:11 +02:00
// Read the chunk
2016-07-01 11:04:52 +02:00
_ , err = io . ReadFull ( up . in , buf )
2016-06-15 19:49:11 +02:00
if err != nil {
2017-01-29 23:21:39 +01:00
up . f . putUploadBlock ( buf )
2016-07-01 11:04:52 +02:00
break outer
2016-06-15 19:49:11 +02:00
}
// Transfer the chunk
2019-09-04 21:00:37 +02:00
up . managedTransferChunk ( ctx , & wg , errs , part , buf )
2016-06-15 19:49:11 +02:00
remaining -= reqSize
}
2016-07-01 11:04:52 +02:00
wg . Wait ( )
2017-09-16 22:43:48 +02:00
2019-09-04 21:00:37 +02:00
return up . finishOrCancelOnError ( ctx , err , errs )
2016-06-15 19:49:11 +02:00
}