s3: implement alternate hashes - fixes #7445 FIXME WIP

This commit is contained in:
Nick Craig-Wood 2024-10-07 15:02:11 +01:00
parent d65d1a44b3
commit bdf1777185
3 changed files with 94 additions and 12 deletions

View File

@ -102,4 +102,5 @@ import (
genSetFrom(new(s3.CreateMultipartUploadInput), new(s3.PutObjectInput)) genSetFrom(new(s3.CreateMultipartUploadInput), new(s3.PutObjectInput))
genSetFrom(new(s3.HeadObjectOutput), new(s3.PutObjectInput)) genSetFrom(new(s3.HeadObjectOutput), new(s3.PutObjectInput))
genSetFrom(new(s3.CopyObjectInput), new(s3.PutObjectInput)) genSetFrom(new(s3.CopyObjectInput), new(s3.PutObjectInput))
genSetFrom(new(types.CompletedPart), new(s3.UploadPartOutput))
} }

View File

@ -2724,6 +2724,26 @@ use |-vv| to see the debug level logs.
`, "|", "`"), `, "|", "`"),
Default: sdkLogMode(0), Default: sdkLogMode(0),
Advanced: true, Advanced: true,
}, {
Name: "hash",
Help: strings.ReplaceAll(`Set to change the hash/checksum in use
This can be set to one of:
- |`+hash.MD5.String()+`|
- |`+hash.SHA1.String()+`|
- |`+hash.SHA256.String()+`|
- |`+hash.CRC32.String()+`|
To choose the checksum algorithm used by S3 to validate your data. Once the data
is uploaded the checksum algorithm can only be changed by copying the data.
This also sets the Hash that rclone uses.
See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
`, "|", "`"),
Default: hash.MD5,
Advanced: true,
}, },
}}) }})
} }
@ -2878,6 +2898,7 @@ type Options struct {
UseUnsignedPayload fs.Tristate `config:"use_unsigned_payload"` UseUnsignedPayload fs.Tristate `config:"use_unsigned_payload"`
SDKLogMode sdkLogMode `config:"sdk_log_mode"` SDKLogMode sdkLogMode `config:"sdk_log_mode"`
DirectoryBucket bool `config:"directory_bucket"` DirectoryBucket bool `config:"directory_bucket"`
Hash hash.Type `config:"hash"`
} }
// Fs represents a remote s3 server // Fs represents a remote s3 server
@ -2897,8 +2918,9 @@ type Fs struct {
srvRest *rest.Client // the rest connection to the server srvRest *rest.Client // the rest connection to the server
etagIsNotMD5 bool // if set ETags are not MD5s etagIsNotMD5 bool // if set ETags are not MD5s
versioningMu sync.Mutex versioningMu sync.Mutex
versioning fs.Tristate // if set bucket is using versions versioning fs.Tristate // if set bucket is using versions
warnCompressed sync.Once // warn once about compressed files warnCompressed sync.Once // warn once about compressed files
checksum types.ChecksumAlgorithm // for uploading new objects
} }
// Object describes a s3 object // Object describes a s3 object
@ -2922,6 +2944,7 @@ type Object struct {
contentDisposition *string // Content-Disposition: header contentDisposition *string // Content-Disposition: header
contentEncoding *string // Content-Encoding: header contentEncoding *string // Content-Encoding: header
contentLanguage *string // Content-Language: header contentLanguage *string // Content-Language: header
hash *string // if hash is set, then this is the alternate hash
} }
// safely dereference the pointer, returning a zero T if nil // safely dereference the pointer, returning a zero T if nil
@ -3662,6 +3685,18 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
// The normal API doesn't work for creating directory buckets, so don't try // The normal API doesn't work for creating directory buckets, so don't try
f.opt.NoCheckBucket = true f.opt.NoCheckBucket = true
} }
switch opt.Hash {
case hash.MD5:
f.checksum = ""
case hash.CRC32:
f.checksum = types.ChecksumAlgorithmCrc32
case hash.SHA1:
f.checksum = types.ChecksumAlgorithmSha1
case hash.SHA256:
f.checksum = types.ChecksumAlgorithmSha256
default:
return nil, fmt.Errorf("%s is not supported for hash/checksum algorithm", opt.Hash)
}
f.setRoot(root) f.setRoot(root)
f.features = (&fs.Features{ f.features = (&fs.Features{
ReadMimeType: true, ReadMimeType: true,
@ -4895,7 +4930,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
// Hashes returns the supported hash sets. // Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set { func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.MD5) return hash.Set(hash.MD5 | f.opt.Hash)
} }
// PublicLink generates a public link to the remote path (usually readable by anyone) // PublicLink generates a public link to the remote path (usually readable by anyone)
@ -5608,6 +5643,25 @@ func (o *Object) setMD5FromEtag(etag string) {
// Hash returns the Md5sum of an object returning a lowercase hex string // Hash returns the Md5sum of an object returning a lowercase hex string
func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) { func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
if t != hash.MD5 { if t != hash.MD5 {
if t == o.fs.opt.Hash {
if o.hash == nil {
err := o.readMetaData(ctx)
if err != nil {
return "", err
}
}
if o.hash == nil {
return "", nil
}
// FIXME check multipart
hashBytes, err := base64.StdEncoding.DecodeString(*o.hash)
if err != nil {
return "", fmt.Errorf("failed to read hash from response %q: %v", *o.hash, err)
} else if 2*len(hashBytes) != hash.Width(o.fs.opt.Hash, false) {
return "", fmt.Errorf("failed to read hash from response %q: wrong length", *o.hash)
}
return hex.EncodeToString(hashBytes), nil
}
return "", hash.ErrUnsupported return "", hash.ErrUnsupported
} }
// If decompressing, erase the hash // If decompressing, erase the hash
@ -5652,6 +5706,9 @@ func (f *Fs) headObject(ctx context.Context, req *s3.HeadObjectInput) (resp *s3.
if f.opt.SSECustomerKeyMD5 != "" { if f.opt.SSECustomerKeyMD5 != "" {
req.SSECustomerKeyMD5 = &f.opt.SSECustomerKeyMD5 req.SSECustomerKeyMD5 = &f.opt.SSECustomerKeyMD5
} }
if f.checksum != "" {
req.ChecksumMode = types.ChecksumModeEnabled
}
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
var err error var err error
resp, err = f.c.HeadObject(ctx, req) resp, err = f.c.HeadObject(ctx, req)
@ -5741,11 +5798,21 @@ func (o *Object) setMetaData(resp *s3.HeadObjectOutput) {
o.contentDisposition = resp.ContentDisposition o.contentDisposition = resp.ContentDisposition
o.contentEncoding = resp.ContentEncoding o.contentEncoding = resp.ContentEncoding
o.contentLanguage = resp.ContentLanguage o.contentLanguage = resp.ContentLanguage
if o.fs.opt.Hash == hash.CRC32 {
o.hash = resp.ChecksumCRC32
}
if o.fs.opt.Hash == hash.SHA1 {
o.hash = resp.ChecksumSHA1
}
if o.fs.opt.Hash == hash.SHA256 {
o.hash = resp.ChecksumSHA256
}
// If decompressing then size and md5sum are unknown // If decompressing then size and md5sum are unknown
if o.fs.opt.Decompress && deref(o.contentEncoding) == "gzip" { if o.fs.opt.Decompress && deref(o.contentEncoding) == "gzip" {
o.bytes = -1 o.bytes = -1
o.md5 = "" o.md5 = ""
o.hash = nil
} }
} }
@ -5910,6 +5977,9 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
if o.fs.opt.SSECustomerKeyMD5 != "" { if o.fs.opt.SSECustomerKeyMD5 != "" {
req.SSECustomerKeyMD5 = &o.fs.opt.SSECustomerKeyMD5 req.SSECustomerKeyMD5 = &o.fs.opt.SSECustomerKeyMD5
} }
if o.fs.checksum != "" {
req.ChecksumMode = types.ChecksumModeEnabled
}
// httpReq, err := s3.NewPresignClient(o.fs.c).PresignGetObject(ctx, &req) // httpReq, err := s3.NewPresignClient(o.fs.c).PresignGetObject(ctx, &req)
// if err != nil { // if err != nil {
// return nil, err // return nil, err
@ -6088,13 +6158,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
} }
// add a part number and etag to the completed parts // add a part number and etag to the completed parts
func (w *s3ChunkWriter) addCompletedPart(partNum *int32, eTag *string) { func (w *s3ChunkWriter) addCompletedPart(partNum *int32, uout *s3.UploadPartOutput) {
w.completedPartsMu.Lock() w.completedPartsMu.Lock()
defer w.completedPartsMu.Unlock() defer w.completedPartsMu.Unlock()
w.completedParts = append(w.completedParts, types.CompletedPart{ var part types.CompletedPart
PartNumber: partNum, setFrom_typesCompletedPart_s3UploadPartOutput(&part, uout)
ETag: eTag, part.PartNumber = partNum
}) w.completedParts = append(w.completedParts, part)
} }
// addMd5 adds a binary md5 to the md5 calculated so far // addMd5 adds a binary md5 to the md5 calculated so far
@ -6158,6 +6228,7 @@ func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader
SSECustomerAlgorithm: w.multiPartUploadInput.SSECustomerAlgorithm, SSECustomerAlgorithm: w.multiPartUploadInput.SSECustomerAlgorithm,
SSECustomerKey: w.multiPartUploadInput.SSECustomerKey, SSECustomerKey: w.multiPartUploadInput.SSECustomerKey,
SSECustomerKeyMD5: w.multiPartUploadInput.SSECustomerKeyMD5, SSECustomerKeyMD5: w.multiPartUploadInput.SSECustomerKeyMD5,
ChecksumAlgorithm: w.f.checksum,
} }
if w.f.opt.DirectoryBucket { if w.f.opt.DirectoryBucket {
// Directory buckets do not support "Content-Md5" header // Directory buckets do not support "Content-Md5" header
@ -6184,7 +6255,7 @@ func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader
return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", chunkNumber+1, currentChunkSize, err) return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", chunkNumber+1, currentChunkSize, err)
} }
w.addCompletedPart(s3PartNumber, uout.ETag) w.addCompletedPart(s3PartNumber, uout)
fs.Debugf(w.o, "multipart upload wrote chunk %d with %v bytes and etag %v", chunkNumber+1, currentChunkSize, *uout.ETag) fs.Debugf(w.o, "multipart upload wrote chunk %d with %v bytes and etag %v", chunkNumber+1, currentChunkSize, *uout.ETag)
return currentChunkSize, err return currentChunkSize, err
@ -6368,9 +6439,10 @@ func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options [
modTime := src.ModTime(ctx) modTime := src.ModTime(ctx)
ui.req = &s3.PutObjectInput{ ui.req = &s3.PutObjectInput{
Bucket: &bucket, Bucket: &bucket,
ACL: types.ObjectCannedACL(o.fs.opt.ACL), ACL: types.ObjectCannedACL(o.fs.opt.ACL),
Key: &bucketPath, Key: &bucketPath,
ChecksumAlgorithm: o.fs.checksum,
} }
// Fetch metadata if --metadata is in use // Fetch metadata if --metadata is in use

View File

@ -285,3 +285,12 @@ func setFrom_s3CopyObjectInput_s3PutObjectInput(a *s3.CopyObjectInput, b *s3.Put
a.Tagging = b.Tagging a.Tagging = b.Tagging
a.WebsiteRedirectLocation = b.WebsiteRedirectLocation a.WebsiteRedirectLocation = b.WebsiteRedirectLocation
} }
// setFrom_typesCompletedPart_s3UploadPartOutput copies matching elements from a to b
func setFrom_typesCompletedPart_s3UploadPartOutput(a *types.CompletedPart, b *s3.UploadPartOutput) {
a.ChecksumCRC32 = b.ChecksumCRC32
a.ChecksumCRC32C = b.ChecksumCRC32C
a.ChecksumSHA1 = b.ChecksumSHA1
a.ChecksumSHA256 = b.ChecksumSHA256
a.ETag = b.ETag
}