oracleobjectstorage: implement OpenChunkWriter and multi-thread uploads #7056

This commit is contained in:
Manoj Ghosh 2023-08-21 21:18:27 -07:00 committed by Nick Craig-Wood
parent ab803d1278
commit 25703ad20e
6 changed files with 390 additions and 462 deletions

View File

@ -4,43 +4,390 @@
package oracleobjectstorage
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/ncw/swift/v2"
"github.com/rclone/rclone/lib/multipart"
"golang.org/x/net/http/httpguts"
"github.com/oracle/oci-go-sdk/v65/common"
"github.com/oracle/oci-go-sdk/v65/objectstorage"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
"github.com/rclone/rclone/fs/hash"
)
var warnStreamUpload sync.Once
func (o *Object) uploadMultipart(
// Info needed for an upload
type uploadInfo struct {
req *objectstorage.PutObjectRequest
md5sumHex string
}
type objectChunkWriter struct {
chunkSize int64
size int64
f *Fs
bucket *string
key *string
uploadId *string
partsToCommit []objectstorage.CommitMultipartUploadPartDetails
partsToCommitMu sync.Mutex
existingParts map[int]objectstorage.MultipartUploadPartSummary
eTag string
md5sMu sync.Mutex
md5s []byte
ui uploadInfo
o *Object
}
func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) error {
_, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.fs,
Concurrency: o.fs.opt.UploadConcurrency,
LeavePartsOnError: o.fs.opt.LeavePartsOnError,
OpenOptions: options,
})
return err
}
// OpenChunkWriter returns the chunk size and a ChunkWriter
//
// Pass in the remote and the src object
// You can also use options to hint at the desired chunk size
func (f *Fs) OpenChunkWriter(
ctx context.Context,
putReq *objectstorage.PutObjectRequest,
in io.Reader,
src fs.ObjectInfo) (err error) {
uploadID, uploadedParts, err := o.createMultipartUpload(ctx, putReq)
remote string,
src fs.ObjectInfo,
options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) {
// Temporary Object under construction
o := &Object{
fs: f,
remote: remote,
}
ui, err := o.prepareUpload(ctx, src, options)
if err != nil {
return -1, nil, fmt.Errorf("failed to prepare upload: %w", err)
}
uploadParts := f.opt.MaxUploadParts
if uploadParts < 1 {
uploadParts = 1
} else if uploadParts > maxUploadParts {
uploadParts = maxUploadParts
}
size := src.Size()
// calculate size of parts
chunkSize := f.opt.ChunkSize
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of
// 48 GiB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
f.opt.ChunkSize, fs.SizeSuffix(int64(chunkSize)*int64(uploadParts)))
})
} else {
chunkSize = chunksize.Calculator(src, size, uploadParts, chunkSize)
}
uploadId, existingParts, err := o.createMultipartUpload(ctx, ui.req)
if err != nil {
return -1, nil, fmt.Errorf("create multipart upload request failed: %w", err)
}
bucketName, bucketPath := o.split()
chunkWriter := &objectChunkWriter{
chunkSize: int64(chunkSize),
size: size,
f: f,
bucket: &bucketName,
key: &bucketPath,
uploadId: &uploadId,
existingParts: existingParts,
ui: ui,
o: o,
}
fs.Debugf(o, "open chunk writer: started multipart upload: %v", uploadId)
return int64(chunkSize), chunkWriter, err
}
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
func (w *objectChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) {
if chunkNumber < 0 {
err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
return -1, err
}
m := md5.New()
currentChunkSize, err := io.Copy(m, reader)
if err != nil {
return -1, err
}
// If no data read, don't write the chunk
if currentChunkSize == 0 {
return 0, nil
}
md5sumBinary := m.Sum([]byte{})
w.addMd5(&md5sumBinary, int64(chunkNumber))
md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:])
// Object storage requires 1 <= PartNumber <= 10000
ossPartNumber := chunkNumber + 1
if existing, ok := w.existingParts[ossPartNumber]; ok {
if md5sum == *existing.Md5 {
fs.Debugf(w.o, "matched uploaded part found, part num %d, skipping part, md5=%v", *existing.PartNumber, md5sum)
w.addCompletedPart(existing.PartNumber, existing.Etag)
return currentChunkSize, nil
}
}
req := objectstorage.UploadPartRequest{
NamespaceName: common.String(w.f.opt.Namespace),
BucketName: w.bucket,
ObjectName: w.key,
UploadId: w.uploadId,
UploadPartNum: common.Int(ossPartNumber),
ContentLength: common.Int64(currentChunkSize),
ContentMD5: common.String(md5sum),
}
w.o.applyPartUploadOptions(w.ui.req, &req)
var resp objectstorage.UploadPartResponse
err = w.f.pacer.Call(func() (bool, error) {
// req.UploadPartBody = io.NopCloser(bytes.NewReader(buf))
// rewind the reader on retry and after reading md5
_, err = reader.Seek(0, io.SeekStart)
if err != nil {
return false, err
}
req.UploadPartBody = io.NopCloser(reader)
resp, err = w.f.srv.UploadPart(ctx, req)
if err != nil {
if ossPartNumber <= 8 {
return shouldRetry(ctx, resp.HTTPResponse(), err)
}
// retry all chunks once have done the first few
return true, err
}
return false, err
})
if err != nil {
fs.Errorf(w.o, "multipart upload failed to upload part:%d err: %v", ossPartNumber, err)
return -1, fmt.Errorf("multipart upload failed to upload part: %w", err)
}
w.addCompletedPart(&ossPartNumber, resp.ETag)
return currentChunkSize, err
}
// add a part number and etag to the completed parts
func (w *objectChunkWriter) addCompletedPart(partNum *int, eTag *string) {
w.partsToCommitMu.Lock()
defer w.partsToCommitMu.Unlock()
w.partsToCommit = append(w.partsToCommit, objectstorage.CommitMultipartUploadPartDetails{
PartNum: partNum,
Etag: eTag,
})
}
func (w *objectChunkWriter) Close(ctx context.Context) (err error) {
req := objectstorage.CommitMultipartUploadRequest{
NamespaceName: common.String(w.f.opt.Namespace),
BucketName: w.bucket,
ObjectName: w.key,
UploadId: w.uploadId,
}
req.PartsToCommit = w.partsToCommit
var resp objectstorage.CommitMultipartUploadResponse
err = w.f.pacer.Call(func() (bool, error) {
resp, err = w.f.srv.CommitMultipartUpload(ctx, req)
// if multipart is corrupted, we will abort the uploadId
if isMultiPartUploadCorrupted(err) {
fs.Debugf(w.o, "multipart uploadId %v is corrupted, aborting...", *w.uploadId)
_ = w.Abort(ctx)
return false, err
}
return shouldRetry(ctx, resp.HTTPResponse(), err)
})
if err != nil {
fs.Errorf(o, "failed to create multipart upload-id err: %v", err)
return err
}
return o.uploadParts(ctx, putReq, in, src, uploadID, uploadedParts)
w.eTag = *resp.ETag
hashOfHashes := md5.Sum(w.md5s)
wantMultipartMd5 := fmt.Sprintf("%s-%d", base64.StdEncoding.EncodeToString(hashOfHashes[:]), len(w.partsToCommit))
gotMultipartMd5 := *resp.OpcMultipartMd5
if wantMultipartMd5 != gotMultipartMd5 {
fs.Errorf(w.o, "multipart upload corrupted: multipart md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5)
return fmt.Errorf("multipart upload corrupted: md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5)
}
fs.Debugf(w.o, "multipart upload %v md5 matched: expecting %s and got %s", *w.uploadId, wantMultipartMd5, gotMultipartMd5)
return nil
}
func isMultiPartUploadCorrupted(err error) bool {
if err == nil {
return false
}
// Check if this oci-err object, and if it is multipart commit error
if ociError, ok := err.(common.ServiceError); ok {
// If it is a timeout then we want to retry that
if ociError.GetCode() == "InvalidUploadPart" {
return true
}
}
return false
}
func (w *objectChunkWriter) Abort(ctx context.Context) error {
fs.Debugf(w.o, "Cancelling multipart upload")
err := w.o.fs.abortMultiPartUpload(
ctx,
w.bucket,
w.key,
w.uploadId)
if err != nil {
fs.Debugf(w.o, "Failed to cancel multipart upload: %v", err)
} else {
fs.Debugf(w.o, "canceled and aborted multipart upload: %v", *w.uploadId)
}
return err
}
// addMd5 adds a binary md5 to the md5 calculated so far
func (w *objectChunkWriter) addMd5(md5binary *[]byte, chunkNumber int64) {
w.md5sMu.Lock()
defer w.md5sMu.Unlock()
start := chunkNumber * md5.Size
end := start + md5.Size
if extend := end - int64(len(w.md5s)); extend > 0 {
w.md5s = append(w.md5s, make([]byte, extend)...)
}
copy(w.md5s[start:end], (*md5binary)[:])
}
func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (ui uploadInfo, err error) {
bucket, bucketPath := o.split()
ui.req = &objectstorage.PutObjectRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucket),
ObjectName: common.String(bucketPath),
}
// Set the mtime in the metadata
modTime := src.ModTime(ctx)
// Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options)
if err != nil {
return ui, fmt.Errorf("failed to read metadata from source object: %w", err)
}
ui.req.OpcMeta = make(map[string]string, len(meta)+2)
// merge metadata into request and user metadata
for k, v := range meta {
pv := common.String(v)
k = strings.ToLower(k)
switch k {
case "cache-control":
ui.req.CacheControl = pv
case "content-disposition":
ui.req.ContentDisposition = pv
case "content-encoding":
ui.req.ContentEncoding = pv
case "content-language":
ui.req.ContentLanguage = pv
case "content-type":
ui.req.ContentType = pv
case "tier":
// ignore
case "mtime":
// mtime in meta overrides source ModTime
metaModTime, err := time.Parse(time.RFC3339Nano, v)
if err != nil {
fs.Debugf(o, "failed to parse metadata %s: %q: %v", k, v, err)
} else {
modTime = metaModTime
}
case "btime":
// write as metadata since we can't set it
ui.req.OpcMeta[k] = v
default:
ui.req.OpcMeta[k] = v
}
}
// Set the mtime in the metadata
ui.req.OpcMeta[metaMtime] = swift.TimeToFloatString(modTime)
// read the md5sum if available
// - for non-multipart
// - so we can add a ContentMD5
// - so we can add the md5sum in the metadata as metaMD5Hash if using SSE/SSE-C
// - for multipart provided checksums aren't disabled
// - so we can add the md5sum in the metadata as metaMD5Hash
size := src.Size()
isMultipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
var md5sumBase64 string
if !isMultipart || !o.fs.opt.DisableChecksum {
ui.md5sumHex, err = src.Hash(ctx, hash.MD5)
if err == nil && matchMd5.MatchString(ui.md5sumHex) {
hashBytes, err := hex.DecodeString(ui.md5sumHex)
if err == nil {
md5sumBase64 = base64.StdEncoding.EncodeToString(hashBytes)
if isMultipart && !o.fs.opt.DisableChecksum {
// Set the md5sum as metadata on the object if
// - a multipart upload
// - the ETag is not an MD5, e.g. when using SSE/SSE-C
// provided checksums aren't disabled
ui.req.OpcMeta[metaMD5Hash] = md5sumBase64
}
}
}
}
// Set the content type if it isn't set already
if ui.req.ContentType == nil {
ui.req.ContentType = common.String(fs.MimeType(ctx, src))
}
if size >= 0 {
ui.req.ContentLength = common.Int64(size)
}
if md5sumBase64 != "" {
ui.req.ContentMD5 = &md5sumBase64
}
o.applyPutOptions(ui.req, options...)
useBYOKPutObject(o.fs, ui.req)
if o.fs.opt.StorageTier != "" {
storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier)
if !ok {
return ui, fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier)
}
ui.req.StorageTier = storageTier
}
// Check metadata keys and values are valid
for key, value := range ui.req.OpcMeta {
if !httpguts.ValidHeaderFieldName(key) {
fs.Errorf(o, "Dropping invalid metadata key %q", key)
delete(ui.req.OpcMeta, key)
} else if value == "" {
fs.Errorf(o, "Dropping nil metadata value for key %q", key)
delete(ui.req.OpcMeta, key)
} else if !httpguts.ValidHeaderFieldValue(value) {
fs.Errorf(o, "Dropping invalid metadata value %q for key %q", value, key)
delete(ui.req.OpcMeta, key)
}
}
return ui, nil
}
func (o *Object) createMultipartUpload(ctx context.Context, putReq *objectstorage.PutObjectRequest) (
uploadID string, uploadedParts map[int]objectstorage.MultipartUploadPartSummary, err error) {
uploadID string, existingParts map[int]objectstorage.MultipartUploadPartSummary, err error) {
bucketName, bucketPath := o.split()
f := o.fs
if f.opt.AttemptResumeUpload {
@ -48,10 +395,10 @@ func (o *Object) createMultipartUpload(ctx context.Context, putReq *objectstorag
resumeUploads, err := o.fs.findLatestMultipartUpload(ctx, bucketName, bucketPath)
if err == nil && len(resumeUploads) > 0 {
uploadID = *resumeUploads[0].UploadId
uploadedParts, err = f.listMultipartUploadParts(ctx, bucketName, bucketPath, uploadID)
existingParts, err = f.listMultipartUploadParts(ctx, bucketName, bucketPath, uploadID)
if err == nil {
fs.Debugf(o, "resuming with existing upload id: %v", uploadID)
return uploadID, uploadedParts, err
return uploadID, existingParts, err
}
}
}
@ -75,260 +422,10 @@ func (o *Object) createMultipartUpload(ctx context.Context, putReq *objectstorag
return shouldRetry(ctx, resp.HTTPResponse(), err)
})
if err != nil {
return "", nil, err
return "", existingParts, err
}
existingParts = make(map[int]objectstorage.MultipartUploadPartSummary)
uploadID = *resp.UploadId
fs.Debugf(o, "created new upload id: %v", uploadID)
return uploadID, nil, err
}
func (o *Object) uploadParts(
ctx context.Context,
putReq *objectstorage.PutObjectRequest,
in io.Reader,
src fs.ObjectInfo,
uploadID string,
uploadedParts map[int]objectstorage.MultipartUploadPartSummary) (err error) {
bucketName, bucketPath := o.split()
f := o.fs
// make concurrency machinery
concurrency := f.opt.UploadConcurrency
if concurrency < 1 {
concurrency = 1
}
uploadParts := f.opt.MaxUploadParts
if uploadParts < 1 {
uploadParts = 1
} else if uploadParts > maxUploadParts {
uploadParts = maxUploadParts
}
// calculate size of parts
partSize := f.opt.ChunkSize
fileSize := src.Size()
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of
// 48 GiB which seems like a not too unreasonable limit.
if fileSize == -1 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
f.opt.ChunkSize, fs.SizeSuffix(int64(partSize)*int64(uploadParts)))
})
} else {
partSize = chunksize.Calculator(o, fileSize, uploadParts, f.opt.ChunkSize)
}
uploadCtx, cancel := context.WithCancel(ctx)
defer atexit.OnError(&err, func() {
cancel()
if o.fs.opt.LeavePartsOnError {
return
}
fs.Debugf(o, "Cancelling multipart upload")
errCancel := o.fs.abortMultiPartUpload(
context.Background(),
bucketName,
bucketPath,
uploadID)
if errCancel != nil {
fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel)
} else {
fs.Debugf(o, "canceled and aborted multipart upload: %v", uploadID)
}
})()
var (
g, gCtx = errgroup.WithContext(uploadCtx)
finished = false
partsMu sync.Mutex // to protect parts
parts []*objectstorage.CommitMultipartUploadPartDetails
off int64
md5sMu sync.Mutex
md5s []byte
tokens = pacer.NewTokenDispenser(concurrency)
memPool = o.fs.getMemoryPool(int64(partSize))
)
addMd5 := func(md5binary *[md5.Size]byte, partNum int64) {
md5sMu.Lock()
defer md5sMu.Unlock()
start := partNum * md5.Size
end := start + md5.Size
if extend := end - int64(len(md5s)); extend > 0 {
md5s = append(md5s, make([]byte, extend)...)
}
copy(md5s[start:end], (*md5binary)[:])
}
for partNum := int64(1); !finished; partNum++ {
// Get a block of memory from the pool and token which limits concurrency.
tokens.Get()
buf := memPool.Get()
free := func() {
// return the memory and token
memPool.Put(buf)
tokens.Put()
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
var n int
n, err = readers.ReadFill(in, buf) // this can never return 0, nil
if err == io.EOF {
if n == 0 && partNum != 1 { // end if no data and if not first chunk
free()
break
}
finished = true
} else if err != nil {
free()
return fmt.Errorf("multipart upload failed to read source: %w", err)
}
buf = buf[:n]
partNum := partNum
fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(fileSize))
off += int64(n)
g.Go(func() (err error) {
defer free()
partLength := int64(len(buf))
// create checksum of buffer for integrity checking
md5sumBinary := md5.Sum(buf)
addMd5(&md5sumBinary, partNum-1)
md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:])
if uploadedPart, ok := uploadedParts[int(partNum)]; ok {
if md5sum == *uploadedPart.Md5 {
fs.Debugf(o, "matched uploaded part found, part num %d, skipping part, md5=%v", partNum, md5sum)
partsMu.Lock()
parts = append(parts, &objectstorage.CommitMultipartUploadPartDetails{
PartNum: uploadedPart.PartNumber,
Etag: uploadedPart.Etag,
})
partsMu.Unlock()
return nil
}
}
req := objectstorage.UploadPartRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
UploadId: common.String(uploadID),
UploadPartNum: common.Int(int(partNum)),
ContentLength: common.Int64(partLength),
ContentMD5: common.String(md5sum),
}
o.applyPartUploadOptions(putReq, &req)
var resp objectstorage.UploadPartResponse
err = f.pacer.Call(func() (bool, error) {
req.UploadPartBody = io.NopCloser(bytes.NewReader(buf))
resp, err = f.srv.UploadPart(gCtx, req)
if err != nil {
if partNum <= int64(concurrency) {
return shouldRetry(gCtx, resp.HTTPResponse(), err)
}
// retry all chunks once have done the first batch
return true, err
}
partsMu.Lock()
parts = append(parts, &objectstorage.CommitMultipartUploadPartDetails{
PartNum: common.Int(int(partNum)),
Etag: resp.ETag,
})
partsMu.Unlock()
return false, nil
})
if err != nil {
fs.Errorf(o, "multipart upload failed to upload part:%d err: %v", partNum, err)
return fmt.Errorf("multipart upload failed to upload part: %w", err)
}
return nil
})
}
err = g.Wait()
if err != nil {
return err
}
// sort the completed parts by part number
sort.Slice(parts, func(i, j int) bool {
return *parts[i].PartNum < *parts[j].PartNum
})
var resp objectstorage.CommitMultipartUploadResponse
resp, err = o.commitMultiPart(ctx, uploadID, parts)
if err != nil {
return err
}
fs.Debugf(o, "multipart upload %v committed.", uploadID)
hashOfHashes := md5.Sum(md5s)
wantMultipartMd5 := base64.StdEncoding.EncodeToString(hashOfHashes[:]) + "-" + strconv.Itoa(len(parts))
gotMultipartMd5 := *resp.OpcMultipartMd5
if wantMultipartMd5 != gotMultipartMd5 {
fs.Errorf(o, "multipart upload corrupted: multipart md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5)
return fmt.Errorf("multipart upload corrupted: md5 differ: expecting %s but got %s", wantMultipartMd5, gotMultipartMd5)
}
fs.Debugf(o, "multipart upload %v md5 matched: expecting %s and got %s", uploadID, wantMultipartMd5, gotMultipartMd5)
return nil
}
// commits the multipart upload
func (o *Object) commitMultiPart(ctx context.Context, uploadID string, parts []*objectstorage.CommitMultipartUploadPartDetails) (resp objectstorage.CommitMultipartUploadResponse, err error) {
bucketName, bucketPath := o.split()
req := objectstorage.CommitMultipartUploadRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
UploadId: common.String(uploadID),
}
var partsToCommit []objectstorage.CommitMultipartUploadPartDetails
for _, part := range parts {
partsToCommit = append(partsToCommit, *part)
}
req.PartsToCommit = partsToCommit
err = o.fs.pacer.Call(func() (bool, error) {
resp, err = o.fs.srv.CommitMultipartUpload(ctx, req)
// if multipart is corrupted, we will abort the uploadId
if o.isMultiPartUploadCorrupted(err) {
fs.Debugf(o, "multipart uploadId %v is corrupted, aborting...", uploadID)
errCancel := o.fs.abortMultiPartUpload(
context.Background(),
bucketName,
bucketPath,
uploadID)
if errCancel != nil {
fs.Debugf(o, "Failed to abort multipart upload: %v, ignoring.", errCancel)
} else {
fs.Debugf(o, "aborted multipart upload: %v", uploadID)
}
return false, err
}
return shouldRetry(ctx, resp.HTTPResponse(), err)
})
return resp, err
}
func (o *Object) isMultiPartUploadCorrupted(err error) bool {
if err == nil {
return false
}
// Check if this ocierr object, and if it is multipart commit error
if ociError, ok := err.(common.ServiceError); ok {
// If it is a timeout then we want to retry that
if ociError.GetCode() == "InvalidUploadPart" {
return true
}
}
return false
return uploadID, existingParts, err
}

