diff --git a/vfs/vfscache/downloader.go b/vfs/vfscache/downloader.go index b47b3f86b..7861f9e34 100644 --- a/vfs/vfscache/downloader.go +++ b/vfs/vfscache/downloader.go @@ -23,6 +23,8 @@ const ( maxSkipBytes = 1024 * 1024 // time between background kicks of waiters to pick up errors backgroundKickerInterval = 5 * time.Second + // maximum number of errors before declaring dead + maxErrorCount = 10 ) // downloaders is a number of downloader~s and a queue of waiters @@ -39,9 +41,11 @@ type downloaders struct { wg sync.WaitGroup // Read write - mu sync.Mutex - dls []*downloader - waiters []waiter + mu sync.Mutex + dls []*downloader + waiters []waiter + errorCount int // number of consecutive errors + lastErr error // last error received } // waiter is a range we are waiting for and a channel to signal when @@ -104,6 +108,36 @@ func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls return dls } +// Accumulate errors for this downloader +// +// It should be called with +// +// n bytes downloaded +// err is error from download +// +// call with lock held +func (dls *downloaders) _countErrors(n int64, err error) { + if err == nil && n != 0 { + if dls.errorCount != 0 { + fs.Infof(dls.src, "Resetting error count to 0") + dls.errorCount = 0 + dls.lastErr = nil + } + return + } + if err != nil { + dls.errorCount++ + dls.lastErr = err + fs.Infof(dls.src, "Error count now %d: %v", dls.errorCount, err) + } +} + +func (dls *downloaders) countErrors(n int64, err error) { + dls.mu.Lock() + dls._countErrors(n, err) + dls.mu.Unlock() +} + // Make a new downloader, starting it to download r // // call with lock held @@ -130,8 +164,9 @@ func (dls *downloaders) _newDownloader(r ranges.Range) (dl *downloader, err erro dl.wg.Add(1) go func() { defer dl.wg.Done() - err := dl.download() + n, err := dl.download() _ = dl.close(err) + dl.dls.countErrors(n, err) if err != nil { fs.Errorf(dl.dls.src, "Failed to download: %v", err) } @@ -271,6 +306,7 @@ func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) { // Downloader not found so start a new one dl, err = dls._newDownloader(r) if err != nil { + dls._countErrors(0, err) return errors.Wrap(err, "failed to start downloader") } return err @@ -328,8 +364,10 @@ func (dls *downloaders) kickWaiters() (err error) { } } - if true { - + if dls.errorCount > maxErrorCount { + fs.Errorf(dls.src, "Too many errors %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr) + dls._closeWaiters(dls.lastErr) + return dls.lastErr } return nil @@ -504,13 +542,13 @@ func (dl *downloader) stopAndClose(inErr error) (err error) { } // Start downloading to the local file starting at offset until maxOffset. -func (dl *downloader) download() (err error) { +func (dl *downloader) download() (n int64, err error) { defer log.Trace(dl.dls.src, "")("err=%v", &err) - _, err = dl.in.WriteTo(dl) + n, err = dl.in.WriteTo(dl) if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned { - return errors.Wrap(err, "vfs reader: failed to write to cache file") + return n, errors.Wrap(err, "vfs reader: failed to write to cache file") } - return nil + return n, nil } // setRange makes sure the downloader is downloading the range passed in