rclone/fs/operations/reopen.go

145 lines
3.7 KiB
Go
Raw Permalink Normal View History

package operations
import (
"context"
"io"
"sync"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fserrors"
)
// ReOpen is a wrapper for an object reader which reopens the stream on error
type ReOpen struct {
ctx context.Context
mu sync.Mutex // mutex to protect the below
src fs.Object // object to open
options []fs.OpenOption // option to pass to initial open
rc io.ReadCloser // underlying stream
read int64 // number of bytes read from this stream
maxTries int // maximum number of retries
tries int // number of retries we've had so far in this stream
err error // if this is set then Read/Close calls will return it
opened bool // if set then rc is valid and needs closing
}
var (
errorFileClosed = errors.New("file already closed")
errorTooManyTries = errors.New("failed to reopen: too many retries")
)
// NewReOpen makes a handle which will reopen itself and seek to where it was on errors
//
// If hashOption is set this will be applied when reading from the start
//
// If rangeOption is set then this will applied when reading from the
// start, and updated on retries.
func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
h := &ReOpen{
ctx: ctx,
src: src,
maxTries: maxTries,
options: options,
}
h.mu.Lock()
defer h.mu.Unlock()
err = h.open()
if err != nil {
return nil, err
}
return h, nil
}
// open the underlying handle - call with lock held
//
// we don't retry here as the Open() call will itself have low level retries
func (h *ReOpen) open() error {
opts := []fs.OpenOption{}
var hashOption *fs.HashesOption
var rangeOption *fs.RangeOption
for _, option := range h.options {
switch option.(type) {
case *fs.HashesOption:
hashOption = option.(*fs.HashesOption)
case *fs.RangeOption:
rangeOption = option.(*fs.RangeOption)
case *fs.HTTPOption:
opts = append(opts, option)
default:
if option.Mandatory() {
fs.Logf(h.src, "Unsupported mandatory option: %v", option)
}
}
}
if h.read == 0 {
if rangeOption != nil {
opts = append(opts, rangeOption)
}
if hashOption != nil {
// put hashOption on if reading from the start, ditch otherwise
opts = append(opts, hashOption)
}
} else {
if rangeOption != nil {
// range to the read point
opts = append(opts, &fs.RangeOption{Start: rangeOption.Start + h.read, End: rangeOption.End})
} else {
// seek to the read point
opts = append(opts, &fs.SeekOption{Offset: h.read})
}
}
h.tries++
if h.tries > h.maxTries {
h.err = errorTooManyTries
} else {
h.rc, h.err = h.src.Open(h.ctx, opts...)
}
if h.err != nil {
if h.tries > 1 {
fs.Debugf(h.src, "Reopen failed after %d bytes read: %v", h.read, h.err)
}
return h.err
}
h.opened = true
return nil
}
// Read bytes retrying as necessary
func (h *ReOpen) Read(p []byte) (n int, err error) {
h.mu.Lock()
defer h.mu.Unlock()
if h.err != nil {
// return a previous error if there is one
return n, h.err
}
n, err = h.rc.Read(p)
if err != nil {
h.err = err
}
h.read += int64(n)
if err != nil && err != io.EOF && !fserrors.IsNoLowLevelRetryError(err) {
// close underlying stream
h.opened = false
_ = h.rc.Close()
// reopen stream, clearing error if successful
fs.Debugf(h.src, "Reopening on read failure after %d bytes: retry %d/%d: %v", h.read, h.tries, h.maxTries, err)
if h.open() == nil {
err = nil
}
}
return n, err
}
// Close the stream
func (h *ReOpen) Close() error {
h.mu.Lock()
defer h.mu.Unlock()
if !h.opened {
return errorFileClosed
}
h.opened = false
h.err = errorFileClosed
return h.rc.Close()
}