swift: ensure partially uploaded large files are uploaded unless --swift-leave-parts-on-error

This makes sure that partially uploaded large files are removed
unless the `--swift-leave-parts-on-error` flag is supplied.

- refactor swift.go
- add unit test for swift with chunk
- add unit test for large object with fail case
- add "-" to white list char during encode.
This commit is contained in:
Nguyễn Hữu Luân 2021-01-29 00:09:41 +07:00 committed by GitHub
parent 6272ca74bc
commit 671dd047f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 152 additions and 87 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pacer"
@ -167,6 +168,11 @@ func init() {
Help: "Admin", Help: "Admin",
Value: "admin", Value: "admin",
}}, }},
}, {
Name: "leave_parts_on_error",
Help: `If true avoid calling abort upload on a failure. It should be set to true for resuming uploads across different sessions.`,
Default: false,
Advanced: true,
}, { }, {
Name: "storage_policy", Name: "storage_policy",
Help: `The storage policy to use when creating a new container Help: `The storage policy to use when creating a new container
@ -208,6 +214,7 @@ type Options struct {
ApplicationCredentialID string `config:"application_credential_id"` ApplicationCredentialID string `config:"application_credential_id"`
ApplicationCredentialName string `config:"application_credential_name"` ApplicationCredentialName string `config:"application_credential_name"`
ApplicationCredentialSecret string `config:"application_credential_secret"` ApplicationCredentialSecret string `config:"application_credential_secret"`
LeavePartsOnError bool `config:"leave_parts_on_error"`
StoragePolicy string `config:"storage_policy"` StoragePolicy string `config:"storage_policy"`
EndpointType string `config:"endpoint_type"` EndpointType string `config:"endpoint_type"`
ChunkSize fs.SizeSuffix `config:"chunk_size"` ChunkSize fs.SizeSuffix `config:"chunk_size"`
@ -1127,44 +1134,35 @@ func min(x, y int64) int64 {
return y return y
} }
// removeSegments removes any old segments from o func (o *Object) getSegmentsLargeObject() (map[string][]string, error) {
// container, objectName := o.split()
// if except is passed in then segments with that prefix won't be deleted segmentContainer, segmentObjects, err := o.fs.c.LargeObjectGetSegments(container, objectName)
func (o *Object) removeSegments(except string) error {
segmentsContainer, _, err := o.getSegmentsDlo()
if err != nil { if err != nil {
return err fs.Debugf(o, "Failed to get list segments of object: %v", err)
return nil, err
} }
except = path.Join(o.remote, except) var containerSegments = make(map[string][]string)
// fs.Debugf(o, "segmentsContainer %q prefix %q", segmentsContainer, prefix) for _, segment := range segmentObjects {
err = o.fs.listContainerRoot(segmentsContainer, o.remote, "", false, true, true, func(remote string, object *swift.Object, isDirectory bool) error { if _, ok := containerSegments[segmentContainer]; !ok {
if isDirectory { containerSegments[segmentContainer] = make([]string, 0, len(segmentObjects))
return nil
} }
if except != "" && strings.HasPrefix(remote, except) { segments, _ := containerSegments[segmentContainer]
// fs.Debugf(o, "Ignoring current segment file %q in container %q", remote, segmentsContainer) segments = append(segments, segment.Name)
return nil containerSegments[segmentContainer] = segments
}
fs.Debugf(o, "Removing segment file %q in container %q", remote, segmentsContainer)
var err error
return o.fs.pacer.Call(func() (bool, error) {
err = o.fs.c.ObjectDelete(segmentsContainer, remote)
return shouldRetry(err)
})
})
if err != nil {
return err
} }
// remove the segments container if empty, ignore errors return containerSegments, nil
err = o.fs.pacer.Call(func() (bool, error) { }
err = o.fs.c.ContainerDelete(segmentsContainer)
if err == swift.ContainerNotFound || err == swift.ContainerNotEmpty { func (o *Object) removeSegmentsLargeObject(containerSegments map[string][]string) error {
return false, err if containerSegments == nil || len(containerSegments) <= 0 {
return nil
}
for container, segments := range containerSegments {
_, err := o.fs.c.BulkDelete(container, segments)
if err != nil {
fs.Debugf(o, "Failed to delete bulk segments %v", err)
return err
} }
return shouldRetry(err)
})
if err == nil {
fs.Debugf(o, "Removed empty container %q", segmentsContainer)
} }
return nil return nil
} }
@ -1194,7 +1192,7 @@ func urlEncode(str string) string {
var buf bytes.Buffer var buf bytes.Buffer
for i := 0; i < len(str); i++ { for i := 0; i < len(str); i++ {
c := str[i] c := str[i]
if (c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '/' || c == '.' { if (c >= '0' && c <= '9') || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '/' || c == '.' || c == '_' || c == '-' {
_ = buf.WriteByte(c) _ = buf.WriteByte(c)
} else { } else {
_, _ = buf.WriteString(fmt.Sprintf("%%%02X", c)) _, _ = buf.WriteString(fmt.Sprintf("%%%02X", c))
@ -1234,10 +1232,20 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size) uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size)
segmentsPath := path.Join(containerPath, uniquePrefix) segmentsPath := path.Join(containerPath, uniquePrefix)
in := bufio.NewReader(in0) in := bufio.NewReader(in0)
segmentInfos := make([]string, 0, ((size / int64(o.fs.opt.ChunkSize)) + 1)) segmentInfos := make([]string, 0, (size/int64(o.fs.opt.ChunkSize))+1)
defer atexit.OnError(&err, func() {
if o.fs.opt.LeavePartsOnError {
return
}
fs.Debugf(o, "Delete segments when err raise %v", err)
if segmentInfos == nil || len(segmentInfos) == 0 {
return
}
deleteChunks(o, segmentsContainer, segmentInfos)
})()
for { for {
// can we read at least one byte? // can we read at least one byte?
if _, err := in.Peek(1); err != nil { if _, err = in.Peek(1); err != nil {
if left > 0 { if left > 0 {
return "", err // read less than expected return "", err // read less than expected
} }
@ -1262,8 +1270,6 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
return shouldRetryHeaders(rxHeaders, err) return shouldRetryHeaders(rxHeaders, err)
}) })
if err != nil { if err != nil {
deleteChunks(o, segmentsContainer, segmentInfos)
segmentInfos = nil
return "", err return "", err
} }
i++ i++
@ -1277,21 +1283,23 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
rxHeaders, err = o.fs.c.ObjectPut(container, containerPath, emptyReader, true, "", contentType, headers) rxHeaders, err = o.fs.c.ObjectPut(container, containerPath, emptyReader, true, "", contentType, headers)
return shouldRetryHeaders(rxHeaders, err) return shouldRetryHeaders(rxHeaders, err)
}) })
if err != nil {
deleteChunks(o, segmentsContainer, segmentInfos) if err == nil {
//reset data
segmentInfos = nil segmentInfos = nil
} }
return uniquePrefix + "/", err return uniquePrefix + "/", err
} }
func deleteChunks(o *Object, segmentsContainer string, segmentInfos []string) { func deleteChunks(o *Object, segmentsContainer string, segmentInfos []string) {
if segmentInfos != nil && len(segmentInfos) > 0 { if segmentInfos == nil || len(segmentInfos) == 0 {
for _, v := range segmentInfos { return
fs.Debugf(o, "Delete segment file %q on %q", v, segmentsContainer) }
e := o.fs.c.ObjectDelete(segmentsContainer, v) for _, v := range segmentInfos {
if e != nil { fs.Debugf(o, "Delete segment file %q on %q", v, segmentsContainer)
fs.Errorf(o, "Error occurred in delete segment file %q on %q, error: %q", v, segmentsContainer, e) e := o.fs.c.ObjectDelete(segmentsContainer, v)
} if e != nil {
fs.Errorf(o, "Error occurred in delete segment file %q on %q, error: %q", v, segmentsContainer, e)
} }
} }
} }
@ -1312,20 +1320,26 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
modTime := src.ModTime(ctx) modTime := src.ModTime(ctx)
// Note whether this is a dynamic large object before starting // Note whether this is a dynamic large object before starting
isDynamicLargeObject, err := o.isDynamicLargeObject() isLargeObject, err := o.isLargeObject()
if err != nil { if err != nil {
return err return err
} }
//capture segments before upload
var segmentsContainer map[string][]string
if isLargeObject {
segmentsContainer, _ = o.getSegmentsLargeObject()
}
// Set the mtime // Set the mtime
m := swift.Metadata{} m := swift.Metadata{}
m.SetModTime(modTime) m.SetModTime(modTime)
contentType := fs.MimeType(ctx, src) contentType := fs.MimeType(ctx, src)
headers := m.ObjectHeaders() headers := m.ObjectHeaders()
fs.OpenOptionAddHeaders(options, headers) fs.OpenOptionAddHeaders(options, headers)
uniquePrefix := ""
if size > int64(o.fs.opt.ChunkSize) || (size == -1 && !o.fs.opt.NoChunk) { if size > int64(o.fs.opt.ChunkSize) || (size == -1 && !o.fs.opt.NoChunk) {
uniquePrefix, err = o.updateChunks(in, headers, size, contentType) _, err = o.updateChunks(in, headers, size, contentType)
if err != nil { if err != nil {
return err return err
} }
@ -1359,10 +1373,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
o.size = int64(inCount.BytesRead()) o.size = int64(inCount.BytesRead())
} }
} }
isInContainerVersioning, _ := o.isInContainerVersioning(container)
// If file was a dynamic large object then remove old/all segments // If file was a large object and the container is not enable versioning then remove old/all segments
if isDynamicLargeObject { if isLargeObject && len(segmentsContainer) > 0 && !isInContainerVersioning {
err = o.removeSegments(uniquePrefix) err := o.removeSegmentsLargeObject(segmentsContainer)
if err != nil { if err != nil {
fs.Logf(o, "Failed to remove old segments - carrying on with upload: %v", err) fs.Logf(o, "Failed to remove old segments - carrying on with upload: %v", err)
} }
@ -1389,15 +1403,10 @@ func (o *Object) Remove(ctx context.Context) (err error) {
return err return err
} }
} }
isStaticLargeObject, err := o.isStaticLargeObject() //capture segments object if this object is large object
if err != nil { var containerSegments map[string][]string
return err if isLargeObject {
} containerSegments, err = o.getSegmentsLargeObject()
var segmentContainer string
var segmentObjects []swift.Object
if isStaticLargeObject {
segmentContainer, segmentObjects, err = o.fs.c.LargeObjectGetSegments(container, containerPath)
if err != nil { if err != nil {
return err return err
} }
@ -1415,31 +1424,9 @@ func (o *Object) Remove(ctx context.Context) (err error) {
return nil return nil
} }
isDynamicLargeObject, err := o.isDynamicLargeObject() if isLargeObject {
if err != nil { return o.removeSegmentsLargeObject(containerSegments)
return err
} }
// ...then segments if required
//delete segment for dynamic large object
if isDynamicLargeObject {
return o.removeSegments("")
}
//delete segment for static large object
if isStaticLargeObject && len(segmentContainer) > 0 && segmentObjects != nil && len(segmentObjects) > 0 {
var segmentNames []string
for _, segmentObject := range segmentObjects {
if len(segmentObject.Name) == 0 {
continue
}
segmentNames = append(segmentNames, segmentObject.Name)
}
_, err := o.fs.c.BulkDelete(segmentContainer, segmentNames)
if err != nil {
return err
}
}
return nil return nil
} }

