2017-07-05 23:20:40 +02:00
// multpart upload for box
package box
import (
"bytes"
2019-09-04 21:00:37 +02:00
"context"
2017-07-05 23:20:40 +02:00
"crypto/sha1"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
"github.com/pkg/errors"
2019-07-28 19:47:38 +02:00
"github.com/rclone/rclone/backend/box/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/lib/rest"
2017-07-05 23:20:40 +02:00
)
// createUploadSession creates an upload session for the object
2019-09-04 21:00:37 +02:00
func ( o * Object ) createUploadSession ( ctx context . Context , leaf , directoryID string , size int64 ) ( response * api . UploadSessionResponse , err error ) {
2017-07-05 23:20:40 +02:00
opts := rest . Opts {
Method : "POST" ,
Path : "/files/upload_sessions" ,
RootURL : uploadURL ,
}
request := api . UploadSessionRequest {
FileSize : size ,
}
// If object has an ID then it is existing so create a new version
if o . id != "" {
opts . Path = "/files/" + o . id + "/upload_sessions"
} else {
opts . Path = "/files/upload_sessions"
request . FolderID = directoryID
2020-01-14 18:33:35 +01:00
request . FileName = o . fs . opt . Enc . FromStandardName ( leaf )
2017-07-05 23:20:40 +02:00
}
var resp * http . Response
err = o . fs . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 21:00:37 +02:00
resp , err = o . fs . srv . CallJSON ( ctx , & opts , & request , & response )
2017-07-05 23:20:40 +02:00
return shouldRetry ( resp , err )
} )
return
}
// sha1Digest produces a digest using sha1 as per RFC3230
func sha1Digest ( digest [ ] byte ) string {
return "sha=" + base64 . StdEncoding . EncodeToString ( digest )
}
// uploadPart uploads a part in an upload session
2020-03-21 22:49:12 +01:00
func ( o * Object ) uploadPart ( ctx context . Context , SessionID string , offset , totalSize int64 , chunk [ ] byte , wrap accounting . WrapFn , options ... fs . OpenOption ) ( response * api . UploadPartResponse , err error ) {
2017-07-05 23:20:40 +02:00
chunkSize := int64 ( len ( chunk ) )
sha1sum := sha1 . Sum ( chunk )
opts := rest . Opts {
Method : "PUT" ,
Path : "/files/upload_sessions/" + SessionID ,
RootURL : uploadURL ,
ContentType : "application/octet-stream" ,
ContentLength : & chunkSize ,
ContentRange : fmt . Sprintf ( "bytes %d-%d/%d" , offset , offset + chunkSize - 1 , totalSize ) ,
2020-03-21 22:49:12 +01:00
Options : options ,
2017-07-05 23:20:40 +02:00
ExtraHeaders : map [ string ] string {
"Digest" : sha1Digest ( sha1sum [ : ] ) ,
} ,
}
var resp * http . Response
err = o . fs . pacer . Call ( func ( ) ( bool , error ) {
2018-02-01 17:36:12 +01:00
opts . Body = wrap ( bytes . NewReader ( chunk ) )
2019-09-04 21:00:37 +02:00
resp , err = o . fs . srv . CallJSON ( ctx , & opts , nil , & response )
2017-07-05 23:20:40 +02:00
return shouldRetry ( resp , err )
} )
if err != nil {
return nil , err
}
return response , nil
}
// commitUpload finishes an upload session
2019-09-04 21:00:37 +02:00
func ( o * Object ) commitUpload ( ctx context . Context , SessionID string , parts [ ] api . Part , modTime time . Time , sha1sum [ ] byte ) ( result * api . FolderItems , err error ) {
2017-07-05 23:20:40 +02:00
opts := rest . Opts {
Method : "POST" ,
Path : "/files/upload_sessions/" + SessionID + "/commit" ,
RootURL : uploadURL ,
ExtraHeaders : map [ string ] string {
"Digest" : sha1Digest ( sha1sum ) ,
} ,
}
request := api . CommitUpload {
Parts : parts ,
}
request . Attributes . ContentModifiedAt = api . Time ( modTime )
request . Attributes . ContentCreatedAt = api . Time ( modTime )
var body [ ] byte
var resp * http . Response
2018-07-09 18:12:18 +02:00
// For discussion of this value see:
2019-07-28 19:47:38 +02:00
// https://github.com/rclone/rclone/issues/2054
2018-07-09 18:12:18 +02:00
maxTries := o . fs . opt . CommitRetries
2017-08-05 22:00:16 +02:00
const defaultDelay = 10
2017-07-05 23:20:40 +02:00
var tries int
outer :
for tries = 0 ; tries < maxTries ; tries ++ {
err = o . fs . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 21:00:37 +02:00
resp , err = o . fs . srv . CallJSON ( ctx , & opts , & request , nil )
2017-07-05 23:20:40 +02:00
if err != nil {
return shouldRetry ( resp , err )
}
body , err = rest . ReadBody ( resp )
return shouldRetry ( resp , err )
} )
2017-08-05 22:00:16 +02:00
delay := defaultDelay
2019-06-10 20:33:10 +02:00
var why string
2017-07-05 23:20:40 +02:00
if err != nil {
2017-08-05 22:00:16 +02:00
// Sometimes we get 400 Error with
// parts_mismatch immediately after uploading
// the last part. Ignore this error and wait.
if boxErr , ok := err . ( * api . Error ) ; ok && boxErr . Code == "parts_mismatch" {
why = err . Error ( )
} else {
return nil , err
}
} else {
switch resp . StatusCode {
case http . StatusOK , http . StatusCreated :
break outer
case http . StatusAccepted :
why = "not ready yet"
delayString := resp . Header . Get ( "Retry-After" )
if delayString != "" {
delay , err = strconv . Atoi ( delayString )
if err != nil {
fs . Debugf ( o , "Couldn't decode Retry-After header %q: %v" , delayString , err )
delay = defaultDelay
}
2017-07-05 23:20:40 +02:00
}
2017-08-05 22:00:16 +02:00
default :
return nil , errors . Errorf ( "unknown HTTP status return %q (%d)" , resp . Status , resp . StatusCode )
2017-07-05 23:20:40 +02:00
}
}
2017-08-05 22:00:16 +02:00
fs . Debugf ( o , "commit multipart upload failed %d/%d - trying again in %d seconds (%s)" , tries + 1 , maxTries , delay , why )
2017-07-05 23:20:40 +02:00
time . Sleep ( time . Duration ( delay ) * time . Second )
}
if tries >= maxTries {
2017-08-05 22:00:16 +02:00
return nil , errors . New ( "too many tries to commit multipart upload - increase --low-level-retries" )
2017-07-05 23:20:40 +02:00
}
err = json . Unmarshal ( body , & result )
if err != nil {
return nil , errors . Wrapf ( err , "couldn't decode commit response: %q" , body )
}
return result , nil
}
// abortUpload cancels an upload session
2019-09-04 21:00:37 +02:00
func ( o * Object ) abortUpload ( ctx context . Context , SessionID string ) ( err error ) {
2017-07-05 23:20:40 +02:00
opts := rest . Opts {
Method : "DELETE" ,
Path : "/files/upload_sessions/" + SessionID ,
RootURL : uploadURL ,
NoResponse : true ,
}
var resp * http . Response
err = o . fs . pacer . Call ( func ( ) ( bool , error ) {
2019-09-04 21:00:37 +02:00
resp , err = o . fs . srv . Call ( ctx , & opts )
2017-07-05 23:20:40 +02:00
return shouldRetry ( resp , err )
} )
return err
}
// uploadMultipart uploads a file using multipart upload
2020-03-21 22:49:12 +01:00
func ( o * Object ) uploadMultipart ( ctx context . Context , in io . Reader , leaf , directoryID string , size int64 , modTime time . Time , options ... fs . OpenOption ) ( err error ) {
2017-07-05 23:20:40 +02:00
// Create upload session
2019-09-04 21:00:37 +02:00
session , err := o . createUploadSession ( ctx , leaf , directoryID , size )
2017-07-05 23:20:40 +02:00
if err != nil {
return errors . Wrap ( err , "multipart upload create session failed" )
}
chunkSize := session . PartSize
fs . Debugf ( o , "Multipart upload session started for %d parts of size %v" , session . TotalParts , fs . SizeSuffix ( chunkSize ) )
// Cancel the session if something went wrong
defer func ( ) {
if err != nil {
fs . Debugf ( o , "Cancelling multipart upload: %v" , err )
2019-09-04 21:00:37 +02:00
cancelErr := o . abortUpload ( ctx , session . ID )
2017-07-05 23:20:40 +02:00
if cancelErr != nil {
fs . Logf ( o , "Failed to cancel multipart upload: %v" , err )
}
}
} ( )
2018-02-01 17:36:12 +01:00
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in , wrap := accounting . UnWrap ( in )
2017-07-05 23:20:40 +02:00
// Upload the chunks
remaining := size
position := int64 ( 0 )
parts := make ( [ ] api . Part , session . TotalParts )
hash := sha1 . New ( )
errs := make ( chan error , 1 )
var wg sync . WaitGroup
outer :
for part := 0 ; part < session . TotalParts ; part ++ {
// Check any errors
select {
case err = <- errs :
break outer
default :
}
reqSize := remaining
2019-01-11 18:17:46 +01:00
if reqSize >= chunkSize {
reqSize = chunkSize
2017-07-05 23:20:40 +02:00
}
// Make a block of memory
buf := make ( [ ] byte , reqSize )
// Read the chunk
_ , err = io . ReadFull ( in , buf )
if err != nil {
err = errors . Wrap ( err , "multipart upload failed to read source" )
break outer
}
// Make the global hash (must be done sequentially)
_ , _ = hash . Write ( buf )
// Transfer the chunk
wg . Add ( 1 )
2017-07-29 23:05:36 +02:00
o . fs . uploadToken . Get ( )
2017-07-05 23:20:40 +02:00
go func ( part int , position int64 ) {
defer wg . Done ( )
2017-07-29 23:05:36 +02:00
defer o . fs . uploadToken . Put ( )
2017-07-05 23:20:40 +02:00
fs . Debugf ( o , "Uploading part %d/%d offset %v/%v part size %v" , part + 1 , session . TotalParts , fs . SizeSuffix ( position ) , fs . SizeSuffix ( size ) , fs . SizeSuffix ( chunkSize ) )
2020-03-21 22:49:12 +01:00
partResponse , err := o . uploadPart ( ctx , session . ID , position , size , buf , wrap , options ... )
2017-07-05 23:20:40 +02:00
if err != nil {
err = errors . Wrap ( err , "multipart upload failed to upload part" )
select {
case errs <- err :
default :
}
return
}
parts [ part ] = partResponse . Part
} ( part , position )
// ready for next block
remaining -= chunkSize
position += chunkSize
}
wg . Wait ( )
if err == nil {
select {
case err = <- errs :
default :
}
}
if err != nil {
return err
}
// Finalise the upload session
2019-09-04 21:00:37 +02:00
result , err := o . commitUpload ( ctx , session . ID , parts , modTime , hash . Sum ( nil ) )
2017-07-05 23:20:40 +02:00
if err != nil {
return errors . Wrap ( err , "multipart upload failed to finalize" )
}
if result . TotalCount != 1 || len ( result . Entries ) != 1 {
return errors . Errorf ( "multipart upload failed %v - not sure why" , o )
}
return o . setMetaData ( & result . Entries [ 0 ] )
}