View File

@ -17,8 +17,6 @@ import (
"strings"
"time"
"golang.org/x/net/http/httpguts"
"github.com/ncw/swift/v2"
"github.com/oracle/oci-go-sdk/v65/common"
"github.com/oracle/oci-go-sdk/v65/objectstorage"
@ -390,7 +388,7 @@ func isZeroLength(streamReader io.Reader) bool {
// Update an object if it has changed
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
bucketName, bucketPath := o.split()
bucketName, _ := o.split()
err = o.fs.makeBucket(ctx, bucketName)
if err != nil {
return err
@ -398,129 +396,24 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// determine if we like upload single or multipart.
size := src.Size()
multipart := size >= int64(o.fs.opt.UploadCutoff)
multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
if isZeroLength(in) {
multipart = false
}
req := objectstorage.PutObjectRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
}
// Set the mtime in the metadata
modTime := src.ModTime(ctx)
// Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options)
if err != nil {
return fmt.Errorf("failed to read metadata from source object: %w", err)
}
req.OpcMeta = make(map[string]string, len(meta)+2)
// merge metadata into request and user metadata
for k, v := range meta {
pv := common.String(v)
k = strings.ToLower(k)
switch k {
case "cache-control":
req.CacheControl = pv
case "content-disposition":
req.ContentDisposition = pv
case "content-encoding":
req.ContentEncoding = pv
case "content-language":
req.ContentLanguage = pv
case "content-type":
req.ContentType = pv
case "tier":
// ignore
case "mtime":
// mtime in meta overrides source ModTime
metaModTime, err := time.Parse(time.RFC3339Nano, v)
if err != nil {
fs.Debugf(o, "failed to parse metadata %s: %q: %v", k, v, err)
} else {
modTime = metaModTime
}
case "btime":
// write as metadata since we can't set it
req.OpcMeta[k] = v
default:
req.OpcMeta[k] = v
}
}
// Set the mtime in the metadata
req.OpcMeta[metaMtime] = swift.TimeToFloatString(modTime)
// read the md5sum if available
// - for non-multipart
// - so we can add a ContentMD5
// - so we can add the md5sum in the metadata as metaMD5Hash if using SSE/SSE-C
// - for multipart provided checksums aren't disabled
// - so we can add the md5sum in the metadata as metaMD5Hash
var md5sumBase64 string
var md5sumHex string
if !multipart || !o.fs.opt.DisableChecksum {
md5sumHex, err = src.Hash(ctx, hash.MD5)
if err == nil && matchMd5.MatchString(md5sumHex) {
hashBytes, err := hex.DecodeString(md5sumHex)
if err == nil {
md5sumBase64 = base64.StdEncoding.EncodeToString(hashBytes)
if multipart && !o.fs.opt.DisableChecksum {
// Set the md5sum as metadata on the object if
// - a multipart upload
// - the ETag is not an MD5, e.g. when using SSE/SSE-C
// provided checksums aren't disabled
req.OpcMeta[metaMD5Hash] = md5sumBase64
}
}
}
}
// Set the content type if it isn't set already
if req.ContentType == nil {
req.ContentType = common.String(fs.MimeType(ctx, src))
}
if size >= 0 {
req.ContentLength = common.Int64(size)
}
if md5sumBase64 != "" {
req.ContentMD5 = &md5sumBase64
}
o.applyPutOptions(&req, options...)
useBYOKPutObject(o.fs, &req)
if o.fs.opt.StorageTier != "" {
storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier)
if !ok {
return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier)
}
req.StorageTier = storageTier
}
// Check metadata keys and values are valid
for key, value := range req.OpcMeta {
if !httpguts.ValidHeaderFieldName(key) {
fs.Errorf(o, "Dropping invalid metadata key %q", key)
delete(req.OpcMeta, key)
} else if value == "" {
fs.Errorf(o, "Dropping nil metadata value for key %q", key)
delete(req.OpcMeta, key)
} else if !httpguts.ValidHeaderFieldValue(value) {
fs.Errorf(o, "Dropping invalid metadata value %q for key %q", value, key)
delete(req.OpcMeta, key)
}
}
if multipart {
err = o.uploadMultipart(ctx, &req, in, src)
err = o.uploadMultipart(ctx, src, in)
if err != nil {
return err
}
} else {
ui, err := o.prepareUpload(ctx, src, options)
if err != nil {
return fmt.Errorf("failed to prepare upload: %w", err)
}
var resp objectstorage.PutObjectResponse
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
req.PutObjectBody = io.NopCloser(in)
resp, err = o.fs.srv.PutObject(ctx, req)
ui.req.PutObjectBody = io.NopCloser(in)
resp, err = o.fs.srv.PutObject(ctx, *ui.req)
return shouldRetry(ctx, resp.HTTPResponse(), err)
})
if err != nil {

View File

@ -20,8 +20,6 @@ const (
maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024)
minSleep = 10 * time.Millisecond
defaultCopyTimeoutDuration = fs.Duration(time.Minute)
memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long
memoryPoolUseMmap = false
)
const (
@ -61,8 +59,6 @@ type Options struct {
MaxUploadParts int `config:"max_upload_parts"`
UploadConcurrency int `config:"upload_concurrency"`
DisableChecksum bool `config:"disable_checksum"`
MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"`
MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"`
CopyCutoff fs.SizeSuffix `config:"copy_cutoff"`
CopyTimeout fs.Duration `config:"copy_timeout"`
StorageTier string `config:"storage_tier"`
@ -223,19 +219,6 @@ copied in chunks of this size.
The minimum is 0 and the maximum is 5 GiB.`,
Default: fs.SizeSuffix(maxSizeForCopy),
Advanced: true,
}, {
Name: "memory_pool_flush_time",
Default: memoryPoolFlushTime,
Advanced: true,
Help: `How often internal memory buffer pools will be flushed.
Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations.
This option controls how often unused buffers will be removed from the pool.`,
}, {
Name: "memory_pool_use_mmap",
Default: memoryPoolUseMmap,
Advanced: true,
Help: `Whether to use mmap buffers in internal memory pool.`,
}, {
Name: "copy_timeout",
Help: `Timeout for copy.

View File

@ -23,7 +23,6 @@ import (
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
)
// Register with Fs
@ -50,7 +49,6 @@ type Fs struct {
rootDirectory string // directory part of root (if any)
cache *bucket.Cache // cache for bucket creation status
pacer *fs.Pacer // To pace the API calls
pool *pool.Pool // memory pool
}
// NewFs Initialize backend
@ -82,12 +80,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
srv: objectStorageClient,
cache: bucket.NewCache(),
pacer: pc,
pool: pool.New(
time.Duration(opt.MemoryPoolFlushTime),
int(opt.ChunkSize),
opt.UploadConcurrency*ci.Transfers,
opt.MemoryPoolUseMmap,
),
}
f.setRoot(root)
f.features = (&fs.Features{
@ -187,19 +179,6 @@ func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.MD5)
}
func (f *Fs) getMemoryPool(size int64) *pool.Pool {
if size == int64(f.opt.ChunkSize) {
return f.pool
}
return pool.New(
time.Duration(f.opt.MemoryPoolFlushTime),
int(size),
f.opt.UploadConcurrency*f.ci.Transfers,
f.opt.MemoryPoolUseMmap,
)
}
// setRoot changes the root of the Fs
func (f *Fs) setRoot(root string) {
f.root = parsePath(root)
@ -339,7 +318,6 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
remote := *object.Name
remote = f.opt.Enc.ToStandardPath(remote)
if !strings.HasPrefix(remote, prefix) {
// fs.Debugf(f, "Odd name received %v", object.Name)
continue
}
remote = remote[len(prefix):]
@ -579,15 +557,15 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
})
}
func (f *Fs) abortMultiPartUpload(ctx context.Context, bucketName, bucketPath, uploadID string) (err error) {
if uploadID == "" {
func (f *Fs) abortMultiPartUpload(ctx context.Context, bucketName, bucketPath, uploadID *string) (err error) {
if uploadID == nil || *uploadID == "" {
return nil
}
request := objectstorage.AbortMultipartUploadRequest{
NamespaceName: common.String(f.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
UploadId: common.String(uploadID),
BucketName: bucketName,
ObjectName: bucketPath,
UploadId: uploadID,
}
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.AbortMultipartUpload(ctx, request)
@ -610,7 +588,7 @@ func (f *Fs) cleanUpBucket(ctx context.Context, bucket string, maxAge time.Durat
if operations.SkipDestructive(ctx, what, "remove pending upload") {
continue
}
_ = f.abortMultiPartUpload(ctx, *upload.Bucket, *upload.Object, *upload.UploadId)
_ = f.abortMultiPartUpload(ctx, upload.Bucket, upload.Object, upload.UploadId)
}
} else {
fs.Infof(f, "MultipartUpload doesn't have sufficient details to abort.")
@ -705,12 +683,13 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.Commander = &Fs{}
_ fs.CleanUpper = &Fs{}
_ fs.Fs = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.Commander = &Fs{}
_ fs.CleanUpper = &Fs{}
_ fs.OpenChunkWriter = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}

View File

@ -507,31 +507,6 @@ Properties:
- Type: SizeSuffix
- Default: 4.656Gi
#### --oos-memory-pool-flush-time
How often internal memory buffer pools will be flushed.
Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations.
This option controls how often unused buffers will be removed from the pool.
Properties:
- Config: memory_pool_flush_time
- Env Var: RCLONE_OOS_MEMORY_POOL_FLUSH_TIME
- Type: Duration
- Default: 1m0s
#### --oos-memory-pool-use-mmap
Whether to use mmap buffers in internal memory pool.
Properties:
- Config: memory_pool_use_mmap
- Env Var: RCLONE_OOS_MEMORY_POOL_USE_MMAP
- Type: bool
- Default: false
#### --oos-copy-timeout
Timeout for copy.

View File

@ -428,3 +428,4 @@ backends:
- TestIntegration/FsMkdir/FsEncoding/trailing_CR
- TestIntegration/FsMkdir/FsEncoding/trailing_LF
- TestIntegration/FsMkdir/FsEncoding/leading_HT
- TestIntegration/FsMkdir/FsPutFiles/FsPutStream/0