View File

@ -4,15 +4,19 @@ package swift
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"io" "io"
"io/ioutil"
"testing" "testing"
"github.com/ncw/swift"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/fstests" "github.com/rclone/rclone/fstest/fstests"
"github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/random"
"github.com/rclone/rclone/lib/readers"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -74,6 +78,80 @@ func (f *Fs) testNoChunk(t *testing.T) {
// Additional tests that aren't in the framework // Additional tests that aren't in the framework
func (f *Fs) InternalTest(t *testing.T) { func (f *Fs) InternalTest(t *testing.T) {
t.Run("NoChunk", f.testNoChunk) t.Run("NoChunk", f.testNoChunk)
t.Run("WithChunk", f.testWithChunk)
t.Run("WithChunkFail", f.testWithChunkFail)
}
func (f *Fs) testWithChunk(t *testing.T) {
preConfChunkSize := f.opt.ChunkSize
preConfChunk := f.opt.NoChunk
f.opt.NoChunk = false
f.opt.ChunkSize = 1024 * fs.Byte
defer func() {
//restore old config after test
f.opt.ChunkSize = preConfChunkSize
f.opt.NoChunk = preConfChunk
}()
file := fstest.Item{
ModTime: fstest.Time("2020-12-31T04:05:06.499999999Z"),
Path: "piped data chunk.txt",
Size: -1, // use unknown size during upload
}
const contentSize = 2048
contents := random.String(contentSize)
buf := bytes.NewBufferString(contents)
uploadHash := hash.NewMultiHasher()
in := io.TeeReader(buf, uploadHash)
file.Size = -1
obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil)
ctx := context.TODO()
obj, err := f.Features().PutStream(ctx, in, obji)
require.NoError(t, err)
require.NotEmpty(t, obj)
}
func (f *Fs) testWithChunkFail(t *testing.T) {
preConfChunkSize := f.opt.ChunkSize
preConfChunk := f.opt.NoChunk
f.opt.NoChunk = false
f.opt.ChunkSize = 1024 * fs.Byte
segmentContainer := f.root + "_segments"
defer func() {
//restore config
f.opt.ChunkSize = preConfChunkSize
f.opt.NoChunk = preConfChunk
}()
path := "piped data chunk with error.txt"
file := fstest.Item{
ModTime: fstest.Time("2021-01-04T03:46:00.499999999Z"),
Path: path,
Size: -1, // use unknown size during upload
}
const contentSize = 4096
const errPosition = 3072
contents := random.String(contentSize)
buf := bytes.NewBufferString(contents[:errPosition])
errMessage := "potato"
er := &readers.ErrorReader{Err: errors.New(errMessage)}
in := ioutil.NopCloser(io.MultiReader(buf, er))
file.Size = contentSize
obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil)
ctx := context.TODO()
_, err := f.Features().PutStream(ctx, in, obji)
// error is potato
require.NotNil(t, err)
require.Equal(t, errMessage, err.Error())
_, _, err = f.c.Object(f.rootContainer, path)
assert.Equal(t, swift.ObjectNotFound, err)
prefix := path
objs, err := f.c.Objects(segmentContainer, &swift.ObjectsOpts{
Prefix: prefix,
})
require.NoError(t, err)
require.Empty(t, objs)
} }
var _ fstests.InternalTester = (*Fs)(nil) var _ fstests.InternalTester = (*Fs)(nil)