backend/azureblob: Port new Azure Blob Storage SDK #2362

This change includes removing older azureblob storage SDK, and getting
parity to existing code with latest blob storage SDK.
This change is also pre-req for addressing #2091
This commit is contained in:
sandeepkru 2018-07-08 21:39:58 -07:00
parent 2cb79cb43d
commit e24fe27153
5 changed files with 267 additions and 230 deletions

View File

@ -1,9 +1,12 @@
// Package azureblob provides an interface to the Microsoft Azure blob object storage system // Package azureblob provides an interface to the Microsoft Azure blob object storage system
// +build !freebsd,!netbsd,!openbsd,!plan9,!solaris,go1.8
package azureblob package azureblob
import ( import (
"bytes" "bytes"
"crypto/md5" "context"
"encoding/base64" "encoding/base64"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
@ -18,13 +21,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/Azure/azure-sdk-for-go/storage" "github.com/Azure/azure-storage-blob-go/2017-07-29/azblob"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/accounting"
"github.com/ncw/rclone/fs/config" "github.com/ncw/rclone/fs/config"
"github.com/ncw/rclone/fs/config/flags" "github.com/ncw/rclone/fs/config/flags"
"github.com/ncw/rclone/fs/fserrors" "github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fs/fshttp"
"github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/fs/walk" "github.com/ncw/rclone/fs/walk"
"github.com/ncw/rclone/lib/pacer" "github.com/ncw/rclone/lib/pacer"
@ -64,9 +65,6 @@ func init() {
}, { }, {
Name: "key", Name: "key",
Help: "Storage Account Key (leave blank to use connection string or SAS URL)", Help: "Storage Account Key (leave blank to use connection string or SAS URL)",
}, {
Name: "connection_string",
Help: "Connection string (leave blank if using account/key or SAS URL)",
}, { }, {
Name: "sas_url", Name: "sas_url",
Help: "SAS URL for container level access only\n(leave blank if using account/key or connection string)", Help: "SAS URL for container level access only\n(leave blank if using account/key or connection string)",
@ -82,13 +80,13 @@ func init() {
// Fs represents a remote azure server // Fs represents a remote azure server
type Fs struct { type Fs struct {
name string // name of this remote name string // name of this remote
root string // the path we are working on if any root string // the path we are working on if any
features *fs.Features // optional features features *fs.Features // optional features
account string // account name account string // account name
endpoint string // name of the starting api endpoint endpoint string // name of the starting api endpoint
bc *storage.BlobStorageClient svcURL *azblob.ServiceURL // reference to serviceURL
cc *storage.Container cntURL *azblob.ContainerURL // reference to containerURL
container string // the container we are working on container string // the container we are working on
containerOKMu sync.Mutex // mutex to protect container OK containerOKMu sync.Mutex // mutex to protect container OK
containerOK bool // true if we have created the container containerOK bool // true if we have created the container
@ -99,13 +97,14 @@ type Fs struct {
// Object describes a azure object // Object describes a azure object
type Object struct { type Object struct {
fs *Fs // what this object is part of fs *Fs // what this object is part of
remote string // The remote path remote string // The remote path
modTime time.Time // The modified time of the object if known modTime time.Time // The modified time of the object if known
md5 string // MD5 hash if known md5 string // MD5 hash if known
size int64 // Size of the object size int64 // Size of the object
mimeType string // Content-Type of the object mimeType string // Content-Type of the object
meta map[string]string // blob metadata accessTier azblob.AccessTierType // Blob Access Tier
meta map[string]string // blob metadata
} }
// ------------------------------------------------------------ // ------------------------------------------------------------
@ -165,8 +164,8 @@ var retryErrorCodes = []int{
// deserve to be retried. It returns the err as a convenience // deserve to be retried. It returns the err as a convenience
func (f *Fs) shouldRetry(err error) (bool, error) { func (f *Fs) shouldRetry(err error) (bool, error) {
// FIXME interpret special errors - more to do here // FIXME interpret special errors - more to do here
if storageErr, ok := err.(storage.AzureStorageServiceError); ok { if storageErr, ok := err.(azblob.StorageError); ok {
statusCode := storageErr.StatusCode statusCode := storageErr.Response().StatusCode
for _, e := range retryErrorCodes { for _, e := range retryErrorCodes {
if statusCode == e { if statusCode == e {
return true, err return true, err
@ -190,44 +189,45 @@ func NewFs(name, root string) (fs.Fs, error) {
} }
account := config.FileGet(name, "account") account := config.FileGet(name, "account")
key := config.FileGet(name, "key") key := config.FileGet(name, "key")
connectionString := config.FileGet(name, "connection_string")
sasURL := config.FileGet(name, "sas_url") sasURL := config.FileGet(name, "sas_url")
endpoint := config.FileGet(name, "endpoint", storage.DefaultBaseURL) endpoint := config.FileGet(name, "endpoint", "blob.core.windows.net")
var ( var (
oclient storage.Client u *url.URL
client = &oclient serviceURL azblob.ServiceURL
cc *storage.Container containerURL azblob.ContainerURL
) )
switch { switch {
case account != "" && key != "": case account != "" && key != "":
oclient, err = storage.NewClient(account, key, endpoint, apiVersion, true) credential := azblob.NewSharedKeyCredential(account, key)
u, err = url.Parse(fmt.Sprintf("https://%s.%s", account, endpoint))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to make azure storage client from account/key") return nil, errors.Wrap(err, "failed to make azure storage url from account and endpoint")
}
case connectionString != "":
oclient, err = storage.NewClientFromConnectionString(connectionString)
if err != nil {
return nil, errors.Wrap(err, "failed to make azure storage client from connection string")
} }
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{})
serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
case sasURL != "": case sasURL != "":
URL, err := url.Parse(sasURL) u, err = url.Parse(sasURL)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to parse SAS URL") return nil, errors.Wrapf(err, "failed to parse SAS URL")
} }
cc, err = storage.GetContainerReferenceFromSASURI(*URL) // use anonymous credentials in case of sas url
if err != nil { pipeline := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{})
return nil, errors.Wrapf(err, "failed to make azure storage client from SAS URL") // Check if we have container level SAS or account level sas
parts := azblob.NewBlobURLParts(*u)
if parts.ContainerName != "" {
if parts.ContainerName != container {
return nil, errors.New("Container name in SAS URL and container provided in command do not match")
}
containerURL = azblob.NewContainerURL(*u, pipeline)
} else {
serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
} }
client = cc.Client()
default: default:
return nil, errors.New("Need account+key or connectionString or sasURL") return nil, errors.New("Need account+key or connectionString or sasURL")
} }
client.HTTPClient = fshttp.NewClient(fs.Config)
bc := client.GetBlobService()
if cc == nil {
cc = bc.GetContainerReference(container)
}
f := &Fs{ f := &Fs{
name: name, name: name,
@ -235,8 +235,8 @@ func NewFs(name, root string) (fs.Fs, error) {
root: directory, root: directory,
account: account, account: account,
endpoint: endpoint, endpoint: endpoint,
bc: &bc, svcURL: &serviceURL,
cc: cc, cntURL: &containerURL,
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
} }
@ -274,22 +274,17 @@ func NewFs(name, root string) (fs.Fs, error) {
// Return an Object from a path // Return an Object from a path
// //
// If it can't be found it returns the error fs.ErrorObjectNotFound. // If it can't be found it returns the error fs.ErrorObjectNotFound.
func (f *Fs) newObjectWithInfo(remote string, info *storage.Blob) (fs.Object, error) { func (f *Fs) newObjectWithInfo(remote string, info *azblob.Blob) (fs.Object, error) {
o := &Object{ o := &Object{
fs: f, fs: f,
remote: remote, remote: remote,
} }
if info != nil {
err := o.decodeMetaData(info) err := o.readMetaData() // reads info and headers, returning an error
if err != nil { if err != nil {
return nil, err return nil, err
}
} else {
err := o.readMetaData() // reads info and headers, returning an error
if err != nil {
return nil, err
}
} }
return o, nil return o, nil
} }
@ -300,13 +295,13 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) {
} }
// getBlobReference creates an empty blob reference with no metadata // getBlobReference creates an empty blob reference with no metadata
func (f *Fs) getBlobReference(remote string) *storage.Blob { func (f *Fs) getBlobReference(remote string) azblob.BlobURL {
return f.cc.GetBlobReference(f.root + remote) return f.cntURL.NewBlobURL(f.root + remote)
} }
// getBlobWithModTime adds the modTime passed in to o.meta and creates // getBlobWithModTime adds the modTime passed in to o.meta and creates
// a Blob from it. // a Blob from it.
func (o *Object) getBlobWithModTime(modTime time.Time) *storage.Blob { func (o *Object) getBlobWithModTime(modTime time.Time) *azblob.BlobURL {
// Make sure o.meta is not nil // Make sure o.meta is not nil
if o.meta == nil { if o.meta == nil {
o.meta = make(map[string]string, 1) o.meta = make(map[string]string, 1)
@ -316,12 +311,18 @@ func (o *Object) getBlobWithModTime(modTime time.Time) *storage.Blob {
o.meta[modTimeKey] = modTime.Format(timeFormatOut) o.meta[modTimeKey] = modTime.Format(timeFormatOut)
blob := o.getBlobReference() blob := o.getBlobReference()
blob.Metadata = o.meta ctx := context.Background()
return blob _, err := blob.SetMetadata(ctx, o.meta, azblob.BlobAccessConditions{})
if err != nil {
return nil
}
return &blob
} }
// listFn is called from list to handle an object // listFn is called from list to handle an object
type listFn func(remote string, object *storage.Blob, isDirectory bool) error type listFn func(remote string, object *azblob.Blob, isDirectory bool) error
// list lists the objects into the function supplied from // list lists the objects into the function supplied from
// the container and root supplied // the container and root supplied
@ -342,32 +343,38 @@ func (f *Fs) list(dir string, recurse bool, maxResults uint, fn listFn) error {
if !recurse { if !recurse {
delimiter = "/" delimiter = "/"
} }
params := storage.ListBlobsParameters{
MaxResults: maxResults, options := azblob.ListBlobsSegmentOptions{
Prefix: root, Details: azblob.BlobListingDetails{
Delimiter: delimiter,
Include: &storage.IncludeBlobDataset{
Snapshots: false,
Metadata: true,
UncommittedBlobs: false,
Copy: false, Copy: false,
Metadata: true,
Snapshots: false,
UncommittedBlobs: false,
Deleted: false,
}, },
Prefix: root,
MaxResults: int32(maxResults),
} }
for { ctx := context.Background()
var response storage.BlobListResponse for marker := (azblob.Marker{}); marker.NotDone(); {
var response *azblob.ListBlobsHierarchyResponse
err := f.pacer.Call(func() (bool, error) { err := f.pacer.Call(func() (bool, error) {
var err error var err error
response, err = f.cc.ListBlobs(params) response, err = f.cntURL.ListBlobsHierarchySegment(ctx, marker, delimiter, options)
return f.shouldRetry(err) return f.shouldRetry(err)
}) })
if err != nil { if err != nil {
if storageErr, ok := err.(storage.AzureStorageServiceError); ok && storageErr.StatusCode == http.StatusNotFound { if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound {
return fs.ErrorDirNotFound return fs.ErrorDirNotFound
} }
return err return err
} }
for i := range response.Blobs { // Advance marker to next
file := &response.Blobs[i] marker = response.NextMarker
for i := range response.Blobs.Blob {
file := &response.Blobs.Blob[i]
// Finish if file name no longer has prefix // Finish if file name no longer has prefix
// if prefix != "" && !strings.HasPrefix(file.Name, prefix) { // if prefix != "" && !strings.HasPrefix(file.Name, prefix) {
// return nil // return nil
@ -389,8 +396,8 @@ func (f *Fs) list(dir string, recurse bool, maxResults uint, fn listFn) error {
} }
} }
// Send the subdirectories // Send the subdirectories
for _, remote := range response.BlobPrefixes { for _, remote := range response.Blobs.BlobPrefix {
remote := strings.TrimRight(remote, "/") remote := strings.TrimRight(remote.Name, "/")
if !strings.HasPrefix(remote, f.root) { if !strings.HasPrefix(remote, f.root) {
fs.Debugf(f, "Odd directory name received %q", remote) fs.Debugf(f, "Odd directory name received %q", remote)
continue continue
@ -402,17 +409,12 @@ func (f *Fs) list(dir string, recurse bool, maxResults uint, fn listFn) error {
return err return err
} }
} }
// end if no NextFileName
if response.NextMarker == "" {
break
}
params.Marker = response.NextMarker
} }
return nil return nil
} }
// Convert a list item into a DirEntry // Convert a list item into a DirEntry
func (f *Fs) itemToDirEntry(remote string, object *storage.Blob, isDirectory bool) (fs.DirEntry, error) { func (f *Fs) itemToDirEntry(remote string, object *azblob.Blob, isDirectory bool) (fs.DirEntry, error) {
if isDirectory { if isDirectory {
d := fs.NewDir(remote, time.Time{}) d := fs.NewDir(remote, time.Time{})
return d, nil return d, nil
@ -436,7 +438,7 @@ func (f *Fs) markContainerOK() {
// listDir lists a single directory // listDir lists a single directory
func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) { func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) {
err = f.list(dir, false, listChunkSize, func(remote string, object *storage.Blob, isDirectory bool) error { err = f.list(dir, false, listChunkSize, func(remote string, object *azblob.Blob, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory) entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil { if err != nil {
return err return err
@ -459,13 +461,8 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
if dir != "" { if dir != "" {
return nil, fs.ErrorListBucketRequired return nil, fs.ErrorListBucketRequired
} }
err = f.listContainersToFn(func(container *storage.Container) error { err = f.listContainersToFn(func(container *azblob.Container) error {
t, err := time.Parse(time.RFC1123, container.Properties.LastModified) d := fs.NewDir(container.Name, container.Properties.LastModified)
if err != nil {
fs.Debugf(f, "Failed to parse LastModified %q: %v", container.Properties.LastModified, err)
t = time.Time{}
}
d := fs.NewDir(container.Name, t)
entries = append(entries, d) entries = append(entries, d)
return nil return nil
}) })
@ -512,7 +509,7 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
return fs.ErrorListBucketRequired return fs.ErrorListBucketRequired
} }
list := walk.NewListRHelper(callback) list := walk.NewListRHelper(callback)
err = f.list(dir, true, listChunkSize, func(remote string, object *storage.Blob, isDirectory bool) error { err = f.list(dir, true, listChunkSize, func(remote string, object *azblob.Blob, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory) entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil { if err != nil {
return err return err
@ -528,27 +525,34 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
} }
// listContainerFn is called from listContainersToFn to handle a container // listContainerFn is called from listContainersToFn to handle a container
type listContainerFn func(*storage.Container) error type listContainerFn func(*azblob.Container) error
// listContainersToFn lists the containers to the function supplied // listContainersToFn lists the containers to the function supplied
func (f *Fs) listContainersToFn(fn listContainerFn) error { func (f *Fs) listContainersToFn(fn listContainerFn) error {
// FIXME page the containers if necessary? params := azblob.ListContainersSegmentOptions{
params := storage.ListContainersParameters{} MaxResults: int32(listChunkSize),
var response *storage.ContainerListResponse
err := f.pacer.Call(func() (bool, error) {
var err error
response, err = f.bc.ListContainers(params)
return f.shouldRetry(err)
})
if err != nil {
return err
} }
for i := range response.Containers { ctx := context.Background()
err = fn(&response.Containers[i]) for marker := (azblob.Marker{}); marker.NotDone(); {
var response *azblob.ListContainersResponse
err := f.pacer.Call(func() (bool, error) {
var err error
response, err = f.svcURL.ListContainersSegment(ctx, marker, params)
return f.shouldRetry(err)
})
if err != nil { if err != nil {
return err return err
} }
for i := range response.Containers {
err = fn(&response.Containers[i])
if err != nil {
return err
}
}
marker = response.NextMarker
} }
return nil return nil
} }
@ -573,32 +577,20 @@ func (f *Fs) Mkdir(dir string) error {
if f.containerOK { if f.containerOK {
return nil return nil
} }
// List the container to see if it exists
err := f.list("", false, 1, func(remote string, object *storage.Blob, isDirectory bool) error {
return nil
})
if err == nil {
f.markContainerOK()
return nil
}
// now try to create the container // now try to create the container
options := storage.CreateContainerOptions{ err := f.pacer.Call(func() (bool, error) {
Access: storage.ContainerAccessTypePrivate, ctx := context.Background()
} _, err := f.cntURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
err = f.pacer.Call(func() (bool, error) {
err := f.cc.Create(&options)
if err != nil { if err != nil {
if storageErr, ok := err.(storage.AzureStorageServiceError); ok { if storageErr, ok := err.(azblob.StorageError); ok {
switch storageErr.StatusCode { switch storageErr.ServiceCode() {
case http.StatusConflict: case azblob.ServiceCodeContainerAlreadyExists:
switch storageErr.Code { f.containerOK = true
case "ContainerAlreadyExists": return false, nil
f.containerOK = true case azblob.ServiceCodeContainerBeingDeleted:
return false, nil f.containerDeleted = true
case "ContainerBeingDeleted": return true, err
f.containerDeleted = true
return true, err
}
} }
} }
} }
@ -614,7 +606,7 @@ func (f *Fs) Mkdir(dir string) error {
// isEmpty checks to see if a given directory is empty and returns an error if not // isEmpty checks to see if a given directory is empty and returns an error if not
func (f *Fs) isEmpty(dir string) (err error) { func (f *Fs) isEmpty(dir string) (err error) {
empty := true empty := true
err = f.list("", true, 1, func(remote string, object *storage.Blob, isDirectory bool) error { err = f.list("", true, 1, func(remote string, object *azblob.Blob, isDirectory bool) error {
empty = false empty = false
return nil return nil
}) })
@ -632,16 +624,16 @@ func (f *Fs) isEmpty(dir string) (err error) {
func (f *Fs) deleteContainer() error { func (f *Fs) deleteContainer() error {
f.containerOKMu.Lock() f.containerOKMu.Lock()
defer f.containerOKMu.Unlock() defer f.containerOKMu.Unlock()
options := storage.DeleteContainerOptions{} options := azblob.ContainerAccessConditions{}
ctx := context.Background()
err := f.pacer.Call(func() (bool, error) { err := f.pacer.Call(func() (bool, error) {
exists, err := f.cc.Exists() _, err := f.cntURL.Delete(ctx, options)
if err != nil { if err != nil {
if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound {
return false, fs.ErrorDirNotFound
}
return f.shouldRetry(err) return f.shouldRetry(err)
} }
if !exists {
return false, fs.ErrorDirNotFound
}
err = f.cc.Delete(&options)
return f.shouldRetry(err) return f.shouldRetry(err)
}) })
if err == nil { if err == nil {
@ -704,17 +696,36 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
fs.Debugf(src, "Can't copy - not same remote type") fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy return nil, fs.ErrorCantCopy
} }
dstBlob := f.getBlobReference(remote) dstBlobURL := f.getBlobReference(remote)
srcBlob := srcObj.getBlobReference() srcBlobURL := srcObj.getBlobReference()
options := storage.CopyOptions{}
sourceBlobURL := srcBlob.GetURL() source, err := url.Parse(srcBlobURL.String())
if err != nil {
return nil, err
}
options := azblob.BlobAccessConditions{}
ctx := context.Background()
var startCopy *azblob.BlobsStartCopyFromURLResponse
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
err = dstBlob.Copy(sourceBlobURL, &options) startCopy, err = dstBlobURL.StartCopyFromURL(ctx, *source, nil, options, options)
return f.shouldRetry(err) return f.shouldRetry(err)
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
copyStatus := startCopy.CopyStatus()
for copyStatus == azblob.CopyStatusPending {
time.Sleep(1 * time.Second)
getMetadata, err := dstBlobURL.GetProperties(ctx, options)
if err != nil {
return nil, err
}
copyStatus = getMetadata.CopyStatus()
}
return f.NewObject(remote) return f.NewObject(remote)
} }
@ -759,7 +770,7 @@ func (o *Object) Size() int64 {
return o.size return o.size
} }
// decodeMetaData sets the metadata from the data passed in // decodeMetaDataFromProperties sets the metadata from the data passed in
// //
// Sets // Sets
// o.id // o.id
@ -767,14 +778,17 @@ func (o *Object) Size() int64 {
// o.size // o.size
// o.md5 // o.md5
// o.meta // o.meta
func (o *Object) decodeMetaData(info *storage.Blob) (err error) { func (o *Object) decodeMetaDataFromProperties(info *azblob.BlobsGetPropertiesResponse) (err error) {
o.md5 = info.Properties.ContentMD5 // FIXME - Client library returns MD5 as base64 decoded string, object struct should be changed
o.mimeType = info.Properties.ContentType // to maintain md5 as simple byte array instead of as string
o.size = info.Properties.ContentLength o.md5 = base64.StdEncoding.EncodeToString(info.ContentMD5())
o.modTime = time.Time(info.Properties.LastModified) o.mimeType = info.ContentType()
if len(info.Metadata) > 0 { o.size = info.ContentLength()
o.meta = info.Metadata o.modTime = time.Time(info.LastModified())
if modTime, ok := info.Metadata[modTimeKey]; ok { metadata := info.NewMetadata()
if len(metadata) > 0 {
o.meta = metadata
if modTime, ok := metadata[modTimeKey]; ok {
when, err := time.Parse(timeFormatIn, modTime) when, err := time.Parse(timeFormatIn, modTime)
if err != nil { if err != nil {
fs.Debugf(o, "Couldn't parse %v = %q: %v", modTimeKey, modTime, err) fs.Debugf(o, "Couldn't parse %v = %q: %v", modTimeKey, modTime, err)
@ -788,7 +802,7 @@ func (o *Object) decodeMetaData(info *storage.Blob) (err error) {
} }
// getBlobReference creates an empty blob reference with no metadata // getBlobReference creates an empty blob reference with no metadata
func (o *Object) getBlobReference() *storage.Blob { func (o *Object) getBlobReference() azblob.BlobURL {
return o.fs.getBlobReference(o.remote) return o.fs.getBlobReference(o.remote)
} }
@ -811,19 +825,22 @@ func (o *Object) readMetaData() (err error) {
blob := o.getBlobReference() blob := o.getBlobReference()
// Read metadata (this includes metadata) // Read metadata (this includes metadata)
getPropertiesOptions := storage.GetBlobPropertiesOptions{} options := azblob.BlobAccessConditions{}
ctx := context.Background()
var blobProperties *azblob.BlobsGetPropertiesResponse
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
err = blob.GetProperties(&getPropertiesOptions) blobProperties, err = blob.GetProperties(ctx, options)
return o.fs.shouldRetry(err) return o.fs.shouldRetry(err)
}) })
if err != nil { if err != nil {
if storageErr, ok := err.(storage.AzureStorageServiceError); ok && storageErr.StatusCode == http.StatusNotFound { // On directories - GetProperties does not work and current SDK does not populate service code correctly hence check regular http response as well
if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeBlobNotFound || storageErr.Response().StatusCode == http.StatusNotFound {
return fs.ErrorObjectNotFound return fs.ErrorObjectNotFound
} }
return err return err
} }
return o.decodeMetaData(blob) return o.decodeMetaDataFromProperties(blobProperties)
} }
// timeString returns modTime as the number of milliseconds // timeString returns modTime as the number of milliseconds
@ -860,16 +877,14 @@ func (o *Object) ModTime() (result time.Time) {
// SetModTime sets the modification time of the local fs object // SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(modTime time.Time) error { func (o *Object) SetModTime(modTime time.Time) error {
blob := o.getBlobWithModTime(modTime) // Make sure o.meta is not nil
options := storage.SetBlobMetadataOptions{} if o.meta == nil {
err := o.fs.pacer.Call(func() (bool, error) { o.meta = make(map[string]string, 1)
err := blob.SetMetadata(&options)
return o.fs.shouldRetry(err)
})
if err != nil {
return err
} }
// Set modTimeKey in it
o.meta[modTimeKey] = modTime.Format(timeFormatOut)
o.modTime = modTime o.modTime = modTime
return nil return nil
} }
@ -880,10 +895,9 @@ func (o *Object) Storable() bool {
// Open an object for read // Open an object for read
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
getBlobOptions := storage.GetBlobOptions{} // Offset and Count for range download
getBlobRangeOptions := storage.GetBlobRangeOptions{ var offset int64
GetBlobOptions: &getBlobOptions, var count int64
}
for _, option := range options { for _, option := range options {
switch x := option.(type) { switch x := option.(type) {
case *fs.RangeOption: case *fs.RangeOption:
@ -895,14 +909,10 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
start = o.size - end start = o.size - end
end = 0 end = 0
} }
getBlobRangeOptions.Range = &storage.BlobRange{ offset = start
Start: uint64(start), count = end - start
End: uint64(end),
}
case *fs.SeekOption: case *fs.SeekOption:
getBlobRangeOptions.Range = &storage.BlobRange{ offset = x.Offset
Start: uint64(x.Offset),
}
default: default:
if option.Mandatory() { if option.Mandatory() {
fs.Logf(o, "Unsupported mandatory option: %v", option) fs.Logf(o, "Unsupported mandatory option: %v", option)
@ -910,17 +920,17 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
} }
} }
blob := o.getBlobReference() blob := o.getBlobReference()
ctx := context.Background()
ac := azblob.BlobAccessConditions{}
var dowloadResponse *azblob.DownloadResponse
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
if getBlobRangeOptions.Range == nil { dowloadResponse, err = blob.Download(ctx, offset, count, ac, false)
in, err = blob.Get(&getBlobOptions)
} else {
in, err = blob.GetRange(&getBlobRangeOptions)
}
return o.fs.shouldRetry(err) return o.fs.shouldRetry(err)
}) })
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to open for download") return nil, errors.Wrap(err, "failed to open for download")
} }
in = dowloadResponse.Body(azblob.RetryReaderOptions{})
return in, nil return in, nil
} }
@ -948,7 +958,7 @@ func init() {
// uploadMultipart uploads a file using multipart upload // uploadMultipart uploads a file using multipart upload
// //
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList. // Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
func (o *Object) uploadMultipart(in io.Reader, size int64, blob *storage.Blob, putBlobOptions *storage.PutBlobOptions) (err error) { func (o *Object) uploadMultipart(in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) {
// Calculate correct chunkSize // Calculate correct chunkSize
chunkSize := int64(chunkSize) chunkSize := int64(chunkSize)
var totalParts int64 var totalParts int64
@ -970,34 +980,41 @@ func (o *Object) uploadMultipart(in io.Reader, size int64, blob *storage.Blob, p
} }
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize)) fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize))
// Create an empty blob // https://godoc.org/github.com/Azure/azure-storage-blob-go/2017-07-29/azblob#example-BlockBlobURL
err = o.fs.pacer.Call(func() (bool, error) { // Utilities are cloned from above example
err := blob.CreateBlockBlob(putBlobOptions) // These helper functions convert a binary block ID to a base-64 string and vice versa
return o.fs.shouldRetry(err) // NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length
}) blockIDBinaryToBase64 := func(blockID []byte) string { return base64.StdEncoding.EncodeToString(blockID) }
// These helper functions convert an int block ID to a base-64 string and vice versa
blockIDIntToBase64 := func(blockID uint64) string {
binaryBlockID := (&[8]byte{})[:] // All block IDs are 8 bytes long
binary.LittleEndian.PutUint64(binaryBlockID, blockID)
return blockIDBinaryToBase64(binaryBlockID)
}
// block ID variables // block ID variables
var ( var (
rawID uint64 rawID uint64
bytesID = make([]byte, 8)
blockID = "" // id in base64 encoded form blockID = "" // id in base64 encoded form
blocks = make([]storage.Block, 0, totalParts) blocks = make([]string, totalParts)
) )
// increment the blockID // increment the blockID
nextID := func() { nextID := func() {
rawID++ rawID++
binary.LittleEndian.PutUint64(bytesID, rawID) blockID = blockIDIntToBase64(rawID)
blockID = base64.StdEncoding.EncodeToString(bytesID) blocks = append(blocks, blockID)
blocks = append(blocks, storage.Block{
ID: blockID,
Status: storage.BlockStatusLatest,
})
} }
// Get BlockBlobURL, we will use default pipeline here
blockBlobURL := blob.ToBlockBlobURL()
ctx := context.Background()
ac := azblob.LeaseAccessConditions{} // Use default lease access conditions
// FIXME - Accounting
// unwrap the accounting from the input, we use wrap to put it // unwrap the accounting from the input, we use wrap to put it
// back on after the buffering // back on after the buffering
in, wrap := accounting.UnWrap(in) // in, wrap := accounting.UnWrap(in)
// Upload the chunks // Upload the chunks
remaining := size remaining := size
@ -1037,13 +1054,8 @@ outer:
defer o.fs.uploadToken.Put() defer o.fs.uploadToken.Put()
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize)) fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
// Upload the block, with MD5 for check
md5sum := md5.Sum(buf)
putBlockOptions := storage.PutBlockOptions{
ContentMD5: base64.StdEncoding.EncodeToString(md5sum[:]),
}
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
err = blob.PutBlockWithLength(blockID, uint64(len(buf)), wrap(bytes.NewBuffer(buf)), &putBlockOptions) _, err = blockBlobURL.StageBlock(ctx, blockID, bytes.NewReader(buf), ac)
return o.fs.shouldRetry(err) return o.fs.shouldRetry(err)
}) })
@ -1073,9 +1085,8 @@ outer:
} }
// Finalise the upload session // Finalise the upload session
putBlockListOptions := storage.PutBlockListOptions{}
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
err := blob.PutBlockList(blocks, &putBlockListOptions) _, err := blockBlobURL.CommitBlockList(ctx, blocks, *httpHeaders, o.meta, azblob.BlobAccessConditions{})
return o.fs.shouldRetry(err) return o.fs.shouldRetry(err)
}) })
if err != nil { if err != nil {
@ -1093,29 +1104,45 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
return err return err
} }
size := src.Size() size := src.Size()
blob := o.getBlobWithModTime(src.ModTime()) // Update Mod time
blob.Properties.ContentType = fs.MimeType(o) err = o.SetModTime(src.ModTime())
if sourceMD5, _ := src.Hash(hash.MD5); sourceMD5 != "" { if err != nil {
sourceMD5bytes, err := hex.DecodeString(sourceMD5) return err
if err == nil { }
blob.Properties.ContentMD5 = base64.StdEncoding.EncodeToString(sourceMD5bytes)
} else { blob := o.getBlobReference()
fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err) httpHeaders := azblob.BlobHTTPHeaders{}
httpHeaders.ContentType = fs.MimeType(o)
// Multipart upload doesn't support MD5 checksums at put block calls, hence calculate
// MD5 only for PutBlob requests
if size < int64(uploadCutoff) {
if sourceMD5, _ := src.Hash(hash.MD5); sourceMD5 != "" {
sourceMD5bytes, err := hex.DecodeString(sourceMD5)
if err == nil {
httpHeaders.ContentMD5 = sourceMD5bytes
} else {
fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err)
}
} }
} }
putBlobOptions := storage.PutBlobOptions{}
putBlobOptions := azblob.UploadStreamToBlockBlobOptions{
BufferSize: int(chunkSize),
MaxBuffers: 4,
Metadata: o.meta,
BlobHTTPHeaders: httpHeaders,
}
ctx := context.Background()
// Don't retry, return a retry error instead // Don't retry, return a retry error instead
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.CallNoRetry(func() (bool, error) {
if size >= int64(uploadCutoff) { if size >= int64(uploadCutoff) {
// If a large file upload in chunks // If a large file upload in chunks
err = o.uploadMultipart(in, size, blob, &putBlobOptions) err = o.uploadMultipart(in, size, &blob, &httpHeaders)
} else { } else {
// Write a small blob in one transaction // Write a small blob in one transaction
if size == 0 { blockBlobURL := blob.ToBlockBlobURL()
in = nil _, err = azblob.UploadStreamToBlockBlob(ctx, in, blockBlobURL, putBlobOptions)
}
err = blob.CreateBlockBlobFromReader(in, &putBlobOptions)
} }
return o.fs.shouldRetry(err) return o.fs.shouldRetry(err)
}) })
@ -1129,9 +1156,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
// Remove an object // Remove an object
func (o *Object) Remove() error { func (o *Object) Remove() error {
blob := o.getBlobReference() blob := o.getBlobReference()
options := storage.DeleteBlobOptions{} snapShotOptions := azblob.DeleteSnapshotsOptionNone
ac := azblob.BlobAccessConditions{}
ctx := context.Background()
return o.fs.pacer.Call(func() (bool, error) { return o.fs.pacer.Call(func() (bool, error) {
err := blob.Delete(&options) _, err := blob.Delete(ctx, snapShotOptions, ac)
return o.fs.shouldRetry(err) return o.fs.shouldRetry(err)
}) })
} }

View File

@ -1,4 +1,7 @@
// Test AzureBlob filesystem interface // Test AzureBlob filesystem interface
// +build !freebsd,!netbsd,!openbsd,!plan9,!solaris,go1.8
package azureblob_test package azureblob_test
import ( import (

View File

@ -0,0 +1,6 @@
// Build for azureblob for unsupported platforms to stop go complaining
// about "no buildable Go source files "
// +build freebsd netbsd openbsd plan9 solaris !go1.8
package azureblob

View File

@ -125,21 +125,16 @@ Rclone has 3 ways of authenticating with Azure Blob Storage:
This is the most straight forward and least flexible way. Just fill in the `account` and `key` lines and leave the rest blank. This is the most straight forward and least flexible way. Just fill in the `account` and `key` lines and leave the rest blank.
#### Connection string
This supports all the possible connection string variants. Leave `account`, `key` and `sas_url` blank and put the connection string into the `connection_string` configuration parameter.
Use this method if using an account level SAS; the Azure Portal shows connection strings you can cut and paste.
#### SAS URL #### SAS URL
This only for a container level SAS URL - it does not work with an account level SAS URL. For account level SAS use the connection string method. This can be an account level SAS URL or container level SAS URL
To use it leave `account`, `key` and `connection_string` blank and fill in `sas_url`. To use it leave `account`, `key` blank and fill in `sas_url`.
Account level SAS URL or container level SAS URL can be obtained from Azure portal or Azure Storage Explorer.
To get a container level SAS URL right click on a container in the Azure Blob explorer in the Azure portal. To get a container level SAS URL right click on a container in the Azure Blob explorer in the Azure portal.
You will only be able to use the container specified in the SAS URL with rclone, eg If You use container level SAS URL, rclone operations are permitted only on particular container, eg
rclone ls azureblob:container rclone ls azureblob:container

View File

@ -252,6 +252,10 @@ func ShouldRetry(err error) bool {
return true return true
} }
// FIXME Handle this correctly, perhaps Cause should not ever return nil?
if err == nil {
return false
}
// Check if it is a retriable error // Check if it is a retriable error
for _, retriableErr := range retriableErrors { for _, retriableErr := range retriableErrors {
if err == retriableErr { if err == retriableErr {