From 5b27702b6147100bb747b4ddad0d4d360476a15c Mon Sep 17 00:00:00 2001 From: sandeepkru Date: Thu, 6 Sep 2018 21:43:40 -0700 Subject: [PATCH] AzureBlob new sdk changes --- backend/azureblob/azureblob.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index cc17b40a9..77b69c722 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -7,6 +7,7 @@ package azureblob import ( "bytes" "context" + "crypto/md5" "encoding/base64" "encoding/binary" "encoding/hex" @@ -248,7 +249,11 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { ) switch { case opt.Account != "" && opt.Key != "": - credential := azblob.NewSharedKeyCredential(opt.Account, opt.Key) + credential, err := azblob.NewSharedKeyCredential(opt.Account, opt.Key) + if err != nil { + return nil, errors.Wrapf(err, "Failed to parse credentials") + } + u, err = url.Parse(fmt.Sprintf("https://%s.%s", opt.Account, opt.Endpoint)) if err != nil { return nil, errors.Wrap(err, "failed to make azure storage url from account and endpoint") @@ -579,7 +584,7 @@ func (f *Fs) listContainersToFn(fn listContainerFn) error { } ctx := context.Background() for marker := (azblob.Marker{}); marker.NotDone(); { - var response *azblob.ListContainersResponse + var response *azblob.ListContainersSegmentResponse err := f.pacer.Call(func() (bool, error) { var err error response, err = f.svcURL.ListContainersSegment(ctx, marker, params) @@ -761,7 +766,7 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { var startCopy *azblob.BlobStartCopyFromURLResponse err = f.pacer.Call(func() (bool, error) { - startCopy, err = dstBlobURL.StartCopyFromURL(ctx, *source, nil, options, options) + startCopy, err = dstBlobURL.StartCopyFromURL(ctx, *source, nil, azblob.ModifiedAccessConditions{}, options) return f.shouldRetry(err) }) if err != nil { @@ -846,9 +851,8 @@ func (o *Object) setMetadata(metadata azblob.Metadata) { // o.md5 // o.meta func (o *Object) decodeMetaDataFromPropertiesResponse(info *azblob.BlobGetPropertiesResponse) (err error) { - // NOTE - In BlobGetPropertiesResponse, Client library returns MD5 as base64 decoded string - // unlike BlobProperties in BlobItem (used in decodeMetadataFromBlob) which returns base64 - // encoded bytes. Object needs to maintain this as base64 encoded string. + // NOTE - Client library always returns MD5 as base64 decoded string, Object needs to maintain + // this as base64 encoded string. o.md5 = base64.StdEncoding.EncodeToString(info.ContentMD5()) o.mimeType = info.ContentType() o.size = info.ContentLength() @@ -860,7 +864,9 @@ func (o *Object) decodeMetaDataFromPropertiesResponse(info *azblob.BlobGetProper } func (o *Object) decodeMetaDataFromBlob(info *azblob.BlobItem) (err error) { - o.md5 = string(info.Properties.ContentMD5) + // NOTE - Client library always returns MD5 as base64 decoded string, Object needs to maintain + // this as base64 encoded string. + o.md5 = base64.StdEncoding.EncodeToString(info.Properties.ContentMD5) o.mimeType = *info.Properties.ContentType o.size = *info.Properties.ContentLength o.modTime = info.Properties.LastModified @@ -1134,11 +1140,14 @@ outer: defer o.fs.uploadToken.Put() fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize)) + // Upload the block, with MD5 for check + md5sum := md5.Sum(buf) + transactionalMD5 := md5sum[:] err = o.fs.pacer.Call(func() (bool, error) { bufferReader := bytes.NewReader(buf) wrappedReader := wrap(bufferReader) rs := readSeeker{wrappedReader, bufferReader} - _, err = blockBlobURL.StageBlock(ctx, blockID, &rs, ac) + _, err = blockBlobURL.StageBlock(ctx, blockID, &rs, ac, transactionalMD5) return o.fs.shouldRetry(err) })