From 8f164e4df53cf3435b83eae2474bbdc0fe2dd6d8 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 16 Feb 2022 17:50:11 +0000 Subject: [PATCH] s3: Use the ETag on multipart transfers to verify the transfer was OK Before this rclone ignored the ETag on multipart uploads which missed an opportunity for a whole file integrity check. This adds that check which means that we now check even harder that multipart uploads have arrived properly. See #5993 --- backend/s3/s3.go | 52 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 55b49503a..cf93287c9 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -3244,9 +3244,6 @@ func (o *Object) readMetaData(ctx context.Context) (err error) { if err != nil { return err } - if resp.LastModified == nil { - fs.Logf(o, "Failed to read last modified from HEAD: %v", err) - } o.setMetaData(resp.ETag, resp.ContentLength, resp.LastModified, resp.Metadata, resp.ContentType, resp.StorageClass) return nil } @@ -3276,6 +3273,7 @@ func (o *Object) setMetaData(etag *string, contentLength *int64, lastModified *t o.storageClass = aws.StringValue(storageClass) if lastModified == nil { o.lastModified = time.Now() + fs.Logf(o, "Failed to read last modified") } else { o.lastModified = *lastModified } @@ -3447,9 +3445,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read if err != nil { return nil, err } - if resp.LastModified == nil { - fs.Logf(o, "Failed to read last modified: %v", err) - } + // read size from ContentLength or ContentRange size := resp.ContentLength if resp.ContentRange != nil { @@ -3472,7 +3468,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read var warnStreamUpload sync.Once -func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (err error) { +func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, err error) { f := o.fs // make concurrency machinery @@ -3519,7 +3515,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return f.shouldRetry(ctx, err) }) if err != nil { - return fmt.Errorf("multipart upload failed to initialise: %w", err) + return etag, fmt.Errorf("multipart upload failed to initialise: %w", err) } uid := cout.UploadId @@ -3548,8 +3544,21 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si partsMu sync.Mutex // to protect parts parts []*s3.CompletedPart off int64 + md5sMu sync.Mutex + md5s []byte ) + addMd5 := func(md5binary *[md5.Size]byte, partNum int64) { + md5sMu.Lock() + defer md5sMu.Unlock() + start := partNum * md5.Size + end := start + md5.Size + if extend := end - int64(len(md5s)); extend > 0 { + md5s = append(md5s, make([]byte, extend)...) + } + copy(md5s[start:end], (*md5binary)[:]) + } + for partNum := int64(1); !finished; partNum++ { // Get a block of memory from the pool and token which limits concurrency. tokens.Get() @@ -3579,7 +3588,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si finished = true } else if err != nil { free() - return fmt.Errorf("multipart upload failed to read source: %w", err) + return etag, fmt.Errorf("multipart upload failed to read source: %w", err) } buf = buf[:n] @@ -3592,6 +3601,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si // create checksum of buffer for integrity checking md5sumBinary := md5.Sum(buf) + addMd5(&md5sumBinary, partNum-1) md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) err = f.pacer.Call(func() (bool, error) { @@ -3633,7 +3643,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si } err = g.Wait() if err != nil { - return err + return etag, err } // sort the completed parts by part number @@ -3654,9 +3664,11 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return f.shouldRetry(ctx, err) }) if err != nil { - return fmt.Errorf("multipart upload failed to finalise: %w", err) + return etag, fmt.Errorf("multipart upload failed to finalise: %w", err) } - return nil + hashOfHashes := md5.Sum(md5s) + etag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(parts)) + return etag, nil } // Update the Object from in with modTime and size @@ -3765,8 +3777,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } var resp *http.Response // response from PUT + var wantETag string // Multipart upload Etag to check if multipart { - err = o.uploadMultipart(ctx, &req, size, in) + wantETag, err = o.uploadMultipart(ctx, &req, size, in) if err != nil { return err } @@ -3846,7 +3859,18 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Read the metadata from the newly created object o.meta = nil // wipe old metadata - err = o.readMetaData(ctx) + head, err := o.headObject(ctx) + if err != nil { + return err + } + o.setMetaData(head.ETag, head.ContentLength, head.LastModified, head.Metadata, head.ContentType, head.StorageClass) + if !o.fs.etagIsNotMD5 && wantETag != "" && head.ETag != nil && *head.ETag != "" { + gotETag := strings.Trim(strings.ToLower(*head.ETag), `"`) + if wantETag != gotETag { + return fmt.Errorf("multipart upload corrupted: Etag differ: expecting %s but got %s", wantETag, gotETag) + } + fs.Debugf(o, "Multipart upload Etag: %s OK", wantETag) + } return err }