// Package writeback keeps track of the files which need to be written // back to storage package writeback import ( "container/heap" "context" "errors" "sync" "sync/atomic" "time" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/vfs/vfscommon" ) const ( maxUploadDelay = 5 * time.Minute // max delay between upload attempts ) // PutFn is the interface that item provides to store the data type PutFn func(context.Context) error // Handle is returned for callers to keep track of writeback items type Handle uint64 // WriteBack keeps track of the items which need to be written back to the disk at some point type WriteBack struct { ctx context.Context mu sync.Mutex items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only lookup map[Handle]*writeBackItem // for getting a *writeBackItem from a Handle - writeBackItems are in here until cancelled opt *vfscommon.Options // VFS options timer *time.Timer // next scheduled time for the uploader expiry time.Time // time the next item expires or IsZero uploads int // number of uploads in progress // read and written with atomic id Handle // id of the last writeBackItem created } // New make a new WriteBack // // cancel the context to stop the background processing func New(ctx context.Context, opt *vfscommon.Options) *WriteBack { wb := &WriteBack{ ctx: ctx, items: writeBackItems{}, lookup: make(map[Handle]*writeBackItem), opt: opt, } heap.Init(&wb.items) return wb } // writeBackItem stores an Item awaiting writeback // // These are stored on the items heap when awaiting transfer but // removed from the items heap when transferring. They remain in the // lookup map until cancelled. // // writeBack.mu must be held to manipulate this type writeBackItem struct { name string // name of the item so we don't have to read it from item id Handle // id of the item index int // index into the priority queue for update expiry time.Time // When this expires we will write it back uploading bool // True if item is being processed by upload() method onHeap bool // true if this item is on the items heap cancel context.CancelFunc // To cancel the upload with done chan struct{} // closed when the cancellation completes putFn PutFn // To write the object data tries int // number of times we have tried to upload delay time.Duration // delay between upload attempts } // A writeBackItems implements a priority queue by implementing // heap.Interface and holds writeBackItems. type writeBackItems []*writeBackItem func (ws writeBackItems) Len() int { return len(ws) } func (ws writeBackItems) Less(i, j int) bool { a, b := ws[i], ws[j] // If times are equal then use ID to disambiguate if a.expiry.Equal(b.expiry) { return a.id < b.id } return a.expiry.Before(b.expiry) } func (ws writeBackItems) Swap(i, j int) { ws[i], ws[j] = ws[j], ws[i] ws[i].index = i ws[j].index = j } func (ws *writeBackItems) Push(x interface{}) { n := len(*ws) item := x.(*writeBackItem) item.index = n *ws = append(*ws, item) } func (ws *writeBackItems) Pop() interface{} { old := *ws n := len(old) item := old[n-1] old[n-1] = nil // avoid memory leak item.index = -1 // for safety *ws = old[0 : n-1] return item } // update modifies the expiry of an Item in the queue. // // call with lock held func (ws *writeBackItems) _update(item *writeBackItem, expiry time.Time) { item.expiry = expiry heap.Fix(ws, item.index) } // return a new expiry time based from now until the WriteBack timeout // // call with lock held func (wb *WriteBack) _newExpiry() time.Time { expiry := time.Now() if wb.opt.WriteBack > 0 { expiry = expiry.Add(wb.opt.WriteBack) } // expiry = expiry.Round(time.Millisecond) return expiry } // make a new writeBackItem // // call with the lock held func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem { wb.SetID(&id) wbItem := &writeBackItem{ name: name, expiry: wb._newExpiry(), delay: wb.opt.WriteBack, id: id, } wb._addItem(wbItem) wb._pushItem(wbItem) return wbItem } // add a writeBackItem to the lookup map // // call with the lock held func (wb *WriteBack) _addItem(wbItem *writeBackItem) { wb.lookup[wbItem.id] = wbItem } // delete a writeBackItem from the lookup map // // call with the lock held func (wb *WriteBack) _delItem(wbItem *writeBackItem) { delete(wb.lookup, wbItem.id) } // pop a writeBackItem from the items heap // // call with the lock held func (wb *WriteBack) _popItem() (wbItem *writeBackItem) { wbItem = heap.Pop(&wb.items).(*writeBackItem) wbItem.onHeap = false return wbItem } // push a writeBackItem onto the items heap // // call with the lock held func (wb *WriteBack) _pushItem(wbItem *writeBackItem) { if !wbItem.onHeap { heap.Push(&wb.items, wbItem) wbItem.onHeap = true } } // remove a writeBackItem from the items heap // // call with the lock held func (wb *WriteBack) _removeItem(wbItem *writeBackItem) { if wbItem.onHeap { heap.Remove(&wb.items, wbItem.index) wbItem.onHeap = false } } // peek the oldest writeBackItem - may be nil // // call with the lock held func (wb *WriteBack) _peekItem() (wbItem *writeBackItem) { if len(wb.items) == 0 { return nil } return wb.items[0] } // stop the timer which runs the expiries func (wb *WriteBack) _stopTimer() { if wb.expiry.IsZero() { return } wb.expiry = time.Time{} // fs.Debugf(nil, "resetTimer STOP") if wb.timer != nil { wb.timer.Stop() wb.timer = nil } } // reset the timer which runs the expiries func (wb *WriteBack) _resetTimer() { wbItem := wb._peekItem() if wbItem == nil { wb._stopTimer() } else { if wb.expiry.Equal(wbItem.expiry) { return } wb.expiry = wbItem.expiry dt := time.Until(wbItem.expiry) if dt < 0 { dt = 0 } // fs.Debugf(nil, "resetTimer dt=%v", dt) if wb.timer != nil { wb.timer.Stop() } wb.timer = time.AfterFunc(dt, func() { wb.processItems(wb.ctx) }) } } // SetID sets the Handle pointed to if it is non zero to the next // handle. func (wb *WriteBack) SetID(pid *Handle) { if *pid == 0 { *pid = Handle(atomic.AddUint64((*uint64)(&wb.id), 1)) } } // Add adds an item to the writeback queue or resets its timer if it // is already there. // // If id is 0 then a new item will always be created and the new // Handle will be returned. // // Use SetID to create Handles in advance of calling Add. // // If modified is false then it it doesn't cancel a pending upload if // there is one as there is no need. func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle { wb.mu.Lock() defer wb.mu.Unlock() wbItem, ok := wb.lookup[id] if !ok { wbItem = wb._newItem(id, name) } else { if wbItem.uploading && modified { // We are uploading already so cancel the upload wb._cancelUpload(wbItem) } // Kick the timer on wb.items._update(wbItem, wb._newExpiry()) } wbItem.putFn = putFn wb._resetTimer() return wbItem.id } // _remove should be called when a file should be removed from the // writeback queue. This cancels a writeback if there is one and // doesn't return the item to the queue. // // This should be called with the lock held func (wb *WriteBack) _remove(id Handle) (found bool) { wbItem, found := wb.lookup[id] if found { fs.Debugf(wbItem.name, "vfs cache: cancelling writeback (uploading %v) %p item %d", wbItem.uploading, wbItem, wbItem.id) if wbItem.uploading { // We are uploading already so cancel the upload wb._cancelUpload(wbItem) } // Remove the item from the heap wb._removeItem(wbItem) // Remove the item from the lookup map wb._delItem(wbItem) } wb._resetTimer() return found } // Remove should be called when a file should be removed from the // writeback queue. This cancels a writeback if there is one and // doesn't return the item to the queue. func (wb *WriteBack) Remove(id Handle) (found bool) { wb.mu.Lock() defer wb.mu.Unlock() return wb._remove(id) } // Rename should be called when a file might be uploading and it gains // a new name. This will cancel the upload and put it back in the // queue. func (wb *WriteBack) Rename(id Handle, name string) { wb.mu.Lock() defer wb.mu.Unlock() wbItem, ok := wb.lookup[id] if !ok { return } if wbItem.uploading { // We are uploading already so cancel the upload wb._cancelUpload(wbItem) } // Check to see if there are any uploads with the existing // name and remove them for existingID, existingItem := range wb.lookup { if existingID != id && existingItem.name == name { wb._remove(existingID) } } wbItem.name = name // Kick the timer on wb.items._update(wbItem, wb._newExpiry()) wb._resetTimer() } // upload the item - called as a goroutine // // uploading will have been incremented here already func (wb *WriteBack) upload(ctx context.Context, wbItem *writeBackItem) { wb.mu.Lock() defer wb.mu.Unlock() putFn := wbItem.putFn wbItem.tries++ fs.Debugf(wbItem.name, "vfs cache: starting upload") wb.mu.Unlock() err := putFn(ctx) wb.mu.Lock() wbItem.cancel() // cancel context to release resources since store done wbItem.uploading = false wb.uploads-- if err != nil { // FIXME should this have a max number of transfer attempts? wbItem.delay *= 2 if wbItem.delay > maxUploadDelay { wbItem.delay = maxUploadDelay } if errors.Is(err, context.Canceled) { fs.Infof(wbItem.name, "vfs cache: upload canceled") // Upload was cancelled so reset timer wbItem.delay = wb.opt.WriteBack } else { fs.Errorf(wbItem.name, "vfs cache: failed to upload try #%d, will retry in %v: %v", wbItem.tries, wbItem.delay, err) } // push the item back on the queue for retry wb._pushItem(wbItem) wb.items._update(wbItem, time.Now().Add(wbItem.delay)) } else { fs.Infof(wbItem.name, "vfs cache: upload succeeded try #%d", wbItem.tries) // show that we are done with the item wb._delItem(wbItem) } wb._resetTimer() close(wbItem.done) } // cancel the upload - the item should be on the heap after this returns // // call with lock held func (wb *WriteBack) _cancelUpload(wbItem *writeBackItem) { if !wbItem.uploading { return } fs.Debugf(wbItem.name, "vfs cache: cancelling upload") if wbItem.cancel != nil { // Cancel the upload - this may or may not be effective wbItem.cancel() // wait for the uploader to finish // // we need to wait without the lock otherwise the // background part will never run. wb.mu.Unlock() <-wbItem.done wb.mu.Lock() } // uploading items are not on the heap so add them back wb._pushItem(wbItem) fs.Debugf(wbItem.name, "vfs cache: cancelled upload") } // cancelUpload cancels the upload of the item if there is one in progress // // it returns true if there was an upload in progress func (wb *WriteBack) cancelUpload(id Handle) bool { wb.mu.Lock() defer wb.mu.Unlock() wbItem, ok := wb.lookup[id] if !ok || !wbItem.uploading { return false } wb._cancelUpload(wbItem) return true } // this uploads as many items as possible func (wb *WriteBack) processItems(ctx context.Context) { wb.mu.Lock() defer wb.mu.Unlock() if wb.ctx.Err() != nil { return } resetTimer := true for wbItem := wb._peekItem(); wbItem != nil && time.Until(wbItem.expiry) <= 0; wbItem = wb._peekItem() { // If reached transfer limit don't restart the timer if wb.uploads >= fs.GetConfig(context.TODO()).Transfers { fs.Debugf(wbItem.name, "vfs cache: delaying writeback as --transfers exceeded") resetTimer = false break } // Pop the item, mark as uploading and start the uploader wbItem = wb._popItem() //fs.Debugf(wbItem.name, "uploading = true %p item %p", wbItem, wbItem.item) wbItem.uploading = true wb.uploads++ newCtx, cancel := context.WithCancel(ctx) wbItem.cancel = cancel wbItem.done = make(chan struct{}) go wb.upload(newCtx, wbItem) } if resetTimer { wb._resetTimer() } else { wb._stopTimer() } } // Stats return the number of uploads in progress and queued func (wb *WriteBack) Stats() (uploadsInProgress, uploadsQueued int) { wb.mu.Lock() defer wb.mu.Unlock() return wb.uploads, len(wb.items) }