mirror of
https://github.com/rclone/rclone.git
synced 2024-11-23 00:43:49 +01:00
8301a72453
This was caused by the signal to stop buffering being ignored when there was no buffer! This is fixed by explicitly checking for no buffering and stopping.
607 lines
16 KiB
Go
607 lines
16 KiB
Go
package downloaders
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/accounting"
|
|
"github.com/rclone/rclone/fs/asyncreader"
|
|
"github.com/rclone/rclone/fs/chunkedreader"
|
|
"github.com/rclone/rclone/fs/log"
|
|
"github.com/rclone/rclone/lib/ranges"
|
|
"github.com/rclone/rclone/vfs/vfscommon"
|
|
)
|
|
|
|
// FIXME implement max downloaders
|
|
|
|
const (
|
|
// max time a downloader can be idle before closing itself
|
|
maxDownloaderIdleTime = 5 * time.Second
|
|
// max number of bytes a reader should skip over before closing it
|
|
maxSkipBytes = 1024 * 1024
|
|
// time between background kicks of waiters to pick up errors
|
|
backgroundKickerInterval = 5 * time.Second
|
|
// maximum number of errors before declaring dead
|
|
maxErrorCount = 10
|
|
)
|
|
|
|
// Item is the interface that an item to download must obey
|
|
type Item interface {
|
|
// FindMissing adjusts r returning a new ranges.Range which only
|
|
// contains the range which needs to be downloaded. This could be
|
|
// empty - check with IsEmpty. It also adjust this to make sure it is
|
|
// not larger than the file.
|
|
FindMissing(r ranges.Range) (outr ranges.Range)
|
|
|
|
// HasRange returns true if the current ranges entirely include range
|
|
HasRange(r ranges.Range) bool
|
|
|
|
// WriteAtNoOverwrite writes b to the file, but will not overwrite
|
|
// already present ranges.
|
|
//
|
|
// This is used by the downloader to write bytes to the file
|
|
//
|
|
// It returns n the total bytes processed and skipped the number of
|
|
// bytes which were processed but not actually written to the file.
|
|
WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error)
|
|
}
|
|
|
|
// Downloaders is a number of downloader~s and a queue of waiters
|
|
// waiting for segments to be downloaded to a file.
|
|
type Downloaders struct {
|
|
// Write once - no locking required
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
item Item
|
|
opt *vfscommon.Options
|
|
src fs.Object // source object
|
|
remote string
|
|
wg sync.WaitGroup
|
|
|
|
// Read write
|
|
mu sync.Mutex
|
|
dls []*downloader
|
|
waiters []waiter
|
|
errorCount int // number of consecutive errors
|
|
lastErr error // last error received
|
|
}
|
|
|
|
// waiter is a range we are waiting for and a channel to signal when
|
|
// the range is found
|
|
type waiter struct {
|
|
r ranges.Range
|
|
errChan chan<- error
|
|
}
|
|
|
|
// downloader represents a running download for part of a file.
|
|
type downloader struct {
|
|
// Write once
|
|
dls *Downloaders // parent structure
|
|
quit chan struct{} // close to quit the downloader
|
|
wg sync.WaitGroup // to keep track of downloader goroutine
|
|
kick chan struct{} // kick the downloader when needed
|
|
|
|
// Read write
|
|
mu sync.Mutex
|
|
start int64 // start offset
|
|
offset int64 // current offset
|
|
maxOffset int64 // maximum offset we are reading to
|
|
tr *accounting.Transfer
|
|
in *accounting.Account // input we are reading from
|
|
skipped int64 // number of bytes we have skipped sequentially
|
|
_closed bool // set to true if downloader is closed
|
|
stop bool // set to true if we have called _stop()
|
|
}
|
|
|
|
// New makes a downloader for item
|
|
func New(item Item, opt *vfscommon.Options, remote string, src fs.Object) (dls *Downloaders) {
|
|
if src == nil {
|
|
panic("internal error: newDownloaders called with nil src object")
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
dls = &Downloaders{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
item: item,
|
|
opt: opt,
|
|
src: src,
|
|
remote: remote,
|
|
}
|
|
dls.wg.Add(1)
|
|
go func() {
|
|
defer dls.wg.Done()
|
|
ticker := time.NewTicker(backgroundKickerInterval)
|
|
select {
|
|
case <-ticker.C:
|
|
err := dls.kickWaiters()
|
|
if err != nil {
|
|
fs.Errorf(dls.src, "Failed to kick waiters: %v", err)
|
|
}
|
|
case <-ctx.Done():
|
|
break
|
|
}
|
|
ticker.Stop()
|
|
}()
|
|
|
|
return dls
|
|
}
|
|
|
|
// Accumulate errors for this downloader
|
|
//
|
|
// It should be called with
|
|
//
|
|
// n bytes downloaded
|
|
// err is error from download
|
|
//
|
|
// call with lock held
|
|
func (dls *Downloaders) _countErrors(n int64, err error) {
|
|
if err == nil && n != 0 {
|
|
if dls.errorCount != 0 {
|
|
fs.Infof(dls.src, "Resetting error count to 0")
|
|
dls.errorCount = 0
|
|
dls.lastErr = nil
|
|
}
|
|
return
|
|
}
|
|
if err != nil {
|
|
dls.errorCount++
|
|
dls.lastErr = err
|
|
fs.Infof(dls.src, "Error count now %d: %v", dls.errorCount, err)
|
|
}
|
|
}
|
|
|
|
func (dls *Downloaders) countErrors(n int64, err error) {
|
|
dls.mu.Lock()
|
|
dls._countErrors(n, err)
|
|
dls.mu.Unlock()
|
|
}
|
|
|
|
// Make a new downloader, starting it to download r
|
|
//
|
|
// call with lock held
|
|
func (dls *Downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) {
|
|
defer log.Trace(dls.src, "r=%v", r)("err=%v", &err)
|
|
|
|
dl = &downloader{
|
|
kick: make(chan struct{}, 1),
|
|
quit: make(chan struct{}),
|
|
dls: dls,
|
|
start: r.Pos,
|
|
offset: r.Pos,
|
|
maxOffset: r.End(),
|
|
}
|
|
|
|
err = dl.open(dl.offset)
|
|
if err != nil {
|
|
_ = dl.close(err)
|
|
return nil, errors.Wrap(err, "failed to open downloader")
|
|
}
|
|
|
|
dls.dls = append(dls.dls, dl)
|
|
|
|
dl.wg.Add(1)
|
|
go func() {
|
|
defer dl.wg.Done()
|
|
n, err := dl.download()
|
|
_ = dl.close(err)
|
|
dl.dls.countErrors(n, err)
|
|
if err != nil {
|
|
fs.Errorf(dl.dls.src, "Failed to download: %v", err)
|
|
}
|
|
err = dl.dls.kickWaiters()
|
|
if err != nil {
|
|
fs.Errorf(dl.dls.src, "Failed to kick waiters: %v", err)
|
|
}
|
|
}()
|
|
|
|
return dl, nil
|
|
}
|
|
|
|
// _removeClosed() removes any downloaders which are closed.
|
|
//
|
|
// Call with the mutex held
|
|
func (dls *Downloaders) _removeClosed() {
|
|
newDownloaders := dls.dls[:0]
|
|
for _, dl := range dls.dls {
|
|
if !dl.closed() {
|
|
newDownloaders = append(newDownloaders, dl)
|
|
}
|
|
}
|
|
dls.dls = newDownloaders
|
|
}
|
|
|
|
// Close all running downloaders and return any unfulfilled waiters
|
|
// with inErr
|
|
func (dls *Downloaders) Close(inErr error) (err error) {
|
|
dls.mu.Lock()
|
|
defer dls.mu.Unlock()
|
|
dls._removeClosed()
|
|
for _, dl := range dls.dls {
|
|
dls.mu.Unlock()
|
|
closeErr := dl.stopAndClose(inErr)
|
|
dls.mu.Lock()
|
|
if closeErr != nil && err != nil {
|
|
err = closeErr
|
|
}
|
|
}
|
|
dls.cancel()
|
|
dls.wg.Wait()
|
|
dls.dls = nil
|
|
dls._dispatchWaiters()
|
|
dls._closeWaiters(inErr)
|
|
return err
|
|
}
|
|
|
|
// Download the range passed in returning when it has been downloaded
|
|
// with an error from the downloading go routine.
|
|
func (dls *Downloaders) Download(r ranges.Range) (err error) {
|
|
defer log.Trace(dls.src, "r=%+v", r)("err=%v", &err)
|
|
|
|
dls.mu.Lock()
|
|
|
|
errChan := make(chan error)
|
|
waiter := waiter{
|
|
r: r,
|
|
errChan: errChan,
|
|
}
|
|
|
|
err = dls._ensureDownloader(r)
|
|
if err != nil {
|
|
dls.mu.Unlock()
|
|
return err
|
|
}
|
|
|
|
dls.waiters = append(dls.waiters, waiter)
|
|
dls.mu.Unlock()
|
|
return <-errChan
|
|
}
|
|
|
|
// close any waiters with the error passed in
|
|
//
|
|
// call with lock held
|
|
func (dls *Downloaders) _closeWaiters(err error) {
|
|
for _, waiter := range dls.waiters {
|
|
waiter.errChan <- err
|
|
}
|
|
dls.waiters = nil
|
|
}
|
|
|
|
// ensure a downloader is running for the range if required. If one isn't found
|
|
// then it starts it.
|
|
//
|
|
// call with lock held
|
|
func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) {
|
|
// FIXME this window could be a different config var?
|
|
window := int64(fs.Config.BufferSize)
|
|
|
|
// We may be reopening a downloader after a failure here or
|
|
// doing a tentative prefetch so check to see that we haven't
|
|
// read some stuff already.
|
|
//
|
|
// Clip r to stuff which needs downloading
|
|
r = dls.item.FindMissing(r)
|
|
|
|
// If the range is entirely present then we only need to start a
|
|
// dowloader if the window isn't full.
|
|
if r.IsEmpty() {
|
|
// Make a new range which includes the window
|
|
rWindow := r
|
|
if rWindow.Size < window {
|
|
rWindow.Size = window
|
|
}
|
|
// Clip rWindow to stuff which needs downloading
|
|
rWindow = dls.item.FindMissing(rWindow)
|
|
// If rWindow is empty then just return without starting a
|
|
// downloader as there is no data within the window which needs
|
|
// downloading.
|
|
if rWindow.IsEmpty() {
|
|
return nil
|
|
}
|
|
// Start downloading at the start of the unread window
|
|
r.Pos = rWindow.Pos
|
|
// But don't write anything for the moment
|
|
r.Size = 0
|
|
}
|
|
|
|
var dl *downloader
|
|
// Look through downloaders to find one in range
|
|
// If there isn't one then start a new one
|
|
dls._removeClosed()
|
|
for _, dl = range dls.dls {
|
|
start, maxOffset := dl.getRange()
|
|
|
|
// The downloader's offset to offset+window is the gap
|
|
// in which we would like to re-use this
|
|
// downloader. The downloader will never reach before
|
|
// start and maxOffset+windows is too far away - we'd
|
|
// rather start another downloader.
|
|
// fs.Debugf(nil, "r=%v start=%d, maxOffset=%d, found=%v", r, start, maxOffset, r.Pos >= start && r.Pos < maxOffset+window)
|
|
if r.Pos >= start && r.Pos < maxOffset+window {
|
|
// Found downloader which will soon have our data
|
|
dl.setRange(r)
|
|
return nil
|
|
}
|
|
}
|
|
// Downloader not found so start a new one
|
|
dl, err = dls._newDownloader(r)
|
|
if err != nil {
|
|
dls._countErrors(0, err)
|
|
return errors.Wrap(err, "failed to start downloader")
|
|
}
|
|
return err
|
|
}
|
|
|
|
// EnsureDownloader makes sure a downloader is running for the range
|
|
// passed in. If one isn't found then it starts it.
|
|
//
|
|
// It does not wait for the range to be downloaded
|
|
func (dls *Downloaders) EnsureDownloader(r ranges.Range) (err error) {
|
|
dls.mu.Lock()
|
|
defer dls.mu.Unlock()
|
|
return dls._ensureDownloader(r)
|
|
}
|
|
|
|
// _dispatchWaiters() sends any waiters which have completed back to
|
|
// their callers.
|
|
//
|
|
// Call with the mutex held
|
|
func (dls *Downloaders) _dispatchWaiters() {
|
|
if len(dls.waiters) == 0 {
|
|
return
|
|
}
|
|
|
|
newWaiters := dls.waiters[:0]
|
|
for _, waiter := range dls.waiters {
|
|
if dls.item.HasRange(waiter.r) {
|
|
waiter.errChan <- nil
|
|
} else {
|
|
newWaiters = append(newWaiters, waiter)
|
|
}
|
|
}
|
|
dls.waiters = newWaiters
|
|
}
|
|
|
|
// Send any waiters which have completed back to their callers and make sure
|
|
// there is a downloader appropriate for each waiter
|
|
func (dls *Downloaders) kickWaiters() (err error) {
|
|
dls.mu.Lock()
|
|
defer dls.mu.Unlock()
|
|
|
|
dls._dispatchWaiters()
|
|
|
|
if len(dls.waiters) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Make sure each waiter has a downloader
|
|
// This is an O(waiters*Downloaders) algorithm
|
|
// However the number of waiters and the number of downloaders
|
|
// are both expected to be small.
|
|
for _, waiter := range dls.waiters {
|
|
err = dls._ensureDownloader(waiter.r)
|
|
if err != nil {
|
|
// Failures here will be retried by background kicker
|
|
fs.Errorf(dls.src, "Restart download failed: %v", err)
|
|
}
|
|
}
|
|
|
|
if dls.errorCount > maxErrorCount {
|
|
fs.Errorf(dls.src, "Too many errors %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr)
|
|
dls._closeWaiters(dls.lastErr)
|
|
return dls.lastErr
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Write writes len(p) bytes from p to the underlying data stream. It
|
|
// returns the number of bytes written from p (0 <= n <= len(p)) and
|
|
// any error encountered that caused the write to stop early. Write
|
|
// must return a non-nil error if it returns n < len(p). Write must
|
|
// not modify the slice data, even temporarily.
|
|
//
|
|
// Implementations must not retain p.
|
|
func (dl *downloader) Write(p []byte) (n int, err error) {
|
|
defer log.Trace(dl.dls.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err)
|
|
|
|
// Kick the waiters on exit if some characters received
|
|
defer func() {
|
|
if n <= 0 {
|
|
return
|
|
}
|
|
if waitErr := dl.dls.kickWaiters(); waitErr != nil {
|
|
fs.Errorf(dl.dls.src, "vfs cache: download write: failed to kick waiters: %v", waitErr)
|
|
if err == nil {
|
|
err = waitErr
|
|
}
|
|
}
|
|
}()
|
|
|
|
dl.mu.Lock()
|
|
defer dl.mu.Unlock()
|
|
|
|
// Wait here if we have reached maxOffset until
|
|
// - we are quitting
|
|
// - we get kicked
|
|
// - timeout happens
|
|
if dl.offset >= dl.maxOffset {
|
|
var timeout = time.NewTimer(maxDownloaderIdleTime)
|
|
dl.mu.Unlock()
|
|
select {
|
|
case <-dl.quit:
|
|
dl.mu.Lock()
|
|
timeout.Stop()
|
|
case <-dl.kick:
|
|
dl.mu.Lock()
|
|
timeout.Stop()
|
|
case <-timeout.C:
|
|
// stop any future reading
|
|
dl.mu.Lock()
|
|
if !dl.stop {
|
|
fs.Debugf(dl.dls.src, "stopping download thread as it timed out")
|
|
dl._stop()
|
|
}
|
|
}
|
|
}
|
|
|
|
n, skipped, err := dl.dls.item.WriteAtNoOverwrite(p, dl.offset)
|
|
if skipped == n {
|
|
dl.skipped += int64(skipped)
|
|
} else {
|
|
dl.skipped = 0
|
|
}
|
|
dl.offset += int64(n)
|
|
|
|
// Kill this downloader if skipped too many bytes
|
|
if !dl.stop && dl.skipped > maxSkipBytes {
|
|
fs.Debugf(dl.dls.src, "stopping download thread as it has skipped %d bytes", dl.skipped)
|
|
dl._stop()
|
|
}
|
|
|
|
// If running without a async buffer then stop now as
|
|
// StopBuffering has no effect if the Account wasn't buffered
|
|
// so we need to stop manually now rather than wait for the
|
|
// AsyncReader to stop.
|
|
if dl.stop && !dl.in.HasBuffer() {
|
|
err = asyncreader.ErrorStreamAbandoned
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// open the file from offset
|
|
//
|
|
// should be called on a fresh downloader
|
|
func (dl *downloader) open(offset int64) (err error) {
|
|
defer log.Trace(dl.dls.src, "offset=%d", offset)("err=%v", &err)
|
|
dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src)
|
|
|
|
size := dl.dls.src.Size()
|
|
if size < 0 {
|
|
// FIXME should just completely download these
|
|
return errors.New("can't open unknown sized file")
|
|
}
|
|
|
|
// FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag.
|
|
// var rangeOption *fs.RangeOption
|
|
// if offset > 0 {
|
|
// rangeOption = &fs.RangeOption{Start: offset, End: size - 1}
|
|
// }
|
|
// in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, fs.Config.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption)
|
|
|
|
in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit))
|
|
_, err = in0.Seek(offset, 0)
|
|
if err != nil {
|
|
return errors.Wrap(err, "vfs reader: failed to open source file")
|
|
}
|
|
dl.in = dl.tr.Account(in0).WithBuffer() // account and buffer the transfer
|
|
|
|
dl.offset = offset
|
|
|
|
// FIXME set mod time
|
|
// FIXME check checksums
|
|
|
|
return nil
|
|
}
|
|
|
|
// close the downloader
|
|
func (dl *downloader) close(inErr error) (err error) {
|
|
defer log.Trace(dl.dls.src, "inErr=%v", err)("err=%v", &err)
|
|
checkErr := func(e error) {
|
|
if e == nil || errors.Cause(err) == asyncreader.ErrorStreamAbandoned {
|
|
return
|
|
}
|
|
err = e
|
|
}
|
|
dl.mu.Lock()
|
|
if dl.in != nil {
|
|
checkErr(dl.in.Close())
|
|
dl.in = nil
|
|
}
|
|
if dl.tr != nil {
|
|
dl.tr.Done(inErr)
|
|
dl.tr = nil
|
|
}
|
|
dl._closed = true
|
|
dl.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// closed returns true if the downloader has been closed alread
|
|
func (dl *downloader) closed() bool {
|
|
dl.mu.Lock()
|
|
defer dl.mu.Unlock()
|
|
return dl._closed
|
|
}
|
|
|
|
// stop the downloader if running
|
|
//
|
|
// Call with the mutex held
|
|
func (dl *downloader) _stop() {
|
|
defer log.Trace(dl.dls.src, "")("")
|
|
|
|
// exit if have already called _stop
|
|
if dl.stop {
|
|
return
|
|
}
|
|
dl.stop = true
|
|
|
|
// Signal quit now to unblock the downloader
|
|
close(dl.quit)
|
|
|
|
// stop the downloader by stopping the async reader buffering
|
|
// any more input. This causes all the stuff in the async
|
|
// buffer (which can be many MB) to be written to the disk
|
|
// before exiting.
|
|
if dl.in != nil {
|
|
dl.in.StopBuffering()
|
|
}
|
|
}
|
|
|
|
// stop the downloader if running then close it with the error passed in
|
|
func (dl *downloader) stopAndClose(inErr error) (err error) {
|
|
// Stop the downloader by closing its input
|
|
dl.mu.Lock()
|
|
dl._stop()
|
|
dl.mu.Unlock()
|
|
// wait for downloader to finish...
|
|
// do this without mutex as asyncreader
|
|
// calls back into Write() which needs the lock
|
|
dl.wg.Wait()
|
|
return dl.close(inErr)
|
|
}
|
|
|
|
// Start downloading to the local file starting at offset until maxOffset.
|
|
func (dl *downloader) download() (n int64, err error) {
|
|
defer log.Trace(dl.dls.src, "")("err=%v", &err)
|
|
n, err = dl.in.WriteTo(dl)
|
|
if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned {
|
|
return n, errors.Wrap(err, "vfs reader: failed to write to cache file")
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// setRange makes sure the downloader is downloading the range passed in
|
|
func (dl *downloader) setRange(r ranges.Range) {
|
|
dl.mu.Lock()
|
|
maxOffset := r.End()
|
|
if maxOffset > dl.maxOffset {
|
|
dl.maxOffset = maxOffset
|
|
// fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset)
|
|
select {
|
|
case dl.kick <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
dl.mu.Unlock()
|
|
}
|
|
|
|
// get the current range this downloader is working on
|
|
func (dl *downloader) getRange() (start, maxOffset int64) {
|
|
dl.mu.Lock()
|
|
defer dl.mu.Unlock()
|
|
return dl.start, dl.maxOffset
|
|
}
|