mirror of
https://github.com/rclone/rclone.git
synced 2025-01-10 16:28:30 +01:00
e43b5ce5e5
This is possible now that we no longer support go1.12 and brings rclone into line with standard practices in the Go world. This also removes errors.New and errors.Errorf from lib/errors and prefers the stdlib errors package over lib/errors.
145 lines
3.7 KiB
Go
145 lines
3.7 KiB
Go
package operations
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
)
|
|
|
|
// ReOpen is a wrapper for an object reader which reopens the stream on error
|
|
type ReOpen struct {
|
|
ctx context.Context
|
|
mu sync.Mutex // mutex to protect the below
|
|
src fs.Object // object to open
|
|
options []fs.OpenOption // option to pass to initial open
|
|
rc io.ReadCloser // underlying stream
|
|
read int64 // number of bytes read from this stream
|
|
maxTries int // maximum number of retries
|
|
tries int // number of retries we've had so far in this stream
|
|
err error // if this is set then Read/Close calls will return it
|
|
opened bool // if set then rc is valid and needs closing
|
|
}
|
|
|
|
var (
|
|
errorFileClosed = errors.New("file already closed")
|
|
errorTooManyTries = errors.New("failed to reopen: too many retries")
|
|
)
|
|
|
|
// NewReOpen makes a handle which will reopen itself and seek to where it was on errors
|
|
//
|
|
// If hashOption is set this will be applied when reading from the start
|
|
//
|
|
// If rangeOption is set then this will applied when reading from the
|
|
// start, and updated on retries.
|
|
func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
|
|
h := &ReOpen{
|
|
ctx: ctx,
|
|
src: src,
|
|
maxTries: maxTries,
|
|
options: options,
|
|
}
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
err = h.open()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return h, nil
|
|
}
|
|
|
|
// open the underlying handle - call with lock held
|
|
//
|
|
// we don't retry here as the Open() call will itself have low level retries
|
|
func (h *ReOpen) open() error {
|
|
opts := []fs.OpenOption{}
|
|
var hashOption *fs.HashesOption
|
|
var rangeOption *fs.RangeOption
|
|
for _, option := range h.options {
|
|
switch option.(type) {
|
|
case *fs.HashesOption:
|
|
hashOption = option.(*fs.HashesOption)
|
|
case *fs.RangeOption:
|
|
rangeOption = option.(*fs.RangeOption)
|
|
case *fs.HTTPOption:
|
|
opts = append(opts, option)
|
|
default:
|
|
if option.Mandatory() {
|
|
fs.Logf(h.src, "Unsupported mandatory option: %v", option)
|
|
}
|
|
}
|
|
}
|
|
if h.read == 0 {
|
|
if rangeOption != nil {
|
|
opts = append(opts, rangeOption)
|
|
}
|
|
if hashOption != nil {
|
|
// put hashOption on if reading from the start, ditch otherwise
|
|
opts = append(opts, hashOption)
|
|
}
|
|
} else {
|
|
if rangeOption != nil {
|
|
// range to the read point
|
|
opts = append(opts, &fs.RangeOption{Start: rangeOption.Start + h.read, End: rangeOption.End})
|
|
} else {
|
|
// seek to the read point
|
|
opts = append(opts, &fs.SeekOption{Offset: h.read})
|
|
}
|
|
}
|
|
h.tries++
|
|
if h.tries > h.maxTries {
|
|
h.err = errorTooManyTries
|
|
} else {
|
|
h.rc, h.err = h.src.Open(h.ctx, opts...)
|
|
}
|
|
if h.err != nil {
|
|
if h.tries > 1 {
|
|
fs.Debugf(h.src, "Reopen failed after %d bytes read: %v", h.read, h.err)
|
|
}
|
|
return h.err
|
|
}
|
|
h.opened = true
|
|
return nil
|
|
}
|
|
|
|
// Read bytes retrying as necessary
|
|
func (h *ReOpen) Read(p []byte) (n int, err error) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if h.err != nil {
|
|
// return a previous error if there is one
|
|
return n, h.err
|
|
}
|
|
n, err = h.rc.Read(p)
|
|
if err != nil {
|
|
h.err = err
|
|
}
|
|
h.read += int64(n)
|
|
if err != nil && err != io.EOF && !fserrors.IsNoLowLevelRetryError(err) {
|
|
// close underlying stream
|
|
h.opened = false
|
|
_ = h.rc.Close()
|
|
// reopen stream, clearing error if successful
|
|
fs.Debugf(h.src, "Reopening on read failure after %d bytes: retry %d/%d: %v", h.read, h.tries, h.maxTries, err)
|
|
if h.open() == nil {
|
|
err = nil
|
|
}
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// Close the stream
|
|
func (h *ReOpen) Close() error {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if !h.opened {
|
|
return errorFileClosed
|
|
}
|
|
h.opened = false
|
|
h.err = errorFileClosed
|
|
return h.rc.Close()
|
|
}
|