mirror of
https://github.com/rclone/rclone.git
synced 2024-11-23 00:43:49 +01:00
gcs: add context to SDK calls #3257
This commit is contained in:
parent
bd863f8868
commit
29b4f211ab
@ -374,6 +374,7 @@ func (f *Fs) setRoot(root string) {
|
|||||||
|
|
||||||
// NewFs constructs an Fs from the path, bucket:path
|
// NewFs constructs an Fs from the path, bucket:path
|
||||||
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
|
ctx := context.TODO()
|
||||||
var oAuthClient *http.Client
|
var oAuthClient *http.Client
|
||||||
|
|
||||||
// Parse config into Options struct
|
// Parse config into Options struct
|
||||||
@ -438,7 +439,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||||||
if f.rootBucket != "" && f.rootDirectory != "" {
|
if f.rootBucket != "" && f.rootDirectory != "" {
|
||||||
// Check to see if the object exists
|
// Check to see if the object exists
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
_, err = f.svc.Objects.Get(f.rootBucket, f.rootDirectory).Do()
|
_, err = f.svc.Objects.Get(f.rootBucket, f.rootDirectory).Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -457,7 +458,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||||||
// Return an Object from a path
|
// Return an Object from a path
|
||||||
//
|
//
|
||||||
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
||||||
func (f *Fs) newObjectWithInfo(remote string, info *storage.Object) (fs.Object, error) {
|
func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *storage.Object) (fs.Object, error) {
|
||||||
o := &Object{
|
o := &Object{
|
||||||
fs: f,
|
fs: f,
|
||||||
remote: remote,
|
remote: remote,
|
||||||
@ -465,7 +466,7 @@ func (f *Fs) newObjectWithInfo(remote string, info *storage.Object) (fs.Object,
|
|||||||
if info != nil {
|
if info != nil {
|
||||||
o.setMetaData(info)
|
o.setMetaData(info)
|
||||||
} else {
|
} else {
|
||||||
err := o.readMetaData() // reads info and meta, returning an error
|
err := o.readMetaData(ctx) // reads info and meta, returning an error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -476,7 +477,7 @@ func (f *Fs) newObjectWithInfo(remote string, info *storage.Object) (fs.Object,
|
|||||||
// NewObject finds the Object at remote. If it can't be found
|
// NewObject finds the Object at remote. If it can't be found
|
||||||
// it returns the error fs.ErrorObjectNotFound.
|
// it returns the error fs.ErrorObjectNotFound.
|
||||||
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||||
return f.newObjectWithInfo(remote, nil)
|
return f.newObjectWithInfo(ctx, remote, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// listFn is called from list to handle an object.
|
// listFn is called from list to handle an object.
|
||||||
@ -504,7 +505,7 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||||||
for {
|
for {
|
||||||
var objects *storage.Objects
|
var objects *storage.Objects
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
objects, err = list.Do()
|
objects, err = list.Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -563,12 +564,12 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Convert a list item into a DirEntry
|
// Convert a list item into a DirEntry
|
||||||
func (f *Fs) itemToDirEntry(remote string, object *storage.Object, isDirectory bool) (fs.DirEntry, error) {
|
func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *storage.Object, isDirectory bool) (fs.DirEntry, error) {
|
||||||
if isDirectory {
|
if isDirectory {
|
||||||
d := fs.NewDir(remote, time.Time{}).SetSize(int64(object.Size))
|
d := fs.NewDir(remote, time.Time{}).SetSize(int64(object.Size))
|
||||||
return d, nil
|
return d, nil
|
||||||
}
|
}
|
||||||
o, err := f.newObjectWithInfo(remote, object)
|
o, err := f.newObjectWithInfo(ctx, remote, object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -579,7 +580,7 @@ func (f *Fs) itemToDirEntry(remote string, object *storage.Object, isDirectory b
|
|||||||
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) {
|
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) {
|
||||||
// List the objects
|
// List the objects
|
||||||
err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *storage.Object, isDirectory bool) error {
|
err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *storage.Object, isDirectory bool) error {
|
||||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -605,7 +606,7 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error)
|
|||||||
for {
|
for {
|
||||||
var buckets *storage.Buckets
|
var buckets *storage.Buckets
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
buckets, err = listBuckets.Do()
|
buckets, err = listBuckets.Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -664,7 +665,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||||||
list := walk.NewListRHelper(callback)
|
list := walk.NewListRHelper(callback)
|
||||||
listR := func(bucket, directory, prefix string, addBucket bool) error {
|
listR := func(bucket, directory, prefix string, addBucket bool) error {
|
||||||
return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *storage.Object, isDirectory bool) error {
|
return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *storage.Object, isDirectory bool) error {
|
||||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -731,7 +732,7 @@ func (f *Fs) makeBucket(ctx context.Context, bucket string) (err error) {
|
|||||||
// List something from the bucket to see if it exists. Doing it like this enables the use of a
|
// List something from the bucket to see if it exists. Doing it like this enables the use of a
|
||||||
// service account that only has the "Storage Object Admin" role. See #2193 for details.
|
// service account that only has the "Storage Object Admin" role. See #2193 for details.
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
_, err = f.svc.Objects.List(bucket).MaxResults(1).Do()
|
_, err = f.svc.Objects.List(bucket).MaxResults(1).Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -766,7 +767,7 @@ func (f *Fs) makeBucket(ctx context.Context, bucket string) (err error) {
|
|||||||
if !f.opt.BucketPolicyOnly {
|
if !f.opt.BucketPolicyOnly {
|
||||||
insertBucket.PredefinedAcl(f.opt.BucketACL)
|
insertBucket.PredefinedAcl(f.opt.BucketACL)
|
||||||
}
|
}
|
||||||
_, err = insertBucket.Do()
|
_, err = insertBucket.Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
}, nil)
|
}, nil)
|
||||||
@ -783,7 +784,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) (err error) {
|
|||||||
}
|
}
|
||||||
return f.cache.Remove(bucket, func() error {
|
return f.cache.Remove(bucket, func() error {
|
||||||
return f.pacer.Call(func() (bool, error) {
|
return f.pacer.Call(func() (bool, error) {
|
||||||
err = f.svc.Buckets.Delete(bucket).Do()
|
err = f.svc.Buckets.Delete(bucket).Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -828,7 +829,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|||||||
if !f.opt.BucketPolicyOnly {
|
if !f.opt.BucketPolicyOnly {
|
||||||
copyObject.DestinationPredefinedAcl(f.opt.ObjectACL)
|
copyObject.DestinationPredefinedAcl(f.opt.ObjectACL)
|
||||||
}
|
}
|
||||||
newObject, err = copyObject.Do()
|
newObject, err = copyObject.Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -912,10 +913,10 @@ func (o *Object) setMetaData(info *storage.Object) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// readObjectInfo reads the definition for an object
|
// readObjectInfo reads the definition for an object
|
||||||
func (o *Object) readObjectInfo() (object *storage.Object, err error) {
|
func (o *Object) readObjectInfo(ctx context.Context) (object *storage.Object, err error) {
|
||||||
bucket, bucketPath := o.split()
|
bucket, bucketPath := o.split()
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
object, err = o.fs.svc.Objects.Get(bucket, bucketPath).Do()
|
object, err = o.fs.svc.Objects.Get(bucket, bucketPath).Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -932,11 +933,11 @@ func (o *Object) readObjectInfo() (object *storage.Object, err error) {
|
|||||||
// readMetaData gets the metadata if it hasn't already been fetched
|
// readMetaData gets the metadata if it hasn't already been fetched
|
||||||
//
|
//
|
||||||
// it also sets the info
|
// it also sets the info
|
||||||
func (o *Object) readMetaData() (err error) {
|
func (o *Object) readMetaData(ctx context.Context) (err error) {
|
||||||
if !o.modTime.IsZero() {
|
if !o.modTime.IsZero() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
object, err := o.readObjectInfo()
|
object, err := o.readObjectInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -949,7 +950,7 @@ func (o *Object) readMetaData() (err error) {
|
|||||||
// It attempts to read the objects mtime and if that isn't present the
|
// It attempts to read the objects mtime and if that isn't present the
|
||||||
// LastModified returned in the http headers
|
// LastModified returned in the http headers
|
||||||
func (o *Object) ModTime(ctx context.Context) time.Time {
|
func (o *Object) ModTime(ctx context.Context) time.Time {
|
||||||
err := o.readMetaData()
|
err := o.readMetaData(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// fs.Logf(o, "Failed to read metadata: %v", err)
|
// fs.Logf(o, "Failed to read metadata: %v", err)
|
||||||
return time.Now()
|
return time.Now()
|
||||||
@ -967,7 +968,7 @@ func metadataFromModTime(modTime time.Time) map[string]string {
|
|||||||
// SetModTime sets the modification time of the local fs object
|
// SetModTime sets the modification time of the local fs object
|
||||||
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) (err error) {
|
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) (err error) {
|
||||||
// read the complete existing object first
|
// read the complete existing object first
|
||||||
object, err := o.readObjectInfo()
|
object, err := o.readObjectInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -986,7 +987,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) (err error)
|
|||||||
if !o.fs.opt.BucketPolicyOnly {
|
if !o.fs.opt.BucketPolicyOnly {
|
||||||
copyObject.DestinationPredefinedAcl(o.fs.opt.ObjectACL)
|
copyObject.DestinationPredefinedAcl(o.fs.opt.ObjectACL)
|
||||||
}
|
}
|
||||||
newObject, err = copyObject.Do()
|
newObject, err = copyObject.Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1055,7 +1056,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||||||
if !o.fs.opt.BucketPolicyOnly {
|
if !o.fs.opt.BucketPolicyOnly {
|
||||||
insertObject.PredefinedAcl(o.fs.opt.ObjectACL)
|
insertObject.PredefinedAcl(o.fs.opt.ObjectACL)
|
||||||
}
|
}
|
||||||
newObject, err = insertObject.Do()
|
newObject, err = insertObject.Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1070,7 +1071,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||||||
func (o *Object) Remove(ctx context.Context) (err error) {
|
func (o *Object) Remove(ctx context.Context) (err error) {
|
||||||
bucket, bucketPath := o.split()
|
bucket, bucketPath := o.split()
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
err = o.fs.svc.Objects.Delete(bucket, bucketPath).Do()
|
err = o.fs.svc.Objects.Delete(bucket, bucketPath).Context(ctx).Do()
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user