// +build !plan9 package cache import ( "context" "fmt" "io" "math" "os" "os/signal" "path" "path/filepath" "strconv" "strings" "sync" "syscall" "time" "github.com/ncw/rclone/backend/crypt" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/config" "github.com/ncw/rclone/fs/config/configmap" "github.com/ncw/rclone/fs/config/configstruct" "github.com/ncw/rclone/fs/config/obscure" "github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/rc" "github.com/ncw/rclone/fs/walk" "github.com/ncw/rclone/lib/atexit" "github.com/pkg/errors" "golang.org/x/time/rate" ) const ( // DefCacheChunkSize is the default value for chunk size DefCacheChunkSize = fs.SizeSuffix(5 * 1024 * 1024) // DefCacheTotalChunkSize is the default value for the maximum size of stored chunks DefCacheTotalChunkSize = fs.SizeSuffix(10 * 1024 * 1024 * 1024) // DefCacheChunkCleanInterval is the interval at which chunks are cleaned DefCacheChunkCleanInterval = fs.Duration(time.Minute) // DefCacheInfoAge is the default value for object info age DefCacheInfoAge = fs.Duration(6 * time.Hour) // DefCacheReadRetries is the default value for read retries DefCacheReadRetries = 10 // DefCacheTotalWorkers is how many workers run in parallel to download chunks DefCacheTotalWorkers = 4 // DefCacheChunkNoMemory will enable or disable in-memory storage for chunks DefCacheChunkNoMemory = false // DefCacheRps limits the number of requests per second to the source FS DefCacheRps = -1 // DefCacheWrites will cache file data on writes through the cache DefCacheWrites = false // DefCacheTmpWaitTime says how long should files be stored in local cache before being uploaded DefCacheTmpWaitTime = fs.Duration(15 * time.Second) // DefCacheDbWaitTime defines how long the cache backend should wait for the DB to be available DefCacheDbWaitTime = fs.Duration(1 * time.Second) ) // Register with Fs func init() { fs.Register(&fs.RegInfo{ Name: "cache", Description: "Cache a remote", NewFs: NewFs, Options: []fs.Option{{ Name: "remote", Help: "Remote to cache.\nNormally should contain a ':' and a path, eg \"myremote:path/to/dir\",\n\"myremote:bucket\" or maybe \"myremote:\" (not recommended).", Required: true, }, { Name: "plex_url", Help: "The URL of the Plex server", }, { Name: "plex_username", Help: "The username of the Plex user", }, { Name: "plex_password", Help: "The password of the Plex user", IsPassword: true, }, { Name: "plex_token", Help: "The plex token for authentication - auto set normally", Hide: fs.OptionHideBoth, Advanced: true, }, { Name: "plex_insecure", Help: "Skip all certificate verifications when connecting to the Plex server", Advanced: true, }, { Name: "chunk_size", Help: "The size of a chunk. Lower value good for slow connections but can affect seamless reading.", Default: DefCacheChunkSize, Examples: []fs.OptionExample{{ Value: "1m", Help: "1MB", }, { Value: "5M", Help: "5 MB", }, { Value: "10M", Help: "10 MB", }}, }, { Name: "info_age", Help: "How much time should object info (file size, file hashes etc) be stored in cache.\nUse a very high value if you don't plan on changing the source FS from outside the cache.\nAccepted units are: \"s\", \"m\", \"h\".", Default: DefCacheInfoAge, Examples: []fs.OptionExample{{ Value: "1h", Help: "1 hour", }, { Value: "24h", Help: "24 hours", }, { Value: "48h", Help: "48 hours", }}, }, { Name: "chunk_total_size", Help: "The maximum size of stored chunks. When the storage grows beyond this size, the oldest chunks will be deleted.", Default: DefCacheTotalChunkSize, Examples: []fs.OptionExample{{ Value: "500M", Help: "500 MB", }, { Value: "1G", Help: "1 GB", }, { Value: "10G", Help: "10 GB", }}, }, { Name: "db_path", Default: filepath.Join(config.CacheDir, "cache-backend"), Help: "Directory to cache DB", Advanced: true, }, { Name: "chunk_path", Default: filepath.Join(config.CacheDir, "cache-backend"), Help: "Directory to cache chunk files", Advanced: true, }, { Name: "db_purge", Default: false, Help: "Purge the cache DB before", Hide: fs.OptionHideConfigurator, Advanced: true, }, { Name: "chunk_clean_interval", Default: DefCacheChunkCleanInterval, Help: "Interval at which chunk cleanup runs", Advanced: true, }, { Name: "read_retries", Default: DefCacheReadRetries, Help: "How many times to retry a read from a cache storage", Advanced: true, }, { Name: "workers", Default: DefCacheTotalWorkers, Help: "How many workers should run in parallel to download chunks", Advanced: true, }, { Name: "chunk_no_memory", Default: DefCacheChunkNoMemory, Help: "Disable the in-memory cache for storing chunks during streaming", Advanced: true, }, { Name: "rps", Default: int(DefCacheRps), Help: "Limits the number of requests per second to the source FS. -1 disables the rate limiter", Advanced: true, }, { Name: "writes", Default: DefCacheWrites, Help: "Will cache file data on writes through the FS", Advanced: true, }, { Name: "tmp_upload_path", Default: "", Help: "Directory to keep temporary files until they are uploaded to the cloud storage", Advanced: true, }, { Name: "tmp_wait_time", Default: DefCacheTmpWaitTime, Help: "How long should files be stored in local cache before being uploaded", Advanced: true, }, { Name: "db_wait_time", Default: DefCacheDbWaitTime, Help: "How long to wait for the DB to be available - 0 is unlimited", Advanced: true, }}, }) } // Options defines the configuration for this backend type Options struct { Remote string `config:"remote"` PlexURL string `config:"plex_url"` PlexUsername string `config:"plex_username"` PlexPassword string `config:"plex_password"` PlexToken string `config:"plex_token"` PlexInsecure bool `config:"plex_insecure"` ChunkSize fs.SizeSuffix `config:"chunk_size"` InfoAge fs.Duration `config:"info_age"` ChunkTotalSize fs.SizeSuffix `config:"chunk_total_size"` DbPath string `config:"db_path"` ChunkPath string `config:"chunk_path"` DbPurge bool `config:"db_purge"` ChunkCleanInterval fs.Duration `config:"chunk_clean_interval"` ReadRetries int `config:"read_retries"` TotalWorkers int `config:"workers"` ChunkNoMemory bool `config:"chunk_no_memory"` Rps int `config:"rps"` StoreWrites bool `config:"writes"` TempWritePath string `config:"tmp_upload_path"` TempWaitTime fs.Duration `config:"tmp_wait_time"` DbWaitTime fs.Duration `config:"db_wait_time"` } // Fs represents a wrapped fs.Fs type Fs struct { fs.Fs wrapper fs.Fs name string root string opt Options // parsed options features *fs.Features // optional features cache *Persistent tempFs fs.Fs lastChunkCleanup time.Time cleanupMu sync.Mutex rateLimiter *rate.Limiter plexConnector *plexConnector backgroundRunner *backgroundWriter cleanupChan chan bool parentsForgetFn []func(string, fs.EntryType) notifiedRemotes map[string]bool notifiedMu sync.Mutex parentsForgetMu sync.Mutex } // parseRootPath returns a cleaned root path and a nil error or "" and an error when the path is invalid func parseRootPath(path string) (string, error) { return strings.Trim(path, "/"), nil } // NewFs constructs a Fs from the path, container:path func NewFs(name, rootPath string, m configmap.Mapper) (fs.Fs, error) { // Parse config into Options struct opt := new(Options) err := configstruct.Set(m, opt) if err != nil { return nil, err } if opt.ChunkTotalSize < opt.ChunkSize*fs.SizeSuffix(opt.TotalWorkers) { return nil, errors.Errorf("don't set cache-chunk-total-size(%v) less than cache-chunk-size(%v) * cache-workers(%v)", opt.ChunkTotalSize, opt.ChunkSize, opt.TotalWorkers) } if strings.HasPrefix(opt.Remote, name+":") { return nil, errors.New("can't point cache remote at itself - check the value of the remote setting") } rpath, err := parseRootPath(rootPath) if err != nil { return nil, errors.Wrapf(err, "failed to clean root path %q", rootPath) } wInfo, wName, wPath, wConfig, err := fs.ConfigFs(opt.Remote) if err != nil { return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", opt.Remote) } remotePath := path.Join(wPath, rootPath) wrappedFs, wrapErr := wInfo.NewFs(wName, remotePath, wConfig) if wrapErr != nil && wrapErr != fs.ErrorIsFile { return nil, errors.Wrapf(wrapErr, "failed to make remote %s:%s to wrap", wName, remotePath) } var fsErr error fs.Debugf(name, "wrapped %v:%v at root %v", wrappedFs.Name(), wrappedFs.Root(), rpath) if wrapErr == fs.ErrorIsFile { fsErr = fs.ErrorIsFile rpath = cleanPath(path.Dir(rpath)) } // configure cache backend if opt.DbPurge { fs.Debugf(name, "Purging the DB") } f := &Fs{ Fs: wrappedFs, name: name, root: rpath, opt: *opt, lastChunkCleanup: time.Now().Truncate(time.Hour * 24 * 30), cleanupChan: make(chan bool, 1), notifiedRemotes: make(map[string]bool), } f.rateLimiter = rate.NewLimiter(rate.Limit(float64(opt.Rps)), opt.TotalWorkers) f.plexConnector = &plexConnector{} if opt.PlexURL != "" { if opt.PlexToken != "" { f.plexConnector, err = newPlexConnectorWithToken(f, opt.PlexURL, opt.PlexToken, opt.PlexInsecure) if err != nil { return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", opt.PlexURL) } } else { if opt.PlexPassword != "" && opt.PlexUsername != "" { decPass, err := obscure.Reveal(opt.PlexPassword) if err != nil { decPass = opt.PlexPassword } f.plexConnector, err = newPlexConnector(f, opt.PlexURL, opt.PlexUsername, decPass, opt.PlexInsecure, func(token string) { m.Set("plex_token", token) }) if err != nil { return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", opt.PlexURL) } } } } dbPath := f.opt.DbPath chunkPath := f.opt.ChunkPath // if the dbPath is non default but the chunk path is default, we overwrite the last to follow the same one as dbPath if dbPath != filepath.Join(config.CacheDir, "cache-backend") && chunkPath == filepath.Join(config.CacheDir, "cache-backend") { chunkPath = dbPath } if filepath.Ext(dbPath) != "" { dbPath = filepath.Dir(dbPath) } if filepath.Ext(chunkPath) != "" { chunkPath = filepath.Dir(chunkPath) } err = os.MkdirAll(dbPath, os.ModePerm) if err != nil { return nil, errors.Wrapf(err, "failed to create cache directory %v", dbPath) } err = os.MkdirAll(chunkPath, os.ModePerm) if err != nil { return nil, errors.Wrapf(err, "failed to create cache directory %v", chunkPath) } dbPath = filepath.Join(dbPath, name+".db") chunkPath = filepath.Join(chunkPath, name) fs.Infof(name, "Cache DB path: %v", dbPath) fs.Infof(name, "Cache chunk path: %v", chunkPath) f.cache, err = GetPersistent(dbPath, chunkPath, &Features{ PurgeDb: opt.DbPurge, DbWaitTime: time.Duration(opt.DbWaitTime), }) if err != nil { return nil, errors.Wrapf(err, "failed to start cache db") } // Trap SIGINT and SIGTERM to close the DB handle gracefully c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP) atexit.Register(func() { if opt.PlexURL != "" { f.plexConnector.closeWebsocket() } f.StopBackgroundRunners() }) go func() { for { s := <-c if s == syscall.SIGHUP { fs.Infof(f, "Clearing cache from signal") f.DirCacheFlush() } } }() fs.Infof(name, "Chunk Memory: %v", !f.opt.ChunkNoMemory) fs.Infof(name, "Chunk Size: %v", f.opt.ChunkSize) fs.Infof(name, "Chunk Total Size: %v", f.opt.ChunkTotalSize) fs.Infof(name, "Chunk Clean Interval: %v", f.opt.ChunkCleanInterval) fs.Infof(name, "Workers: %v", f.opt.TotalWorkers) fs.Infof(name, "File Age: %v", f.opt.InfoAge) if !f.opt.StoreWrites { fs.Infof(name, "Cache Writes: enabled") } if f.opt.TempWritePath != "" { err = os.MkdirAll(f.opt.TempWritePath, os.ModePerm) if err != nil { return nil, errors.Wrapf(err, "failed to create cache directory %v", f.opt.TempWritePath) } f.opt.TempWritePath = filepath.ToSlash(f.opt.TempWritePath) f.tempFs, err = fs.NewFs(f.opt.TempWritePath) if err != nil { return nil, errors.Wrapf(err, "failed to create temp fs: %v", err) } fs.Infof(name, "Upload Temp Rest Time: %v", f.opt.TempWaitTime) fs.Infof(name, "Upload Temp FS: %v", f.opt.TempWritePath) f.backgroundRunner, _ = initBackgroundUploader(f) go f.backgroundRunner.run() } go func() { for { time.Sleep(time.Duration(f.opt.ChunkCleanInterval)) select { case <-f.cleanupChan: fs.Infof(f, "stopping cleanup") return default: fs.Debugf(f, "starting cleanup") f.CleanUpCache(false) } } }() if doChangeNotify := wrappedFs.Features().ChangeNotify; doChangeNotify != nil { pollInterval := make(chan time.Duration, 1) pollInterval <- time.Duration(f.opt.ChunkCleanInterval) doChangeNotify(f.receiveChangeNotify, pollInterval) } f.features = (&fs.Features{ CanHaveEmptyDirectories: true, DuplicateFiles: false, // storage doesn't permit this }).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs) // override only those features that use a temp fs and it doesn't support them //f.features.ChangeNotify = f.ChangeNotify if f.opt.TempWritePath != "" { if f.tempFs.Features().Copy == nil { f.features.Copy = nil } if f.tempFs.Features().Move == nil { f.features.Move = nil } if f.tempFs.Features().Move == nil { f.features.Move = nil } if f.tempFs.Features().DirMove == nil { f.features.DirMove = nil } if f.tempFs.Features().MergeDirs == nil { f.features.MergeDirs = nil } } // even if the wrapped fs doesn't support it, we still want it f.features.DirCacheFlush = f.DirCacheFlush rc.Add(rc.Call{ Path: "cache/expire", Fn: f.httpExpireRemote, Title: "Purge a remote from cache", Help: ` Purge a remote from the cache backend. Supports either a directory or a file. Params: - remote = path to remote (required) - withData = true/false to delete cached data (chunks) as well (optional) Eg rclone rc cache/expire remote=path/to/sub/folder/ rclone rc cache/expire remote=/ withData=true `, }) rc.Add(rc.Call{ Path: "cache/stats", Fn: f.httpStats, Title: "Get cache stats", Help: ` Show statistics for the cache remote. `, }) rc.Add(rc.Call{ Path: "cache/fetch", Fn: f.rcFetch, Title: "Fetch file chunks", Help: ` Ensure the specified file chunks are cached on disk. The chunks= parameter specifies the file chunks to check. It takes a comma separated list of array slice indices. The slice indices are similar to Python slices: start[:end] start is the 0 based chunk number from the beginning of the file to fetch inclusive. end is 0 based chunk number from the beginning of the file to fetch exclisive. Both values can be negative, in which case they count from the back of the file. The value "-5:" represents the last 5 chunks of a file. Some valid examples are: ":5,-5:" -> the first and last five chunks "0,-2" -> the first and the second last chunk "0:10" -> the first ten chunks Any parameter with a key that starts with "file" can be used to specify files to fetch, eg rclone rc cache/fetch chunks=0 file=hello file2=home/goodbye File names will automatically be encrypted when the a crypt remote is used on top of the cache. `, }) return f, fsErr } func (f *Fs) httpStats(in rc.Params) (out rc.Params, err error) { out = make(rc.Params) m, err := f.Stats() if err != nil { return out, errors.Errorf("error while getting cache stats") } out["status"] = "ok" out["stats"] = m return out, nil } func (f *Fs) unwrapRemote(remote string) string { remote = cleanPath(remote) if remote != "" { // if it's wrapped by crypt we need to check what format we got if cryptFs, yes := f.isWrappedByCrypt(); yes { _, err := cryptFs.DecryptFileName(remote) // if it failed to decrypt then it is a decrypted format and we need to encrypt it if err != nil { return cryptFs.EncryptFileName(remote) } // else it's an encrypted format and we can use it as it is } } return remote } func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) { out = make(rc.Params) remoteInt, ok := in["remote"] if !ok { return out, errors.Errorf("remote is needed") } remote := remoteInt.(string) withData := false _, ok = in["withData"] if ok { withData = true } remote = f.unwrapRemote(remote) if !f.cache.HasEntry(path.Join(f.Root(), remote)) { return out, errors.Errorf("%s doesn't exist in cache", remote) } co := NewObject(f, remote) err = f.cache.GetObject(co) if err != nil { // it could be a dir cd := NewDirectory(f, remote) err := f.cache.ExpireDir(cd) if err != nil { return out, errors.WithMessage(err, "error expiring directory") } // notify vfs too f.notifyChangeUpstream(cd.Remote(), fs.EntryDirectory) out["status"] = "ok" out["message"] = fmt.Sprintf("cached directory cleared: %v", remote) return out, nil } // expire the entry err = f.cache.ExpireObject(co, withData) if err != nil { return out, errors.WithMessage(err, "error expiring file") } // notify vfs too f.notifyChangeUpstream(co.Remote(), fs.EntryObject) out["status"] = "ok" out["message"] = fmt.Sprintf("cached file cleared: %v", remote) return out, nil } func (f *Fs) rcFetch(in rc.Params) (rc.Params, error) { type chunkRange struct { start, end int64 } parseChunks := func(ranges string) (crs []chunkRange, err error) { for _, part := range strings.Split(ranges, ",") { var start, end int64 = 0, math.MaxInt64 switch ints := strings.Split(part, ":"); len(ints) { case 1: start, err = strconv.ParseInt(ints[0], 10, 64) if err != nil { return nil, errors.Errorf("invalid range: %q", part) } end = start + 1 case 2: if ints[0] != "" { start, err = strconv.ParseInt(ints[0], 10, 64) if err != nil { return nil, errors.Errorf("invalid range: %q", part) } } if ints[1] != "" { end, err = strconv.ParseInt(ints[1], 10, 64) if err != nil { return nil, errors.Errorf("invalid range: %q", part) } } default: return nil, errors.Errorf("invalid range: %q", part) } crs = append(crs, chunkRange{start: start, end: end}) } return } walkChunkRange := func(cr chunkRange, size int64, cb func(chunk int64)) { if size <= 0 { return } chunks := (size-1)/f.ChunkSize() + 1 start, end := cr.start, cr.end if start < 0 { start += chunks } if end <= 0 { end += chunks } if end <= start { return } switch { case start < 0: start = 0 case start >= chunks: return } switch { case end <= start: end = start + 1 case end >= chunks: end = chunks } for i := start; i < end; i++ { cb(i) } } walkChunkRanges := func(crs []chunkRange, size int64, cb func(chunk int64)) { for _, cr := range crs { walkChunkRange(cr, size, cb) } } v, ok := in["chunks"] if !ok { return nil, errors.New("missing chunks parameter") } s, ok := v.(string) if !ok { return nil, errors.New("invalid chunks parameter") } delete(in, "chunks") crs, err := parseChunks(s) if err != nil { return nil, errors.Wrap(err, "invalid chunks parameter") } var files [][2]string for k, v := range in { if !strings.HasPrefix(k, "file") { return nil, errors.Errorf("invalid parameter %s=%s", k, v) } switch v := v.(type) { case string: files = append(files, [2]string{v, f.unwrapRemote(v)}) default: return nil, errors.Errorf("invalid parameter %s=%s", k, v) } } type fileStatus struct { Error string FetchedChunks int } fetchedChunks := make(map[string]fileStatus, len(files)) for _, pair := range files { file, remote := pair[0], pair[1] var status fileStatus o, err := f.NewObject(remote) if err != nil { fetchedChunks[file] = fileStatus{Error: err.Error()} continue } co := o.(*Object) err = co.refreshFromSource(true) if err != nil { fetchedChunks[file] = fileStatus{Error: err.Error()} continue } handle := NewObjectHandle(co, f) handle.UseMemory = false handle.scaleWorkers(1) walkChunkRanges(crs, co.Size(), func(chunk int64) { _, err := handle.getChunk(chunk * f.ChunkSize()) if err != nil { if status.Error == "" { status.Error = err.Error() } } else { status.FetchedChunks++ } }) fetchedChunks[file] = status } return rc.Params{"status": fetchedChunks}, nil } // receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files func (f *Fs) receiveChangeNotify(forgetPath string, entryType fs.EntryType) { if crypt, yes := f.isWrappedByCrypt(); yes { decryptedPath, err := crypt.DecryptFileName(forgetPath) if err == nil { fs.Infof(decryptedPath, "received cache expiry notification") } else { fs.Infof(forgetPath, "received cache expiry notification") } } else { fs.Infof(forgetPath, "received cache expiry notification") } // notify upstreams too (vfs) f.notifyChangeUpstream(forgetPath, entryType) var cd *Directory if entryType == fs.EntryObject { co := NewObject(f, forgetPath) err := f.cache.GetObject(co) if err != nil { fs.Debugf(f, "got change notification for non cached entry %v", co) } err = f.cache.ExpireObject(co, true) if err != nil { fs.Debugf(forgetPath, "notify: error expiring '%v': %v", co, err) } cd = NewDirectory(f, cleanPath(path.Dir(co.Remote()))) } else { cd = NewDirectory(f, forgetPath) } // we expire the dir err := f.cache.ExpireDir(cd) if err != nil { fs.Debugf(forgetPath, "notify: error expiring '%v': %v", cd, err) } else { fs.Debugf(forgetPath, "notify: expired '%v'", cd) } f.notifiedMu.Lock() defer f.notifiedMu.Unlock() f.notifiedRemotes[forgetPath] = true f.notifiedRemotes[cd.Remote()] = true } // notifyChangeUpstreamIfNeeded will check if the wrapped remote doesn't notify on changes // or if we use a temp fs func (f *Fs) notifyChangeUpstreamIfNeeded(remote string, entryType fs.EntryType) { if f.Fs.Features().ChangeNotify == nil || f.opt.TempWritePath != "" { f.notifyChangeUpstream(remote, entryType) } } // notifyChangeUpstream will loop through all the upstreams and notify // of the provided remote (should be only a dir) func (f *Fs) notifyChangeUpstream(remote string, entryType fs.EntryType) { f.parentsForgetMu.Lock() defer f.parentsForgetMu.Unlock() if len(f.parentsForgetFn) > 0 { for _, fn := range f.parentsForgetFn { fn(remote, entryType) } } } // ChangeNotify can subsribe multiple callers // this is coupled with the wrapped fs ChangeNotify (if it supports it) // and also notifies other caches (i.e VFS) to clear out whenever something changes func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval <-chan time.Duration) { f.parentsForgetMu.Lock() defer f.parentsForgetMu.Unlock() fs.Debugf(f, "subscribing to ChangeNotify") f.parentsForgetFn = append(f.parentsForgetFn, notifyFunc) go func() { for range pollInterval { } }() } // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name } // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { return f.root } // Features returns the optional features of this Fs func (f *Fs) Features() *fs.Features { return f.features } // String returns a description of the FS func (f *Fs) String() string { return fmt.Sprintf("Cache remote %s:%s", f.name, f.root) } // ChunkSize returns the configured chunk size func (f *Fs) ChunkSize() int64 { return int64(f.opt.ChunkSize) } // InfoAge returns the configured file age func (f *Fs) InfoAge() time.Duration { return time.Duration(f.opt.InfoAge) } // TempUploadWaitTime returns the configured temp file upload wait time func (f *Fs) TempUploadWaitTime() time.Duration { return time.Duration(f.opt.TempWaitTime) } // NewObject finds the Object at remote. func (f *Fs) NewObject(remote string) (fs.Object, error) { var err error fs.Debugf(f, "new object '%s'", remote) co := NewObject(f, remote) // search for entry in cache and validate it err = f.cache.GetObject(co) if err != nil { fs.Debugf(remote, "find: error: %v", err) } else if time.Now().After(co.CacheTs.Add(time.Duration(f.opt.InfoAge))) { fs.Debugf(co, "find: cold object: %+v", co) } else { fs.Debugf(co, "find: warm object: %v, expiring on: %v", co, co.CacheTs.Add(time.Duration(f.opt.InfoAge))) return co, nil } // search for entry in source or temp fs var obj fs.Object if f.opt.TempWritePath != "" { obj, err = f.tempFs.NewObject(remote) // not found in temp fs if err != nil { fs.Debugf(remote, "find: not found in local cache fs") obj, err = f.Fs.NewObject(remote) } else { fs.Debugf(obj, "find: found in local cache fs") } } else { obj, err = f.Fs.NewObject(remote) } // not found in either fs if err != nil { fs.Debugf(obj, "find failed: not found in either local or remote fs") return nil, err } // cache the new entry co = ObjectFromOriginal(f, obj).persist() fs.Debugf(co, "find: cached object") return co, nil } // List the objects and directories in dir into entries func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { fs.Debugf(f, "list '%s'", dir) cd := ShallowDirectory(f, dir) // search for cached dir entries and validate them entries, err = f.cache.GetDirEntries(cd) if err != nil { fs.Debugf(dir, "list: error: %v", err) } else if time.Now().After(cd.CacheTs.Add(time.Duration(f.opt.InfoAge))) { fs.Debugf(dir, "list: cold listing: %v", cd.CacheTs) } else if len(entries) == 0 { // TODO: read empty dirs from source? fs.Debugf(dir, "list: empty listing") } else { fs.Debugf(dir, "list: warm %v from cache for: %v, expiring on: %v", len(entries), cd.abs(), cd.CacheTs.Add(time.Duration(f.opt.InfoAge))) fs.Debugf(dir, "list: cached entries: %v", entries) return entries, nil } // FIXME need to clean existing cached listing // we first search any temporary files stored locally var cachedEntries fs.DirEntries if f.opt.TempWritePath != "" { queuedEntries, err := f.cache.searchPendingUploadFromDir(cd.abs()) if err != nil { fs.Errorf(dir, "list: error getting pending uploads: %v", err) } else { fs.Debugf(dir, "list: read %v from temp fs", len(queuedEntries)) fs.Debugf(dir, "list: temp fs entries: %v", queuedEntries) for _, queuedRemote := range queuedEntries { queuedEntry, err := f.tempFs.NewObject(f.cleanRootFromPath(queuedRemote)) if err != nil { fs.Debugf(dir, "list: temp file not found in local fs: %v", err) continue } co := ObjectFromOriginal(f, queuedEntry).persist() fs.Debugf(co, "list: cached temp object") cachedEntries = append(cachedEntries, co) } } } // search from the source entries, err = f.Fs.List(dir) if err != nil { return nil, err } fs.Debugf(dir, "list: read %v from source", len(entries)) fs.Debugf(dir, "list: source entries: %v", entries) // and then iterate over the ones from source (temp Objects will override source ones) var batchDirectories []*Directory for _, entry := range entries { switch o := entry.(type) { case fs.Object: // skip over temporary objects (might be uploading) found := false for _, t := range cachedEntries { if t.Remote() == o.Remote() { found = true break } } if found { continue } co := ObjectFromOriginal(f, o).persist() cachedEntries = append(cachedEntries, co) fs.Debugf(dir, "list: cached object: %v", co) case fs.Directory: cdd := DirectoryFromOriginal(f, o) // check if the dir isn't expired and add it in cache if it isn't if cdd2, err := f.cache.GetDir(cdd.abs()); err != nil || time.Now().Before(cdd2.CacheTs.Add(time.Duration(f.opt.InfoAge))) { batchDirectories = append(batchDirectories, cdd) } cachedEntries = append(cachedEntries, cdd) default: fs.Debugf(entry, "list: Unknown object type %T", entry) } } err = f.cache.AddBatchDir(batchDirectories) if err != nil { fs.Errorf(dir, "list: error caching directories from listing %v", dir) } else { fs.Debugf(dir, "list: cached directories: %v", len(batchDirectories)) } // cache dir meta t := time.Now() cd.CacheTs = &t err = f.cache.AddDir(cd) if err != nil { fs.Errorf(cd, "list: save error: '%v'", err) } else { fs.Debugf(dir, "list: cached dir: '%v', cache ts: %v", cd.abs(), cd.CacheTs) } return cachedEntries, nil } func (f *Fs) recurse(dir string, list *walk.ListRHelper) error { entries, err := f.List(dir) if err != nil { return err } for i := 0; i < len(entries); i++ { innerDir, ok := entries[i].(fs.Directory) if ok { err := f.recurse(innerDir.Remote(), list) if err != nil { return err } } err := list.Add(entries[i]) if err != nil { return err } } return nil } // ListR lists the objects and directories of the Fs starting // from dir recursively into out. func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) { fs.Debugf(f, "list recursively from '%s'", dir) // we check if the source FS supports ListR // if it does, we'll use that to get all the entries, cache them and return do := f.Fs.Features().ListR if do != nil { return do(dir, func(entries fs.DirEntries) error { // we got called back with a set of entries so let's cache them and call the original callback for _, entry := range entries { switch o := entry.(type) { case fs.Object: _ = f.cache.AddObject(ObjectFromOriginal(f, o)) case fs.Directory: _ = f.cache.AddDir(DirectoryFromOriginal(f, o)) default: return errors.Errorf("Unknown object type %T", entry) } } // call the original callback return callback(entries) }) } // if we're here, we're gonna do a standard recursive traversal and cache everything list := walk.NewListRHelper(callback) err = f.recurse(dir, list) if err != nil { return err } return list.Flush() } // Mkdir makes the directory (container, bucket) func (f *Fs) Mkdir(dir string) error { fs.Debugf(f, "mkdir '%s'", dir) err := f.Fs.Mkdir(dir) if err != nil { return err } fs.Debugf(dir, "mkdir: created dir in source fs") cd := NewDirectory(f, cleanPath(dir)) err = f.cache.AddDir(cd) if err != nil { fs.Errorf(dir, "mkdir: add error: %v", err) } else { fs.Debugf(cd, "mkdir: added to cache") } // expire parent of new dir parentCd := NewDirectory(f, cleanPath(path.Dir(dir))) err = f.cache.ExpireDir(parentCd) if err != nil { fs.Errorf(parentCd, "mkdir: cache expire error: %v", err) } else { fs.Infof(parentCd, "mkdir: cache expired") } // advertise to ChangeNotify if wrapped doesn't do that f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) return nil } // Rmdir removes the directory (container, bucket) if empty func (f *Fs) Rmdir(dir string) error { fs.Debugf(f, "rmdir '%s'", dir) if f.opt.TempWritePath != "" { // pause background uploads f.backgroundRunner.pause() defer f.backgroundRunner.play() // we check if the source exists on the remote and make the same move on it too if it does // otherwise, we skip this step _, err := f.UnWrap().List(dir) if err == nil { err := f.Fs.Rmdir(dir) if err != nil { return err } fs.Debugf(dir, "rmdir: removed dir in source fs") } var queuedEntries []*Object err = walk.Walk(f.tempFs, dir, true, -1, func(path string, entries fs.DirEntries, err error) error { for _, o := range entries { if oo, ok := o.(fs.Object); ok { co := ObjectFromOriginal(f, oo) queuedEntries = append(queuedEntries, co) } } return nil }) if err != nil { fs.Errorf(dir, "rmdir: error getting pending uploads: %v", err) } else { fs.Debugf(dir, "rmdir: read %v from temp fs", len(queuedEntries)) fs.Debugf(dir, "rmdir: temp fs entries: %v", queuedEntries) if len(queuedEntries) > 0 { fs.Errorf(dir, "rmdir: temporary dir not empty: %v", queuedEntries) return fs.ErrorDirectoryNotEmpty } } } else { err := f.Fs.Rmdir(dir) if err != nil { return err } fs.Debugf(dir, "rmdir: removed dir in source fs") } // remove dir data d := NewDirectory(f, dir) err := f.cache.RemoveDir(d.abs()) if err != nil { fs.Errorf(dir, "rmdir: remove error: %v", err) } else { fs.Debugf(d, "rmdir: removed from cache") } // expire parent parentCd := NewDirectory(f, cleanPath(path.Dir(dir))) err = f.cache.ExpireDir(parentCd) if err != nil { fs.Errorf(dir, "rmdir: cache expire error: %v", err) } else { fs.Infof(parentCd, "rmdir: cache expired") } // advertise to ChangeNotify if wrapped doesn't do that f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) return nil } // DirMove moves src, srcRemote to this remote at dstRemote // using server side move operations. func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { fs.Debugf(f, "move dir '%s'/'%s' -> '%s'/'%s'", src.Root(), srcRemote, f.Root(), dstRemote) do := f.Fs.Features().DirMove if do == nil { return fs.ErrorCantDirMove } srcFs, ok := src.(*Fs) if !ok { fs.Errorf(srcFs, "can't move directory - not same remote type") return fs.ErrorCantDirMove } if srcFs.Fs.Name() != f.Fs.Name() { fs.Errorf(srcFs, "can't move directory - not wrapping same remotes") return fs.ErrorCantDirMove } if f.opt.TempWritePath != "" { // pause background uploads f.backgroundRunner.pause() defer f.backgroundRunner.play() _, errInWrap := srcFs.UnWrap().List(srcRemote) _, errInTemp := f.tempFs.List(srcRemote) // not found in either fs if errInWrap != nil && errInTemp != nil { return fs.ErrorDirNotFound } // we check if the source exists on the remote and make the same move on it too if it does // otherwise, we skip this step if errInWrap == nil { err := do(srcFs.UnWrap(), srcRemote, dstRemote) if err != nil { return err } fs.Debugf(srcRemote, "movedir: dir moved in the source fs") } // we need to check if the directory exists in the temp fs // and skip the move if it doesn't if errInTemp != nil { goto cleanup } var queuedEntries []*Object err := walk.Walk(f.tempFs, srcRemote, true, -1, func(path string, entries fs.DirEntries, err error) error { for _, o := range entries { if oo, ok := o.(fs.Object); ok { co := ObjectFromOriginal(f, oo) queuedEntries = append(queuedEntries, co) if co.tempFileStartedUpload() { fs.Errorf(co, "can't move - upload has already started. need to finish that") return fs.ErrorCantDirMove } } } return nil }) if err != nil { return err } fs.Debugf(srcRemote, "dirmove: read %v from temp fs", len(queuedEntries)) fs.Debugf(srcRemote, "dirmove: temp fs entries: %v", queuedEntries) do := f.tempFs.Features().DirMove if do == nil { fs.Errorf(srcRemote, "dirmove: can't move dir in temp fs") return fs.ErrorCantDirMove } err = do(f.tempFs, srcRemote, dstRemote) if err != nil { return err } err = f.cache.ReconcileTempUploads(f) if err != nil { return err } } else { err := do(srcFs.UnWrap(), srcRemote, dstRemote) if err != nil { return err } fs.Debugf(srcRemote, "movedir: dir moved in the source fs") } cleanup: // delete src dir from cache along with all chunks srcDir := NewDirectory(srcFs, srcRemote) err := f.cache.RemoveDir(srcDir.abs()) if err != nil { fs.Errorf(srcDir, "dirmove: remove error: %v", err) } else { fs.Debugf(srcDir, "dirmove: removed cached dir") } // expire src parent srcParent := NewDirectory(f, cleanPath(path.Dir(srcRemote))) err = f.cache.ExpireDir(srcParent) if err != nil { fs.Errorf(srcParent, "dirmove: cache expire error: %v", err) } else { fs.Debugf(srcParent, "dirmove: cache expired") } // advertise to ChangeNotify if wrapped doesn't do that f.notifyChangeUpstreamIfNeeded(srcParent.Remote(), fs.EntryDirectory) // expire parent dir at the destination path dstParent := NewDirectory(f, cleanPath(path.Dir(dstRemote))) err = f.cache.ExpireDir(dstParent) if err != nil { fs.Errorf(dstParent, "dirmove: cache expire error: %v", err) } else { fs.Debugf(dstParent, "dirmove: cache expired") } // advertise to ChangeNotify if wrapped doesn't do that f.notifyChangeUpstreamIfNeeded(dstParent.Remote(), fs.EntryDirectory) // TODO: precache dst dir and save the chunks return nil } // cacheReader will split the stream of a reader to be cached at the same time it is read by the original source func (f *Fs) cacheReader(u io.Reader, src fs.ObjectInfo, originalRead func(inn io.Reader)) { // create the pipe and tee reader pr, pw := io.Pipe() tr := io.TeeReader(u, pw) // create channel to synchronize done := make(chan bool) defer close(done) go func() { // notify the cache reader that we're complete after the source FS finishes defer func() { _ = pw.Close() }() // process original reading originalRead(tr) // signal complete done <- true }() go func() { var offset int64 for { chunk := make([]byte, f.opt.ChunkSize) readSize, err := io.ReadFull(pr, chunk) // we ignore 3 failures which are ok: // 1. EOF - original reading finished and we got a full buffer too // 2. ErrUnexpectedEOF - original reading finished and partial buffer // 3. ErrClosedPipe - source remote reader was closed (usually means it reached the end) and we need to stop too // if we have a different error: we're going to error out the original reading too and stop this if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF && err != io.ErrClosedPipe { fs.Errorf(src, "error saving new data in cache. offset: %v, err: %v", offset, err) _ = pr.CloseWithError(err) break } // if we have some bytes we cache them if readSize > 0 { chunk = chunk[:readSize] err2 := f.cache.AddChunk(cleanPath(path.Join(f.root, src.Remote())), chunk, offset) if err2 != nil { fs.Errorf(src, "error saving new data in cache '%v'", err2) _ = pr.CloseWithError(err2) break } offset += int64(readSize) } // stuff should be closed but let's be sure if err == io.EOF || err == io.ErrUnexpectedEOF || err == io.ErrClosedPipe { _ = pr.Close() break } } // signal complete done <- true }() // wait until both are done for c := 0; c < 2; c++ { <-done } } type putFn func(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) // put in to the remote path func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (fs.Object, error) { var err error var obj fs.Object // queue for upload and store in temp fs if configured if f.opt.TempWritePath != "" { // we need to clear the caches before a put through temp fs parentCd := NewDirectory(f, cleanPath(path.Dir(src.Remote()))) _ = f.cache.ExpireDir(parentCd) f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) obj, err = f.tempFs.Put(in, src, options...) if err != nil { fs.Errorf(obj, "put: failed to upload in temp fs: %v", err) return nil, err } fs.Infof(obj, "put: uploaded in temp fs") err = f.cache.addPendingUpload(path.Join(f.Root(), src.Remote()), false) if err != nil { fs.Errorf(obj, "put: failed to queue for upload: %v", err) return nil, err } fs.Infof(obj, "put: queued for upload") // if cache writes is enabled write it first through cache } else if f.opt.StoreWrites { f.cacheReader(in, src, func(inn io.Reader) { obj, err = put(inn, src, options...) }) if err == nil { fs.Debugf(obj, "put: uploaded to remote fs and saved in cache") } // last option: save it directly in remote fs } else { obj, err = put(in, src, options...) if err == nil { fs.Debugf(obj, "put: uploaded to remote fs") } } // validate and stop if errors are found if err != nil { fs.Errorf(src, "put: error uploading: %v", err) return nil, err } // cache the new file cachedObj := ObjectFromOriginal(f, obj) // deleting cached chunks and info to be replaced with new ones _ = f.cache.RemoveObject(cachedObj.abs()) cachedObj.persist() fs.Debugf(cachedObj, "put: added to cache") // expire parent parentCd := NewDirectory(f, cleanPath(path.Dir(cachedObj.Remote()))) err = f.cache.ExpireDir(parentCd) if err != nil { fs.Errorf(cachedObj, "put: cache expire error: %v", err) } else { fs.Infof(parentCd, "put: cache expired") } // advertise to ChangeNotify f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) return cachedObj, nil } // Put in to the remote path with the modTime given of the given size func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { fs.Debugf(f, "put data at '%s'", src.Remote()) return f.put(in, src, options, f.Fs.Put) } // PutUnchecked uploads the object func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { do := f.Fs.Features().PutUnchecked if do == nil { return nil, errors.New("can't PutUnchecked") } fs.Debugf(f, "put data unchecked in '%s'", src.Remote()) return f.put(in, src, options, do) } // PutStream uploads the object func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { do := f.Fs.Features().PutStream if do == nil { return nil, errors.New("can't PutStream") } fs.Debugf(f, "put data streaming in '%s'", src.Remote()) return f.put(in, src, options, do) } // Copy src to this remote using server side copy operations. func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { fs.Debugf(f, "copy obj '%s' -> '%s'", src, remote) do := f.Fs.Features().Copy if do == nil { fs.Errorf(src, "source remote (%v) doesn't support Copy", src.Fs()) return nil, fs.ErrorCantCopy } // the source must be a cached object or we abort srcObj, ok := src.(*Object) if !ok { fs.Errorf(srcObj, "can't copy - not same remote type") return nil, fs.ErrorCantCopy } // both the source cache fs and this cache fs need to wrap the same remote if srcObj.CacheFs.Fs.Name() != f.Fs.Name() { fs.Errorf(srcObj, "can't copy - not wrapping same remotes") return nil, fs.ErrorCantCopy } // refresh from source or abort if err := srcObj.refreshFromSource(false); err != nil { fs.Errorf(f, "can't copy %v - %v", src, err) return nil, fs.ErrorCantCopy } if srcObj.isTempFile() { // we check if the feature is stil active if f.opt.TempWritePath == "" { fs.Errorf(srcObj, "can't copy - this is a local cached file but this feature is turned off this run") return nil, fs.ErrorCantCopy } do = srcObj.ParentFs.Features().Copy if do == nil { fs.Errorf(src, "parent remote (%v) doesn't support Copy", srcObj.ParentFs) return nil, fs.ErrorCantCopy } } obj, err := do(srcObj.Object, remote) if err != nil { fs.Errorf(srcObj, "error moving in cache: %v", err) return nil, err } fs.Debugf(obj, "copy: file copied") // persist new co := ObjectFromOriginal(f, obj).persist() fs.Debugf(co, "copy: added to cache") // expire the destination path parentCd := NewDirectory(f, cleanPath(path.Dir(co.Remote()))) err = f.cache.ExpireDir(parentCd) if err != nil { fs.Errorf(parentCd, "copy: cache expire error: %v", err) } else { fs.Infof(parentCd, "copy: cache expired") } // advertise to ChangeNotify if wrapped doesn't do that f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) // expire src parent srcParent := NewDirectory(f, cleanPath(path.Dir(src.Remote()))) err = f.cache.ExpireDir(srcParent) if err != nil { fs.Errorf(srcParent, "copy: cache expire error: %v", err) } else { fs.Infof(srcParent, "copy: cache expired") } // advertise to ChangeNotify if wrapped doesn't do that f.notifyChangeUpstreamIfNeeded(srcParent.Remote(), fs.EntryDirectory) return co, nil } // Move src to this remote using server side move operations. func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { fs.Debugf(f, "moving obj '%s' -> %s", src, remote) // if source fs doesn't support move abort do := f.Fs.Features().Move if do == nil { fs.Errorf(src, "source remote (%v) doesn't support Move", src.Fs()) return nil, fs.ErrorCantMove } // the source must be a cached object or we abort srcObj, ok := src.(*Object) if !ok { fs.Errorf(srcObj, "can't move - not same remote type") return nil, fs.ErrorCantMove } // both the source cache fs and this cache fs need to wrap the same remote if srcObj.CacheFs.Fs.Name() != f.Fs.Name() { fs.Errorf(srcObj, "can't move - not wrapping same remote types") return nil, fs.ErrorCantMove } // refresh from source or abort if err := srcObj.refreshFromSource(false); err != nil { fs.Errorf(f, "can't move %v - %v", src, err) return nil, fs.ErrorCantMove } // if this is a temp object then we perform the changes locally if srcObj.isTempFile() { // we check if the feature is stil active if f.opt.TempWritePath == "" { fs.Errorf(srcObj, "can't move - this is a local cached file but this feature is turned off this run") return nil, fs.ErrorCantMove } // pause background uploads f.backgroundRunner.pause() defer f.backgroundRunner.play() // started uploads can't be moved until they complete if srcObj.tempFileStartedUpload() { fs.Errorf(srcObj, "can't move - upload has already started. need to finish that") return nil, fs.ErrorCantMove } do = f.tempFs.Features().Move // we must also update the pending queue err := f.cache.updatePendingUpload(srcObj.abs(), func(item *tempUploadInfo) error { item.DestPath = path.Join(f.Root(), remote) item.AddedOn = time.Now() return nil }) if err != nil { fs.Errorf(srcObj, "failed to rename queued file for upload: %v", err) return nil, fs.ErrorCantMove } fs.Debugf(srcObj, "move: queued file moved to %v", remote) } obj, err := do(srcObj.Object, remote) if err != nil { fs.Errorf(srcObj, "error moving: %v", err) return nil, err } fs.Debugf(obj, "move: file moved") // remove old err = f.cache.RemoveObject(srcObj.abs()) if err != nil { fs.Errorf(srcObj, "move: remove error: %v", err) } else { fs.Debugf(srcObj, "move: removed from cache") } // expire old parent parentCd := NewDirectory(f, cleanPath(path.Dir(srcObj.Remote()))) err = f.cache.ExpireDir(parentCd) if err != nil { fs.Errorf(parentCd, "move: parent cache expire error: %v", err) } else { fs.Infof(parentCd, "move: cache expired") } // advertise to ChangeNotify if wrapped doesn't do that f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) // persist new cachedObj := ObjectFromOriginal(f, obj).persist() fs.Debugf(cachedObj, "move: added to cache") // expire new parent parentCd = NewDirectory(f, cleanPath(path.Dir(cachedObj.Remote()))) err = f.cache.ExpireDir(parentCd) if err != nil { fs.Errorf(parentCd, "move: expire error: %v", err) } else { fs.Infof(parentCd, "move: cache expired") } // advertise to ChangeNotify if wrapped doesn't do that f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) return cachedObj, nil } // Hashes returns the supported hash sets. func (f *Fs) Hashes() hash.Set { return f.Fs.Hashes() } // Purge all files in the root and the root directory func (f *Fs) Purge() error { fs.Infof(f, "purging cache") f.cache.Purge() do := f.Fs.Features().Purge if do == nil { return nil } err := do() if err != nil { return err } return nil } // CleanUp the trash in the Fs func (f *Fs) CleanUp() error { f.CleanUpCache(false) do := f.Fs.Features().CleanUp if do == nil { return nil } return do() } // About gets quota information from the Fs func (f *Fs) About() (*fs.Usage, error) { do := f.Fs.Features().About if do == nil { return nil, errors.New("About not supported") } return do() } // Stats returns stats about the cache storage func (f *Fs) Stats() (map[string]map[string]interface{}, error) { return f.cache.Stats() } // openRateLimited will execute a closure under a rate limiter watch func (f *Fs) openRateLimited(fn func() (io.ReadCloser, error)) (io.ReadCloser, error) { var err error ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() start := time.Now() if err = f.rateLimiter.Wait(ctx); err != nil { return nil, err } elapsed := time.Since(start) if elapsed > time.Second*2 { fs.Debugf(f, "rate limited: %s", elapsed) } return fn() } // CleanUpCache will cleanup only the cache data that is expired func (f *Fs) CleanUpCache(ignoreLastTs bool) { f.cleanupMu.Lock() defer f.cleanupMu.Unlock() if ignoreLastTs || time.Now().After(f.lastChunkCleanup.Add(time.Duration(f.opt.ChunkCleanInterval))) { f.cache.CleanChunksBySize(int64(f.opt.ChunkTotalSize)) f.lastChunkCleanup = time.Now() } } // StopBackgroundRunners will signall all the runners to stop their work // can be triggered from a terminate signal or from testing between runs func (f *Fs) StopBackgroundRunners() { f.cleanupChan <- false if f.opt.TempWritePath != "" && f.backgroundRunner != nil && f.backgroundRunner.isRunning() { f.backgroundRunner.close() } f.cache.Close() fs.Debugf(f, "Services stopped") } // UnWrap returns the Fs that this Fs is wrapping func (f *Fs) UnWrap() fs.Fs { return f.Fs } // WrapFs returns the Fs that is wrapping this Fs func (f *Fs) WrapFs() fs.Fs { return f.wrapper } // SetWrapper sets the Fs that is wrapping this Fs func (f *Fs) SetWrapper(wrapper fs.Fs) { f.wrapper = wrapper } // isWrappedByCrypt checks if this is wrapped by a crypt remote func (f *Fs) isWrappedByCrypt() (*crypt.Fs, bool) { if f.wrapper == nil { return nil, false } c, ok := f.wrapper.(*crypt.Fs) return c, ok } // cleanRootFromPath trims the root of the current fs from a path func (f *Fs) cleanRootFromPath(p string) string { if f.Root() != "" { p = p[len(f.Root()):] // trim out root if len(p) > 0 { // remove first separator p = p[1:] } } return p } func (f *Fs) isRootInPath(p string) bool { if f.Root() == "" { return true } return strings.HasPrefix(p, f.Root()+"/") } // DirCacheFlush flushes the dir cache func (f *Fs) DirCacheFlush() { _ = f.cache.RemoveDir("") } // GetBackgroundUploadChannel returns a channel that can be listened to for remote activities that happen // in the background func (f *Fs) GetBackgroundUploadChannel() chan BackgroundUploadState { if f.opt.TempWritePath != "" { return f.backgroundRunner.notifyCh } return nil } func (f *Fs) isNotifiedRemote(remote string) bool { f.notifiedMu.Lock() defer f.notifiedMu.Unlock() n, ok := f.notifiedRemotes[remote] if !ok || !n { return false } delete(f.notifiedRemotes, remote) return n } func cleanPath(p string) string { p = path.Clean(p) if p == "." || p == "/" { p = "" } return p } // Check the interfaces are satisfied var ( _ fs.Fs = (*Fs)(nil) _ fs.Purger = (*Fs)(nil) _ fs.Copier = (*Fs)(nil) _ fs.Mover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil) _ fs.PutUncheckeder = (*Fs)(nil) _ fs.PutStreamer = (*Fs)(nil) _ fs.CleanUpper = (*Fs)(nil) _ fs.UnWrapper = (*Fs)(nil) _ fs.Wrapper = (*Fs)(nil) _ fs.ListRer = (*Fs)(nil) _ fs.ChangeNotifier = (*Fs)(nil) _ fs.Abouter = (*Fs)(nil) )