// Package downloaders provides utilities for the VFS layer package downloaders import ( "context" "errors" "fmt" "sync" "time" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/asyncreader" "github.com/rclone/rclone/fs/chunkedreader" "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/lib/ranges" "github.com/rclone/rclone/vfs/vfscommon" ) // FIXME implement max downloaders const ( // max time a downloader can be idle before closing itself maxDownloaderIdleTime = 5 * time.Second // max number of bytes a reader should skip over before closing it maxSkipBytes = 1024 * 1024 // time between background kicks of waiters to pick up errors backgroundKickerInterval = 5 * time.Second // maximum number of errors before declaring dead maxErrorCount = 10 // If a downloader is within this range or --buffer-size // whichever is the larger, we will reuse the downloader minWindow = 1024 * 1024 ) // Item is the interface that an item to download must obey type Item interface { // FindMissing adjusts r returning a new ranges.Range which only // contains the range which needs to be downloaded. This could be // empty - check with IsEmpty. It also adjust this to make sure it is // not larger than the file. FindMissing(r ranges.Range) (outr ranges.Range) // HasRange returns true if the current ranges entirely include range HasRange(r ranges.Range) bool // WriteAtNoOverwrite writes b to the file, but will not overwrite // already present ranges. // // This is used by the downloader to write bytes to the file // // It returns n the total bytes processed and skipped the number of // bytes which were processed but not actually written to the file. WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) } // Downloaders is a number of downloader~s and a queue of waiters // waiting for segments to be downloaded to a file. type Downloaders struct { // Write once - no locking required ctx context.Context cancel context.CancelFunc item Item opt *vfscommon.Options src fs.Object // source object remote string wg sync.WaitGroup // Read write mu sync.Mutex dls []*downloader waiters []waiter errorCount int // number of consecutive errors lastErr error // last error received } // waiter is a range we are waiting for and a channel to signal when // the range is found type waiter struct { r ranges.Range errChan chan<- error } // downloader represents a running download for part of a file. type downloader struct { // Write once dls *Downloaders // parent structure quit chan struct{} // close to quit the downloader wg sync.WaitGroup // to keep track of downloader goroutine kick chan struct{} // kick the downloader when needed // Read write mu sync.Mutex start int64 // start offset offset int64 // current offset maxOffset int64 // maximum offset we are reading to tr *accounting.Transfer in *accounting.Account // input we are reading from skipped int64 // number of bytes we have skipped sequentially _closed bool // set to true if downloader is closed stop bool // set to true if we have called _stop() } // New makes a downloader for item func New(item Item, opt *vfscommon.Options, remote string, src fs.Object) (dls *Downloaders) { if src == nil { panic("internal error: newDownloaders called with nil src object") } ctx, cancel := context.WithCancel(context.Background()) dls = &Downloaders{ ctx: ctx, cancel: cancel, item: item, opt: opt, src: src, remote: remote, } dls.wg.Add(1) go func() { defer dls.wg.Done() ticker := time.NewTicker(backgroundKickerInterval) select { case <-ticker.C: err := dls.kickWaiters() if err != nil { fs.Errorf(dls.src, "vfs cache: failed to kick waiters: %v", err) } case <-ctx.Done(): break } ticker.Stop() }() return dls } // Accumulate errors for this downloader // // It should be called with // // n bytes downloaded // err is error from download // // call with lock held func (dls *Downloaders) _countErrors(n int64, err error) { if err == nil && n != 0 { if dls.errorCount != 0 { fs.Infof(dls.src, "vfs cache: downloader: resetting error count to 0") dls.errorCount = 0 dls.lastErr = nil } return } if err != nil { //if err != syscall.ENOSPC { dls.errorCount++ //} dls.lastErr = err fs.Infof(dls.src, "vfs cache: downloader: error count now %d: %v", dls.errorCount, err) } } func (dls *Downloaders) countErrors(n int64, err error) { dls.mu.Lock() dls._countErrors(n, err) dls.mu.Unlock() } // Make a new downloader, starting it to download r // // call with lock held func (dls *Downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) { // defer log.Trace(dls.src, "r=%v", r)("err=%v", &err) dl = &downloader{ kick: make(chan struct{}, 1), quit: make(chan struct{}), dls: dls, start: r.Pos, offset: r.Pos, maxOffset: r.End(), } err = dl.open(dl.offset) if err != nil { _ = dl.close(err) return nil, fmt.Errorf("failed to open downloader: %w", err) } dls.dls = append(dls.dls, dl) dl.wg.Add(1) go func() { defer dl.wg.Done() n, err := dl.download() _ = dl.close(err) dl.dls.countErrors(n, err) if err != nil { fs.Errorf(dl.dls.src, "vfs cache: failed to download: %v", err) } err = dl.dls.kickWaiters() if err != nil { fs.Errorf(dl.dls.src, "vfs cache: failed to kick waiters: %v", err) } }() return dl, nil } // _removeClosed() removes any downloaders which are closed. // // Call with the mutex held func (dls *Downloaders) _removeClosed() { newDownloaders := dls.dls[:0] for _, dl := range dls.dls { if !dl.closed() { newDownloaders = append(newDownloaders, dl) } } dls.dls = newDownloaders } // Close all running downloaders and return any unfulfilled waiters // with inErr func (dls *Downloaders) Close(inErr error) (err error) { dls.mu.Lock() defer dls.mu.Unlock() dls._removeClosed() for _, dl := range dls.dls { dls.mu.Unlock() closeErr := dl.stopAndClose(inErr) dls.mu.Lock() if closeErr != nil && err != nil { err = closeErr } } dls.cancel() // dls may have entered the periodical (every 5 seconds) kickWaiters() call // unlock the mutex to allow it to finish so that we can get its dls.wg.Done() dls.mu.Unlock() dls.wg.Wait() dls.mu.Lock() dls.dls = nil dls._dispatchWaiters() dls._closeWaiters(inErr) return err } // Download the range passed in returning when it has been downloaded // with an error from the downloading go routine. func (dls *Downloaders) Download(r ranges.Range) (err error) { // defer log.Trace(dls.src, "r=%+v", r)("err=%v", &err) dls.mu.Lock() errChan := make(chan error) waiter := waiter{ r: r, errChan: errChan, } err = dls._ensureDownloader(r) if err != nil { dls.mu.Unlock() return err } dls.waiters = append(dls.waiters, waiter) dls.mu.Unlock() return <-errChan } // close any waiters with the error passed in // // call with lock held func (dls *Downloaders) _closeWaiters(err error) { for _, waiter := range dls.waiters { waiter.errChan <- err } dls.waiters = nil } // ensure a downloader is running for the range if required. If one isn't found // then it starts it. // // call with lock held func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) { // defer log.Trace(dls.src, "r=%v", r)("err=%v", &err) // The window includes potentially unread data in the buffer window := int64(fs.GetConfig(context.TODO()).BufferSize) // Increase the read range by the read ahead if set if dls.opt.ReadAhead > 0 { r.Size += int64(dls.opt.ReadAhead) } // We may be reopening a downloader after a failure here or // doing a tentative prefetch so check to see that we haven't // read some stuff already. // // Clip r to stuff which needs downloading r = dls.item.FindMissing(r) // If the range is entirely present then we only need to start a // downloader if the window isn't full. startNew := true if r.IsEmpty() { // Make a new range which includes the window rWindow := r rWindow.Size += window // Clip rWindow to stuff which needs downloading rWindowClipped := dls.item.FindMissing(rWindow) // If rWindowClipped is empty then don't start a new downloader // if there isn't an existing one as there is no data within the // window which needs downloading. We do want to kick an // existing one though to stop it timing out. if rWindowClipped.IsEmpty() { // Don't start any more downloaders startNew = false // Start downloading at the start of the unread window // This likely has been downloaded already but it will // kick the downloader r.Pos = rWindow.End() } else { // Start downloading at the start of the unread window r.Pos = rWindowClipped.Pos } // But don't write anything for the moment r.Size = 0 } // If buffer size is less than minWindow then make it that if window < minWindow { window = minWindow } var dl *downloader // Look through downloaders to find one in range // If there isn't one then start a new one dls._removeClosed() for _, dl = range dls.dls { start, offset := dl.getRange() // The downloader's offset to offset+window is the gap // in which we would like to reuse this // downloader. The downloader will never reach before // start and offset+windows is too far away - we'd // rather start another downloader. // fs.Debugf(nil, "r=%v start=%d, offset=%d, found=%v", r, start, offset, r.Pos >= start && r.Pos < offset+window) if r.Pos >= start && r.Pos < offset+window { // Found downloader which will soon have our data dl.setRange(r) return nil } } if !startNew { return nil } // Size can be 0 here if file shrinks - no need to download if r.Size == 0 { return nil } // Downloader not found so start a new one _, err = dls._newDownloader(r) if err != nil { dls._countErrors(0, err) return fmt.Errorf("failed to start downloader: %w", err) } return err } // EnsureDownloader makes sure a downloader is running for the range // passed in. If one isn't found then it starts it. // // It does not wait for the range to be downloaded func (dls *Downloaders) EnsureDownloader(r ranges.Range) (err error) { dls.mu.Lock() defer dls.mu.Unlock() return dls._ensureDownloader(r) } // _dispatchWaiters() sends any waiters which have completed back to // their callers. // // Call with the mutex held func (dls *Downloaders) _dispatchWaiters() { if len(dls.waiters) == 0 { return } newWaiters := dls.waiters[:0] for _, waiter := range dls.waiters { // Clip the size against the actual size in case it has shrunk r := waiter.r r.Clip(dls.src.Size()) if dls.item.HasRange(r) { waiter.errChan <- nil } else { newWaiters = append(newWaiters, waiter) } } dls.waiters = newWaiters } // Send any waiters which have completed back to their callers and make sure // there is a downloader appropriate for each waiter func (dls *Downloaders) kickWaiters() (err error) { dls.mu.Lock() defer dls.mu.Unlock() dls._dispatchWaiters() if len(dls.waiters) == 0 { return nil } // Make sure each waiter has a downloader // This is an O(waiters*Downloaders) algorithm // However the number of waiters and the number of downloaders // are both expected to be small. for _, waiter := range dls.waiters { err = dls._ensureDownloader(waiter.r) if err != nil { // Failures here will be retried by background kicker fs.Errorf(dls.src, "vfs cache: restart download failed: %v", err) } } if fserrors.IsErrNoSpace(dls.lastErr) { fs.Errorf(dls.src, "vfs cache: cache is out of space %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr) dls._closeWaiters(dls.lastErr) return dls.lastErr } if dls.errorCount > maxErrorCount { fs.Errorf(dls.src, "vfs cache: too many errors %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr) dls._closeWaiters(dls.lastErr) return dls.lastErr } return nil } // Write writes len(p) bytes from p to the underlying data stream. It // returns the number of bytes written from p (0 <= n <= len(p)) and // any error encountered that caused the write to stop early. Write // must return a non-nil error if it returns n < len(p). Write must // not modify the slice data, even temporarily. // // Implementations must not retain p. func (dl *downloader) Write(p []byte) (n int, err error) { // defer log.Trace(dl.dls.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err) // Kick the waiters on exit if some characters received defer func() { if n <= 0 { return } if waitErr := dl.dls.kickWaiters(); waitErr != nil { fs.Errorf(dl.dls.src, "vfs cache: download write: failed to kick waiters: %v", waitErr) if err == nil { err = waitErr } } }() dl.mu.Lock() defer dl.mu.Unlock() // Wait here if we have reached maxOffset until // - we are quitting // - we get kicked // - timeout happens loop: for dl.offset >= dl.maxOffset { var timeout = time.NewTimer(maxDownloaderIdleTime) dl.mu.Unlock() select { case <-dl.quit: dl.mu.Lock() timeout.Stop() break loop case <-dl.kick: dl.mu.Lock() timeout.Stop() case <-timeout.C: // stop any future reading dl.mu.Lock() if !dl.stop { fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it timed out") dl._stop() } break loop } } n, skipped, err := dl.dls.item.WriteAtNoOverwrite(p, dl.offset) if skipped == n { dl.skipped += int64(skipped) } else { dl.skipped = 0 } dl.offset += int64(n) // Kill this downloader if skipped too many bytes if !dl.stop && dl.skipped > maxSkipBytes { fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it has skipped %d bytes", dl.skipped) dl._stop() } // If running without a async buffer then stop now as // StopBuffering has no effect if the Account wasn't buffered // so we need to stop manually now rather than wait for the // AsyncReader to stop. if dl.stop && !dl.in.HasBuffer() { err = asyncreader.ErrorStreamAbandoned } return n, err } // open the file from offset // // should be called on a fresh downloader func (dl *downloader) open(offset int64) (err error) { // defer log.Trace(dl.dls.src, "offset=%d", offset)("err=%v", &err) dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src, nil) size := dl.dls.src.Size() if size < 0 { // FIXME should just completely download these return errors.New("can't open unknown sized file") } // FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag. // var rangeOption *fs.RangeOption // if offset > 0 { // rangeOption = &fs.RangeOption{Start: offset, End: size - 1} // } // in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, ci.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption) in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit)) _, err = in0.Seek(offset, 0) if err != nil { return fmt.Errorf("vfs reader: failed to open source file: %w", err) } dl.in = dl.tr.Account(dl.dls.ctx, in0).WithBuffer() // account and buffer the transfer dl.offset = offset // FIXME set mod time // FIXME check checksums return nil } // close the downloader func (dl *downloader) close(inErr error) (err error) { // defer log.Trace(dl.dls.src, "inErr=%v", err)("err=%v", &err) checkErr := func(e error) { if e == nil || errors.Is(err, asyncreader.ErrorStreamAbandoned) { return } err = e } dl.mu.Lock() if dl.in != nil { checkErr(dl.in.Close()) dl.in = nil } if dl.tr != nil { dl.tr.Done(dl.dls.ctx, inErr) dl.tr = nil } dl._closed = true dl.mu.Unlock() return nil } // closed returns true if the downloader has been closed already func (dl *downloader) closed() bool { dl.mu.Lock() defer dl.mu.Unlock() return dl._closed } // stop the downloader if running // // Call with the mutex held func (dl *downloader) _stop() { // defer log.Trace(dl.dls.src, "")("") // exit if have already called _stop if dl.stop { return } dl.stop = true // Signal quit now to unblock the downloader close(dl.quit) // stop the downloader by stopping the async reader buffering // any more input. This causes all the stuff in the async // buffer (which can be many MiB) to be written to the disk // before exiting. if dl.in != nil { dl.in.StopBuffering() } } // stop the downloader if running then close it with the error passed in func (dl *downloader) stopAndClose(inErr error) (err error) { // Stop the downloader by closing its input dl.mu.Lock() dl._stop() dl.mu.Unlock() // wait for downloader to finish... // do this without mutex as asyncreader // calls back into Write() which needs the lock dl.wg.Wait() return dl.close(inErr) } // Start downloading to the local file starting at offset until maxOffset. func (dl *downloader) download() (n int64, err error) { // defer log.Trace(dl.dls.src, "")("err=%v", &err) n, err = dl.in.WriteTo(dl) if err != nil && !errors.Is(err, asyncreader.ErrorStreamAbandoned) { return n, fmt.Errorf("vfs reader: failed to write to cache file: %w", err) } return n, nil } // setRange makes sure the downloader is downloading the range passed in func (dl *downloader) setRange(r ranges.Range) { // defer log.Trace(dl.dls.src, "r=%v", r)("") dl.mu.Lock() maxOffset := r.End() if maxOffset > dl.maxOffset { dl.maxOffset = maxOffset } dl.mu.Unlock() // fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset) select { case dl.kick <- struct{}{}: default: } } // get the current range this downloader is working on func (dl *downloader) getRange() (start, offset int64) { dl.mu.Lock() defer dl.mu.Unlock() return dl.start, dl.offset }