mirror of
https://github.com/rclone/rclone.git
synced 2025-02-20 12:32:21 +01:00
gcs: make all operations work from the root #3421
This commit is contained in:
parent
e0e0e0c7bd
commit
d8e9b1a67c
@ -23,9 +23,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"regexp"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -38,6 +36,7 @@ import (
|
|||||||
"github.com/rclone/rclone/fs/fshttp"
|
"github.com/rclone/rclone/fs/fshttp"
|
||||||
"github.com/rclone/rclone/fs/hash"
|
"github.com/rclone/rclone/fs/hash"
|
||||||
"github.com/rclone/rclone/fs/walk"
|
"github.com/rclone/rclone/fs/walk"
|
||||||
|
"github.com/rclone/rclone/lib/bucket"
|
||||||
"github.com/rclone/rclone/lib/oauthutil"
|
"github.com/rclone/rclone/lib/oauthutil"
|
||||||
"github.com/rclone/rclone/lib/pacer"
|
"github.com/rclone/rclone/lib/pacer"
|
||||||
"golang.org/x/oauth2"
|
"golang.org/x/oauth2"
|
||||||
@ -264,16 +263,16 @@ type Options struct {
|
|||||||
|
|
||||||
// Fs represents a remote storage server
|
// Fs represents a remote storage server
|
||||||
type Fs struct {
|
type Fs struct {
|
||||||
name string // name of this remote
|
name string // name of this remote
|
||||||
root string // the path we are working on if any
|
root string // the path we are working on if any
|
||||||
opt Options // parsed options
|
opt Options // parsed options
|
||||||
features *fs.Features // optional features
|
features *fs.Features // optional features
|
||||||
svc *storage.Service // the connection to the storage server
|
svc *storage.Service // the connection to the storage server
|
||||||
client *http.Client // authorized client
|
client *http.Client // authorized client
|
||||||
bucket string // the bucket we are working on
|
rootBucket string // bucket part of root (if any)
|
||||||
bucketOKMu sync.Mutex // mutex to protect bucket OK
|
rootDirectory string // directory part of root (if any)
|
||||||
bucketOK bool // true if we have created the bucket
|
cache *bucket.Cache // cache of bucket status
|
||||||
pacer *fs.Pacer // To pace the API calls
|
pacer *fs.Pacer // To pace the API calls
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a storage object
|
// Object describes a storage object
|
||||||
@ -298,18 +297,18 @@ func (f *Fs) Name() string {
|
|||||||
|
|
||||||
// Root of the remote (as passed into NewFs)
|
// Root of the remote (as passed into NewFs)
|
||||||
func (f *Fs) Root() string {
|
func (f *Fs) Root() string {
|
||||||
if f.root == "" {
|
return f.root
|
||||||
return f.bucket
|
|
||||||
}
|
|
||||||
return f.bucket + "/" + f.root
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// String converts this Fs to a string
|
// String converts this Fs to a string
|
||||||
func (f *Fs) String() string {
|
func (f *Fs) String() string {
|
||||||
if f.root == "" {
|
if f.rootBucket == "" {
|
||||||
return fmt.Sprintf("Storage bucket %s", f.bucket)
|
return fmt.Sprintf("GCS root")
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("Storage bucket %s path %s", f.bucket, f.root)
|
if f.rootDirectory == "" {
|
||||||
|
return fmt.Sprintf("GCS bucket %s", f.rootBucket)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("GCS bucket %s path %s", f.rootBucket, f.rootDirectory)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Features returns the optional features of this Fs
|
// Features returns the optional features of this Fs
|
||||||
@ -341,21 +340,23 @@ func shouldRetry(err error) (again bool, errOut error) {
|
|||||||
return again, err
|
return again, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pattern to match a storage path
|
// parsePath parses a remote 'url'
|
||||||
var matcher = regexp.MustCompile(`^([^/]*)(.*)$`)
|
func parsePath(path string) (root string) {
|
||||||
|
root = strings.Trim(path, "/")
|
||||||
// parseParse parses a storage 'url'
|
|
||||||
func parsePath(path string) (bucket, directory string, err error) {
|
|
||||||
parts := matcher.FindStringSubmatch(path)
|
|
||||||
if parts == nil {
|
|
||||||
err = errors.Errorf("couldn't find bucket in storage path %q", path)
|
|
||||||
} else {
|
|
||||||
bucket, directory = parts[1], parts[2]
|
|
||||||
directory = strings.Trim(directory, "/")
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// split returns bucket and bucketPath from the rootRelativePath
|
||||||
|
// relative to f.root
|
||||||
|
func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) {
|
||||||
|
return bucket.Split(path.Join(f.root, rootRelativePath))
|
||||||
|
}
|
||||||
|
|
||||||
|
// split returns bucket and bucketPath from the object
|
||||||
|
func (o *Object) split() (bucket, bucketPath string) {
|
||||||
|
return o.fs.split(o.remote)
|
||||||
|
}
|
||||||
|
|
||||||
func getServiceAccountClient(credentialsData []byte) (*http.Client, error) {
|
func getServiceAccountClient(credentialsData []byte) (*http.Client, error) {
|
||||||
conf, err := google.JWTConfigFromJSON(credentialsData, storageConfig.Scopes...)
|
conf, err := google.JWTConfigFromJSON(credentialsData, storageConfig.Scopes...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -365,6 +366,12 @@ func getServiceAccountClient(credentialsData []byte) (*http.Client, error) {
|
|||||||
return oauth2.NewClient(ctxWithSpecialClient, conf.TokenSource(ctxWithSpecialClient)), nil
|
return oauth2.NewClient(ctxWithSpecialClient, conf.TokenSource(ctxWithSpecialClient)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setRoot changes the root of the Fs
|
||||||
|
func (f *Fs) setRoot(root string) {
|
||||||
|
f.root = parsePath(root)
|
||||||
|
f.rootBucket, f.rootDirectory = bucket.Split(f.root)
|
||||||
|
}
|
||||||
|
|
||||||
// NewFs constructs an Fs from the path, bucket:path
|
// NewFs constructs an Fs from the path, bucket:path
|
||||||
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
var oAuthClient *http.Client
|
var oAuthClient *http.Client
|
||||||
@ -406,22 +413,19 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bucket, directory, err := parsePath(root)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
f := &Fs{
|
f := &Fs{
|
||||||
name: name,
|
name: name,
|
||||||
bucket: bucket,
|
root: root,
|
||||||
root: directory,
|
opt: *opt,
|
||||||
opt: *opt,
|
pacer: fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(minSleep))),
|
||||||
pacer: fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(minSleep))),
|
cache: bucket.NewCache(),
|
||||||
}
|
}
|
||||||
|
f.setRoot(root)
|
||||||
f.features = (&fs.Features{
|
f.features = (&fs.Features{
|
||||||
ReadMimeType: true,
|
ReadMimeType: true,
|
||||||
WriteMimeType: true,
|
WriteMimeType: true,
|
||||||
BucketBased: true,
|
BucketBased: true,
|
||||||
|
BucketBasedRootOK: true,
|
||||||
}).Fill(f)
|
}).Fill(f)
|
||||||
|
|
||||||
// Create a new authorized Drive client.
|
// Create a new authorized Drive client.
|
||||||
@ -431,20 +435,18 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||||||
return nil, errors.Wrap(err, "couldn't create Google Cloud Storage client")
|
return nil, errors.Wrap(err, "couldn't create Google Cloud Storage client")
|
||||||
}
|
}
|
||||||
|
|
||||||
if f.root != "" {
|
if f.rootBucket != "" && f.rootDirectory != "" {
|
||||||
f.root += "/"
|
|
||||||
// Check to see if the object exists
|
// Check to see if the object exists
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
_, err = f.svc.Objects.Get(bucket, directory).Do()
|
_, err = f.svc.Objects.Get(f.rootBucket, f.rootDirectory).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f.root = path.Dir(directory)
|
newRoot := path.Dir(f.root)
|
||||||
if f.root == "." {
|
if newRoot == "." {
|
||||||
f.root = ""
|
newRoot = ""
|
||||||
} else {
|
|
||||||
f.root += "/"
|
|
||||||
}
|
}
|
||||||
|
f.setRoot(newRoot)
|
||||||
// return an error with an fs which points to the parent
|
// return an error with an fs which points to the parent
|
||||||
return f, fs.ErrorIsFile
|
return f, fs.ErrorIsFile
|
||||||
}
|
}
|
||||||
@ -485,13 +487,17 @@ type listFn func(remote string, object *storage.Object, isDirectory bool) error
|
|||||||
// dir is the starting directory, "" for root
|
// dir is the starting directory, "" for root
|
||||||
//
|
//
|
||||||
// Set recurse to read sub directories
|
// Set recurse to read sub directories
|
||||||
func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) (err error) {
|
//
|
||||||
root := f.root
|
// The remote has prefix removed from it and if addBucket is set
|
||||||
rootLength := len(root)
|
// then it adds the bucket to the start.
|
||||||
if dir != "" {
|
func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) (err error) {
|
||||||
root += dir + "/"
|
if prefix != "" {
|
||||||
|
prefix += "/"
|
||||||
}
|
}
|
||||||
list := f.svc.Objects.List(f.bucket).Prefix(root).MaxResults(listChunks)
|
if directory != "" {
|
||||||
|
directory += "/"
|
||||||
|
}
|
||||||
|
list := f.svc.Objects.List(bucket).Prefix(directory).MaxResults(listChunks)
|
||||||
if !recurse {
|
if !recurse {
|
||||||
list = list.Delimiter("/")
|
list = list.Delimiter("/")
|
||||||
}
|
}
|
||||||
@ -511,31 +517,36 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) (err
|
|||||||
}
|
}
|
||||||
if !recurse {
|
if !recurse {
|
||||||
var object storage.Object
|
var object storage.Object
|
||||||
for _, prefix := range objects.Prefixes {
|
for _, remote := range objects.Prefixes {
|
||||||
if !strings.HasSuffix(prefix, "/") {
|
if !strings.HasSuffix(remote, "/") {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = fn(prefix[rootLength:len(prefix)-1], &object, true)
|
if !strings.HasPrefix(remote, prefix) {
|
||||||
|
fs.Logf(f, "Odd name received %q", remote)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
remote = remote[len(prefix) : len(remote)-1]
|
||||||
|
if addBucket {
|
||||||
|
remote = path.Join(bucket, remote)
|
||||||
|
}
|
||||||
|
err = fn(remote, &object, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, object := range objects.Items {
|
for _, object := range objects.Items {
|
||||||
if !strings.HasPrefix(object.Name, root) {
|
if !strings.HasPrefix(object.Name, prefix) {
|
||||||
fs.Logf(f, "Odd name received %q", object.Name)
|
fs.Logf(f, "Odd name received %q", object.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote := object.Name[rootLength:]
|
remote := object.Name[len(prefix):]
|
||||||
|
isDirectory := strings.HasSuffix(remote, "/")
|
||||||
|
if addBucket {
|
||||||
|
remote = path.Join(bucket, remote)
|
||||||
|
}
|
||||||
// is this a directory marker?
|
// is this a directory marker?
|
||||||
if (strings.HasSuffix(remote, "/") || remote == "") && object.Size == 0 {
|
if isDirectory && object.Size == 0 {
|
||||||
if recurse && remote != "" {
|
|
||||||
// add a directory in if --fast-list since will have no prefixes
|
|
||||||
err = fn(remote[:len(remote)-1], object, true)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue // skip directory marker
|
continue // skip directory marker
|
||||||
}
|
}
|
||||||
err = fn(remote, object, false)
|
err = fn(remote, object, false)
|
||||||
@ -564,19 +575,10 @@ func (f *Fs) itemToDirEntry(remote string, object *storage.Object, isDirectory b
|
|||||||
return o, nil
|
return o, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark the bucket as being OK
|
|
||||||
func (f *Fs) markBucketOK() {
|
|
||||||
if f.bucket != "" {
|
|
||||||
f.bucketOKMu.Lock()
|
|
||||||
f.bucketOK = true
|
|
||||||
f.bucketOKMu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// listDir lists a single directory
|
// listDir lists a single directory
|
||||||
func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) {
|
||||||
// List the objects
|
// List the objects
|
||||||
err = f.list(ctx, dir, false, func(remote string, object *storage.Object, isDirectory bool) error {
|
err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *storage.Object, isDirectory bool) error {
|
||||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -590,7 +592,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// bucket must be present if listing succeeded
|
// bucket must be present if listing succeeded
|
||||||
f.markBucketOK()
|
f.cache.MarkOK(bucket)
|
||||||
return entries, err
|
return entries, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -634,10 +636,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) {
|
|||||||
// This should return ErrDirNotFound if the directory isn't
|
// This should return ErrDirNotFound if the directory isn't
|
||||||
// found.
|
// found.
|
||||||
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
||||||
if f.bucket == "" {
|
bucket, directory := f.split(dir)
|
||||||
|
if bucket == "" {
|
||||||
return f.listBuckets(dir)
|
return f.listBuckets(dir)
|
||||||
}
|
}
|
||||||
return f.listDir(ctx, dir)
|
return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListR lists the objects and directories of the Fs starting
|
// ListR lists the objects and directories of the Fs starting
|
||||||
@ -657,22 +660,41 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
|||||||
// Don't implement this unless you have a more efficient way
|
// Don't implement this unless you have a more efficient way
|
||||||
// of listing recursively that doing a directory traversal.
|
// of listing recursively that doing a directory traversal.
|
||||||
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
||||||
if f.bucket == "" {
|
bucket, directory := f.split(dir)
|
||||||
return fs.ErrorListBucketRequired
|
|
||||||
}
|
|
||||||
list := walk.NewListRHelper(callback)
|
list := walk.NewListRHelper(callback)
|
||||||
err = f.list(ctx, dir, true, func(remote string, object *storage.Object, isDirectory bool) error {
|
listR := func(bucket, directory, prefix string, addBucket bool) error {
|
||||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *storage.Object, isDirectory bool) error {
|
||||||
|
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return list.Add(entry)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if bucket == "" {
|
||||||
|
entries, err := f.listBuckets("")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, entry := range entries {
|
||||||
|
err = list.Add(entry)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bucket := entry.Remote()
|
||||||
|
err = listR(bucket, "", f.rootDirectory, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return list.Add(entry)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
// bucket must be present if listing succeeded
|
// bucket must be present if listing succeeded
|
||||||
f.markBucketOK()
|
f.cache.MarkOK(bucket)
|
||||||
return list.Flush()
|
return list.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -697,58 +719,50 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
|
|||||||
|
|
||||||
// Mkdir creates the bucket if it doesn't exist
|
// Mkdir creates the bucket if it doesn't exist
|
||||||
func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) {
|
func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) {
|
||||||
f.bucketOKMu.Lock()
|
bucket, _ := f.split(dir)
|
||||||
defer f.bucketOKMu.Unlock()
|
return f.cache.Create(bucket, func() error {
|
||||||
if f.bucketOK {
|
// List something from the bucket to see if it exists. Doing it like this enables the use of a
|
||||||
return nil
|
// service account that only has the "Storage Object Admin" role. See #2193 for details.
|
||||||
}
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
// List something from the bucket to see if it exists. Doing it like this enables the use of a
|
_, err = f.svc.Objects.List(bucket).MaxResults(1).Do()
|
||||||
// service account that only has the "Storage Object Admin" role. See #2193 for details.
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
if err == nil {
|
||||||
_, err = f.svc.Objects.List(f.bucket).MaxResults(1).Do()
|
// Bucket already exists
|
||||||
return shouldRetry(err)
|
return nil
|
||||||
})
|
} else if gErr, ok := err.(*googleapi.Error); ok {
|
||||||
if err == nil {
|
if gErr.Code != http.StatusNotFound {
|
||||||
// Bucket already exists
|
return errors.Wrap(err, "failed to get bucket")
|
||||||
f.bucketOK = true
|
}
|
||||||
return nil
|
} else {
|
||||||
} else if gErr, ok := err.(*googleapi.Error); ok {
|
|
||||||
if gErr.Code != http.StatusNotFound {
|
|
||||||
return errors.Wrap(err, "failed to get bucket")
|
return errors.Wrap(err, "failed to get bucket")
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return errors.Wrap(err, "failed to get bucket")
|
|
||||||
}
|
|
||||||
|
|
||||||
if f.opt.ProjectNumber == "" {
|
if f.opt.ProjectNumber == "" {
|
||||||
return errors.New("can't make bucket without project number")
|
return errors.New("can't make bucket without project number")
|
||||||
}
|
}
|
||||||
|
|
||||||
bucket := storage.Bucket{
|
bucket := storage.Bucket{
|
||||||
Name: f.bucket,
|
Name: bucket,
|
||||||
Location: f.opt.Location,
|
Location: f.opt.Location,
|
||||||
StorageClass: f.opt.StorageClass,
|
StorageClass: f.opt.StorageClass,
|
||||||
}
|
|
||||||
if f.opt.BucketPolicyOnly {
|
|
||||||
bucket.IamConfiguration = &storage.BucketIamConfiguration{
|
|
||||||
BucketPolicyOnly: &storage.BucketIamConfigurationBucketPolicyOnly{
|
|
||||||
Enabled: true,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
if f.opt.BucketPolicyOnly {
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
bucket.IamConfiguration = &storage.BucketIamConfiguration{
|
||||||
insertBucket := f.svc.Buckets.Insert(f.opt.ProjectNumber, &bucket)
|
BucketPolicyOnly: &storage.BucketIamConfigurationBucketPolicyOnly{
|
||||||
if !f.opt.BucketPolicyOnly {
|
Enabled: true,
|
||||||
insertBucket.PredefinedAcl(f.opt.BucketACL)
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_, err = insertBucket.Do()
|
return f.pacer.Call(func() (bool, error) {
|
||||||
return shouldRetry(err)
|
insertBucket := f.svc.Buckets.Insert(f.opt.ProjectNumber, &bucket)
|
||||||
})
|
if !f.opt.BucketPolicyOnly {
|
||||||
if err == nil {
|
insertBucket.PredefinedAcl(f.opt.BucketACL)
|
||||||
f.bucketOK = true
|
}
|
||||||
}
|
_, err = insertBucket.Do()
|
||||||
return err
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
|
}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rmdir deletes the bucket if the fs is at the root
|
// Rmdir deletes the bucket if the fs is at the root
|
||||||
@ -756,19 +770,16 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) {
|
|||||||
// Returns an error if it isn't empty: Error 409: The bucket you tried
|
// Returns an error if it isn't empty: Error 409: The bucket you tried
|
||||||
// to delete was not empty.
|
// to delete was not empty.
|
||||||
func (f *Fs) Rmdir(ctx context.Context, dir string) (err error) {
|
func (f *Fs) Rmdir(ctx context.Context, dir string) (err error) {
|
||||||
f.bucketOKMu.Lock()
|
bucket, directory := f.split(dir)
|
||||||
defer f.bucketOKMu.Unlock()
|
if bucket == "" || directory != "" {
|
||||||
if f.root != "" || dir != "" {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
return f.cache.Remove(bucket, func() error {
|
||||||
err = f.svc.Buckets.Delete(f.bucket).Do()
|
return f.pacer.Call(func() (bool, error) {
|
||||||
return shouldRetry(err)
|
err = f.svc.Buckets.Delete(bucket).Do()
|
||||||
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
if err == nil {
|
|
||||||
f.bucketOK = false
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Precision returns the precision
|
// Precision returns the precision
|
||||||
@ -786,6 +797,7 @@ func (f *Fs) Precision() time.Duration {
|
|||||||
//
|
//
|
||||||
// If it isn't possible then return fs.ErrorCantCopy
|
// If it isn't possible then return fs.ErrorCantCopy
|
||||||
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||||
|
dstBucket, dstPath := f.split(remote)
|
||||||
err := f.Mkdir(ctx, "")
|
err := f.Mkdir(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -795,6 +807,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||||||
fs.Debugf(src, "Can't copy - not same remote type")
|
fs.Debugf(src, "Can't copy - not same remote type")
|
||||||
return nil, fs.ErrorCantCopy
|
return nil, fs.ErrorCantCopy
|
||||||
}
|
}
|
||||||
|
srcBucket, srcPath := srcObj.split()
|
||||||
|
|
||||||
// Temporary Object under construction
|
// Temporary Object under construction
|
||||||
dstObj := &Object{
|
dstObj := &Object{
|
||||||
@ -802,13 +815,9 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||||||
remote: remote,
|
remote: remote,
|
||||||
}
|
}
|
||||||
|
|
||||||
srcBucket := srcObj.fs.bucket
|
|
||||||
srcObject := srcObj.fs.root + srcObj.remote
|
|
||||||
dstBucket := f.bucket
|
|
||||||
dstObject := f.root + remote
|
|
||||||
var newObject *storage.Object
|
var newObject *storage.Object
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
newObject, err = f.svc.Objects.Copy(srcBucket, srcObject, dstBucket, dstObject, nil).Do()
|
newObject, err = f.svc.Objects.Copy(srcBucket, srcPath, dstBucket, dstPath, nil).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -898,9 +907,10 @@ func (o *Object) readMetaData() (err error) {
|
|||||||
if !o.modTime.IsZero() {
|
if !o.modTime.IsZero() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
bucket, bucketPath := o.split()
|
||||||
var object *storage.Object
|
var object *storage.Object
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
object, err = o.fs.svc.Objects.Get(o.fs.bucket, o.fs.root+o.remote).Do()
|
object, err = o.fs.svc.Objects.Get(bucket, bucketPath).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -938,14 +948,15 @@ func metadataFromModTime(modTime time.Time) map[string]string {
|
|||||||
// SetModTime sets the modification time of the local fs object
|
// SetModTime sets the modification time of the local fs object
|
||||||
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) (err error) {
|
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) (err error) {
|
||||||
// This only adds metadata so will perserve other metadata
|
// This only adds metadata so will perserve other metadata
|
||||||
|
bucket, bucketPath := o.split()
|
||||||
object := storage.Object{
|
object := storage.Object{
|
||||||
Bucket: o.fs.bucket,
|
Bucket: bucket,
|
||||||
Name: o.fs.root + o.remote,
|
Name: bucketPath,
|
||||||
Metadata: metadataFromModTime(modTime),
|
Metadata: metadataFromModTime(modTime),
|
||||||
}
|
}
|
||||||
var newObject *storage.Object
|
var newObject *storage.Object
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
newObject, err = o.fs.svc.Objects.Patch(o.fs.bucket, o.fs.root+o.remote, &object).Do()
|
newObject, err = o.fs.svc.Objects.Patch(bucket, bucketPath, &object).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -994,6 +1005,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
|
bucket, bucketPath := o.split()
|
||||||
err := o.fs.Mkdir(ctx, "")
|
err := o.fs.Mkdir(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -1001,14 +1013,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||||||
modTime := src.ModTime(ctx)
|
modTime := src.ModTime(ctx)
|
||||||
|
|
||||||
object := storage.Object{
|
object := storage.Object{
|
||||||
Bucket: o.fs.bucket,
|
Bucket: bucket,
|
||||||
Name: o.fs.root + o.remote,
|
Name: bucketPath,
|
||||||
ContentType: fs.MimeType(ctx, src),
|
ContentType: fs.MimeType(ctx, src),
|
||||||
Metadata: metadataFromModTime(modTime),
|
Metadata: metadataFromModTime(modTime),
|
||||||
}
|
}
|
||||||
var newObject *storage.Object
|
var newObject *storage.Object
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
insertObject := o.fs.svc.Objects.Insert(o.fs.bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name)
|
insertObject := o.fs.svc.Objects.Insert(bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name)
|
||||||
if !o.fs.opt.BucketPolicyOnly {
|
if !o.fs.opt.BucketPolicyOnly {
|
||||||
insertObject.PredefinedAcl(o.fs.opt.ObjectACL)
|
insertObject.PredefinedAcl(o.fs.opt.ObjectACL)
|
||||||
}
|
}
|
||||||
@ -1025,8 +1037,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||||||
|
|
||||||
// Remove an object
|
// Remove an object
|
||||||
func (o *Object) Remove(ctx context.Context) (err error) {
|
func (o *Object) Remove(ctx context.Context) (err error) {
|
||||||
|
bucket, bucketPath := o.split()
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
err = o.fs.svc.Objects.Delete(o.fs.bucket, o.fs.root+o.remote).Do()
|
err = o.fs.svc.Objects.Delete(bucket, bucketPath).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user