From b48b537325459e5f2bf8eb5f6e9a560a755e013d Mon Sep 17 00:00:00 2001 From: remusb Date: Sat, 9 Dec 2017 23:54:26 +0200 Subject: [PATCH] cache: plex integration, refactor chunk storage and worker retries (#1899) --- cache/cache.go | 326 ++++++++++------------- cache/cache_internal_test.go | 224 +++------------- cache/cache_test.go | 2 +- cache/cache_unsupported.go | 2 +- cache/directory.go | 2 +- cache/handle.go | 253 +++++++++++------- cache/object.go | 14 +- cache/plex.go | 229 ++++++++++++++++ cache/storage_memory.go | 11 +- cache/storage_persistent.go | 218 ++++++++------- cmd/cachestats/cachestats.go | 2 +- cmd/cachestats/cachestats_unsupported.go | 2 +- docs/content/cache.md | 146 +++++----- fstest/fstests/gen_tests.go | 2 +- 14 files changed, 781 insertions(+), 652 deletions(-) create mode 100644 cache/plex.go diff --git a/cache/cache.go b/cache/cache.go index 9d4abf7a5..b7471dc21 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -1,4 +1,4 @@ -// +build !plan9 +// +build !plan9,go1.7 package cache @@ -7,7 +7,6 @@ import ( "io" "path" "path/filepath" - "strconv" "strings" "sync" "time" @@ -17,6 +16,7 @@ import ( "os/signal" "syscall" + "github.com/ncw/rclone/crypt" "github.com/ncw/rclone/fs" "github.com/pkg/errors" "golang.org/x/net/context" @@ -26,22 +26,20 @@ import ( const ( // DefCacheChunkSize is the default value for chunk size DefCacheChunkSize = "5M" + // DefCacheTotalChunkSize is the default value for the maximum size of stored chunks + DefCacheTotalChunkSize = "10G" + // DefCacheChunkCleanInterval is the interval at which chunks are cleaned + DefCacheChunkCleanInterval = "1m" // DefCacheInfoAge is the default value for object info age DefCacheInfoAge = "6h" - // DefCacheChunkAge is the default value for chunk age duration - DefCacheChunkAge = "3h" - // DefCacheMetaAge is the default value for chunk age duration - DefCacheMetaAge = "3h" // DefCacheReadRetries is the default value for read retries - DefCacheReadRetries = 3 + DefCacheReadRetries = 10 // DefCacheTotalWorkers is how many workers run in parallel to download chunks DefCacheTotalWorkers = 4 // DefCacheChunkNoMemory will enable or disable in-memory storage for chunks DefCacheChunkNoMemory = false // DefCacheRps limits the number of requests per second to the source FS DefCacheRps = -1 - // DefWarmUpRatePerSeconds will apply a special config for warming up the cache - DefWarmUpRatePerSeconds = "3/20" // DefCacheWrites will cache file data on writes through the cache DefCacheWrites = false ) @@ -49,18 +47,17 @@ const ( // Globals var ( // Flags - cacheDbPath = fs.StringP("cache-db-path", "", filepath.Join(fs.CacheDir, "cache-backend"), "Directory to cache DB") - cacheDbPurge = fs.BoolP("cache-db-purge", "", false, "Purge the cache DB before") - cacheChunkSize = fs.StringP("cache-chunk-size", "", DefCacheChunkSize, "The size of a chunk") - cacheInfoAge = fs.StringP("cache-info-age", "", DefCacheInfoAge, "How much time should object info be stored in cache") - cacheChunkAge = fs.StringP("cache-chunk-age", "", DefCacheChunkAge, "How much time should a chunk be in cache before cleanup") - cacheMetaAge = fs.StringP("cache-warm-up-age", "", DefCacheMetaAge, "How much time should data be cached during warm up") - cacheReadRetries = fs.IntP("cache-read-retries", "", DefCacheReadRetries, "How many times to retry a read from a cache storage") - cacheTotalWorkers = fs.IntP("cache-workers", "", DefCacheTotalWorkers, "How many workers should run in parallel to download chunks") - cacheChunkNoMemory = fs.BoolP("cache-chunk-no-memory", "", DefCacheChunkNoMemory, "Disable the in-memory cache for storing chunks during streaming") - cacheRps = fs.IntP("cache-rps", "", int(DefCacheRps), "Limits the number of requests per second to the source FS. -1 disables the rate limiter") - cacheWarmUp = fs.StringP("cache-warm-up-rps", "", DefWarmUpRatePerSeconds, "Format is X/Y = how many X opens per Y seconds should trigger the warm up mode. See the docs") - cacheStoreWrites = fs.BoolP("cache-writes", "", DefCacheWrites, "Will cache file data on writes through the FS") + cacheDbPath = fs.StringP("cache-db-path", "", filepath.Join(fs.CacheDir, "cache-backend"), "Directory to cache DB") + cacheDbPurge = fs.BoolP("cache-db-purge", "", false, "Purge the cache DB before") + cacheChunkSize = fs.StringP("cache-chunk-size", "", DefCacheChunkSize, "The size of a chunk") + cacheTotalChunkSize = fs.StringP("cache-total-chunk-size", "", DefCacheTotalChunkSize, "The total size which the chunks can take up from the disk") + cacheChunkCleanInterval = fs.StringP("cache-chunk-clean-interval", "", DefCacheChunkCleanInterval, "Interval at which chunk cleanup runs") + cacheInfoAge = fs.StringP("cache-info-age", "", DefCacheInfoAge, "How much time should object info be stored in cache") + cacheReadRetries = fs.IntP("cache-read-retries", "", DefCacheReadRetries, "How many times to retry a read from a cache storage") + cacheTotalWorkers = fs.IntP("cache-workers", "", DefCacheTotalWorkers, "How many workers should run in parallel to download chunks") + cacheChunkNoMemory = fs.BoolP("cache-chunk-no-memory", "", DefCacheChunkNoMemory, "Disable the in-memory cache for storing chunks during streaming") + cacheRps = fs.IntP("cache-rps", "", int(DefCacheRps), "Limits the number of requests per second to the source FS. -1 disables the rate limiter") + cacheStoreWrites = fs.BoolP("cache-writes", "", DefCacheWrites, "Will cache file data on writes through the FS") ) // Register with Fs @@ -72,6 +69,19 @@ func init() { Options: []fs.Option{{ Name: "remote", Help: "Remote to cache.\nNormally should contain a ':' and a path, eg \"myremote:path/to/dir\",\n\"myremote:bucket\" or maybe \"myremote:\" (not recommended).", + }, { + Name: "plex_url", + Help: "Optional: The URL of the Plex server", + Optional: true, + }, { + Name: "plex_username", + Help: "Optional: The username of the Plex user", + Optional: true, + }, { + Name: "plex_password", + Help: "Optional: The password of the Plex user", + IsPassword: true, + Optional: true, }, { Name: "chunk_size", Help: "The size of a chunk. Lower value good for slow connections but can affect seamless reading. \nDefault: " + DefCacheChunkSize, @@ -105,34 +115,18 @@ func init() { }, Optional: true, }, { - Name: "chunk_age", - Help: "How much time should a chunk (file data) be stored in cache. \nAccepted units are: \"s\", \"m\", \"h\".\nDefault: " + DefCacheChunkAge, + Name: "chunk_total_size", + Help: "The maximum size of stored chunks. When the storage grows beyond this size, the oldest chunks will be deleted. \nDefault: " + DefCacheTotalChunkSize, Examples: []fs.OptionExample{ { - Value: "30s", - Help: "30 seconds", + Value: "500M", + Help: "500 MB", }, { - Value: "1m", - Help: "1 minute", + Value: "1G", + Help: "1 GB", }, { - Value: "1h30m", - Help: "1 hour and 30 minutes", - }, - }, - Optional: true, - }, { - Name: "warmup_age", - Help: "How much time should data be cached during warm up. \nAccepted units are: \"s\", \"m\", \"h\".\nDefault: " + DefCacheMetaAge, - Examples: []fs.OptionExample{ - { - Value: "3h", - Help: "3 hours", - }, { - Value: "6h", - Help: "6 hours", - }, { - Value: "24h", - Help: "24 hours", + Value: "10G", + Help: "10 GB", }, }, Optional: true, @@ -149,10 +143,7 @@ type ChunkStorage interface { GetChunk(cachedObject *Object, offset int64) ([]byte, error) // add a new chunk - AddChunk(cachedObject *Object, data []byte, offset int64) error - - // AddChunkAhead adds a new chunk before caching an Object for it - AddChunkAhead(fp string, data []byte, offset int64, t time.Duration) error + AddChunk(fp string, data []byte, offset int64) error // if the storage can cleanup on a cron basis // otherwise it can do a noop operation @@ -161,6 +152,10 @@ type ChunkStorage interface { // if the storage can cleanup chunks after we no longer need them // otherwise it can do a noop operation CleanChunksByNeed(offset int64) + + // if the storage can cleanup chunks after the total size passes a certain point + // otherwise it can do a noop operation + CleanChunksBySize(maxSize int64) } // Storage is a storage type (Bolt) which needs to support both chunk and file based operations @@ -213,26 +208,21 @@ type Fs struct { features *fs.Features // optional features cache Storage - fileAge time.Duration - chunkSize int64 - chunkAge time.Duration - metaAge time.Duration - readRetries int - totalWorkers int - chunkMemory bool - warmUp bool - warmUpRate int - warmUpSec int - cacheWrites bool - originalTotalWorkers int - originalChunkMemory bool + fileAge time.Duration + chunkSize int64 + chunkTotalSize int64 + chunkCleanInterval time.Duration + readRetries int + totalWorkers int + totalMaxWorkers int + chunkMemory bool + cacheWrites bool - lastChunkCleanup time.Time - lastRootCleanup time.Time - lastOpenedEntries map[string]time.Time - cleanupMu sync.Mutex - warmupMu sync.Mutex - rateLimiter *rate.Limiter + lastChunkCleanup time.Time + lastRootCleanup time.Time + cleanupMu sync.Mutex + rateLimiter *rate.Limiter + plexConnector *plexConnector } // NewFs contstructs an Fs from the path, container:path @@ -245,12 +235,13 @@ func NewFs(name, rpath string) (fs.Fs, error) { // Look for a file first remotePath := path.Join(remote, rpath) wrappedFs, wrapErr := fs.NewFs(remotePath) - if wrapErr != fs.ErrorIsFile && wrapErr != nil { return nil, errors.Wrapf(wrapErr, "failed to make remote %q to wrap", remotePath) } fs.Debugf(name, "wrapped %v:%v at root %v", wrappedFs.Name(), wrappedFs.Root(), rpath) + plexURL := fs.ConfigFileGet(name, "plex_url") + plexToken := fs.ConfigFileGet(name, "plex_token") var chunkSize fs.SizeSuffix chunkSizeString := fs.ConfigFileGet(name, "chunk_size", DefCacheChunkSize) if *cacheChunkSize != DefCacheChunkSize { @@ -260,6 +251,20 @@ func NewFs(name, rpath string) (fs.Fs, error) { if err != nil { return nil, errors.Wrapf(err, "failed to understand chunk size", chunkSizeString) } + var chunkTotalSize fs.SizeSuffix + chunkTotalSizeString := fs.ConfigFileGet(name, "chunk_total_size", DefCacheTotalChunkSize) + if *cacheTotalChunkSize != DefCacheTotalChunkSize { + chunkTotalSizeString = *cacheTotalChunkSize + } + err = chunkTotalSize.Set(chunkTotalSizeString) + if err != nil { + return nil, errors.Wrapf(err, "failed to understand chunk total size", chunkTotalSizeString) + } + chunkCleanIntervalStr := *cacheChunkCleanInterval + chunkCleanInterval, err := time.ParseDuration(chunkCleanIntervalStr) + if err != nil { + return nil, errors.Wrapf(err, "failed to understand duration %v", chunkCleanIntervalStr) + } infoAge := fs.ConfigFileGet(name, "info_age", DefCacheInfoAge) if *cacheInfoAge != DefCacheInfoAge { infoAge = *cacheInfoAge @@ -268,58 +273,70 @@ func NewFs(name, rpath string) (fs.Fs, error) { if err != nil { return nil, errors.Wrapf(err, "failed to understand duration", infoAge) } - chunkAge := fs.ConfigFileGet(name, "chunk_age", DefCacheChunkAge) - if *cacheChunkAge != DefCacheChunkAge { - chunkAge = *cacheChunkAge - } - chunkDuration, err := time.ParseDuration(chunkAge) - if err != nil { - return nil, errors.Wrapf(err, "failed to understand duration", chunkAge) - } - metaAge := fs.ConfigFileGet(name, "warmup_age", DefCacheMetaAge) - if *cacheMetaAge != DefCacheMetaAge { - metaAge = *cacheMetaAge - } - metaDuration, err := time.ParseDuration(metaAge) - if err != nil { - return nil, errors.Wrapf(err, "failed to understand duration", metaAge) - } - warmupRps := strings.Split(*cacheWarmUp, "/") - warmupRate, err := strconv.Atoi(warmupRps[0]) - if err != nil { - return nil, errors.Wrapf(err, "failed to understand warm up rate", *cacheWarmUp) - } - warmupSec, err := strconv.Atoi(warmupRps[1]) - if err != nil { - return nil, errors.Wrapf(err, "failed to understand warm up seconds", *cacheWarmUp) - } // configure cache backend if *cacheDbPurge { fs.Debugf(name, "Purging the DB") } f := &Fs{ - Fs: wrappedFs, - name: name, - root: rpath, - fileAge: infoDuration, - chunkSize: int64(chunkSize), - chunkAge: chunkDuration, - metaAge: metaDuration, - readRetries: *cacheReadRetries, - totalWorkers: *cacheTotalWorkers, - originalTotalWorkers: *cacheTotalWorkers, - chunkMemory: !*cacheChunkNoMemory, - originalChunkMemory: !*cacheChunkNoMemory, - warmUp: false, - warmUpRate: warmupRate, - warmUpSec: warmupSec, - cacheWrites: *cacheStoreWrites, - lastChunkCleanup: time.Now().Truncate(time.Hour * 24 * 30), - lastRootCleanup: time.Now().Truncate(time.Hour * 24 * 30), - lastOpenedEntries: make(map[string]time.Time), + Fs: wrappedFs, + name: name, + root: rpath, + fileAge: infoDuration, + chunkSize: int64(chunkSize), + chunkTotalSize: int64(chunkTotalSize), + chunkCleanInterval: chunkCleanInterval, + readRetries: *cacheReadRetries, + totalWorkers: *cacheTotalWorkers, + totalMaxWorkers: *cacheTotalWorkers, + chunkMemory: !*cacheChunkNoMemory, + cacheWrites: *cacheStoreWrites, + lastChunkCleanup: time.Now().Truncate(time.Hour * 24 * 30), + lastRootCleanup: time.Now().Truncate(time.Hour * 24 * 30), + } + if f.chunkTotalSize < (f.chunkSize * int64(f.totalWorkers)) { + return nil, errors.Errorf("don't set cache-total-chunk-size(%v) less than cache-chunk-size(%v) * cache-workers(%v)", + f.chunkTotalSize, f.chunkSize, f.totalWorkers) } f.rateLimiter = rate.NewLimiter(rate.Limit(float64(*cacheRps)), f.totalWorkers) + f.plexConnector = &plexConnector{} + if plexURL != "" { + usingPlex := false + + if plexToken != "" { + f.plexConnector, err = newPlexConnectorWithToken(f, plexURL, plexToken) + if err != nil { + return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", plexURL) + } + usingPlex = true + } else { + plexUsername := fs.ConfigFileGet(name, "plex_username") + plexPassword := fs.ConfigFileGet(name, "plex_password") + if plexPassword != "" && plexUsername != "" { + decPass, err := fs.Reveal(plexPassword) + if err != nil { + decPass = plexPassword + } + f.plexConnector, err = newPlexConnector(f, plexURL, plexUsername, decPass) + if err != nil { + return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", plexURL) + } + if f.plexConnector.token != "" { + fs.ConfigFileSet(name, "plex_token", f.plexConnector.token) + fs.SaveConfig() + } + usingPlex = true + } + } + + if usingPlex { + fs.Infof(name, "Connected to Plex server: %v", plexURL) + // when connected to a Plex server we default to 1 worker (Plex scans all the time) + // and leave max workers as a setting to scale out the workers on demand during playback + f.totalWorkers = 1 + } + } + dbPath := *cacheDbPath if filepath.Ext(dbPath) != "" { dbPath = filepath.Dir(dbPath) @@ -331,7 +348,9 @@ func NewFs(name, rpath string) (fs.Fs, error) { dbPath = filepath.Join(dbPath, name+".db") fs.Infof(name, "Storage DB path: %v", dbPath) - f.cache, err = GetPersistent(dbPath, *cacheDbPurge) + f.cache, err = GetPersistent(dbPath, &Features{ + PurgeDb: *cacheDbPurge, + }) if err != nil { return nil, errors.Wrapf(err, "failed to start cache db") } @@ -348,9 +367,10 @@ func NewFs(name, rpath string) (fs.Fs, error) { fs.Infof(name, "Chunk Memory: %v", f.chunkMemory) fs.Infof(name, "Chunk Size: %v", fs.SizeSuffix(f.chunkSize)) + fs.Infof(name, "Chunk Total Size: %v", fs.SizeSuffix(f.chunkTotalSize)) + fs.Infof(name, "Chunk Clean Interval: %v", f.chunkCleanInterval.String()) fs.Infof(name, "Workers: %v", f.totalWorkers) fs.Infof(name, "File Age: %v", f.fileAge.String()) - fs.Infof(name, "Chunk Age: %v", f.chunkAge.String()) fs.Infof(name, "Cache Writes: %v", f.cacheWrites) go f.CleanUpCache(false) @@ -412,35 +432,6 @@ func (f *Fs) ChunkSize() int64 { return f.chunkSize } -// originalSettingWorkers will return the original value of this config -func (f *Fs) originalSettingWorkers() int { - return f.originalTotalWorkers -} - -// originalSettingChunkNoMemory will return the original value of this config -func (f *Fs) originalSettingChunkNoMemory() bool { - return f.originalChunkMemory -} - -// InWarmUp says if cache warm up is active -func (f *Fs) InWarmUp() bool { - return f.warmUp -} - -// enableWarmUp will enable the warm up state of this cache along with the relevant settings -func (f *Fs) enableWarmUp() { - f.totalWorkers = 1 - f.chunkMemory = false - f.warmUp = true -} - -// disableWarmUp will disable the warm up state of this cache along with the relevant settings -func (f *Fs) disableWarmUp() { - f.totalWorkers = f.originalSettingWorkers() - f.chunkMemory = !f.originalSettingChunkNoMemory() - f.warmUp = false -} - // NewObject finds the Object at remote. func (f *Fs) NewObject(remote string) (fs.Object, error) { co := NewObject(f, remote) @@ -668,7 +659,7 @@ func (f *Fs) cacheReader(u io.Reader, src fs.ObjectInfo, originalRead func(inn i // if we have some bytes we cache them if readSize > 0 { chunk = chunk[:readSize] - err2 := f.cache.AddChunkAhead(cleanPath(path.Join(f.root, src.Remote())), chunk, offset, f.metaAge) + err2 := f.cache.AddChunk(cleanPath(path.Join(f.root, src.Remote())), chunk, offset) if err2 != nil { fs.Errorf(src, "error saving new data in cache '%v'", err2) _ = pr.CloseWithError(err2) @@ -840,10 +831,6 @@ func (f *Fs) Purge() error { fs.Infof(f, "purging cache") f.cache.Purge() - f.warmupMu.Lock() - defer f.warmupMu.Unlock() - f.lastOpenedEntries = make(map[string]time.Time) - do := f.Fs.Features().Purge if do == nil { return nil @@ -892,45 +879,17 @@ func (f *Fs) OpenRateLimited(fn func() (io.ReadCloser, error)) (io.ReadCloser, e return fn() } -// CheckIfWarmupNeeded changes the FS settings during warmups -func (f *Fs) CheckIfWarmupNeeded(remote string) { - f.warmupMu.Lock() - defer f.warmupMu.Unlock() - - secondCount := time.Duration(f.warmUpSec) - rate := f.warmUpRate - - // clean up entries older than the needed time frame needed - for k, v := range f.lastOpenedEntries { - if time.Now().After(v.Add(time.Second * secondCount)) { - delete(f.lastOpenedEntries, k) - } - } - f.lastOpenedEntries[remote] = time.Now() - - // simple check for the current load - if len(f.lastOpenedEntries) >= rate && !f.warmUp { - fs.Infof(f, "turning on cache warmup") - f.enableWarmUp() - } else if len(f.lastOpenedEntries) < rate && f.warmUp { - fs.Infof(f, "turning off cache warmup") - f.disableWarmUp() - } -} - // CleanUpCache will cleanup only the cache data that is expired func (f *Fs) CleanUpCache(ignoreLastTs bool) { f.cleanupMu.Lock() defer f.cleanupMu.Unlock() - if ignoreLastTs || time.Now().After(f.lastChunkCleanup.Add(f.chunkAge/4)) { - fs.Infof("cache", "running chunks cleanup") - f.cache.CleanChunksByAge(f.chunkAge) + if ignoreLastTs || time.Now().After(f.lastChunkCleanup.Add(f.chunkCleanInterval)) { + f.cache.CleanChunksBySize(f.chunkTotalSize) f.lastChunkCleanup = time.Now() } if ignoreLastTs || time.Now().After(f.lastRootCleanup.Add(f.fileAge/4)) { - fs.Infof("cache", "running root cleanup") f.cache.CleanEntriesByAge(f.fileAge) f.lastRootCleanup = time.Now() } @@ -951,6 +910,15 @@ func (f *Fs) SetWrapper(wrapper fs.Fs) { f.wrapper = wrapper } +// Wrap returns the Fs that is wrapping this Fs +func (f *Fs) isWrappedByCrypt() (*crypt.Fs, bool) { + if f.wrapper == nil { + return nil, false + } + c, ok := f.wrapper.(*crypt.Fs) + return c, ok +} + // DirCacheFlush flushes the dir cache func (f *Fs) DirCacheFlush() { _ = f.cache.RemoveDir("") diff --git a/cache/cache_internal_test.go b/cache/cache_internal_test.go index 04b2a5070..19a33e65a 100644 --- a/cache/cache_internal_test.go +++ b/cache/cache_internal_test.go @@ -1,4 +1,4 @@ -// +build !plan9 +// +build !plan9,go1.7 package cache_test @@ -24,18 +24,14 @@ import ( ) var ( - WrapRemote = flag.String("wrap-remote", "", "Remote to wrap") - RemoteName = flag.String("remote-name", "TestCacheInternal", "Root remote") - SkipTimeouts = flag.Bool("skip-waits", false, "To skip tests that have wait times") - rootFs fs.Fs - boltDb *cache.Persistent - metaAge = time.Second * 30 - infoAge = time.Second * 10 - chunkAge = time.Second * 10 - okDiff = time.Second * 9 // really big diff here but the build machines seem to be slow. need a different way for this - workers = 2 - warmupRate = 3 - warmupSec = 10 + WrapRemote = flag.String("wrap-remote", "", "Remote to wrap") + RemoteName = flag.String("remote-name", "TestCacheInternal", "Root remote") + rootFs fs.Fs + boltDb *cache.Persistent + infoAge = time.Second * 10 + chunkClean = time.Second + okDiff = time.Second * 9 // really big diff here but the build machines seem to be slow. need a different way for this + workers = 2 ) // prepare the test server and return a function to tidy it up afterwards @@ -44,7 +40,7 @@ func TestInternalInit(t *testing.T) { // delete the default path dbPath := filepath.Join(fs.CacheDir, "cache-backend", *RemoteName+".db") - boltDb, err = cache.GetPersistent(dbPath, true) + boltDb, err = cache.GetPersistent(dbPath, &cache.Features{PurgeDb: true}) require.NoError(t, err) fstest.Initialise() @@ -65,17 +61,17 @@ func TestInternalInit(t *testing.T) { fs.ConfigFileSet(*RemoteName, "type", "cache") fs.ConfigFileSet(*RemoteName, "remote", *WrapRemote) fs.ConfigFileSet(*RemoteName, "chunk_size", "1024") - fs.ConfigFileSet(*RemoteName, "chunk_age", chunkAge.String()) + fs.ConfigFileSet(*RemoteName, "chunk_total_size", "2048") fs.ConfigFileSet(*RemoteName, "info_age", infoAge.String()) } - _ = flag.Set("cache-warm-up-age", metaAge.String()) - _ = flag.Set("cache-warm-up-rps", fmt.Sprintf("%v/%v", warmupRate, warmupSec)) _ = flag.Set("cache-chunk-no-memory", "true") _ = flag.Set("cache-workers", strconv.Itoa(workers)) + _ = flag.Set("cache-chunk-clean-interval", chunkClean.String()) // Instantiate root rootFs, err = fs.NewFs(*RemoteName + ":") + require.NoError(t, err) _ = rootFs.Features().Purge() require.NoError(t, err) err = rootFs.Mkdir("") @@ -305,143 +301,6 @@ func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) { require.Equal(t, o.ModTime(), co.ModTime()) } -func TestInternalWarmUp(t *testing.T) { - if *SkipTimeouts { - t.Skip("--skip-waits set") - } - - reset(t) - cfs, err := getCacheFs(rootFs) - require.NoError(t, err) - chunkSize := cfs.ChunkSize() - - o1 := writeObjectRandomBytes(t, rootFs, (chunkSize * 3)) - o2 := writeObjectRandomBytes(t, rootFs, (chunkSize * 4)) - o3 := writeObjectRandomBytes(t, rootFs, (chunkSize * 6)) - - _ = readDataFromObj(t, o1, 0, chunkSize, false) - _ = readDataFromObj(t, o2, 0, chunkSize, false) - - // validate a fresh chunk - expectedExpiry := time.Now().Add(chunkAge) - ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), o2.Remote()), 0) - require.NoError(t, err) - require.WithinDuration(t, expectedExpiry, ts, okDiff) - - // validate that we entered a warm up state - _ = readDataFromObj(t, o3, 0, chunkSize, false) - require.True(t, cfs.InWarmUp()) - expectedExpiry = time.Now().Add(metaAge) - ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), 0) - require.NoError(t, err) - require.WithinDuration(t, expectedExpiry, ts, okDiff) - - // validate that we cooled down and exit warm up - // we wait for the cache to expire - t.Logf("Waiting 10 seconds for warm up to expire\n") - time.Sleep(time.Second * 10) - - _ = readDataFromObj(t, o3, chunkSize, chunkSize*2, false) - require.False(t, cfs.InWarmUp()) - expectedExpiry = time.Now().Add(chunkAge) - ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), chunkSize) - require.NoError(t, err) - require.WithinDuration(t, expectedExpiry, ts, okDiff) -} - -func TestInternalWarmUpInFlight(t *testing.T) { - if *SkipTimeouts { - t.Skip("--skip-waits set") - } - - reset(t) - cfs, err := getCacheFs(rootFs) - require.NoError(t, err) - chunkSize := cfs.ChunkSize() - - o1 := writeObjectRandomBytes(t, rootFs, (chunkSize * 3)) - o2 := writeObjectRandomBytes(t, rootFs, (chunkSize * 4)) - o3 := writeObjectRandomBytes(t, rootFs, (chunkSize * int64(workers) * int64(2))) - - _ = readDataFromObj(t, o1, 0, chunkSize, false) - _ = readDataFromObj(t, o2, 0, chunkSize, false) - require.False(t, cfs.InWarmUp()) - - // validate that we entered a warm up state - _ = readDataFromObj(t, o3, 0, chunkSize, false) - require.True(t, cfs.InWarmUp()) - expectedExpiry := time.Now().Add(metaAge) - ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), 0) - require.NoError(t, err) - require.WithinDuration(t, expectedExpiry, ts, okDiff) - - checkSample := make([]byte, chunkSize) - reader, err := o3.Open(&fs.SeekOption{Offset: 0}) - require.NoError(t, err) - rs, ok := reader.(*cache.Handle) - require.True(t, ok) - - for i := 0; i <= workers; i++ { - _, _ = rs.Seek(int64(i)*chunkSize, 0) - _, err = io.ReadFull(reader, checkSample) - require.NoError(t, err) - - if i == workers { - require.False(t, rs.InWarmUp(), "iteration %v", i) - } else { - require.True(t, rs.InWarmUp(), "iteration %v", i) - } - } - _ = reader.Close() - require.True(t, cfs.InWarmUp()) - expectedExpiry = time.Now().Add(chunkAge) - ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), chunkSize*int64(workers+1)) - require.NoError(t, err) - require.WithinDuration(t, expectedExpiry, ts, okDiff) - - // validate that we cooled down and exit warm up - // we wait for the cache to expire - t.Logf("Waiting 10 seconds for warm up to expire\n") - time.Sleep(time.Second * 10) - - _ = readDataFromObj(t, o2, chunkSize, chunkSize*2, false) - require.False(t, cfs.InWarmUp()) - expectedExpiry = time.Now().Add(chunkAge) - ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o2.Remote()), chunkSize) - require.NoError(t, err) - require.WithinDuration(t, expectedExpiry, ts, okDiff) -} - -// TODO: this is bugged -//func TestInternalRateLimiter(t *testing.T) { -// reset(t) -// _ = flag.Set("cache-rps", "2") -// rootFs, err := fs.NewFs(*RemoteName + ":") -// require.NoError(t, err) -// defer func() { -// _ = flag.Set("cache-rps", "-1") -// rootFs, err = fs.NewFs(*RemoteName + ":") -// require.NoError(t, err) -// }() -// cfs, err := getCacheFs(rootFs) -// require.NoError(t, err) -// chunkSize := cfs.ChunkSize() -// -// // create some rand test data -// co := writeObjectRandomBytes(t, rootFs, (chunkSize*4 + chunkSize/2)) -// -// doStuff(t, 5, time.Second, func() { -// r, err := co.Open(&fs.SeekOption{Offset: chunkSize + 1}) -// require.NoError(t, err) -// -// buf := make([]byte, chunkSize) -// totalRead, err := io.ReadFull(r, buf) -// require.NoError(t, err) -// require.Equal(t, len(buf), totalRead) -// _ = r.Close() -// }) -//} - func TestInternalCacheWrites(t *testing.T) { reset(t) _ = flag.Set("cache-writes", "true") @@ -453,10 +312,10 @@ func TestInternalCacheWrites(t *testing.T) { // create some rand test data co := writeObjectRandomBytes(t, rootFs, (chunkSize*4 + chunkSize/2)) - expectedExpiry := time.Now().Add(metaAge) + expectedTs := time.Now() ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), co.Remote()), 0) require.NoError(t, err) - require.WithinDuration(t, expectedExpiry, ts, okDiff) + require.WithinDuration(t, expectedTs, ts, okDiff) // reset fs _ = flag.Set("cache-writes", "false") @@ -464,43 +323,44 @@ func TestInternalCacheWrites(t *testing.T) { require.NoError(t, err) } -func TestInternalExpiredChunkRemoved(t *testing.T) { - t.Skip("FIXME disabled because it is unreliable") - - if *SkipTimeouts { - t.Skip("--skip-waits set") - } - +func TestInternalMaxChunkSizeRespected(t *testing.T) { reset(t) + _ = flag.Set("cache-workers", "1") + rootFs, err := fs.NewFs(*RemoteName + ":") + require.NoError(t, err) cfs, err := getCacheFs(rootFs) require.NoError(t, err) chunkSize := cfs.ChunkSize() totalChunks := 20 // create some rand test data - co := writeObjectRandomBytes(t, cfs, (int64(totalChunks-1)*chunkSize + chunkSize/2)) - remote := co.Remote() - // cache all the chunks - _ = readDataFromObj(t, co, 0, co.Size(), false) - - // we wait for the cache to expire - t.Logf("Waiting %v for cache to expire\n", chunkAge.String()) - time.Sleep(chunkAge) - _, _ = cfs.List("") - time.Sleep(time.Second * 2) - - o, err := cfs.NewObject(remote) - require.NoError(t, err) - co2, ok := o.(*cache.Object) + o := writeObjectRandomBytes(t, cfs, (int64(totalChunks-1)*chunkSize + chunkSize/2)) + co, ok := o.(*cache.Object) require.True(t, ok) - require.False(t, boltDb.HasChunk(co2, 0)) + + for i := 0; i < 4; i++ { // read first 4 + _ = readDataFromObj(t, co, chunkSize*int64(i), chunkSize*int64(i+1), false) + } + cfs.CleanUpCache(true) + // the last 2 **must** be in the cache + require.True(t, boltDb.HasChunk(co, chunkSize*2)) + require.True(t, boltDb.HasChunk(co, chunkSize*3)) + + for i := 4; i < 6; i++ { // read next 2 + _ = readDataFromObj(t, co, chunkSize*int64(i), chunkSize*int64(i+1), false) + } + cfs.CleanUpCache(true) + // the last 2 **must** be in the cache + require.True(t, boltDb.HasChunk(co, chunkSize*4)) + require.True(t, boltDb.HasChunk(co, chunkSize*5)) + + // reset fs + _ = flag.Set("cache-workers", strconv.Itoa(workers)) + rootFs, err = fs.NewFs(*RemoteName + ":") + require.NoError(t, err) } func TestInternalExpiredEntriesRemoved(t *testing.T) { - if *SkipTimeouts { - t.Skip("--skip-waits set") - } - reset(t) cfs, err := getCacheFs(rootFs) require.NoError(t, err) diff --git a/cache/cache_test.go b/cache/cache_test.go index 24df0d0d2..5ea6be7e3 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -3,7 +3,7 @@ // Automatically generated - DO NOT EDIT // Regenerate with: make gen_tests -// +build !plan9 +// +build !plan9,go1.7 package cache_test diff --git a/cache/cache_unsupported.go b/cache/cache_unsupported.go index 05a39fa8b..05dcf992b 100644 --- a/cache/cache_unsupported.go +++ b/cache/cache_unsupported.go @@ -1,6 +1,6 @@ // Build for cache for unsupported platforms to stop go complaining // about "no buildable Go source files " -// +build plan9 +// +build plan9 !go1.7 package cache diff --git a/cache/directory.go b/cache/directory.go index e558f37bd..84ae21e50 100644 --- a/cache/directory.go +++ b/cache/directory.go @@ -1,4 +1,4 @@ -// +build !plan9 +// +build !plan9,go1.7 package cache diff --git a/cache/handle.go b/cache/handle.go index 0cc059a20..03b104078 100644 --- a/cache/handle.go +++ b/cache/handle.go @@ -1,4 +1,4 @@ -// +build !plan9 +// +build !plan9,go1.7 package cache @@ -15,21 +15,19 @@ import ( // Handle is managing the read/write/seek operations on an open handle type Handle struct { - cachedObject *Object - memory ChunkStorage - preloadQueue chan int64 - preloadOffset int64 - offset int64 - seenOffsets map[int64]bool - mu sync.Mutex + cachedObject *Object + memory ChunkStorage + preloadQueue chan int64 + preloadOffset int64 + offset int64 + seenOffsets map[int64]bool + mu sync.Mutex + confirmReading chan bool - ReadRetries int - TotalWorkers int - UseMemory bool - workers []*worker - chunkAge time.Duration - warmup bool - closed bool + UseMemory bool + workers []*worker + closed bool + reading bool } // NewObjectHandle returns a new Handle for an existing Object @@ -39,20 +37,15 @@ func NewObjectHandle(o *Object) *Handle { offset: 0, preloadOffset: -1, // -1 to trigger the first preload - ReadRetries: o.CacheFs.readRetries, - TotalWorkers: o.CacheFs.totalWorkers, - UseMemory: o.CacheFs.chunkMemory, - chunkAge: o.CacheFs.chunkAge, - warmup: o.CacheFs.InWarmUp(), + UseMemory: o.CacheFs.chunkMemory, + reading: false, } r.seenOffsets = make(map[int64]bool) - r.memory = NewMemory(o.CacheFs.chunkAge) - if o.CacheFs.InWarmUp() { - r.chunkAge = o.CacheFs.metaAge - } + r.memory = NewMemory(-1) // create a larger buffer to queue up requests - r.preloadQueue = make(chan int64, r.TotalWorkers*10) + r.preloadQueue = make(chan int64, o.CacheFs.totalWorkers*10) + r.confirmReading = make(chan bool) r.startReadWorkers() return r } @@ -72,34 +65,86 @@ func (r *Handle) String() string { return r.cachedObject.abs() } -// InWarmUp says if this handle is in warmup mode -func (r *Handle) InWarmUp() bool { - return r.warmup -} - // startReadWorkers will start the worker pool func (r *Handle) startReadWorkers() { if r.hasAtLeastOneWorker() { return } - for i := 0; i < r.TotalWorkers; i++ { - w := &worker{ - r: r, - ch: r.preloadQueue, - id: i, - } - go w.run() + r.scaleWorkers(r.cacheFs().totalWorkers) +} - r.workers = append(r.workers, w) +// scaleOutWorkers will increase the worker pool count by the provided amount +func (r *Handle) scaleWorkers(desired int) { + current := len(r.workers) + if current == desired { + return } + if current > desired { + // scale in gracefully + for i := 0; i < current-desired; i++ { + r.preloadQueue <- -1 + } + } else { + // scale out + for i := 0; i < desired-current; i++ { + w := &worker{ + r: r, + ch: r.preloadQueue, + id: current + i, + } + go w.run() + + r.workers = append(r.workers, w) + } + } + // ignore first scale out from 0 + if current != 0 { + fs.Infof(r, "scale workers to %v", desired) + } +} + +func (r *Handle) requestExternalConfirmation() { + // if there's no external confirmation available + // then we skip this step + if len(r.workers) >= r.cacheFs().totalMaxWorkers || + !r.cacheFs().plexConnector.isConnected() { + return + } + go r.cacheFs().plexConnector.isPlayingAsync(r.cachedObject, r.confirmReading) +} + +func (r *Handle) confirmExternalReading() { + // if we have a max value of workers + // or there's no external confirmation available + // then we skip this step + if len(r.workers) >= r.cacheFs().totalMaxWorkers || + !r.cacheFs().plexConnector.isConnected() { + return + } + + select { + case confirmed := <-r.confirmReading: + if !confirmed { + return + } + default: + return + } + + fs.Infof(r, "confirmed reading by external reader") + r.scaleWorkers(r.cacheFs().totalMaxWorkers) } // queueOffset will send an offset to the workers if it's different from the last one func (r *Handle) queueOffset(offset int64) { if offset != r.preloadOffset { + // clean past in-memory chunks + if r.UseMemory { + go r.memory.CleanChunksByNeed(offset) + } + go r.cacheFs().CleanUpCache(false) + r.confirmExternalReading() r.preloadOffset = offset - previousChunksCounter := 0 - maxOffset := r.cacheFs().chunkSize * int64(r.cacheFs().originalSettingWorkers()) // clear the past seen chunks // they will remain in our persistent storage but will be removed from transient @@ -107,25 +152,10 @@ func (r *Handle) queueOffset(offset int64) { for k := range r.seenOffsets { if k < offset { r.seenOffsets[k] = false - - // we count how many continuous chunks were seen before - if offset >= maxOffset && k >= offset-maxOffset { - previousChunksCounter++ - } } } - // if we read all the previous chunks that could have been preloaded - // we should then disable warm up setting for this handle - if r.warmup && previousChunksCounter >= r.cacheFs().originalSettingWorkers() { - r.TotalWorkers = r.cacheFs().originalSettingWorkers() - r.UseMemory = !r.cacheFs().originalSettingChunkNoMemory() - r.chunkAge = r.cacheFs().chunkAge - r.warmup = false - fs.Infof(r, "disabling warm up") - } - - for i := 0; i < r.TotalWorkers; i++ { + for i := 0; i < len(r.workers); i++ { o := r.preloadOffset + r.cacheFs().chunkSize*int64(i) if o < 0 || o >= r.cachedObject.Size() { continue @@ -137,6 +167,8 @@ func (r *Handle) queueOffset(offset int64) { r.seenOffsets[o] = true r.preloadQueue <- o } + + r.requestExternalConfirmation() } } @@ -157,12 +189,6 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) { var data []byte var err error - // we reached the end of the file - if chunkStart >= r.cachedObject.Size() { - fs.Debugf(r, "reached EOF %v", chunkStart) - return nil, io.EOF - } - // we calculate the modulus of the requested offset with the size of a chunk offset := chunkStart % r.cacheFs().chunkSize @@ -171,10 +197,7 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) { r.queueOffset(chunkStart) found := false - // delete old chunks from memory if r.UseMemory { - go r.memory.CleanChunksByNeed(chunkStart) - data, err = r.memory.GetChunk(r.cachedObject, chunkStart) if err == nil { found = true @@ -184,16 +207,15 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) { if !found { // we're gonna give the workers a chance to pickup the chunk // and retry a couple of times - for i := 0; i < r.ReadRetries; i++ { + for i := 0; i < r.cacheFs().readRetries*2; i++ { data, err = r.storage().GetChunk(r.cachedObject, chunkStart) - if err == nil { found = true break } fs.Debugf(r, "%v: chunk retry storage: %v", chunkStart, i) - time.Sleep(time.Second) + time.Sleep(time.Millisecond * 500) } } @@ -222,17 +244,24 @@ func (r *Handle) Read(p []byte) (n int, err error) { defer r.mu.Unlock() var buf []byte + // first reading + if !r.reading { + r.reading = true + r.requestExternalConfirmation() + } + // reached EOF + if r.offset >= r.cachedObject.Size() { + return 0, io.EOF + } currentOffset := r.offset buf, err = r.getChunk(currentOffset) if err != nil && len(buf) == 0 { + fs.Errorf(r, "(%v/%v) empty and error (%v) response", currentOffset, r.cachedObject.Size(), err) return 0, io.EOF } readSize := copy(p, buf) newOffset := currentOffset + int64(readSize) r.offset = newOffset - if r.offset >= r.cachedObject.Size() { - err = io.EOF - } return readSize, err } @@ -257,6 +286,7 @@ func (r *Handle) Close() error { } } + go r.cacheFs().CleanUpCache(false) fs.Debugf(r, "cache reader closed %v", r.offset) return nil } @@ -313,7 +343,7 @@ func (w *worker) reader(offset, end int64) (io.ReadCloser, error) { r := w.rc if w.rc == nil { r, err = w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) { - return w.r.cachedObject.Object.Open(&fs.RangeOption{Start: offset, End: end}, &fs.SeekOption{Offset: offset}) + return w.r.cachedObject.Object.Open(&fs.SeekOption{Offset: offset}, &fs.RangeOption{Start: offset, End: end}) }) if err != nil { return nil, err @@ -329,7 +359,7 @@ func (w *worker) reader(offset, end int64) (io.ReadCloser, error) { _ = w.rc.Close() return w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) { - r, err = w.r.cachedObject.Object.Open(&fs.RangeOption{Start: offset, End: end}, &fs.SeekOption{Offset: offset}) + r, err = w.r.cachedObject.Object.Open(&fs.SeekOption{Offset: offset}, &fs.RangeOption{Start: offset, End: end}) if err != nil { return nil, err } @@ -377,7 +407,7 @@ func (w *worker) run() { // add it in ram if it's in the persistent storage data, err = w.r.storage().GetChunk(w.r.cachedObject, chunkStart) if err == nil { - err = w.r.memory.AddChunk(w.r.cachedObject, data, chunkStart) + err = w.r.memory.AddChunk(w.r.cachedObject.abs(), data, chunkStart) if err != nil { fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err) } else { @@ -395,37 +425,56 @@ func (w *worker) run() { if chunkEnd > w.r.cachedObject.Size() { chunkEnd = w.r.cachedObject.Size() } - w.rc, err = w.reader(chunkStart, chunkEnd) - // we seem to be getting only errors so we abort + + w.download(chunkStart, chunkEnd, 0) + } +} + +func (w *worker) download(chunkStart, chunkEnd int64, retry int) { + var err error + var data []byte + + // stop retries + if retry >= w.r.cacheFs().readRetries { + return + } + // back-off between retries + if retry > 0 { + time.Sleep(time.Second * time.Duration(retry)) + } + + w.rc, err = w.reader(chunkStart, chunkEnd) + // we seem to be getting only errors so we abort + if err != nil { + fs.Errorf(w, "object open failed %v: %v", chunkStart, err) + w.download(chunkStart, chunkEnd, retry+1) + return + } + + data = make([]byte, chunkEnd-chunkStart) + sourceRead := 0 + sourceRead, err = io.ReadFull(w.rc, data) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + fs.Errorf(w, "failed to read chunk %v: %v", chunkStart, err) + w.download(chunkStart, chunkEnd, retry+1) + return + } + if err == io.ErrUnexpectedEOF { + fs.Debugf(w, "partial read chunk %v: %v", chunkStart, err) + } + data = data[:sourceRead] // reslice to remove extra garbage + fs.Debugf(w, "downloaded chunk %v", fs.SizeSuffix(chunkStart)) + + if w.r.UseMemory { + err = w.r.memory.AddChunk(w.r.cachedObject.abs(), data, chunkStart) if err != nil { - fs.Errorf(w, "object open failed %v: %v", chunkStart, err) - return + fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err) } + } - data = make([]byte, chunkEnd-chunkStart) - sourceRead := 0 - sourceRead, err = io.ReadFull(w.rc, data) - if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { - fs.Errorf(w, "failed to read chunk %v: %v", chunkStart, err) - return - } - if err == io.ErrUnexpectedEOF { - fs.Debugf(w, "partial read chunk %v: %v", chunkStart, err) - } - data = data[:sourceRead] // reslice to remove extra garbage - fs.Debugf(w, "downloaded chunk %v", fs.SizeSuffix(chunkStart)) - - if w.r.UseMemory { - err = w.r.memory.AddChunk(w.r.cachedObject, data, chunkStart) - if err != nil { - fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err) - } - } - - err = w.r.storage().AddChunkAhead(w.r.cachedObject.abs(), data, chunkStart, w.r.chunkAge) - if err != nil { - fs.Errorf(w, "failed caching chunk in storage %v: %v", chunkStart, err) - } + err = w.r.storage().AddChunk(w.r.cachedObject.abs(), data, chunkStart) + if err != nil { + fs.Errorf(w, "failed caching chunk in storage %v: %v", chunkStart, err) } } diff --git a/cache/object.go b/cache/object.go index 7a3621a63..028f72864 100644 --- a/cache/object.go +++ b/cache/object.go @@ -1,4 +1,4 @@ -// +build !plan9 +// +build !plan9,go1.7 package cache @@ -205,16 +205,18 @@ func (o *Object) Open(options ...fs.OpenOption) (io.ReadCloser, error) { if err := o.refreshFromSource(); err != nil { return nil, err } - o.CacheFs.CheckIfWarmupNeeded(o.Remote()) + var err error cacheReader := NewObjectHandle(o) for _, option := range options { switch x := option.(type) { case *fs.SeekOption: - _, err := cacheReader.Seek(x.Offset, os.SEEK_SET) - if err != nil { - return cacheReader, err - } + _, err = cacheReader.Seek(x.Offset, os.SEEK_SET) + case *fs.RangeOption: + _, err = cacheReader.Seek(x.Start, os.SEEK_SET) + } + if err != nil { + return cacheReader, err } } diff --git a/cache/plex.go b/cache/plex.go new file mode 100644 index 000000000..233a85503 --- /dev/null +++ b/cache/plex.go @@ -0,0 +1,229 @@ +// +build !plan9,go1.7 + +package cache + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "github.com/ncw/rclone/fs" +) + +const ( + // defPlexLoginURL is the default URL for Plex login + defPlexLoginURL = "https://plex.tv/users/sign_in.json" +) + +// plexConnector is managing the cache integration with Plex +type plexConnector struct { + url *url.URL + token string + f *Fs +} + +// newPlexConnector connects to a Plex server and generates a token +func newPlexConnector(f *Fs, plexURL, username, password string) (*plexConnector, error) { + u, err := url.ParseRequestURI(strings.TrimRight(plexURL, "/")) + if err != nil { + return nil, err + } + + pc := &plexConnector{ + f: f, + url: u, + token: "", + } + + err = pc.authenticate(username, password) + if err != nil { + return nil, err + } + + return pc, nil +} + +// newPlexConnector connects to a Plex server and generates a token +func newPlexConnectorWithToken(f *Fs, plexURL, token string) (*plexConnector, error) { + u, err := url.ParseRequestURI(strings.TrimRight(plexURL, "/")) + if err != nil { + return nil, err + } + + pc := &plexConnector{ + f: f, + url: u, + token: token, + } + + return pc, nil +} + +// fillDefaultHeaders will add common headers to requests +func (p *plexConnector) fillDefaultHeaders(req *http.Request) { + req.Header.Add("X-Plex-Client-Identifier", fmt.Sprintf("rclone (%v)", p.f.String())) + req.Header.Add("X-Plex-Product", fmt.Sprintf("rclone (%v)", p.f.Name())) + req.Header.Add("X-Plex-Version", fs.Version) + req.Header.Add("Accept", "application/json") + if p.token != "" { + req.Header.Add("X-Plex-Token", p.token) + } +} + +// authenticate will generate a token based on a username/password +func (p *plexConnector) authenticate(username, password string) error { + form := url.Values{} + form.Set("user[login]", username) + form.Add("user[password]", password) + req, err := http.NewRequest("POST", defPlexLoginURL, strings.NewReader(form.Encode())) + if err != nil { + return err + } + p.fillDefaultHeaders(req) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + var data map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&data) + if err != nil { + return fmt.Errorf("failed to obtain token: %v", err) + } + tokenGen, ok := get(data, "user", "authToken") + if !ok { + return fmt.Errorf("failed to obtain token: %v", data) + } + token, ok := tokenGen.(string) + if !ok { + return fmt.Errorf("failed to obtain token: %v", data) + } + p.token = token + + return nil +} + +// isConnected checks if this Plex +func (p *plexConnector) isConnected() bool { + return p.token != "" +} + +func (p *plexConnector) isPlaying(co *Object) bool { + isPlaying := false + req, err := http.NewRequest("GET", fmt.Sprintf("%s/status/sessions", p.url.String()), nil) + if err != nil { + return false + } + p.fillDefaultHeaders(req) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return false + } + var data map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&data) + if err != nil { + return false + } + sizeGen, ok := get(data, "MediaContainer", "size") + if !ok { + return false + } + size, ok := sizeGen.(float64) + if !ok || size < float64(1) { + return false + } + videosGen, ok := get(data, "MediaContainer", "Video") + if !ok { + fs.Errorf("plex", "empty videos: %v", data) + return false + } + videos, ok := videosGen.([]interface{}) + if !ok || len(videos) < 1 { + fs.Errorf("plex", "empty videos: %v", data) + return false + } + for _, v := range videos { + keyGen, ok := get(v, "key") + if !ok { + fs.Errorf("plex", "failed to find: key") + continue + } + key, ok := keyGen.(string) + if !ok { + fs.Errorf("plex", "failed to understand: key") + continue + } + req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", p.url.String(), key), nil) + if err != nil { + return false + } + p.fillDefaultHeaders(req) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return false + } + var data map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&data) + if err != nil { + return false + } + + remote := co.Remote() + if cr, yes := co.CacheFs.isWrappedByCrypt(); yes { + remote, err = cr.DecryptFileName(co.Remote()) + if err != nil { + fs.Errorf("plex", "can not decrypt wrapped file: %v", err) + continue + } + } + fpGen, ok := get(data, "MediaContainer", "Metadata", 0, "Media", 0, "Part", 0, "file") + if !ok { + fs.Errorf("plex", "failed to understand: %v", data) + continue + } + fp, ok := fpGen.(string) + if !ok { + fs.Errorf("plex", "failed to understand: %v", fp) + continue + } + if strings.Contains(fp, remote) { + isPlaying = true + break + } + } + + return isPlaying +} + +func (p *plexConnector) isPlayingAsync(co *Object, response chan bool) { + time.Sleep(time.Second) // FIXME random guess here + res := p.isPlaying(co) + response <- res +} + +// adapted from: https://stackoverflow.com/a/28878037 (credit) +func get(m interface{}, path ...interface{}) (interface{}, bool) { + for _, p := range path { + switch idx := p.(type) { + case string: + if mm, ok := m.(map[string]interface{}); ok { + if val, found := mm[idx]; found { + m = val + continue + } + } + return nil, false + case int: + if mm, ok := m.([]interface{}); ok { + if len(mm) > idx { + m = mm[idx] + continue + } + } + return nil, false + } + } + return m, true +} diff --git a/cache/storage_memory.go b/cache/storage_memory.go index e752c3914..ddb7825f3 100644 --- a/cache/storage_memory.go +++ b/cache/storage_memory.go @@ -1,4 +1,4 @@ -// +build !plan9 +// +build !plan9,go1.7 package cache @@ -58,8 +58,8 @@ func (m *Memory) GetChunk(cachedObject *Object, offset int64) ([]byte, error) { } // AddChunk adds a new chunk of a cached object -func (m *Memory) AddChunk(cachedObject *Object, data []byte, offset int64) error { - return m.AddChunkAhead(cachedObject.abs(), data, offset, time.Second) +func (m *Memory) AddChunk(fp string, data []byte, offset int64) error { + return m.AddChunkAhead(fp, data, offset, time.Second) } // AddChunkAhead adds a new chunk of a cached object @@ -93,3 +93,8 @@ func (m *Memory) CleanChunksByNeed(offset int64) { } } } + +// CleanChunksBySize will cleanup chunks after the total size passes a certain point +func (m *Memory) CleanChunksBySize(maxSize int64) { + // NOOP +} diff --git a/cache/storage_persistent.go b/cache/storage_persistent.go index 7e2c7cfe5..87678c438 100644 --- a/cache/storage_persistent.go +++ b/cache/storage_persistent.go @@ -1,4 +1,4 @@ -// +build !plan9 +// +build !plan9,go1.7 package cache @@ -29,11 +29,16 @@ const ( DataTsBucket = "dataTs" ) +// Features flags for this storage type +type Features struct { + PurgeDb bool // purge the db before starting +} + var boltMap = make(map[string]*Persistent) var boltMapMx sync.Mutex // GetPersistent returns a single instance for the specific store -func GetPersistent(dbPath string, refreshDb bool) (*Persistent, error) { +func GetPersistent(dbPath string, f *Features) (*Persistent, error) { // write lock to create one boltMapMx.Lock() defer boltMapMx.Unlock() @@ -41,7 +46,7 @@ func GetPersistent(dbPath string, refreshDb bool) (*Persistent, error) { return b, nil } - bb, err := newPersistent(dbPath, refreshDb) + bb, err := newPersistent(dbPath, f) if err != nil { return nil, err } @@ -49,6 +54,12 @@ func GetPersistent(dbPath string, refreshDb bool) (*Persistent, error) { return boltMap[dbPath], nil } +type chunkInfo struct { + Path string + Offset int64 + Size int64 +} + // Persistent is a wrapper of persistent storage for a bolt.DB file type Persistent struct { Storage @@ -57,18 +68,20 @@ type Persistent struct { dataPath string db *bolt.DB cleanupMux sync.Mutex + features *Features } // newPersistent builds a new wrapper and connects to the bolt.DB file -func newPersistent(dbPath string, refreshDb bool) (*Persistent, error) { +func newPersistent(dbPath string, f *Features) (*Persistent, error) { dataPath := strings.TrimSuffix(dbPath, filepath.Ext(dbPath)) b := &Persistent{ dbPath: dbPath, dataPath: dataPath, + features: f, } - err := b.Connect(refreshDb) + err := b.Connect() if err != nil { fs.Errorf(dbPath, "Error opening storage cache. Is there another rclone running on the same remote? %v", err) return nil, err @@ -84,11 +97,11 @@ func (b *Persistent) String() string { // Connect creates a connection to the configured file // refreshDb will delete the file before to create an empty DB if it's set to true -func (b *Persistent) Connect(refreshDb bool) error { +func (b *Persistent) Connect() error { var db *bolt.DB var err error - if refreshDb { + if b.features.PurgeDb { err := os.Remove(b.dbPath) if err != nil { fs.Errorf(b, "failed to remove cache file: %v", err) @@ -146,37 +159,6 @@ func (b *Persistent) getBucket(dir string, createIfMissing bool, tx *bolt.Tx) *b return bucket } -// updateChunkTs is a convenience method to update a chunk timestamp to mark that it was used recently -func (b *Persistent) updateChunkTs(tx *bolt.Tx, path string, offset int64, t time.Duration) { - tsBucket := tx.Bucket([]byte(DataTsBucket)) - tsVal := path + "-" + strconv.FormatInt(offset, 10) - ts := time.Now().Add(t) - found := false - - // delete previous timestamps for the same object - c := tsBucket.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - if bytes.Equal(v, []byte(tsVal)) { - if tsInCache := time.Unix(0, btoi(k)); tsInCache.After(ts) && !found { - found = true - continue - } - err := c.Delete() - if err != nil { - fs.Debugf(path, "failed to clean chunk: %v", err) - } - } - } - if found { - return - } - - err := tsBucket.Put(itob(ts.UnixNano()), []byte(tsVal)) - if err != nil { - fs.Debugf(path, "failed to timestamp chunk: %v", err) - } -} - // updateRootTs is a convenience method to update an object timestamp to mark that it was used recently func (b *Persistent) updateRootTs(tx *bolt.Tx, path string, t time.Duration) { tsBucket := tx.Bucket([]byte(RootTsBucket)) @@ -433,7 +415,6 @@ func (b *Persistent) HasChunk(cachedObject *Object, offset int64) bool { // GetChunk will retrieve a single chunk which belongs to a cached object or an error if it doesn't find it func (b *Persistent) GetChunk(cachedObject *Object, offset int64) ([]byte, error) { - p := cachedObject.abs() var data []byte fp := path.Join(b.dataPath, cachedObject.abs(), strconv.FormatInt(offset, 10)) @@ -442,31 +423,11 @@ func (b *Persistent) GetChunk(cachedObject *Object, offset int64) ([]byte, error return nil, err } - d := cachedObject.CacheFs.chunkAge - if cachedObject.CacheFs.InWarmUp() { - d = cachedObject.CacheFs.metaAge - } - - err = b.db.Update(func(tx *bolt.Tx) error { - b.updateChunkTs(tx, p, offset, d) - return nil - }) - return data, err } // AddChunk adds a new chunk of a cached object -func (b *Persistent) AddChunk(cachedObject *Object, data []byte, offset int64) error { - t := cachedObject.CacheFs.chunkAge - if cachedObject.CacheFs.InWarmUp() { - t = cachedObject.CacheFs.metaAge - } - return b.AddChunkAhead(cachedObject.abs(), data, offset, t) -} - -// AddChunkAhead adds a new chunk before caching an Object for it -// see fs.cacheWrites -func (b *Persistent) AddChunkAhead(fp string, data []byte, offset int64, t time.Duration) error { +func (b *Persistent) AddChunk(fp string, data []byte, offset int64) error { _ = os.MkdirAll(path.Join(b.dataPath, fp), os.ModePerm) filePath := path.Join(b.dataPath, fp, strconv.FormatInt(offset, 10)) @@ -476,47 +437,101 @@ func (b *Persistent) AddChunkAhead(fp string, data []byte, offset int64, t time. } return b.db.Update(func(tx *bolt.Tx) error { - b.updateChunkTs(tx, fp, offset, t) + tsBucket := tx.Bucket([]byte(DataTsBucket)) + ts := time.Now() + found := false + + // delete (older) timestamps for the same object + c := tsBucket.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + var ci chunkInfo + err = json.Unmarshal(v, &ci) + if err != nil { + continue + } + if ci.Path == fp && ci.Offset == offset { + if tsInCache := time.Unix(0, btoi(k)); tsInCache.After(ts) && !found { + found = true + continue + } + err := c.Delete() + if err != nil { + fs.Debugf(fp, "failed to clean chunk: %v", err) + } + } + } + // don't overwrite if a newer one is already there + if found { + return nil + } + enc, err := json.Marshal(chunkInfo{Path: fp, Offset: offset, Size: int64(len(data))}) + if err != nil { + fs.Debugf(fp, "failed to timestamp chunk: %v", err) + } + err = tsBucket.Put(itob(ts.UnixNano()), enc) + if err != nil { + fs.Debugf(fp, "failed to timestamp chunk: %v", err) + } return nil }) } // CleanChunksByAge will cleanup on a cron basis func (b *Persistent) CleanChunksByAge(chunkAge time.Duration) { + // NOOP +} + +// CleanChunksByNeed is a noop for this implementation +func (b *Persistent) CleanChunksByNeed(offset int64) { + // noop: we want to clean a Bolt DB by time only +} + +// CleanChunksBySize will cleanup chunks after the total size passes a certain point +func (b *Persistent) CleanChunksBySize(maxSize int64) { b.cleanupMux.Lock() defer b.cleanupMux.Unlock() var cntChunks int err := b.db.Update(func(tx *bolt.Tx) error { - min := itob(0) - max := itob(time.Now().UnixNano()) - dataTsBucket := tx.Bucket([]byte(DataTsBucket)) if dataTsBucket == nil { return errors.Errorf("Couldn't open (%v) bucket", DataTsBucket) } // iterate through ts c := dataTsBucket.Cursor() - for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() { - if v == nil { - continue - } - // split to get (abs path - offset) - val := string(v[:]) - sepIdx := strings.LastIndex(val, "-") - pathCmp := val[:sepIdx] - offsetCmp := val[sepIdx+1:] - - // delete this ts entry - err := c.Delete() + totalSize := int64(0) + for k, v := c.First(); k != nil; k, v = c.Next() { + var ci chunkInfo + err := json.Unmarshal(v, &ci) if err != nil { - fs.Errorf(pathCmp, "failed deleting chunk ts during cleanup (%v): %v", val, err) continue } - err = os.Remove(path.Join(b.dataPath, pathCmp, offsetCmp)) - if err == nil { - cntChunks = cntChunks + 1 + totalSize += ci.Size + } + + if totalSize > maxSize { + needToClean := totalSize - maxSize + for k, v := c.First(); k != nil; k, v = c.Next() { + var ci chunkInfo + err := json.Unmarshal(v, &ci) + if err != nil { + continue + } + // delete this ts entry + err = c.Delete() + if err != nil { + fs.Errorf(ci.Path, "failed deleting chunk ts during cleanup (%v): %v", ci.Offset, err) + continue + } + err = os.Remove(path.Join(b.dataPath, ci.Path, strconv.FormatInt(ci.Offset, 10))) + if err == nil { + cntChunks++ + needToClean -= ci.Size + if needToClean <= 0 { + break + } + } } } fs.Infof("cache", "deleted (%v) chunks", cntChunks) @@ -576,11 +591,6 @@ func (b *Persistent) CleanEntriesByAge(entryAge time.Duration) { } } -// CleanChunksByNeed is a noop for this implementation -func (b *Persistent) CleanChunksByNeed(offset int64) { - // noop: we want to clean a Bolt DB by time only -} - // Stats returns a go map with the stats key values func (b *Persistent) Stats() (map[string]map[string]interface{}, error) { r := make(map[string]map[string]interface{}) @@ -590,6 +600,7 @@ func (b *Persistent) Stats() (map[string]map[string]interface{}, error) { r["data"]["newest-ts"] = time.Now() r["data"]["newest-file"] = "" r["data"]["total-chunks"] = 0 + r["data"]["total-size"] = int64(0) r["files"] = make(map[string]interface{}) r["files"]["oldest-ts"] = time.Now() r["files"]["oldest-name"] = "" @@ -612,19 +623,32 @@ func (b *Persistent) Stats() (map[string]map[string]interface{}, error) { r["files"]["total-files"] = totalFiles c := dataTsBucket.Cursor() + + totalChunks := 0 + totalSize := int64(0) + for k, v := c.First(); k != nil; k, v = c.Next() { + var ci chunkInfo + err := json.Unmarshal(v, &ci) + if err != nil { + continue + } + totalChunks++ + totalSize += ci.Size + } + r["data"]["total-chunks"] = totalChunks + r["data"]["total-size"] = totalSize + if k, v := c.First(); k != nil { - // split to get (abs path - offset) - val := string(v[:]) - p := val[:strings.LastIndex(val, "-")] + var ci chunkInfo + _ = json.Unmarshal(v, &ci) r["data"]["oldest-ts"] = time.Unix(0, btoi(k)) - r["data"]["oldest-file"] = p + r["data"]["oldest-file"] = ci.Path } if k, v := c.Last(); k != nil { - // split to get (abs path - offset) - val := string(v[:]) - p := val[:strings.LastIndex(val, "-")] + var ci chunkInfo + _ = json.Unmarshal(v, &ci) r["data"]["newest-ts"] = time.Unix(0, btoi(k)) - r["data"]["newest-file"] = p + r["data"]["newest-file"] = ci.Path } c = rootTsBucket.Cursor() @@ -671,13 +695,17 @@ func (b *Persistent) Purge() { // GetChunkTs retrieves the current timestamp of this chunk func (b *Persistent) GetChunkTs(path string, offset int64) (time.Time, error) { var t time.Time - tsVal := path + "-" + strconv.FormatInt(offset, 10) err := b.db.View(func(tx *bolt.Tx) error { tsBucket := tx.Bucket([]byte(DataTsBucket)) c := tsBucket.Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { - if bytes.Equal(v, []byte(tsVal)) { + var ci chunkInfo + err := json.Unmarshal(v, &ci) + if err != nil { + continue + } + if ci.Path == path && ci.Offset == offset { t = time.Unix(0, btoi(k)) return nil } diff --git a/cmd/cachestats/cachestats.go b/cmd/cachestats/cachestats.go index 58039ec64..4fa2cc2d6 100644 --- a/cmd/cachestats/cachestats.go +++ b/cmd/cachestats/cachestats.go @@ -1,4 +1,4 @@ -// +build !plan9 +// +build !plan9,go1.7 package cachestats diff --git a/cmd/cachestats/cachestats_unsupported.go b/cmd/cachestats/cachestats_unsupported.go index aded32895..92345203d 100644 --- a/cmd/cachestats/cachestats_unsupported.go +++ b/cmd/cachestats/cachestats_unsupported.go @@ -1,6 +1,6 @@ // Build for cache for unsupported platforms to stop go complaining // about "no buildable Go source files " -// +build plan9 +// +build plan9 !go1.7 package cachestats diff --git a/docs/content/cache.md b/docs/content/cache.md index 1c7d56fdd..2e215cd95 100644 --- a/docs/content/cache.md +++ b/docs/content/cache.md @@ -27,7 +27,7 @@ c) Copy remote s) Set configuration password q) Quit config n/r/c/s/q> n -name> remote +name> test-cache Type of storage to configure. Choose a number from below, or type in your own value ... @@ -39,6 +39,19 @@ Remote to cache. Normally should contain a ':' and a path, eg "myremote:path/to/dir", "myremote:bucket" or maybe "myremote:" (not recommended). remote> local:/test +Optional: The URL of the Plex server +plex_url> http://127.0.0.1:32400 +Optional: The username of the Plex user +plex_username> dummyusername +Optional: The password of the Plex user +y) Yes type in my own password +g) Generate random password +n) No leave this optional password blank +y/g/n> y +Enter the password: +password: +Confirm the password: +password: The size of a chunk. Lower value good for slow connections but can affect seamless reading. Default: 5M Choose a number from below, or type in your own value @@ -60,36 +73,26 @@ Choose a number from below, or type in your own value 3 / 24 hours \ "48h" info_age> 2 -How much time should a chunk (file data) be stored in cache. -Accepted units are: "s", "m", "h". -Default: 3h +The maximum size of stored chunks. When the storage grows beyond this size, the oldest chunks will be deleted. +Default: 10G Choose a number from below, or type in your own value - 1 / 30 seconds - \ "30s" - 2 / 1 minute - \ "1m" - 3 / 1 hour and 30 minutes - \ "1h30m" -chunk_age> 3h -How much time should data be cached during warm up. -Accepted units are: "s", "m", "h". -Default: 24h -Choose a number from below, or type in your own value - 1 / 3 hours - \ "3h" - 2 / 6 hours - \ "6h" - 3 / 24 hours - \ "24h" -warmup_age> 3 + 1 / 500 MB + \ "500M" + 2 / 1 GB + \ "1G" + 3 / 10 GB + \ "10G" +chunk_total_size> 3 Remote config -------------------- [test-cache] remote = local:/test +plex_url = http://127.0.0.1:32400 +plex_username = dummyusername +plex_password = *** ENCRYPTED *** chunk_size = 5M -info_age = 24h -chunk_age = 3h -warmup_age = 24h +info_age = 48h +chunk_total_size = 10G ``` You can then use it like this, @@ -126,30 +129,31 @@ and cloud providers, the cache remote can split multiple requests to the cloud provider for smaller file chunks and combines them together locally where they can be available almost immediately before the reader usually needs them. + This is similar to buffering when media files are played online. Rclone will stay around the current marker but always try its best to stay ahead and prepare the data before. -#### Warmup mode #### +#### Plex Integration #### -A negative side of running multiple requests on the cloud provider is -that you can easily reach a limit on how many requests or how much data -you can download from a cloud provider in a window of time. -For this reason, a warmup mode is a state where `cache` changes its settings -to talk less with the cloud provider. +There is a direct integration with Plex which allows cache to detect during reading +if the file is in playback or not. This helps cache to adapt how it queries +the cloud provider depending on what is needed for. -To prevent a ban or a similar action from the cloud provider, `cache` will -keep track of all the open files and during intensive times when it passes -a configured threshold, it will change its settings to a warmup mode. +Scans will have a minimum amount of workers (1) while in a confirmed playback cache +will deploy the configured number of workers. -It can also be disabled during single file streaming if `cache` sees that we're -reading the file in sequence and can go load its parts in advance. +This integration opens the doorway to additional performance improvements +which will be explored in the near future. + +**Note:** If Plex options are not configured, `cache` will function with its +configured options without adapting any of its settings. + +How to enable? Run `rclone config` and add all the Plex options (endpoint, username +and password) in your remote and it will be automatically enabled. Affected settings: -- `cache-chunk-no-memory`: _disabled_ -- `cache-workers`: _1_ -- file chunks will now be cached using `cache-warm-up-age` as a duration instead of the -regular `cache-chunk-age` +- `cache-workers`: _Configured value_ during confirmed playback or _1_ all the other times ### Known issues ### @@ -194,6 +198,22 @@ connections. **Default**: 5M +#### --cache-total-chunk-size=SIZE #### + +The total size that the chunks can take up on the local disk. If `cache` +exceeds this value then it will start to the delete the oldest chunks until +it goes under this value. + +**Default**: 10G + +#### --cache-chunk-clean-interval=DURATION #### + +How often should `cache` perform cleanups of the chunk storage. The default value +should be ok for most people. If you find that `cache` goes over `cache-total-chunk-size` +too often then try to lower this value to force it to perform cleanups more often. + +**Default**: 1m + #### --cache-info-age=DURATION #### How long to keep file structure information (directory listings, file size, @@ -204,25 +224,6 @@ this value very large as the cache store will also be updated in real time. **Default**: 6h -#### --cache-chunk-age=DURATION #### - -How long to keep file chunks (partial data) locally. - -Longer durations will result in larger cache stores as data will be cleared -less often. - -**Default**: 3h - -#### --cache-warm-up-age=DURATION #### - -How long to keep file chunks (partial data) locally during warmup times. - -If `cache` goes through intensive read times when it is scanned for information -then this setting will allow you to customize higher storage times for that -data. Otherwise, it's safe to keep the same value as `cache-chunk-age`. - -**Default**: 3h - #### --cache-read-retries=RETRIES #### How many times to retry a read from a cache storage. @@ -235,7 +236,7 @@ able to provide file data anymore. For really slow connections, increase this to a point where the stream is able to provide data but your experience will be very stuttering. -**Default**: 3 +**Default**: 10 #### --cache-workers=WORKERS #### @@ -247,6 +248,9 @@ This impacts several aspects like the cloud provider API limits, more stress on the hardware that rclone runs on but it also means that streams will be more fluid and data will be available much more faster to readers. +**Note**: If the optional Plex integration is enabled then this setting +will adapt to the type of reading performed and the value specified here will be used +as a maximum number of workers to use. **Default**: 4 #### --cache-chunk-no-memory #### @@ -268,9 +272,7 @@ available on the local machine. #### --cache-rps=NUMBER #### -Some of the rclone remotes that `cache` will wrap have back-off or limits -in place to not reach cloud provider limits. This is similar to that. -It places a hard limit on the number of requests per second that `cache` +This setting places a hard limit on the number of requests per second that `cache` will be doing to the cloud provider remote and try to respect that value by setting waits between reads. @@ -278,27 +280,13 @@ If you find that you're getting banned or limited on the cloud provider through cache and know that a smaller number of requests per second will allow you to work with it then you can use this setting for that. -A good balance of all the other settings and warmup times should make this +A good balance of all the other settings should make this setting useless but it is available to set for more special cases. **NOTE**: This will limit the number of requests during streams but other API calls to the cloud provider like directory listings will still pass. -**Default**: 4 - -#### --cache-warm-up-rps=RATE/SECONDS #### - -This setting allows `cache` to change its settings for warmup mode or revert -back from it. - -`cache` keeps track of all open files and when there are `RATE` files open -during `SECONDS` window of time reached it will activate warmup and change -its settings as explained in the `Warmup mode` section. - -When the number of files being open goes under `RATE` in the same amount -of time, `cache` will disable this mode. - -**Default**: 3/20 +**Default**: disabled #### --cache-writes #### diff --git a/fstest/fstests/gen_tests.go b/fstest/fstests/gen_tests.go index 61482dec8..04e2fc051 100644 --- a/fstest/fstests/gen_tests.go +++ b/fstest/fstests/gen_tests.go @@ -166,6 +166,6 @@ func main() { generateTestProgram(t, fns, "AzureBlob", buildConstraint("go1.7")) generateTestProgram(t, fns, "Pcloud") generateTestProgram(t, fns, "Webdav") - generateTestProgram(t, fns, "Cache", buildConstraint("!plan9")) + generateTestProgram(t, fns, "Cache", buildConstraint("!plan9,go1.7")) log.Printf("Done") }