diff --git a/fs/operations/check.go b/fs/operations/check.go index fd007819f..ba8c08986 100644 --- a/fs/operations/check.go +++ b/fs/operations/check.go @@ -335,7 +335,7 @@ func CheckIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo // Does the work for CheckIdenticalDownload func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) { - in1, err := dst.Open(ctx) + in1, err := Open(ctx, dst) if err != nil { return true, fmt.Errorf("failed to open %q: %w", dst, err) } @@ -345,7 +345,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo }() in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer - in2, err := src.Open(ctx) + in2, err := Open(ctx, src) if err != nil { return true, fmt.Errorf("failed to open %q: %w", src, err) } @@ -483,7 +483,7 @@ func (c *checkMarch) checkSum(ctx context.Context, obj fs.Object, download bool, <-c.tokens // get the token back to free up a slot c.wg.Done() }() - if in, err = obj.Open(ctx); err != nil { + if in, err = Open(ctx, obj); err != nil { return } tr := accounting.Stats(ctx).NewTransfer(obj) @@ -538,7 +538,7 @@ type HashSums map[string]string // ParseSumFile parses a hash SUM file and returns hashes as a map func ParseSumFile(ctx context.Context, sumFile fs.Object) (HashSums, error) { - rd, err := sumFile.Open(ctx) + rd, err := Open(ctx, sumFile) if err != nil { return nil, err } diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 62812dc00..b4b79a5a6 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -62,7 +62,6 @@ type multiThreadCopyState struct { // Copy a single stream into place func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err error) { - ci := fs.GetConfig(ctx) defer func() { if err != nil { fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err) @@ -79,7 +78,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start)) - rc, err := NewReOpen(ctx, mc.src, ci.LowLevelRetries, &fs.RangeOption{Start: start, End: end - 1}) + rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1}) if err != nil { return fmt.Errorf("multipart copy: failed to open source: %w", err) } diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 85aad8fe6..06ab1be4d 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -421,7 +421,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj for _, option := range ci.DownloadHeaders { options = append(options, option) } - in0, err = NewReOpen(ctx, src, ci.LowLevelRetries, options...) + in0, err = Open(ctx, src, options...) if err != nil { err = fmt.Errorf("failed to open source object: %w", err) } else { @@ -1026,7 +1026,7 @@ func hashSum(ctx context.Context, ht hash.Type, base64Encoded bool, downloadFlag for _, option := range fs.GetConfig(ctx).DownloadHeaders { options = append(options, option) } - in, err := NewReOpen(ctx, o, fs.GetConfig(ctx).LowLevelRetries, options...) + in, err := Open(ctx, o, options...) if err != nil { return "ERROR", fmt.Errorf("failed to open file %v: %w", o, err) } @@ -1326,7 +1326,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b for _, option := range ci.DownloadHeaders { options = append(options, option) } - in, err := o.Open(ctx, options...) + in, err := Open(ctx, o, options...) if err != nil { err = fs.CountError(err) fs.Errorf(o, "Failed to open: %v", err) diff --git a/fs/operations/reopen.go b/fs/operations/reopen.go index 9bfb1508f..2e2a114ed 100644 --- a/fs/operations/reopen.go +++ b/fs/operations/reopen.go @@ -29,12 +29,14 @@ var ( errorTooManyTries = errors.New("failed to reopen: too many retries") ) -// NewReOpen makes a handle which will reopen itself and seek to where it was on errors +// NewReOpen makes a handle which will reopen itself and seek to where +// it was on errors up to maxTries times. // -// If hashOption is set this will be applied when reading from the start. +// If an fs.HashesOption is set this will be applied when reading from +// the start. // -// If rangeOption is set then this will applied when reading from the -// start, and updated on retries. +// If an fs.RangeOption is set then this will applied when reading from +// the start, and updated on retries. func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) { h := &ReOpen{ ctx: ctx, @@ -51,6 +53,24 @@ func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.O return h, nil } +// Open makes a handle which will reopen itself and seek to where it +// was on errors. +// +// If an fs.HashesOption is set this will be applied when reading from +// the start. +// +// If an fs.RangeOption is set then this will applied when reading from +// the start, and updated on retries. +// +// It will obey LowLevelRetries in the ctx as the maximum number of +// tries. +// +// Use this instead of calling the Open method on fs.Objects +func Open(ctx context.Context, src fs.Object, options ...fs.OpenOption) (rc io.ReadCloser, err error) { + maxTries := fs.GetConfig(ctx).LowLevelRetries + return NewReOpen(ctx, src, maxTries, options...) +} + // open the underlying handle - call with lock held // // we don't retry here as the Open() call will itself have low level retries