From 0ae171416f393ff67354608091bc177606f8d72a Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Mon, 25 Jul 2022 16:06:15 +0100 Subject: [PATCH] s3: implement --s3-versions flag - See #1776 --- backend/s3/s3.go | 304 +++++++++++++++++++++++++++------ backend/s3/s3_internal_test.go | 77 +++++++++ docs/content/s3.md | 73 ++++++++ 3 files changed, 402 insertions(+), 52 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index f6fc6e956..88c4e3bf7 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -54,6 +54,7 @@ import ( "github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/structs" + "github.com/rclone/rclone/lib/version" "golang.org/x/sync/errgroup" ) @@ -1982,6 +1983,11 @@ circumstances or for testing. `, Default: false, Advanced: true, + }, { + Name: "versions", + Help: "Include old versions in directory listings.", + Default: false, + Advanced: true, }, }}) } @@ -2099,6 +2105,7 @@ type Options struct { DownloadURL string `config:"download_url"` UseMultipartEtag fs.Tristate `config:"use_multipart_etag"` UsePresignedRequest bool `config:"use_presigned_request"` + Versions bool `config:"versions"` } // Fs represents a remote s3 server @@ -2136,6 +2143,7 @@ type Object struct { lastModified time.Time // Last modified meta map[string]string // The object metadata if known - may be nil - with lower case keys mimeType string // MimeType of object - may be "" + versionID *string // If present this points to an object version // Metadata as pointers to strings as they often won't be present storageClass *string // e.g. GLACIER @@ -2237,7 +2245,17 @@ func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) { // split returns bucket and bucketPath from the object func (o *Object) split() (bucket, bucketPath string) { - return o.fs.split(o.remote) + bucket, bucketPath = o.fs.split(o.remote) + // If there is an object version, then the path may have a + // version suffix, if so remove it. + // + // If we are unlucky enough to have a file name with a valid + // version path where this wasn't required (eg using + // --s3-version-at) then this will go wrong. + if o.versionID != nil { + _, bucketPath = version.Remove(bucketPath) + } + return bucket, bucketPath } // getClient makes an http client according to the options @@ -2671,14 +2689,58 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return f, nil } +// getMetaDataListing gets the metadata from the object unconditionally from the listing +// +// This is needed to find versioned objects from their paths. +// +// It may return info == nil and err == nil if a HEAD would be more appropriate +func (f *Fs) getMetaDataListing(ctx context.Context, wantRemote string) (info *s3.Object, versionID *string, err error) { + bucket, bucketPath := f.split(wantRemote) + timestamp, bucketPath := version.Remove(bucketPath) + + // If the path had no version string return no info, to force caller to look it up + if timestamp.IsZero() { + return nil, nil, nil + } + + err = f.list(ctx, bucket, bucketPath, "", false, true, f.opt.Versions, false, true, func(gotRemote string, object *s3.Object, objectVersionID *string, isDirectory bool) error { + if isDirectory { + return nil + } + if wantRemote != gotRemote { + return nil + } + info = object + versionID = objectVersionID + return errEndList // read only 1 item + }) + if err != nil { + if err == fs.ErrorDirNotFound { + return nil, nil, fs.ErrorObjectNotFound + } + return nil, nil, err + } + if info == nil { + return nil, nil, fs.ErrorObjectNotFound + } + return info, versionID, nil +} + // Return an Object from a path // // If it can't be found it returns the error ErrorObjectNotFound. -func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Object) (fs.Object, error) { +func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Object, versionID *string) (obj fs.Object, err error) { o := &Object{ fs: f, remote: remote, } + if info == nil && f.opt.Versions && version.Match(remote) { + // If versions, have to read the listing to find the version ID + info, versionID, err = f.getMetaDataListing(ctx, remote) + if err != nil { + return nil, err + } + } if info != nil { // Set info but not meta if info.LastModified == nil { @@ -2690,6 +2752,7 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Obje o.setMD5FromEtag(aws.StringValue(info.ETag)) o.bytes = aws.Int64Value(info.Size) o.storageClass = info.StorageClass + o.versionID = versionID } else if !o.fs.opt.NoHeadObject { err := o.readMetaData(ctx) // reads info and meta, returning an error if err != nil { @@ -2702,7 +2765,7 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *s3.Obje // NewObject finds the Object at remote. If it can't be found // it returns the error fs.ErrorObjectNotFound. func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { - return f.newObjectWithInfo(ctx, remote, nil) + return f.newObjectWithInfo(ctx, remote, nil, nil) } // Gets the bucket location @@ -2752,7 +2815,7 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error { // Common interface for bucket listers type bucketLister interface { - List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) + List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) URLEncodeListings(bool) } @@ -2773,7 +2836,7 @@ func (f *Fs) newV1List(req *s3.ListObjectsV2Input) bucketLister { } // List a bucket with V1 listing -func (ls *v1List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) { +func (ls *v1List) List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) { respv1, err := ls.f.c.ListObjectsWithContext(ctx, &ls.req) if err != nil { return nil, nil, err @@ -2829,7 +2892,7 @@ func (f *Fs) newV2List(req *s3.ListObjectsV2Input) bucketLister { } // Do a V2 listing -func (ls *v2List) List(ctx context.Context) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) { +func (ls *v2List) List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) { resp, err = ls.f.c.ListObjectsV2WithContext(ctx, &ls.req) ls.req.ContinuationToken = resp.NextContinuationToken return resp, nil, err @@ -2844,8 +2907,104 @@ func (ls *v2List) URLEncodeListings(encode bool) { } } +// Versions bucket lister +type versionsList struct { + f *Fs + req s3.ListObjectVersionsInput +} + +// Create a new Versions bucket lister +func (f *Fs) newVersionsList(req *s3.ListObjectsV2Input) bucketLister { + l := &versionsList{ + f: f, + } + // Convert v2 req into withVersions req + structs.SetFrom(&l.req, req) + return l +} + +// Any s3.Objects with this as their size are delete markers +var isDeleteMarker = new(int64) + +// List a bucket with versions +func (ls *versionsList) List(ctx context.Context, hidden bool) (resp *s3.ListObjectsV2Output, versionIDs []*string, err error) { + respVersions, err := ls.f.c.ListObjectVersionsWithContext(ctx, &ls.req) + if err != nil { + return nil, nil, err + } + + // Set up the request for next time + ls.req.KeyMarker = respVersions.NextKeyMarker + ls.req.VersionIdMarker = respVersions.NextVersionIdMarker + + // If we are URL encoding then must decode the marker + if ls.req.KeyMarker != nil && ls.req.EncodingType != nil { + *ls.req.KeyMarker, err = url.QueryUnescape(*ls.req.KeyMarker) + if err != nil { + return nil, nil, fmt.Errorf("failed to URL decode KeyMarker %q: %w", *ls.req.KeyMarker, err) + } + } + + // convert Versions resp into v2 resp + resp = new(s3.ListObjectsV2Output) + structs.SetFrom(resp, respVersions) + + // Convert the Versions and the DeleteMarkers into an array of s3.Object + // + // These are returned in the order that they are stored with the most recent first. + // With the annoyance that the Versions and DeleteMarkers are split into two + objs := make([]*s3.Object, 0, len(respVersions.Versions)) + for _, objVersion := range respVersions.Versions { + var obj = new(s3.Object) + structs.SetFrom(obj, objVersion) + // Adjust the file names + if !aws.BoolValue(objVersion.IsLatest) { + if obj.Key != nil && objVersion.LastModified != nil { + *obj.Key = version.Add(*obj.Key, *objVersion.LastModified) + } + } + objs = append(objs, obj) + versionIDs = append(versionIDs, objVersion.VersionId) + } + + // If hidden is set, put the delete markers in too, but set + // their sizes to a sentinel delete marker size + if hidden { + for _, deleteMarker := range respVersions.DeleteMarkers { + var obj = new(s3.Object) + structs.SetFrom(obj, deleteMarker) + obj.Size = isDeleteMarker + // Adjust the file names + if !aws.BoolValue(deleteMarker.IsLatest) { + if obj.Key != nil && deleteMarker.LastModified != nil { + *obj.Key = version.Add(*obj.Key, *deleteMarker.LastModified) + } + } + objs = append(objs, obj) + versionIDs = append(versionIDs, deleteMarker.VersionId) + + } + } + + resp.Contents = objs + return resp, versionIDs, nil +} + +// URL Encode the listings +func (ls *versionsList) URLEncodeListings(encode bool) { + if encode { + ls.req.EncodingType = aws.String(s3.EncodingTypeUrl) + } else { + ls.req.EncodingType = nil + } +} + // listFn is called from list to handle an object. -type listFn func(remote string, object *s3.Object, isDirectory bool) error +type listFn func(remote string, object *s3.Object, versionID *string, isDirectory bool) error + +// errEndList is a sentinel used to end the list iteration now. +// listFn should return it to end the iteration with no errors. +var errEndList = errors.New("end list") // list lists the objects into the function supplied from // the bucket and directory supplied. The remote has prefix @@ -2853,12 +3012,16 @@ type listFn func(remote string, object *s3.Object, isDirectory bool) error // bucket to the start. // // Set recurse to read sub directories -func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) error { - if prefix != "" { - prefix += "/" - } - if directory != "" { - directory += "/" +// +// if findFile is set it will look for files called (bucket, directory) +func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, withVersions bool, hidden bool, findFile bool, fn listFn) error { + if !findFile { + if prefix != "" { + prefix += "/" + } + if directory != "" { + directory += "/" + } } delimiter := "" if !recurse { @@ -2891,6 +3054,8 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck } var listBucket bucketLister switch { + case withVersions: + listBucket = f.newVersionsList(&req) case f.opt.ListVersion == 1: listBucket = f.newV1List(&req) default: @@ -2899,9 +3064,10 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck for { var resp *s3.ListObjectsV2Output var err error + var versionIDs []*string err = f.pacer.Call(func() (bool, error) { listBucket.URLEncodeListings(urlEncodeListings) - resp, _, err = listBucket.List(ctx) + resp, versionIDs, err = listBucket.List(ctx, hidden) if err != nil && !urlEncodeListings { if awsErr, ok := err.(awserr.RequestFailure); ok { if origErr := awsErr.OrigErr(); origErr != nil { @@ -2959,13 +3125,16 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck remote = path.Join(bucket, remote) } remote = strings.TrimSuffix(remote, "/") - err = fn(remote, &s3.Object{Key: &remote}, true) + err = fn(remote, &s3.Object{Key: &remote}, nil, true) if err != nil { + if err == errEndList { + return nil + } return err } } } - for _, object := range resp.Contents { + for i, object := range resp.Contents { remote := aws.StringValue(object.Key) if urlEncodeListings { remote, err = url.QueryUnescape(remote) @@ -2988,8 +3157,15 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck if isDirectory && object.Size != nil && *object.Size == 0 { continue // skip directory marker } - err = fn(remote, object, false) + if versionIDs != nil { + err = fn(remote, object, versionIDs[i], false) + } else { + err = fn(remote, object, nil, false) + } if err != nil { + if err == errEndList { + return nil + } return err } } @@ -3001,7 +3177,7 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck } // Convert a list item into a DirEntry -func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Object, isDirectory bool) (fs.DirEntry, error) { +func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Object, versionID *string, isDirectory bool) (fs.DirEntry, error) { if isDirectory { size := int64(0) if object.Size != nil { @@ -3010,7 +3186,7 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec d := fs.NewDir(remote, time.Time{}).SetSize(size) return d, nil } - o, err := f.newObjectWithInfo(ctx, remote, object) + o, err := f.newObjectWithInfo(ctx, remote, object, versionID) if err != nil { return nil, err } @@ -3020,8 +3196,8 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec // listDir lists files and directories to out func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { // List the objects and directories - err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) + err = f.list(ctx, bucket, directory, prefix, addBucket, false, f.opt.Versions, false, false, func(remote string, object *s3.Object, versionID *string, isDirectory bool) error { + entry, err := f.itemToDirEntry(ctx, remote, object, versionID, isDirectory) if err != nil { return err } @@ -3098,8 +3274,8 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) ( bucket, directory := f.split(dir) list := walk.NewListRHelper(callback) listR := func(bucket, directory, prefix string, addBucket bool) error { - return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *s3.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) + return f.list(ctx, bucket, directory, prefix, addBucket, true, f.opt.Versions, false, false, func(remote string, object *s3.Object, versionID *string, isDirectory bool) error { + entry, err := f.itemToDirEntry(ctx, remote, object, versionID, isDirectory) if err != nil { return err } @@ -3254,6 +3430,9 @@ func (f *Fs) copy(ctx context.Context, req *s3.CopyObjectInput, dstBucket, dstPa req.ACL = &f.opt.ACL req.Key = &dstPath source := pathEscape(path.Join(srcBucket, srcPath)) + if src.versionID != nil { + source += fmt.Sprintf("?versionId=%s", *src.versionID) + } req.CopySource = &source if f.opt.RequesterPays { req.RequestPayer = aws.String(s3.RequestPayerRequester) @@ -3448,17 +3627,20 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, if strings.HasSuffix(remote, "/") { return "", fs.ErrorCantShareDirectories } - if _, err := f.NewObject(ctx, remote); err != nil { + obj, err := f.NewObject(ctx, remote) + if err != nil { return "", err } + o := obj.(*Object) if expire > maxExpireDuration { fs.Logf(f, "Public Link: Reducing expiry to %v as %v is greater than the max time allowed", maxExpireDuration, expire) expire = maxExpireDuration } - bucket, bucketPath := f.split(remote) + bucket, bucketPath := o.split() httpReq, _ := f.c.GetObjectRequest(&s3.GetObjectInput{ - Bucket: &bucket, - Key: &bucketPath, + Bucket: &bucket, + Key: &bucketPath, + VersionId: o.versionID, }) return httpReq.Presign(time.Duration(expire)) @@ -3637,6 +3819,7 @@ func (f *Fs) Command(ctx context.Context, name string, arg []string, opt map[str reqCopy := req reqCopy.Bucket = &bucket reqCopy.Key = &bucketPath + reqCopy.VersionId = o.versionID err = f.pacer.Call(func() (bool, error) { _, err = f.c.RestoreObject(&reqCopy) return f.shouldRetry(ctx, err) @@ -3910,8 +4093,9 @@ func (o *Object) Size() int64 { func (o *Object) headObject(ctx context.Context) (resp *s3.HeadObjectOutput, err error) { bucket, bucketPath := o.split() req := s3.HeadObjectInput{ - Bucket: &bucket, - Key: &bucketPath, + Bucket: &bucket, + Key: &bucketPath, + VersionId: o.versionID, } if o.fs.opt.RequesterPays { req.RequestPayer = aws.String(s3.RequestPayerRequester) @@ -4151,8 +4335,9 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read } req := s3.GetObjectInput{ - Bucket: &bucket, - Key: &bucketPath, + Bucket: &bucket, + Key: &bucketPath, + VersionId: o.versionID, } if o.fs.opt.RequesterPays { req.RequestPayer = aws.String(s3.RequestPayerRequester) @@ -4222,7 +4407,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read var warnStreamUpload sync.Once -func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, err error) { +func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, versionID *string, err error) { f := o.fs // make concurrency machinery @@ -4265,7 +4450,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return f.shouldRetry(ctx, err) }) if err != nil { - return etag, fmt.Errorf("multipart upload failed to initialise: %w", err) + return etag, nil, fmt.Errorf("multipart upload failed to initialise: %w", err) } uid := cout.UploadId @@ -4338,7 +4523,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si finished = true } else if err != nil { free() - return etag, fmt.Errorf("multipart upload failed to read source: %w", err) + return etag, nil, fmt.Errorf("multipart upload failed to read source: %w", err) } buf = buf[:n] @@ -4393,7 +4578,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si } err = g.Wait() if err != nil { - return etag, err + return etag, nil, err } // sort the completed parts by part number @@ -4401,8 +4586,9 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return *parts[i].PartNumber < *parts[j].PartNumber }) + var resp *s3.CompleteMultipartUploadOutput err = f.pacer.Call(func() (bool, error) { - _, err := f.c.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ + resp, err = f.c.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ Bucket: req.Bucket, Key: req.Key, MultipartUpload: &s3.CompletedMultipartUpload{ @@ -4414,11 +4600,14 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return f.shouldRetry(ctx, err) }) if err != nil { - return etag, fmt.Errorf("multipart upload failed to finalise: %w", err) + return etag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err) } hashOfHashes := md5.Sum(md5s) etag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(parts)) - return etag, nil + if resp != nil { + versionID = resp.VersionId + } + return etag, versionID, nil } // unWrapAwsError unwraps AWS errors, looking for a non AWS error @@ -4445,7 +4634,7 @@ func unWrapAwsError(err error) (found bool, outErr error) { } // Upload a single part using PutObject -func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) { +func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, versionID *string, err error) { r, resp := o.fs.c.PutObjectRequest(req) if req.ContentLength != nil && *req.ContentLength == 0 { // Can't upload zero length files like this for some reason @@ -4472,15 +4661,18 @@ func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjec err = newErr } } - return etag, lastModified, err + return etag, lastModified, nil, err } lastModified = time.Now() - etag = aws.StringValue(resp.ETag) - return etag, lastModified, nil + if resp != nil { + etag = aws.StringValue(resp.ETag) + versionID = resp.VersionId + } + return etag, lastModified, versionID, nil } // Upload a single part using a presigned request -func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) { +func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, versionID *string, err error) { // Create the request putObj, _ := o.fs.c.PutObjectRequest(req) @@ -4490,7 +4682,7 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P // PutObject so we used this work-around. url, headers, err := putObj.PresignRequest(15 * time.Minute) if err != nil { - return etag, lastModified, fmt.Errorf("s3 upload: sign request: %w", err) + return etag, lastModified, nil, fmt.Errorf("s3 upload: sign request: %w", err) } if o.fs.opt.V2Auth && headers == nil { @@ -4505,7 +4697,7 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P // create the vanilla http request httpReq, err := http.NewRequestWithContext(ctx, "PUT", url, in) if err != nil { - return etag, lastModified, fmt.Errorf("s3 upload: new request: %w", err) + return etag, lastModified, nil, fmt.Errorf("s3 upload: new request: %w", err) } // set the headers we signed and the length @@ -4530,15 +4722,19 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P return fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err }) if err != nil { - return etag, lastModified, err + return etag, lastModified, nil, err } if resp != nil { if date, err := http.ParseTime(resp.Header.Get("Date")); err != nil { lastModified = date } etag = resp.Header.Get("Etag") + vID := resp.Header.Get("x-amz-version-id") + if vID != "" { + versionID = &vID + } } - return etag, lastModified, nil + return etag, lastModified, versionID, nil } // Update the Object from in with modTime and size @@ -4692,18 +4888,20 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op var wantETag string // Multipart upload Etag to check var gotEtag string // Etag we got from the upload var lastModified time.Time // Time we got from the upload + var versionID *string // versionID we got from the upload if multipart { - wantETag, err = o.uploadMultipart(ctx, &req, size, in) + wantETag, versionID, err = o.uploadMultipart(ctx, &req, size, in) } else { if o.fs.opt.UsePresignedRequest { - gotEtag, lastModified, err = o.uploadSinglepartPresignedRequest(ctx, &req, size, in) + gotEtag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, &req, size, in) } else { - gotEtag, lastModified, err = o.uploadSinglepartPutObject(ctx, &req, size, in) + gotEtag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, &req, size, in) } } if err != nil { return err } + o.versionID = versionID // User requested we don't HEAD the object after uploading it // so make up the object as best we can assuming it got @@ -4721,6 +4919,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op lastModified = time.Now() } head.LastModified = &lastModified + head.VersionId = versionID o.setMetaData(&head) return nil } @@ -4746,8 +4945,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op func (o *Object) Remove(ctx context.Context) error { bucket, bucketPath := o.split() req := s3.DeleteObjectInput{ - Bucket: &bucket, - Key: &bucketPath, + Bucket: &bucket, + Key: &bucketPath, + VersionId: o.versionID, } if o.fs.opt.RequesterPays { req.RequestPayer = aws.String(s3.RequestPayerRequester) diff --git a/backend/s3/s3_internal_test.go b/backend/s3/s3_internal_test.go index fcaf8915f..f3b881fc5 100644 --- a/backend/s3/s3_internal_test.go +++ b/backend/s3/s3_internal_test.go @@ -12,6 +12,7 @@ import ( "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/fstests" "github.com/rclone/rclone/lib/random" + "github.com/rclone/rclone/lib/version" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -84,9 +85,85 @@ func (f *Fs) InternalTestNoHead(t *testing.T) { } +func (f *Fs) InternalTestVersions(t *testing.T) { + ctx := context.Background() + + // Enable versioning for this bucket during this test + _, err := f.setGetVersioning(ctx, "Enabled") + if err != nil { + t.Skipf("Couldn't enable versioning: %v", err) + } + defer func() { + // Disable versioning for this bucket + _, err := f.setGetVersioning(ctx, "Suspended") + assert.NoError(t, err) + }() + + // Create an object + const fileName = "test-versions.txt" + contents := random.String(100) + item := fstest.NewItem(fileName, contents, fstest.Time("2001-05-06T04:05:06.499999999Z")) + obj := fstests.PutTestContents(ctx, t, f, &item, contents, true) + defer func() { + assert.NoError(t, obj.Remove(ctx)) + }() + + // Remove it + assert.NoError(t, obj.Remove(ctx)) + + // And create it with different size and contents + newContents := random.String(101) + newItem := fstest.NewItem(fileName, newContents, fstest.Time("2002-05-06T04:05:06.499999999Z")) + _ = fstests.PutTestContents(ctx, t, f, &newItem, newContents, true) + + // Add the expected version suffix to the old version + item.Path = version.Add(item.Path, obj.(*Object).lastModified) + + t.Run("S3Version", func(t *testing.T) { + // Set --s3-versions for this test + f.opt.Versions = true + defer func() { + f.opt.Versions = false + }() + + // Check listing + items := append([]fstest.Item{item, newItem}, fstests.InternalTestFiles...) + fstest.CheckListing(t, f, items) + + // Read the contents + entries, err := f.List(ctx, "") + require.NoError(t, err) + tests := 0 + for _, entry := range entries { + switch entry.Remote() { + case newItem.Path: + t.Run("ReadCurrent", func(t *testing.T) { + assert.Equal(t, newContents, fstests.ReadObject(ctx, t, entry.(fs.Object), -1)) + }) + tests++ + case item.Path: + t.Run("ReadVersion", func(t *testing.T) { + assert.Equal(t, contents, fstests.ReadObject(ctx, t, entry.(fs.Object), -1)) + }) + tests++ + } + } + assert.Equal(t, 2, tests) + + // Check we can read the object with a version suffix + t.Run("NewObject", func(t *testing.T) { + o, err := f.NewObject(ctx, item.Path) + require.NoError(t, err) + require.NotNil(t, o) + assert.Equal(t, int64(100), o.Size(), o.Remote()) + }) + }) +} + func (f *Fs) InternalTest(t *testing.T) { t.Run("Metadata", f.InternalTestMetadata) t.Run("NoHead", f.InternalTestNoHead) + t.Run("Versions", f.InternalTestVersions) } var _ fstests.InternalTester = (*Fs)(nil) diff --git a/docs/content/s3.md b/docs/content/s3.md index 6462dcf0d..e2089b1d7 100644 --- a/docs/content/s3.md +++ b/docs/content/s3.md @@ -384,6 +384,68 @@ This will mean that these objects do not have an MD5 checksum. Note that reading this from the object takes an additional `HEAD` request as the metadata isn't returned in object listings. +### Versions + +When bucket versioning is enabled (this can be done with rclone with +the [`rclone backend versioning`](#versioning) command) when rclone +uploads a new version of a file it creates a +[new version of it](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Versioning.html) +Likewise when you delete a file, the old version will be marked hidden +and still be available. + +Old versions of files, where available, are visible using the +`--s3-versions` flag. + +If you wish to remove all the old versions then you can use the +[`rclone backend cleanup-hidden remote:bucket`](#cleanup-hidden) +command which will delete all the old hidden versions of files, +leaving the current ones intact. You can also supply a path and only +old versions under that path will be deleted, e.g. +`rclone backend cleanup-hidden remote:bucket/path/to/stuff`. + +When you `purge` a bucket, the current and the old versions will be +deleted then the bucket will be deleted. + +However `delete` will cause the current versions of the files to +become hidden old versions. + +Here is a session showing the listing and retrieval of an old +version followed by a `cleanup` of the old versions. + +Show current version and all the versions with `--s3-versions` flag. + +``` +$ rclone -q ls s3:cleanup-test + 9 one.txt + +$ rclone -q --s3-versions ls s3:cleanup-test + 9 one.txt + 8 one-v2016-07-04-141032-000.txt + 16 one-v2016-07-04-141003-000.txt + 15 one-v2016-07-02-155621-000.txt +``` + +Retrieve an old version + +``` +$ rclone -q --s3-versions copy s3:cleanup-test/one-v2016-07-04-141003-000.txt /tmp + +$ ls -l /tmp/one-v2016-07-04-141003-000.txt +-rw-rw-r-- 1 ncw ncw 16 Jul 2 17:46 /tmp/one-v2016-07-04-141003-000.txt +``` + +Clean up all the old versions and show that they've gone. + +``` +$ rclone -q backend cleanup-hidden s3:cleanup-test + +$ rclone -q ls s3:cleanup-test + 9 one.txt + +$ rclone -q --s3-versions ls s3:cleanup-test + 9 one.txt +``` + ### Cleanup If you run `rclone cleanup s3:bucket` then it will remove all pending @@ -2562,6 +2624,17 @@ Properties: - Type: bool - Default: false +#### --s3-versions + +Include old versions in directory listings. + +Properties: + +- Config: versions +- Env Var: RCLONE_S3_VERSIONS +- Type: bool +- Default: false + ### Metadata User metadata is stored as x-amz-meta- keys. S3 metadata keys are case insensitive and are always returned in lower case.