diff --git a/backend/dropbox/dropbox.go b/backend/dropbox/dropbox.go index 6ce8c2efd..4a996806e 100755 --- a/backend/dropbox/dropbox.go +++ b/backend/dropbox/dropbox.go @@ -22,7 +22,6 @@ of path_display and all will be well. */ import ( - "bytes" "context" "fmt" "io" @@ -1608,104 +1607,70 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read // uploadChunked uploads the object in parts // -// Will work optimally if size is >= uploadChunkSize. If the size is either -// unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an -// avoidable request to the Dropbox API that does not carry payload. +// Will introduce two additional network requests to start and finish the session. +// If the size is unknown (i.e. -1) the method incurs one additional +// request to the Dropbox API that does not carry a payload to close the append session. func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) { - batching := o.fs.batcher.Batching() - chunkSize := int64(o.fs.opt.ChunkSize) - chunks := 0 - if size >= 0 { - chunks = int(size/chunkSize) + 1 - } - in := readers.NewCountingReader(in0) - buf := make([]byte, int(chunkSize)) - - fmtChunk := func(cur int, last bool) { - if chunks == 0 && last { - fs.Debugf(o, "Streaming chunk %d/%d", cur, cur) - } else if chunks == 0 { - fs.Debugf(o, "Streaming chunk %d/unknown", cur) - } else if chunks != 1 { - fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks) - } - } - - appendArg := files.UploadSessionAppendArg{ - Close: chunks == 1, - } - - // write the first chunk - fmtChunk(1, false) + // start upload var res *files.UploadSessionStartResult - chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, chunkSize) err = o.fs.pacer.Call(func() (bool, error) { - // seek to the start in case this is a retry - if _, err = chunk.Seek(0, io.SeekStart); err != nil { - return false, nil - } - arg := files.UploadSessionStartArg{ - Close: appendArg.Close, - } - res, err = o.fs.srv.UploadSessionStart(&arg, chunk) + res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, nil) return shouldRetry(ctx, err) }) if err != nil { return nil, err } + chunkSize := int64(o.fs.opt.ChunkSize) + chunks, remainder := size/chunkSize, size%chunkSize + if remainder > 0 { + chunks++ + } + + // write chunks + in := readers.NewCountingReader(in0) + buf := make([]byte, int(chunkSize)) cursor := files.UploadSessionCursor{ SessionId: res.SessionId, Offset: 0, } - appendArg.Cursor = &cursor - - // write more whole chunks (if any, and if !batching), if - // batching write the last chunk also. - currentChunk := 2 - for { - if chunks > 0 { - // Size known - if currentChunk == chunks { - // Last chunk - if !batching { - // if the size is known, only upload full chunks. Remaining bytes are uploaded with - // the UploadSessionFinish request. - break - } - appendArg.Close = true - } else if currentChunk > chunks { - break - } - } else { - // Size unknown - lastReadWasShort := in.BytesRead()-cursor.Offset < uint64(chunkSize) - if lastReadWasShort { - // if the size is unknown, upload as long as we can read full chunks from the reader. - // The UploadSessionFinish request will not contain any payload. - // This is also what we want if batching - break - } - } + appendArg := files.UploadSessionAppendArg{Cursor: &cursor} + for currentChunk := 1; ; currentChunk++ { cursor.Offset = in.BytesRead() - fmtChunk(currentChunk, false) - chunk = readers.NewRepeatableLimitReaderBuffer(in, buf, chunkSize) + + if chunks < 0 { + fs.Debugf(o, "Streaming chunk %d/unknown", currentChunk) + } else { + fs.Debugf(o, "Uploading chunk %d/%d", currentChunk, chunks) + } + + chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, chunkSize) err = o.fs.pacer.Call(func() (bool, error) { // seek to the start in case this is a retry if _, err = chunk.Seek(0, io.SeekStart); err != nil { return false, nil } err = o.fs.srv.UploadSessionAppendV2(&appendArg, chunk) - // after the first chunk is uploaded, we retry everything + // after session is started, we retry everything return err != nil, err }) if err != nil { return nil, err } - currentChunk++ + if appendArg.Close { + break + } + + if size > 0 { + // if size is known, check if next chunk is final + appendArg.Close = uint64(size)-in.BytesRead() <= uint64(chunkSize) + } else { + // if size is unknown, upload as long as we can read full chunks from the reader + appendArg.Close = in.BytesRead()-cursor.Offset < uint64(chunkSize) + } } - // write the remains + // finish upload cursor.Offset = in.BytesRead() args := &files.UploadSessionFinishArg{ Cursor: &cursor, @@ -1713,32 +1678,12 @@ func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *f } // If we are batching then we should have written all the data now // store the commit info now for a batch commit - if batching { - // If we haven't closed the session then we need to - if !appendArg.Close { - appendArg.Close = true - fs.Debugf(o, "Closing session") - var empty bytes.Buffer - err = o.fs.pacer.Call(func() (bool, error) { - err = o.fs.srv.UploadSessionAppendV2(&appendArg, &empty) - // after the first chunk is uploaded, we retry everything - return err != nil, err - }) - if err != nil { - return nil, err - } - } + if o.fs.batcher.Batching() { return o.fs.batcher.Commit(ctx, args) } - fmtChunk(currentChunk, true) - chunk = readers.NewRepeatableReaderBuffer(in, buf) err = o.fs.pacer.Call(func() (bool, error) { - // seek to the start in case this is a retry - if _, err = chunk.Seek(0, io.SeekStart); err != nil { - return false, nil - } - entry, err = o.fs.srv.UploadSessionFinish(args, chunk) + entry, err = o.fs.srv.UploadSessionFinish(args, nil) // If error is insufficient space then don't retry if e, ok := err.(files.UploadSessionFinishAPIError); ok { if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace {