azureblob: allow working with only list+create/write permissions (fix no_head_object/add no_read_for_metadata)

There are three reasons an option why one would avoid HEAD requests on azureblob, with this now mainly implementing the latter one:

- no_head_object: Don't do a HEAD *before* writing to (or reading from) an object, to reduce the number of transactions and thus increase performance. **This already exists in azureblob, a small issue was fixed here where using a prefix would still require a HEAD request to the prefix itself.**
- no_head: Don't do a HEAD *after* writing to an object, for the same reason but at the cost of "increasing the chance for undetected upload failures".  **This is still not implemented for azureblob yet and quite complex, see s3.Object.Update()!**
- no_read_for_metadata: Instead of doing a HEAD to get metadata, use a list operation, to avoid requiring READ permissions for immutable/append-only applications (this seems to be quite azureblob-specific) - one could also use both no_head_object and no_head together to achieve this, but then uploads wouldn't be verified.

Fixes #6162 and #7027
This commit is contained in:
Moritz Marquardt 2024-04-04 13:31:25 +02:00
parent d84a4c9ac1
commit db14c3e68d
2 changed files with 118 additions and 4 deletions

View File

@ -401,6 +401,11 @@ rclone does if you know the container exists already.
Help: `If set, do not do HEAD before GET when getting objects.`,
Default: false,
Advanced: true,
}, {
Name: "no_read_for_metadata",
Help: `If set, use a list operation instead of the HEAD method on objects, to avoid requiring read permissions.`,
Default: false,
Advanced: true,
}, {
Name: "delete_snapshots",
Help: `Set to specify how to deal with snapshots on blob deletion.`,
@ -455,6 +460,7 @@ type Options struct {
DirectoryMarkers bool `config:"directory_markers"`
NoCheckContainer bool `config:"no_check_container"`
NoHeadObject bool `config:"no_head_object"`
NoReadForMetadata bool `config:"no_read_for_metadata"`
DeleteSnapshots string `config:"delete_snapshots"`
}
@ -909,7 +915,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
return nil, fmt.Errorf("internal error: auth failed to make credentials or client")
}
if f.rootContainer != "" && f.rootDirectory != "" {
if f.rootContainer != "" && f.rootDirectory != "" && !opt.NoHeadObject && !strings.HasSuffix(root, "/") {
// Check to see if the (container,directory) is actually an existing file
oldRoot := f.root
newRoot, leaf := path.Split(oldRoot)
@ -1838,12 +1844,109 @@ func (o *Object) clearMetaData() {
o.modTime = time.Time{}
}
// convertMetadataFromBlobItemToPropertiesResponse converts metadata from a list request to the one from an object request
// Thanks Microsoft for making these two types almost the same, but only almost... -.-
func convertMetadataFromBlobItemToPropertiesResponse(currentBlob container.BlobItem) blob.GetPropertiesResponse {
tagCount := int64(0)
if currentBlob.Properties.TagCount != nil {
tagCount = int64(*currentBlob.Properties.TagCount)
}
isCurrentVersion := true
return blob.GetPropertiesResponse{
AcceptRanges: nil,
AccessTier: (*string)(currentBlob.Properties.AccessTier),
AccessTierChangeTime: currentBlob.Properties.AccessTierChangeTime,
AccessTierInferred: currentBlob.Properties.AccessTierInferred,
ArchiveStatus: (*string)(currentBlob.Properties.ArchiveStatus),
BlobCommittedBlockCount: nil,
BlobSequenceNumber: currentBlob.Properties.BlobSequenceNumber,
BlobType: currentBlob.Properties.BlobType,
CacheControl: currentBlob.Properties.CacheControl,
ClientRequestID: nil,
ContentDisposition: currentBlob.Properties.ContentDisposition,
ContentEncoding: currentBlob.Properties.ContentEncoding,
ContentLanguage: currentBlob.Properties.ContentLanguage,
ContentLength: currentBlob.Properties.ContentLength,
ContentMD5: currentBlob.Properties.ContentMD5,
ContentType: currentBlob.Properties.ContentType,
CopyCompletionTime: currentBlob.Properties.CopyCompletionTime,
CopyID: currentBlob.Properties.CopyID,
CopyProgress: currentBlob.Properties.CopyProgress,
CopySource: currentBlob.Properties.CopySource,
CopyStatus: currentBlob.Properties.CopyStatus,
CopyStatusDescription: currentBlob.Properties.CopyStatusDescription,
CreationTime: currentBlob.Properties.CreationTime,
Date: currentBlob.Properties.LastModified,
DestinationSnapshot: currentBlob.Properties.DestinationSnapshot,
EncryptionKeySHA256: currentBlob.Properties.CustomerProvidedKeySHA256,
EncryptionScope: currentBlob.Properties.EncryptionScope,
ETag: currentBlob.Properties.ETag,
ExpiresOn: currentBlob.Properties.ExpiresOn,
ImmutabilityPolicyExpiresOn: currentBlob.Properties.ImmutabilityPolicyExpiresOn,
ImmutabilityPolicyMode: currentBlob.Properties.ImmutabilityPolicyMode,
IsCurrentVersion: &isCurrentVersion,
IsIncrementalCopy: currentBlob.Properties.IncrementalCopy,
IsSealed: currentBlob.Properties.IsSealed,
IsServerEncrypted: currentBlob.Properties.ServerEncrypted,
LastAccessed: currentBlob.Properties.LastAccessedOn,
LastModified: currentBlob.Properties.LastModified,
LeaseDuration: currentBlob.Properties.LeaseDuration,
LeaseState: currentBlob.Properties.LeaseState,
LeaseStatus: currentBlob.Properties.LeaseStatus,
LegalHold: currentBlob.Properties.LegalHold,
Metadata: currentBlob.Metadata,
ObjectReplicationPolicyID: nil,
ObjectReplicationRules: nil,
RehydratePriority: (*string)(currentBlob.Properties.RehydratePriority),
RequestID: nil,
TagCount: &tagCount,
Version: nil,
VersionID: nil,
}
}
// readMetaData gets the metadata using a ListBlobsFlatPager, to avoid requiring read access on the blob itself, and is used by Fs.readMetaData if no_read_for_metadata is set
func (f *Fs) readMetadataUsingList(ctx context.Context, containerName, containerPath string) (blobProperties blob.GetPropertiesResponse, err error) {
blobsPager := f.cntSVC(containerName).NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Include: container.ListBlobsInclude{
Metadata: true,
},
Prefix: &containerPath,
})
for blobsPager.More() {
blobs, err := blobsPager.NextPage(ctx)
if err != nil {
return blobProperties, err
}
for _, currentBlob := range blobs.ListBlobsFlatSegmentResponse.Segment.BlobItems {
if *currentBlob.Name != containerPath {
continue
}
return convertMetadataFromBlobItemToPropertiesResponse(*currentBlob), nil
}
}
return blobProperties, fs.ErrorObjectNotFound
}
// readMetaData gets the metadata if it hasn't already been fetched
func (f *Fs) readMetaData(ctx context.Context, container, containerPath string) (blobProperties blob.GetPropertiesResponse, err error) {
if !f.containerOK(container) {
func (f *Fs) readMetaData(ctx context.Context, containerName, containerPath string) (blobProperties blob.GetPropertiesResponse, err error) {
if !f.containerOK(containerName) {
return blobProperties, fs.ErrorObjectNotFound
}
blb := f.getBlobSVC(container, containerPath)
if f.opt.NoReadForMetadata {
// When not using HEAD (which requires read permissions), we need to use a list operation.
// By using the filename itself as a prefix, we hopefully only get that one file, but if not it is possible that this takes a bit longer than the HEAD operation.
err = f.pacer.Call(func() (bool, error) {
blobProperties, err = f.readMetadataUsingList(ctx, containerName, containerPath)
return f.shouldRetry(ctx, err)
})
return blobProperties, err
}
blb := f.getBlobSVC(containerName, containerPath)
// Read metadata (this includes metadata)
options := blob.GetPropertiesOptions{}

View File

@ -831,6 +831,17 @@ Properties:
- Type: bool
- Default: false
#### --azureblob-no-read-for-metadata
If set, use a list operation instead of the HEAD method on objects, to avoid requiring read permissions.
Properties:
- Config: no_read_for_metadata
- Env Var: RCLONE_AZUREBLOB_NO_READ_FOR_METADATA
- Type: bool
- Default: false
#### --azureblob-delete-snapshots
Set to specify how to deal with snapshots on blob deletion.