azureblob: implement multipart server side copy

This implements multipart server side copy to improve copying from one
azure region to another by orders of magnitude (from 30s for a 100M
file to 10s for a 10G file with --azureblob-upload-concurrency 500).

- Add `--azureblob-copy-cutoff` to control the cutoff from single to multipart copy
- Add `--azureblob-copy-concurrency` to control the copy concurrency
- Add ServerSideAcrossConfigs flag as this now works properly
- Implement multipart copy using put block list API
- Shortcut multipart copy for same storage account
- Override with `--azureblob-use-copy-blob`

Fixes #8249
This commit is contained in:
Nick Craig-Wood 2024-12-18 15:29:52 +00:00
parent 5a23230fa9
commit f67ac4d389
2 changed files with 319 additions and 55 deletions

View File

@ -44,11 +44,13 @@ import (
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/env"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pacer"
"golang.org/x/sync/errgroup"
)
const (
@ -312,6 +314,47 @@ Note that chunks are stored in memory and there may be up to
in memory.`,
Default: 16,
Advanced: true,
}, {
Name: "copy_cutoff",
Help: `Cutoff for switching to multipart copy.
Any files larger than this that need to be server-side copied will be
copied in chunks of chunk_size using the put block list API.
Files smaller than this limit will be copied with the Copy Blob API.`,
Default: 8 * fs.Mebi,
Advanced: true,
}, {
Name: "copy_concurrency",
Help: `Concurrency for multipart copy.
This is the number of chunks of the same file that are copied
concurrently.
These chunks are not buffered in memory and Microsoft recommends
setting this value to greater than 1000 in the azcopy documentation.
https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-optimize#increase-concurrency
In tests, copy speed increases almost linearly with copy
concurrency.`,
Default: 512,
Advanced: true,
}, {
Name: "use_copy_blob",
Help: `Whether to use the Copy Blob API when copying to the same storage account.
If true (the default) then rclone will use the Copy Blob API for
copies to the same storage account even when the size is above the
copy_cutoff.
Rclone assumes that the same storage account means the same config
and does not check for the same storage account in different configs.
There should be no need to change this value.
`,
Default: true,
Advanced: true,
}, {
Name: "list_chunk",
Help: `Size of blob list.
@ -478,6 +521,9 @@ type Options struct {
UseAZ bool `config:"use_az"`
Endpoint string `config:"endpoint"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
CopyCutoff fs.SizeSuffix `config:"copy_cutoff"`
CopyConcurrency int `config:"copy_concurrency"`
UseCopyBlob bool `config:"use_copy_blob"`
UploadConcurrency int `config:"upload_concurrency"`
ListChunkSize uint `config:"list_chunk"`
AccessTier string `config:"access_tier"`
@ -502,6 +548,9 @@ type Fs struct {
cntSVCcacheMu sync.Mutex // mutex to protect cntSVCcache
cntSVCcache map[string]*container.Client // reference to containerClient per container
svc *service.Client // client to access azblob
cred azcore.TokenCredential // how to generate tokens (may be nil)
sharedKeyCred *service.SharedKeyCredential // shared key credentials (may be nil)
anonymous bool // if this is anonymous access
rootContainer string // container part of root (if any)
rootDirectory string // directory part of root (if any)
isLimited bool // if limited to one container
@ -640,6 +689,14 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
return
}
func (f *Fs) setCopyCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadChunkSize(cs)
if err == nil {
old, f.opt.CopyCutoff = f.opt.CopyCutoff, cs
}
return
}
type servicePrincipalCredentials struct {
AppID string `json:"appId"`
Password string `json:"password"`
@ -725,12 +782,13 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
f.publicAccess = container.PublicAccessType(opt.PublicAccess)
f.setRoot(root)
f.features = (&fs.Features{
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
BucketBasedRootOK: true,
SetTier: true,
GetTier: true,
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
BucketBasedRootOK: true,
SetTier: true,
GetTier: true,
ServerSideAcrossConfigs: true,
}).Fill(ctx, f)
if opt.DirectoryMarkers {
f.features.CanHaveEmptyDirectories = true
@ -745,12 +803,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ClientOptions: policyClientOptions,
}
// Here we auth by setting one of cred, sharedKeyCred, f.svc or anonymous
var (
cred azcore.TokenCredential
sharedKeyCred *service.SharedKeyCredential
anonymous = false
)
// Here we auth by setting one of f.cred, f.sharedKeyCred, f.svc or f.anonymous
switch {
case opt.EnvAuth:
// Read account from environment if needed
@ -762,7 +815,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ClientOptions: policyClientOptions,
DisableInstanceDiscovery: opt.DisableInstanceDiscovery,
}
cred, err = azidentity.NewDefaultAzureCredential(&options)
f.cred, err = azidentity.NewDefaultAzureCredential(&options)
if err != nil {
return nil, fmt.Errorf("create azure environment credential failed: %w", err)
}
@ -776,12 +829,12 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
if opt.Endpoint == "" {
opt.Endpoint = emulatorBlobEndpoint
}
sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
f.sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
if err != nil {
return nil, fmt.Errorf("create new shared key credential for emulator failed: %w", err)
}
case opt.Account != "" && opt.Key != "":
sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
f.sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
if err != nil {
return nil, fmt.Errorf("create new shared key credential failed: %w", err)
}
@ -816,7 +869,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
options := azidentity.ClientSecretCredentialOptions{
ClientOptions: policyClientOptions,
}
cred, err = azidentity.NewClientSecretCredential(opt.Tenant, opt.ClientID, opt.ClientSecret, &options)
f.cred, err = azidentity.NewClientSecretCredential(opt.Tenant, opt.ClientID, opt.ClientSecret, &options)
if err != nil {
return nil, fmt.Errorf("error creating a client secret credential: %w", err)
}
@ -850,7 +903,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ClientOptions: policyClientOptions,
SendCertificateChain: opt.ClientSendCertificateChain,
}
cred, err = azidentity.NewClientCertificateCredential(
f.cred, err = azidentity.NewClientCertificateCredential(
opt.Tenant, opt.ClientID, certs, key, &options,
)
if err != nil {
@ -865,7 +918,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
if err != nil {
return nil, fmt.Errorf("user password decode failed - did you obscure it?: %w", err)
}
cred, err = azidentity.NewUsernamePasswordCredential(
f.cred, err = azidentity.NewUsernamePasswordCredential(
opt.Tenant, opt.ClientID, opt.Username, password, &options,
)
if err != nil {
@ -884,7 +937,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
options := azidentity.ClientSecretCredentialOptions{
ClientOptions: policyClientOptions,
}
cred, err = azidentity.NewClientSecretCredential(parsedCreds.Tenant, parsedCreds.AppID, parsedCreds.Password, &options)
f.cred, err = azidentity.NewClientSecretCredential(parsedCreds.Tenant, parsedCreds.AppID, parsedCreds.Password, &options)
if err != nil {
return nil, fmt.Errorf("error creating a client secret credential: %w", err)
}
@ -906,19 +959,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
case opt.MSIResourceID != "":
options.ID = azidentity.ResourceID(opt.MSIResourceID)
}
cred, err = azidentity.NewManagedIdentityCredential(&options)
f.cred, err = azidentity.NewManagedIdentityCredential(&options)
if err != nil {
return nil, fmt.Errorf("failed to acquire MSI token: %w", err)
}
case opt.UseAZ:
var options = azidentity.AzureCLICredentialOptions{}
cred, err = azidentity.NewAzureCLICredential(&options)
f.cred, err = azidentity.NewAzureCLICredential(&options)
if err != nil {
return nil, fmt.Errorf("failed to create Azure CLI credentials: %w", err)
}
case opt.Account != "":
// Anonymous access
anonymous = true
f.anonymous = true
default:
return nil, errors.New("no authentication method configured")
}
@ -936,19 +989,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
}
opt.Endpoint = u.String()
}
if sharedKeyCred != nil {
if f.sharedKeyCred != nil {
// Shared key cred
f.svc, err = service.NewClientWithSharedKeyCredential(opt.Endpoint, sharedKeyCred, &clientOpt)
f.svc, err = service.NewClientWithSharedKeyCredential(opt.Endpoint, f.sharedKeyCred, &clientOpt)
if err != nil {
return nil, fmt.Errorf("create client with shared key failed: %w", err)
}
} else if cred != nil {
} else if f.cred != nil {
// Azidentity cred
f.svc, err = service.NewClient(opt.Endpoint, cred, &clientOpt)
f.svc, err = service.NewClient(opt.Endpoint, f.cred, &clientOpt)
if err != nil {
return nil, fmt.Errorf("create client failed: %w", err)
}
} else if anonymous {
} else if f.anonymous {
// Anonymous public access
f.svc, err = service.NewClientWithNoCredential(opt.Endpoint, &clientOpt)
if err != nil {
@ -1502,7 +1555,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error {
// When a container is deleted, a container with the same name cannot be created
// for at least 30 seconds; the container may not be available for more than 30
// seconds if the service is still processing the request.
time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds
time.Sleep(12 * time.Second) // default 10 retries will be 120 seconds
f.cache.MarkDeleted(container)
return true, err
case bloberror.AuthorizationFailure:
@ -1610,6 +1663,214 @@ func (f *Fs) Purge(ctx context.Context, dir string) error {
return f.deleteContainer(ctx, container)
}
// getAuth gets auth to copy o.
//
// tokenOK is used to signal that token based auth (Microsoft Entra
// ID) is acceptable.
//
// This will return srcURL to read the object, which may be a SAS URL.
//
// If noAuth is set then the srcURL returned will be a plain object
// URL (not a SAS) and token will be empty.
//
// If tokenOK is true it may also return a token for the auth.
func (o *Object) getAuth(ctx context.Context, tokenOK bool, noAuth bool) (srcURL string, token *string, err error) {
f := o.fs
srcBlobSVC := o.getBlobSVC()
srcURL = srcBlobSVC.URL()
switch {
case noAuth:
// If same storage account then no auth needed
case f.cred != nil:
if !tokenOK {
return srcURL, token, errors.New("not supported: Microsoft Entra ID")
}
options := policy.TokenRequestOptions{}
accessToken, err := f.cred.GetToken(ctx, options)
if err != nil {
return srcURL, token, fmt.Errorf("failed to create access token: %w", err)
}
token = &accessToken.Token
case f.sharedKeyCred != nil:
// Generate a short lived SAS URL if using shared key credentials
expiry := time.Now().Add(time.Hour)
sasOptions := blob.GetSASURLOptions{}
srcURL, err = srcBlobSVC.GetSASURL(sas.BlobPermissions{Read: true}, expiry, &sasOptions)
if err != nil {
return srcURL, token, fmt.Errorf("failed to create SAS URL: %w", err)
}
case f.anonymous || f.opt.SASURL != "":
// If using a SASURL or anonymous, no need for any extra auth
default:
return srcURL, token, errors.New("unknown authentication type")
}
return srcURL, token, nil
}
// Do multipart parallel copy.
//
// This uses these APIs:
//
// - PutBlockFromURL - https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-from-url
// - PutBlockList - https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list
func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath string, src *Object) (dst fs.Object, err error) {
srcProperties, err := src.readMetaDataAlways(ctx)
if err != nil {
return nil, fmt.Errorf("multipart copy: failed to read source object: %w", err)
}
// Create the dst object by altering a copy of the src object
obj := *src
o := &obj
o.fs = f
o.remote = remote
srcURL, token, err := src.getAuth(ctx, true, false)
if err != nil {
return nil, fmt.Errorf("multipart copy: %w", err)
}
bic, err := newBlockIDCreator()
if err != nil {
return nil, err
}
dstBlockBlobSVC := f.getBlockBlobSVC(dstContainer, dstPath)
defer atexit.OnError(&err, func() {
// Try to abort the upload, but ignore the error.
fs.Debugf(o, "Cancelling multipart copy")
_ = o.clearUncomittedBlocks(ctx)
})()
var (
srcSize = src.size
partSize = int64(chunksize.Calculator(o, src.size, blockblob.MaxBlocks, f.opt.ChunkSize))
numParts = (srcSize-1)/partSize + 1
blockIDs = make([]string, numParts) // list of blocks for finalize
g, gCtx = errgroup.WithContext(ctx)
)
g.SetLimit(f.opt.CopyConcurrency)
fs.Debugf(o, "Starting multipart copy with %d parts of size %v", numParts, fs.SizeSuffix(partSize))
for partNum := uint64(0); partNum < uint64(numParts); partNum++ {
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
break
}
partNum := partNum // for closure
g.Go(func() error {
blockID := bic.newBlockID(partNum)
options := blockblob.StageBlockFromURLOptions{
Range: blob.HTTPRange{
Offset: int64(partNum) * partSize,
Count: partSize,
},
// Specifies the authorization scheme and signature for the copy source.
CopySourceAuthorization: token,
// CPKInfo *blob.CPKInfo
// CPKScopeInfo *blob.CPKScopeInfo
}
// Partial last block
if remaining := srcSize - options.Range.Offset; remaining < options.Range.Count {
options.Range.Count = remaining
}
fs.Debugf(o, "multipart copy: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(options.Range.Count), fs.SizeSuffix(options.Range.Offset), fs.SizeSuffix(srcSize))
err := f.pacer.Call(func() (bool, error) {
_, err = dstBlockBlobSVC.StageBlockFromURL(ctx, blockID, srcURL, &options)
if err != nil {
return f.shouldRetry(ctx, err)
}
return false, nil
})
if err != nil {
return fmt.Errorf("multipart copy: failed to copy chunk %d with %v bytes: %w", partNum+1, -1, err)
}
blockIDs[partNum] = blockID
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
}
// Convert metadata from source object
options := blockblob.CommitBlockListOptions{
Metadata: srcProperties.Metadata,
Tier: parseTier(f.opt.AccessTier),
HTTPHeaders: &blob.HTTPHeaders{
BlobCacheControl: srcProperties.CacheControl,
BlobContentDisposition: srcProperties.ContentDisposition,
BlobContentEncoding: srcProperties.ContentEncoding,
BlobContentLanguage: srcProperties.ContentLanguage,
BlobContentMD5: srcProperties.ContentMD5,
BlobContentType: srcProperties.ContentType,
},
}
// Finalise the upload session
err = f.pacer.Call(func() (bool, error) {
_, err := dstBlockBlobSVC.CommitBlockList(ctx, blockIDs, &options)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, fmt.Errorf("failed to complete multipart copy: %w", err)
}
fs.Debugf(o, "multipart copy finished")
return f.NewObject(ctx, remote)
}
// Do single part copy.
//
// This uses these APIs:
//
// - Copy Blob - https://docs.microsoft.com/rest/api/storageservices/copy-blob
// - Get Blob Properties - https://docs.microsoft.com/rest/api/storageservices/get-blob-properties
func (f *Fs) copySinglepart(ctx context.Context, remote, dstContainer, dstPath string, src *Object) (dst fs.Object, err error) {
dstBlobSVC := f.getBlobSVC(dstContainer, dstPath)
// Get the source auth - none needed for same storage account
srcURL, _, err := src.getAuth(ctx, false, f == src.fs)
if err != nil {
return nil, fmt.Errorf("single part copy: source auth: %w", err)
}
// Start the copy
options := blob.StartCopyFromURLOptions{
Tier: parseTier(f.opt.AccessTier),
}
var startCopy blob.StartCopyFromURLResponse
err = f.pacer.Call(func() (bool, error) {
startCopy, err = dstBlobSVC.StartCopyFromURL(ctx, srcURL, &options)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, fmt.Errorf("single part copy: copy blob: %w", err)
}
// Poll for completion if necessary
//
// The for loop is never executed for same storage account copies.
copyStatus := startCopy.CopyStatus
getOptions := blob.GetPropertiesOptions{}
pollTime := 100 * time.Millisecond
for copyStatus != nil && string(*copyStatus) == string(container.CopyStatusTypePending) {
time.Sleep(pollTime)
getMetadata, err := dstBlobSVC.GetProperties(ctx, &getOptions)
if err != nil {
return nil, err
}
copyStatus = getMetadata.CopyStatus
pollTime = min(2*pollTime, time.Second)
}
return f.NewObject(ctx, remote)
}
// Copy src to this remote using server-side copy operations.
//
// This is stored with the remote path given.
@ -1630,36 +1891,29 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
dstBlobSVC := f.getBlobSVC(dstContainer, dstPath)
srcBlobSVC := srcObj.getBlobSVC()
srcURL := srcBlobSVC.URL()
options := blob.StartCopyFromURLOptions{
Tier: parseTier(f.opt.AccessTier),
}
var startCopy blob.StartCopyFromURLResponse
err = f.pacer.Call(func() (bool, error) {
startCopy, err = dstBlobSVC.StartCopyFromURL(ctx, srcURL, &options)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, err
// Assume we are copying to a different storage account if we
// are copying across configs.
sameStorageAccount := f == srcObj.fs
// If we are using Microsoft Entra ID token based auth then
// copySinglepart does not work
usingEntraID := f.cred != nil
// Use multipart copy if size > cutoff
// or using Entra ID and we are not using the same storage account
useMultiPart := srcObj.size >= int64(f.opt.CopyCutoff) || (usingEntraID && !sameStorageAccount)
// Force the use of copy blob if on the same storage account
// and the user hasn't forbidden it.
if f.opt.UseCopyBlob && sameStorageAccount {
useMultiPart = false
}
copyStatus := startCopy.CopyStatus
getOptions := blob.GetPropertiesOptions{}
pollTime := 100 * time.Millisecond
for copyStatus != nil && string(*copyStatus) == string(container.CopyStatusTypePending) {
time.Sleep(pollTime)
getMetadata, err := dstBlobSVC.GetProperties(ctx, &getOptions)
if err != nil {
return nil, err
}
copyStatus = getMetadata.CopyStatus
pollTime = min(2*pollTime, time.Second)
if useMultiPart {
return f.copyMultipart(ctx, remote, dstContainer, dstPath, srcObj)
}
return f.NewObject(ctx, remote)
return f.copySinglepart(ctx, remote, dstContainer, dstPath, srcObj)
}
// ------------------------------------------------------------

View File

@ -15,13 +15,17 @@ import (
// TestIntegration runs integration tests against the remote
func TestIntegration(t *testing.T) {
name := "TestAzureBlob"
fstests.Run(t, &fstests.Opt{
RemoteName: "TestAzureBlob:",
RemoteName: name + ":",
NilObject: (*Object)(nil),
TiersToTest: []string{"Hot", "Cool", "Cold"},
ChunkedUpload: fstests.ChunkedUploadConfig{
MinChunkSize: defaultChunkSize,
},
ExtraConfig: []fstests.ExtraConfigItem{
{Name: name, Key: "use_copy_blob", Value: "false"},
},
})
}
@ -40,6 +44,7 @@ func TestIntegration2(t *testing.T) {
},
ExtraConfig: []fstests.ExtraConfigItem{
{Name: name, Key: "directory_markers", Value: "true"},
{Name: name, Key: "use_copy_blob", Value: "false"},
},
})
}
@ -48,8 +53,13 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs)
}
func (f *Fs) SetCopyCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setCopyCutoff(cs)
}
var (
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
_ fstests.SetCopyCutoffer = (*Fs)(nil)
)
func TestValidateAccessTier(t *testing.T) {