diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index a6946724e..dd856e203 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -1,9 +1,12 @@ // Package azureblob provides an interface to the Microsoft Azure blob object storage system + +// +build !freebsd,!netbsd,!openbsd,!plan9,!solaris,go1.8 + package azureblob import ( "bytes" - "crypto/md5" + "context" "encoding/base64" "encoding/binary" "encoding/hex" @@ -18,13 +21,12 @@ import ( "sync" "time" - "github.com/Azure/azure-sdk-for-go/storage" + "github.com/Azure/azure-storage-blob-go/2018-03-28/azblob" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/accounting" "github.com/ncw/rclone/fs/config" "github.com/ncw/rclone/fs/config/flags" "github.com/ncw/rclone/fs/fserrors" - "github.com/ncw/rclone/fs/fshttp" "github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/walk" "github.com/ncw/rclone/lib/pacer" @@ -32,15 +34,15 @@ import ( ) const ( - apiVersion = "2017-04-17" - minSleep = 10 * time.Millisecond - maxSleep = 10 * time.Second - decayConstant = 1 // bigger for slower decay, exponential - listChunkSize = 5000 // number of items to read at once - modTimeKey = "mtime" - timeFormatIn = time.RFC3339 - timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00" - maxTotalParts = 50000 // in multipart upload + minSleep = 10 * time.Millisecond + maxSleep = 10 * time.Second + decayConstant = 1 // bigger for slower decay, exponential + listChunkSize = 5000 // number of items to read at once + modTimeKey = "mtime" + timeFormatIn = time.RFC3339 + timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00" + maxTotalParts = 50000 // in multipart upload + storageDefaultBaseURL = "blob.core.windows.net" // maxUncommittedSize = 9 << 30 // can't upload bigger than this ) @@ -64,9 +66,6 @@ func init() { }, { Name: "key", Help: "Storage Account Key (leave blank to use connection string or SAS URL)", - }, { - Name: "connection_string", - Help: "Connection string (leave blank if using account/key or SAS URL)", }, { Name: "sas_url", Help: "SAS URL for container level access only\n(leave blank if using account/key or connection string)", @@ -82,13 +81,13 @@ func init() { // Fs represents a remote azure server type Fs struct { - name string // name of this remote - root string // the path we are working on if any - features *fs.Features // optional features - account string // account name - endpoint string // name of the starting api endpoint - bc *storage.BlobStorageClient - cc *storage.Container + name string // name of this remote + root string // the path we are working on if any + features *fs.Features // optional features + account string // account name + endpoint string // name of the starting api endpoint + svcURL *azblob.ServiceURL // reference to serviceURL + cntURL *azblob.ContainerURL // reference to containerURL container string // the container we are working on containerOKMu sync.Mutex // mutex to protect container OK containerOK bool // true if we have created the container @@ -99,13 +98,14 @@ type Fs struct { // Object describes a azure object type Object struct { - fs *Fs // what this object is part of - remote string // The remote path - modTime time.Time // The modified time of the object if known - md5 string // MD5 hash if known - size int64 // Size of the object - mimeType string // Content-Type of the object - meta map[string]string // blob metadata + fs *Fs // what this object is part of + remote string // The remote path + modTime time.Time // The modified time of the object if known + md5 string // MD5 hash if known + size int64 // Size of the object + mimeType string // Content-Type of the object + accessTier azblob.AccessTierType // Blob Access Tier + meta map[string]string // blob metadata } // ------------------------------------------------------------ @@ -165,8 +165,8 @@ var retryErrorCodes = []int{ // deserve to be retried. It returns the err as a convenience func (f *Fs) shouldRetry(err error) (bool, error) { // FIXME interpret special errors - more to do here - if storageErr, ok := err.(storage.AzureStorageServiceError); ok { - statusCode := storageErr.StatusCode + if storageErr, ok := err.(azblob.StorageError); ok { + statusCode := storageErr.Response().StatusCode for _, e := range retryErrorCodes { if statusCode == e { return true, err @@ -190,44 +190,47 @@ func NewFs(name, root string) (fs.Fs, error) { } account := config.FileGet(name, "account") key := config.FileGet(name, "key") - connectionString := config.FileGet(name, "connection_string") sasURL := config.FileGet(name, "sas_url") - endpoint := config.FileGet(name, "endpoint", storage.DefaultBaseURL) + endpoint := config.FileGet(name, "endpoint", storageDefaultBaseURL) var ( - oclient storage.Client - client = &oclient - cc *storage.Container + u *url.URL + serviceURL azblob.ServiceURL + containerURL azblob.ContainerURL ) switch { case account != "" && key != "": - oclient, err = storage.NewClient(account, key, endpoint, apiVersion, true) + credential := azblob.NewSharedKeyCredential(account, key) + u, err = url.Parse(fmt.Sprintf("https://%s.%s", account, endpoint)) if err != nil { - return nil, errors.Wrap(err, "failed to make azure storage client from account/key") - } - case connectionString != "": - oclient, err = storage.NewClientFromConnectionString(connectionString) - if err != nil { - return nil, errors.Wrap(err, "failed to make azure storage client from connection string") + return nil, errors.Wrap(err, "failed to make azure storage url from account and endpoint") } + pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{}) + serviceURL = azblob.NewServiceURL(*u, pipeline) + containerURL = serviceURL.NewContainerURL(container) case sasURL != "": - URL, err := url.Parse(sasURL) + u, err = url.Parse(sasURL) if err != nil { return nil, errors.Wrapf(err, "failed to parse SAS URL") } - cc, err = storage.GetContainerReferenceFromSASURI(*URL) - if err != nil { - return nil, errors.Wrapf(err, "failed to make azure storage client from SAS URL") + // use anonymous credentials in case of sas url + pipeline := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{}) + // Check if we have container level SAS or account level sas + parts := azblob.NewBlobURLParts(*u) + if parts.ContainerName != "" { + if container != "" && parts.ContainerName != container { + return nil, errors.New("Container name in SAS URL and container provided in command do not match") + } + + container = parts.ContainerName + containerURL = azblob.NewContainerURL(*u, pipeline) + } else { + serviceURL = azblob.NewServiceURL(*u, pipeline) + containerURL = serviceURL.NewContainerURL(container) } - client = cc.Client() default: return nil, errors.New("Need account+key or connectionString or sasURL") } - client.HTTPClient = fshttp.NewClient(fs.Config) - bc := client.GetBlobService() - if cc == nil { - cc = bc.GetContainerReference(container) - } f := &Fs{ name: name, @@ -235,8 +238,8 @@ func NewFs(name, root string) (fs.Fs, error) { root: directory, account: account, endpoint: endpoint, - bc: &bc, - cc: cc, + svcURL: &serviceURL, + cntURL: &containerURL, pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), } @@ -274,13 +277,13 @@ func NewFs(name, root string) (fs.Fs, error) { // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. -func (f *Fs) newObjectWithInfo(remote string, info *storage.Blob) (fs.Object, error) { +func (f *Fs) newObjectWithInfo(remote string, info *azblob.BlobItem) (fs.Object, error) { o := &Object{ fs: f, remote: remote, } if info != nil { - err := o.decodeMetaData(info) + err := o.decodeMetaDataFromBlob(info) if err != nil { return nil, err } @@ -300,13 +303,12 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) { } // getBlobReference creates an empty blob reference with no metadata -func (f *Fs) getBlobReference(remote string) *storage.Blob { - return f.cc.GetBlobReference(f.root + remote) +func (f *Fs) getBlobReference(remote string) azblob.BlobURL { + return f.cntURL.NewBlobURL(f.root + remote) } -// getBlobWithModTime adds the modTime passed in to o.meta and creates -// a Blob from it. -func (o *Object) getBlobWithModTime(modTime time.Time) *storage.Blob { +// updateMetadataWithModTime adds the modTime passed in to o.meta. +func (o *Object) updateMetadataWithModTime(modTime time.Time) { // Make sure o.meta is not nil if o.meta == nil { o.meta = make(map[string]string, 1) @@ -314,14 +316,10 @@ func (o *Object) getBlobWithModTime(modTime time.Time) *storage.Blob { // Set modTimeKey in it o.meta[modTimeKey] = modTime.Format(timeFormatOut) - - blob := o.getBlobReference() - blob.Metadata = o.meta - return blob } // listFn is called from list to handle an object -type listFn func(remote string, object *storage.Blob, isDirectory bool) error +type listFn func(remote string, object *azblob.BlobItem, isDirectory bool) error // list lists the objects into the function supplied from // the container and root supplied @@ -342,32 +340,39 @@ func (f *Fs) list(dir string, recurse bool, maxResults uint, fn listFn) error { if !recurse { delimiter = "/" } - params := storage.ListBlobsParameters{ - MaxResults: maxResults, - Prefix: root, - Delimiter: delimiter, - Include: &storage.IncludeBlobDataset{ - Snapshots: false, - Metadata: true, - UncommittedBlobs: false, + + options := azblob.ListBlobsSegmentOptions{ + Details: azblob.BlobListingDetails{ Copy: false, + Metadata: true, + Snapshots: false, + UncommittedBlobs: false, + Deleted: false, }, + Prefix: root, + MaxResults: int32(maxResults), } - for { - var response storage.BlobListResponse + ctx := context.Background() + for marker := (azblob.Marker{}); marker.NotDone(); { + var response *azblob.ListBlobsHierarchySegmentResponse err := f.pacer.Call(func() (bool, error) { var err error - response, err = f.cc.ListBlobs(params) + response, err = f.cntURL.ListBlobsHierarchySegment(ctx, marker, delimiter, options) return f.shouldRetry(err) }) + if err != nil { - if storageErr, ok := err.(storage.AzureStorageServiceError); ok && storageErr.StatusCode == http.StatusNotFound { + // Check http error code along with service code, current SDK doesn't populate service code correctly sometimes + if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound { return fs.ErrorDirNotFound } return err } - for i := range response.Blobs { - file := &response.Blobs[i] + // Advance marker to next + marker = response.NextMarker + + for i := range response.Segment.BlobItems { + file := &response.Segment.BlobItems[i] // Finish if file name no longer has prefix // if prefix != "" && !strings.HasPrefix(file.Name, prefix) { // return nil @@ -389,8 +394,8 @@ func (f *Fs) list(dir string, recurse bool, maxResults uint, fn listFn) error { } } // Send the subdirectories - for _, remote := range response.BlobPrefixes { - remote := strings.TrimRight(remote, "/") + for _, remote := range response.Segment.BlobPrefixes { + remote := strings.TrimRight(remote.Name, "/") if !strings.HasPrefix(remote, f.root) { fs.Debugf(f, "Odd directory name received %q", remote) continue @@ -402,17 +407,12 @@ func (f *Fs) list(dir string, recurse bool, maxResults uint, fn listFn) error { return err } } - // end if no NextFileName - if response.NextMarker == "" { - break - } - params.Marker = response.NextMarker } return nil } // Convert a list item into a DirEntry -func (f *Fs) itemToDirEntry(remote string, object *storage.Blob, isDirectory bool) (fs.DirEntry, error) { +func (f *Fs) itemToDirEntry(remote string, object *azblob.BlobItem, isDirectory bool) (fs.DirEntry, error) { if isDirectory { d := fs.NewDir(remote, time.Time{}) return d, nil @@ -436,7 +436,7 @@ func (f *Fs) markContainerOK() { // listDir lists a single directory func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) { - err = f.list(dir, false, listChunkSize, func(remote string, object *storage.Blob, isDirectory bool) error { + err = f.list(dir, false, listChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error { entry, err := f.itemToDirEntry(remote, object, isDirectory) if err != nil { return err @@ -459,13 +459,8 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { if dir != "" { return nil, fs.ErrorListBucketRequired } - err = f.listContainersToFn(func(container *storage.Container) error { - t, err := time.Parse(time.RFC1123, container.Properties.LastModified) - if err != nil { - fs.Debugf(f, "Failed to parse LastModified %q: %v", container.Properties.LastModified, err) - t = time.Time{} - } - d := fs.NewDir(container.Name, t) + err = f.listContainersToFn(func(container *azblob.ContainerItem) error { + d := fs.NewDir(container.Name, container.Properties.LastModified) entries = append(entries, d) return nil }) @@ -512,7 +507,7 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) { return fs.ErrorListBucketRequired } list := walk.NewListRHelper(callback) - err = f.list(dir, true, listChunkSize, func(remote string, object *storage.Blob, isDirectory bool) error { + err = f.list(dir, true, listChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error { entry, err := f.itemToDirEntry(remote, object, isDirectory) if err != nil { return err @@ -528,27 +523,34 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) { } // listContainerFn is called from listContainersToFn to handle a container -type listContainerFn func(*storage.Container) error +type listContainerFn func(*azblob.ContainerItem) error // listContainersToFn lists the containers to the function supplied func (f *Fs) listContainersToFn(fn listContainerFn) error { - // FIXME page the containers if necessary? - params := storage.ListContainersParameters{} - var response *storage.ContainerListResponse - err := f.pacer.Call(func() (bool, error) { - var err error - response, err = f.bc.ListContainers(params) - return f.shouldRetry(err) - }) - if err != nil { - return err + params := azblob.ListContainersSegmentOptions{ + MaxResults: int32(listChunkSize), } - for i := range response.Containers { - err = fn(&response.Containers[i]) + ctx := context.Background() + for marker := (azblob.Marker{}); marker.NotDone(); { + var response *azblob.ListContainersResponse + err := f.pacer.Call(func() (bool, error) { + var err error + response, err = f.svcURL.ListContainersSegment(ctx, marker, params) + return f.shouldRetry(err) + }) if err != nil { return err } + + for i := range response.ContainerItems { + err = fn(&response.ContainerItems[i]) + if err != nil { + return err + } + } + marker = response.NextMarker } + return nil } @@ -573,32 +575,20 @@ func (f *Fs) Mkdir(dir string) error { if f.containerOK { return nil } - // List the container to see if it exists - err := f.list("", false, 1, func(remote string, object *storage.Blob, isDirectory bool) error { - return nil - }) - if err == nil { - f.markContainerOK() - return nil - } + // now try to create the container - options := storage.CreateContainerOptions{ - Access: storage.ContainerAccessTypePrivate, - } - err = f.pacer.Call(func() (bool, error) { - err := f.cc.Create(&options) + err := f.pacer.Call(func() (bool, error) { + ctx := context.Background() + _, err := f.cntURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) if err != nil { - if storageErr, ok := err.(storage.AzureStorageServiceError); ok { - switch storageErr.StatusCode { - case http.StatusConflict: - switch storageErr.Code { - case "ContainerAlreadyExists": - f.containerOK = true - return false, nil - case "ContainerBeingDeleted": - f.containerDeleted = true - return true, err - } + if storageErr, ok := err.(azblob.StorageError); ok { + switch storageErr.ServiceCode() { + case azblob.ServiceCodeContainerAlreadyExists: + f.containerOK = true + return false, nil + case azblob.ServiceCodeContainerBeingDeleted: + f.containerDeleted = true + return true, err } } } @@ -614,7 +604,7 @@ func (f *Fs) Mkdir(dir string) error { // isEmpty checks to see if a given directory is empty and returns an error if not func (f *Fs) isEmpty(dir string) (err error) { empty := true - err = f.list("", true, 1, func(remote string, object *storage.Blob, isDirectory bool) error { + err = f.list("", true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error { empty = false return nil }) @@ -632,16 +622,23 @@ func (f *Fs) isEmpty(dir string) (err error) { func (f *Fs) deleteContainer() error { f.containerOKMu.Lock() defer f.containerOKMu.Unlock() - options := storage.DeleteContainerOptions{} + options := azblob.ContainerAccessConditions{} + ctx := context.Background() err := f.pacer.Call(func() (bool, error) { - exists, err := f.cc.Exists() + _, err := f.cntURL.GetProperties(ctx, azblob.LeaseAccessConditions{}) + if err == nil { + _, err = f.cntURL.Delete(ctx, options) + } + if err != nil { + // Check http error code along with service code, current SDK doesn't populate service code correctly sometimes + if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound { + return false, fs.ErrorDirNotFound + } + return f.shouldRetry(err) } - if !exists { - return false, fs.ErrorDirNotFound - } - err = f.cc.Delete(&options) + return f.shouldRetry(err) }) if err == nil { @@ -704,17 +701,36 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - dstBlob := f.getBlobReference(remote) - srcBlob := srcObj.getBlobReference() - options := storage.CopyOptions{} - sourceBlobURL := srcBlob.GetURL() + dstBlobURL := f.getBlobReference(remote) + srcBlobURL := srcObj.getBlobReference() + + source, err := url.Parse(srcBlobURL.String()) + if err != nil { + return nil, err + } + + options := azblob.BlobAccessConditions{} + ctx := context.Background() + var startCopy *azblob.BlobStartCopyFromURLResponse + err = f.pacer.Call(func() (bool, error) { - err = dstBlob.Copy(sourceBlobURL, &options) + startCopy, err = dstBlobURL.StartCopyFromURL(ctx, *source, nil, options, options) return f.shouldRetry(err) }) if err != nil { return nil, err } + + copyStatus := startCopy.CopyStatus() + for copyStatus == azblob.CopyStatusPending { + time.Sleep(1 * time.Second) + getMetadata, err := dstBlobURL.GetProperties(ctx, options) + if err != nil { + return nil, err + } + copyStatus = getMetadata.CopyStatus() + } + return f.NewObject(remote) } @@ -759,22 +775,10 @@ func (o *Object) Size() int64 { return o.size } -// decodeMetaData sets the metadata from the data passed in -// -// Sets -// o.id -// o.modTime -// o.size -// o.md5 -// o.meta -func (o *Object) decodeMetaData(info *storage.Blob) (err error) { - o.md5 = info.Properties.ContentMD5 - o.mimeType = info.Properties.ContentType - o.size = info.Properties.ContentLength - o.modTime = time.Time(info.Properties.LastModified) - if len(info.Metadata) > 0 { - o.meta = info.Metadata - if modTime, ok := info.Metadata[modTimeKey]; ok { +func (o *Object) setMetadata(metadata azblob.Metadata) { + if len(metadata) > 0 { + o.meta = metadata + if modTime, ok := metadata[modTimeKey]; ok { when, err := time.Parse(timeFormatIn, modTime) if err != nil { fs.Debugf(o, "Couldn't parse %v = %q: %v", modTimeKey, modTime, err) @@ -784,11 +788,42 @@ func (o *Object) decodeMetaData(info *storage.Blob) (err error) { } else { o.meta = nil } +} + +// decodeMetaDataFromPropertiesResponse sets the metadata from the data passed in +// +// Sets +// o.id +// o.modTime +// o.size +// o.md5 +// o.meta +func (o *Object) decodeMetaDataFromPropertiesResponse(info *azblob.BlobGetPropertiesResponse) (err error) { + // NOTE - In BlobGetPropertiesResponse, Client library returns MD5 as base64 decoded string + // unlike BlobProperties in BlobItem (used in decodeMetadataFromBlob) which returns base64 + // encoded bytes. Object needs to maintain this as base64 encoded string. + o.md5 = base64.StdEncoding.EncodeToString(info.ContentMD5()) + o.mimeType = info.ContentType() + o.size = info.ContentLength() + o.modTime = time.Time(info.LastModified()) + o.accessTier = azblob.AccessTierType(info.AccessTier()) + o.setMetadata(info.NewMetadata()) + + return nil +} + +func (o *Object) decodeMetaDataFromBlob(info *azblob.BlobItem) (err error) { + o.md5 = string(info.Properties.ContentMD5[:]) + o.mimeType = *info.Properties.ContentType + o.size = *info.Properties.ContentLength + o.modTime = info.Properties.LastModified + o.accessTier = info.Properties.AccessTier + o.setMetadata(info.Metadata) return nil } // getBlobReference creates an empty blob reference with no metadata -func (o *Object) getBlobReference() *storage.Blob { +func (o *Object) getBlobReference() azblob.BlobURL { return o.fs.getBlobReference(o.remote) } @@ -811,19 +846,22 @@ func (o *Object) readMetaData() (err error) { blob := o.getBlobReference() // Read metadata (this includes metadata) - getPropertiesOptions := storage.GetBlobPropertiesOptions{} + options := azblob.BlobAccessConditions{} + ctx := context.Background() + var blobProperties *azblob.BlobGetPropertiesResponse err = o.fs.pacer.Call(func() (bool, error) { - err = blob.GetProperties(&getPropertiesOptions) + blobProperties, err = blob.GetProperties(ctx, options) return o.fs.shouldRetry(err) }) if err != nil { - if storageErr, ok := err.(storage.AzureStorageServiceError); ok && storageErr.StatusCode == http.StatusNotFound { + // On directories - GetProperties does not work and current SDK does not populate service code correctly hence check regular http response as well + if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeBlobNotFound || storageErr.Response().StatusCode == http.StatusNotFound { return fs.ErrorObjectNotFound } return err } - return o.decodeMetaData(blob) + return o.decodeMetaDataFromPropertiesResponse(blobProperties) } // timeString returns modTime as the number of milliseconds @@ -860,10 +898,17 @@ func (o *Object) ModTime() (result time.Time) { // SetModTime sets the modification time of the local fs object func (o *Object) SetModTime(modTime time.Time) error { - blob := o.getBlobWithModTime(modTime) - options := storage.SetBlobMetadataOptions{} + // Make sure o.meta is not nil + if o.meta == nil { + o.meta = make(map[string]string, 1) + } + // Set modTimeKey in it + o.meta[modTimeKey] = modTime.Format(timeFormatOut) + + blob := o.getBlobReference() + ctx := context.Background() err := o.fs.pacer.Call(func() (bool, error) { - err := blob.SetMetadata(&options) + _, err := blob.SetMetadata(ctx, o.meta, azblob.BlobAccessConditions{}) return o.fs.shouldRetry(err) }) if err != nil { @@ -880,29 +925,18 @@ func (o *Object) Storable() bool { // Open an object for read func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { - getBlobOptions := storage.GetBlobOptions{} - getBlobRangeOptions := storage.GetBlobRangeOptions{ - GetBlobOptions: &getBlobOptions, - } + // Offset and Count for range download + var offset int64 + var count int64 for _, option := range options { switch x := option.(type) { case *fs.RangeOption: - start, end := x.Start, x.End - if end < 0 { - end = 0 - } - if start < 0 { - start = o.size - end - end = 0 - } - getBlobRangeOptions.Range = &storage.BlobRange{ - Start: uint64(start), - End: uint64(end), + offset, count = x.Decode(o.size) + if count < 0 { + count = o.size - offset } case *fs.SeekOption: - getBlobRangeOptions.Range = &storage.BlobRange{ - Start: uint64(x.Offset), - } + offset = x.Offset default: if option.Mandatory() { fs.Logf(o, "Unsupported mandatory option: %v", option) @@ -910,17 +944,17 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { } } blob := o.getBlobReference() + ctx := context.Background() + ac := azblob.BlobAccessConditions{} + var dowloadResponse *azblob.DownloadResponse err = o.fs.pacer.Call(func() (bool, error) { - if getBlobRangeOptions.Range == nil { - in, err = blob.Get(&getBlobOptions) - } else { - in, err = blob.GetRange(&getBlobRangeOptions) - } + dowloadResponse, err = blob.Download(ctx, offset, count, ac, false) return o.fs.shouldRetry(err) }) if err != nil { return nil, errors.Wrap(err, "failed to open for download") } + in = dowloadResponse.Body(azblob.RetryReaderOptions{}) return in, nil } @@ -945,10 +979,16 @@ func init() { } } +// readSeeker joins an io.Reader and an io.Seeker +type readSeeker struct { + io.Reader + io.Seeker +} + // uploadMultipart uploads a file using multipart upload // // Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList. -func (o *Object) uploadMultipart(in io.Reader, size int64, blob *storage.Blob, putBlobOptions *storage.PutBlobOptions) (err error) { +func (o *Object) uploadMultipart(in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) { // Calculate correct chunkSize chunkSize := int64(chunkSize) var totalParts int64 @@ -970,31 +1010,37 @@ func (o *Object) uploadMultipart(in io.Reader, size int64, blob *storage.Blob, p } fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize)) - // Create an empty blob - err = o.fs.pacer.Call(func() (bool, error) { - err := blob.CreateBlockBlob(putBlobOptions) - return o.fs.shouldRetry(err) - }) + // https://godoc.org/github.com/Azure/azure-storage-blob-go/2017-07-29/azblob#example-BlockBlobURL + // Utilities are cloned from above example + // These helper functions convert a binary block ID to a base-64 string and vice versa + // NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length + blockIDBinaryToBase64 := func(blockID []byte) string { return base64.StdEncoding.EncodeToString(blockID) } + // These helper functions convert an int block ID to a base-64 string and vice versa + blockIDIntToBase64 := func(blockID uint64) string { + binaryBlockID := (&[8]byte{})[:] // All block IDs are 8 bytes long + binary.LittleEndian.PutUint64(binaryBlockID, blockID) + return blockIDBinaryToBase64(binaryBlockID) + } // block ID variables var ( rawID uint64 - bytesID = make([]byte, 8) blockID = "" // id in base64 encoded form - blocks = make([]storage.Block, 0, totalParts) + blocks = make([]string, totalParts) ) // increment the blockID nextID := func() { rawID++ - binary.LittleEndian.PutUint64(bytesID, rawID) - blockID = base64.StdEncoding.EncodeToString(bytesID) - blocks = append(blocks, storage.Block{ - ID: blockID, - Status: storage.BlockStatusLatest, - }) + blockID = blockIDIntToBase64(rawID) + blocks = append(blocks, blockID) } + // Get BlockBlobURL, we will use default pipeline here + blockBlobURL := blob.ToBlockBlobURL() + ctx := context.Background() + ac := azblob.LeaseAccessConditions{} // Use default lease access conditions + // unwrap the accounting from the input, we use wrap to put it // back on after the buffering in, wrap := accounting.UnWrap(in) @@ -1037,13 +1083,11 @@ outer: defer o.fs.uploadToken.Put() fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize)) - // Upload the block, with MD5 for check - md5sum := md5.Sum(buf) - putBlockOptions := storage.PutBlockOptions{ - ContentMD5: base64.StdEncoding.EncodeToString(md5sum[:]), - } err = o.fs.pacer.Call(func() (bool, error) { - err = blob.PutBlockWithLength(blockID, uint64(len(buf)), wrap(bytes.NewBuffer(buf)), &putBlockOptions) + bufferReader := bytes.NewReader(buf) + wrappedReader := wrap(bufferReader) + rs := readSeeker{wrappedReader, bufferReader} + _, err = blockBlobURL.StageBlock(ctx, blockID, rs, ac) return o.fs.shouldRetry(err) }) @@ -1073,9 +1117,8 @@ outer: } // Finalise the upload session - putBlockListOptions := storage.PutBlockListOptions{} err = o.fs.pacer.Call(func() (bool, error) { - err := blob.PutBlockList(blocks, &putBlockListOptions) + _, err := blockBlobURL.CommitBlockList(ctx, blocks, *httpHeaders, o.meta, azblob.BlobAccessConditions{}) return o.fs.shouldRetry(err) }) if err != nil { @@ -1093,29 +1136,45 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio return err } size := src.Size() - blob := o.getBlobWithModTime(src.ModTime()) - blob.Properties.ContentType = fs.MimeType(o) - if sourceMD5, _ := src.Hash(hash.MD5); sourceMD5 != "" { - sourceMD5bytes, err := hex.DecodeString(sourceMD5) - if err == nil { - blob.Properties.ContentMD5 = base64.StdEncoding.EncodeToString(sourceMD5bytes) - } else { - fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err) + // Update Mod time + o.updateMetadataWithModTime(src.ModTime()) + if err != nil { + return err + } + + blob := o.getBlobReference() + httpHeaders := azblob.BlobHTTPHeaders{} + httpHeaders.ContentType = fs.MimeType(o) + // Multipart upload doesn't support MD5 checksums at put block calls, hence calculate + // MD5 only for PutBlob requests + if size < int64(uploadCutoff) { + if sourceMD5, _ := src.Hash(hash.MD5); sourceMD5 != "" { + sourceMD5bytes, err := hex.DecodeString(sourceMD5) + if err == nil { + httpHeaders.ContentMD5 = sourceMD5bytes + } else { + fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err) + } } } - putBlobOptions := storage.PutBlobOptions{} + putBlobOptions := azblob.UploadStreamToBlockBlobOptions{ + BufferSize: int(chunkSize), + MaxBuffers: 4, + Metadata: o.meta, + BlobHTTPHeaders: httpHeaders, + } + + ctx := context.Background() // Don't retry, return a retry error instead err = o.fs.pacer.CallNoRetry(func() (bool, error) { if size >= int64(uploadCutoff) { // If a large file upload in chunks - err = o.uploadMultipart(in, size, blob, &putBlobOptions) + err = o.uploadMultipart(in, size, &blob, &httpHeaders) } else { // Write a small blob in one transaction - if size == 0 { - in = nil - } - err = blob.CreateBlockBlobFromReader(in, &putBlobOptions) + blockBlobURL := blob.ToBlockBlobURL() + _, err = azblob.UploadStreamToBlockBlob(ctx, in, blockBlobURL, putBlobOptions) } return o.fs.shouldRetry(err) }) @@ -1129,9 +1188,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio // Remove an object func (o *Object) Remove() error { blob := o.getBlobReference() - options := storage.DeleteBlobOptions{} + snapShotOptions := azblob.DeleteSnapshotsOptionNone + ac := azblob.BlobAccessConditions{} + ctx := context.Background() return o.fs.pacer.Call(func() (bool, error) { - err := blob.Delete(&options) + _, err := blob.Delete(ctx, snapShotOptions, ac) return o.fs.shouldRetry(err) }) } diff --git a/backend/azureblob/azureblob_test.go b/backend/azureblob/azureblob_test.go index 07a2638c3..3a36d71de 100644 --- a/backend/azureblob/azureblob_test.go +++ b/backend/azureblob/azureblob_test.go @@ -1,4 +1,7 @@ // Test AzureBlob filesystem interface + +// +build !freebsd,!netbsd,!openbsd,!plan9,!solaris,go1.8 + package azureblob_test import ( diff --git a/backend/azureblob/azureblob_unsupported.go b/backend/azureblob/azureblob_unsupported.go new file mode 100644 index 000000000..54947980c --- /dev/null +++ b/backend/azureblob/azureblob_unsupported.go @@ -0,0 +1,6 @@ +// Build for azureblob for unsupported platforms to stop go complaining +// about "no buildable Go source files " + +// +build freebsd netbsd openbsd plan9 solaris !go1.8 + +package azureblob diff --git a/docs/content/azureblob.md b/docs/content/azureblob.md index a142a8250..c4da72e79 100644 --- a/docs/content/azureblob.md +++ b/docs/content/azureblob.md @@ -125,23 +125,20 @@ Rclone has 3 ways of authenticating with Azure Blob Storage: This is the most straight forward and least flexible way. Just fill in the `account` and `key` lines and leave the rest blank. -#### Connection string - -This supports all the possible connection string variants. Leave `account`, `key` and `sas_url` blank and put the connection string into the `connection_string` configuration parameter. - -Use this method if using an account level SAS; the Azure Portal shows connection strings you can cut and paste. - #### SAS URL -This only for a container level SAS URL - it does not work with an account level SAS URL. For account level SAS use the connection string method. +This can be an account level SAS URL or container level SAS URL -To use it leave `account`, `key` and `connection_string` blank and fill in `sas_url`. +To use it leave `account`, `key` blank and fill in `sas_url`. +Account level SAS URL or container level SAS URL can be obtained from Azure portal or Azure Storage Explorer. To get a container level SAS URL right click on a container in the Azure Blob explorer in the Azure portal. -You will only be able to use the container specified in the SAS URL with rclone, eg +If You use container level SAS URL, rclone operations are permitted only on particular container, eg - rclone ls azureblob:container + rclone ls azureblob:container or rclone ls azureblob: + +Since container name already exists in SAS URL, you can leave it empty as well. However these will not work