cache: plex integration, refactor chunk storage and worker retries (#1899)

This commit is contained in:
remusb 2017-12-09 23:54:26 +02:00 committed by GitHub
parent b05e472d2e
commit b48b537325
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 781 additions and 652 deletions

326
cache/cache.go vendored
View File

@ -1,4 +1,4 @@
// +build !plan9
// +build !plan9,go1.7
package cache
@ -7,7 +7,6 @@ import (
"io"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -17,6 +16,7 @@ import (
"os/signal"
"syscall"
"github.com/ncw/rclone/crypt"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
"golang.org/x/net/context"
@ -26,22 +26,20 @@ import (
const (
// DefCacheChunkSize is the default value for chunk size
DefCacheChunkSize = "5M"
// DefCacheTotalChunkSize is the default value for the maximum size of stored chunks
DefCacheTotalChunkSize = "10G"
// DefCacheChunkCleanInterval is the interval at which chunks are cleaned
DefCacheChunkCleanInterval = "1m"
// DefCacheInfoAge is the default value for object info age
DefCacheInfoAge = "6h"
// DefCacheChunkAge is the default value for chunk age duration
DefCacheChunkAge = "3h"
// DefCacheMetaAge is the default value for chunk age duration
DefCacheMetaAge = "3h"
// DefCacheReadRetries is the default value for read retries
DefCacheReadRetries = 3
DefCacheReadRetries = 10
// DefCacheTotalWorkers is how many workers run in parallel to download chunks
DefCacheTotalWorkers = 4
// DefCacheChunkNoMemory will enable or disable in-memory storage for chunks
DefCacheChunkNoMemory = false
// DefCacheRps limits the number of requests per second to the source FS
DefCacheRps = -1
// DefWarmUpRatePerSeconds will apply a special config for warming up the cache
DefWarmUpRatePerSeconds = "3/20"
// DefCacheWrites will cache file data on writes through the cache
DefCacheWrites = false
)
@ -49,18 +47,17 @@ const (
// Globals
var (
// Flags
cacheDbPath = fs.StringP("cache-db-path", "", filepath.Join(fs.CacheDir, "cache-backend"), "Directory to cache DB")
cacheDbPurge = fs.BoolP("cache-db-purge", "", false, "Purge the cache DB before")
cacheChunkSize = fs.StringP("cache-chunk-size", "", DefCacheChunkSize, "The size of a chunk")
cacheInfoAge = fs.StringP("cache-info-age", "", DefCacheInfoAge, "How much time should object info be stored in cache")
cacheChunkAge = fs.StringP("cache-chunk-age", "", DefCacheChunkAge, "How much time should a chunk be in cache before cleanup")
cacheMetaAge = fs.StringP("cache-warm-up-age", "", DefCacheMetaAge, "How much time should data be cached during warm up")
cacheReadRetries = fs.IntP("cache-read-retries", "", DefCacheReadRetries, "How many times to retry a read from a cache storage")
cacheTotalWorkers = fs.IntP("cache-workers", "", DefCacheTotalWorkers, "How many workers should run in parallel to download chunks")
cacheChunkNoMemory = fs.BoolP("cache-chunk-no-memory", "", DefCacheChunkNoMemory, "Disable the in-memory cache for storing chunks during streaming")
cacheRps = fs.IntP("cache-rps", "", int(DefCacheRps), "Limits the number of requests per second to the source FS. -1 disables the rate limiter")
cacheWarmUp = fs.StringP("cache-warm-up-rps", "", DefWarmUpRatePerSeconds, "Format is X/Y = how many X opens per Y seconds should trigger the warm up mode. See the docs")
cacheStoreWrites = fs.BoolP("cache-writes", "", DefCacheWrites, "Will cache file data on writes through the FS")
cacheDbPath = fs.StringP("cache-db-path", "", filepath.Join(fs.CacheDir, "cache-backend"), "Directory to cache DB")
cacheDbPurge = fs.BoolP("cache-db-purge", "", false, "Purge the cache DB before")
cacheChunkSize = fs.StringP("cache-chunk-size", "", DefCacheChunkSize, "The size of a chunk")
cacheTotalChunkSize = fs.StringP("cache-total-chunk-size", "", DefCacheTotalChunkSize, "The total size which the chunks can take up from the disk")
cacheChunkCleanInterval = fs.StringP("cache-chunk-clean-interval", "", DefCacheChunkCleanInterval, "Interval at which chunk cleanup runs")
cacheInfoAge = fs.StringP("cache-info-age", "", DefCacheInfoAge, "How much time should object info be stored in cache")
cacheReadRetries = fs.IntP("cache-read-retries", "", DefCacheReadRetries, "How many times to retry a read from a cache storage")
cacheTotalWorkers = fs.IntP("cache-workers", "", DefCacheTotalWorkers, "How many workers should run in parallel to download chunks")
cacheChunkNoMemory = fs.BoolP("cache-chunk-no-memory", "", DefCacheChunkNoMemory, "Disable the in-memory cache for storing chunks during streaming")
cacheRps = fs.IntP("cache-rps", "", int(DefCacheRps), "Limits the number of requests per second to the source FS. -1 disables the rate limiter")
cacheStoreWrites = fs.BoolP("cache-writes", "", DefCacheWrites, "Will cache file data on writes through the FS")
)
// Register with Fs
@ -72,6 +69,19 @@ func init() {
Options: []fs.Option{{
Name: "remote",
Help: "Remote to cache.\nNormally should contain a ':' and a path, eg \"myremote:path/to/dir\",\n\"myremote:bucket\" or maybe \"myremote:\" (not recommended).",
}, {
Name: "plex_url",
Help: "Optional: The URL of the Plex server",
Optional: true,
}, {
Name: "plex_username",
Help: "Optional: The username of the Plex user",
Optional: true,
}, {
Name: "plex_password",
Help: "Optional: The password of the Plex user",
IsPassword: true,
Optional: true,
}, {
Name: "chunk_size",
Help: "The size of a chunk. Lower value good for slow connections but can affect seamless reading. \nDefault: " + DefCacheChunkSize,
@ -105,34 +115,18 @@ func init() {
},
Optional: true,
}, {
Name: "chunk_age",
Help: "How much time should a chunk (file data) be stored in cache. \nAccepted units are: \"s\", \"m\", \"h\".\nDefault: " + DefCacheChunkAge,
Name: "chunk_total_size",
Help: "The maximum size of stored chunks. When the storage grows beyond this size, the oldest chunks will be deleted. \nDefault: " + DefCacheTotalChunkSize,
Examples: []fs.OptionExample{
{
Value: "30s",
Help: "30 seconds",
Value: "500M",
Help: "500 MB",
}, {
Value: "1m",
Help: "1 minute",
Value: "1G",
Help: "1 GB",
}, {
Value: "1h30m",
Help: "1 hour and 30 minutes",
},
},
Optional: true,
}, {
Name: "warmup_age",
Help: "How much time should data be cached during warm up. \nAccepted units are: \"s\", \"m\", \"h\".\nDefault: " + DefCacheMetaAge,
Examples: []fs.OptionExample{
{
Value: "3h",
Help: "3 hours",
}, {
Value: "6h",
Help: "6 hours",
}, {
Value: "24h",
Help: "24 hours",
Value: "10G",
Help: "10 GB",
},
},
Optional: true,
@ -149,10 +143,7 @@ type ChunkStorage interface {
GetChunk(cachedObject *Object, offset int64) ([]byte, error)
// add a new chunk
AddChunk(cachedObject *Object, data []byte, offset int64) error
// AddChunkAhead adds a new chunk before caching an Object for it
AddChunkAhead(fp string, data []byte, offset int64, t time.Duration) error
AddChunk(fp string, data []byte, offset int64) error
// if the storage can cleanup on a cron basis
// otherwise it can do a noop operation
@ -161,6 +152,10 @@ type ChunkStorage interface {
// if the storage can cleanup chunks after we no longer need them
// otherwise it can do a noop operation
CleanChunksByNeed(offset int64)
// if the storage can cleanup chunks after the total size passes a certain point
// otherwise it can do a noop operation
CleanChunksBySize(maxSize int64)
}
// Storage is a storage type (Bolt) which needs to support both chunk and file based operations
@ -213,26 +208,21 @@ type Fs struct {
features *fs.Features // optional features
cache Storage
fileAge time.Duration
chunkSize int64
chunkAge time.Duration
metaAge time.Duration
readRetries int
totalWorkers int
chunkMemory bool
warmUp bool
warmUpRate int
warmUpSec int
cacheWrites bool
originalTotalWorkers int
originalChunkMemory bool
fileAge time.Duration
chunkSize int64
chunkTotalSize int64
chunkCleanInterval time.Duration
readRetries int
totalWorkers int
totalMaxWorkers int
chunkMemory bool
cacheWrites bool
lastChunkCleanup time.Time
lastRootCleanup time.Time
lastOpenedEntries map[string]time.Time
cleanupMu sync.Mutex
warmupMu sync.Mutex
rateLimiter *rate.Limiter
lastChunkCleanup time.Time
lastRootCleanup time.Time
cleanupMu sync.Mutex
rateLimiter *rate.Limiter
plexConnector *plexConnector
}
// NewFs contstructs an Fs from the path, container:path
@ -245,12 +235,13 @@ func NewFs(name, rpath string) (fs.Fs, error) {
// Look for a file first
remotePath := path.Join(remote, rpath)
wrappedFs, wrapErr := fs.NewFs(remotePath)
if wrapErr != fs.ErrorIsFile && wrapErr != nil {
return nil, errors.Wrapf(wrapErr, "failed to make remote %q to wrap", remotePath)
}
fs.Debugf(name, "wrapped %v:%v at root %v", wrappedFs.Name(), wrappedFs.Root(), rpath)
plexURL := fs.ConfigFileGet(name, "plex_url")
plexToken := fs.ConfigFileGet(name, "plex_token")
var chunkSize fs.SizeSuffix
chunkSizeString := fs.ConfigFileGet(name, "chunk_size", DefCacheChunkSize)
if *cacheChunkSize != DefCacheChunkSize {
@ -260,6 +251,20 @@ func NewFs(name, rpath string) (fs.Fs, error) {
if err != nil {
return nil, errors.Wrapf(err, "failed to understand chunk size", chunkSizeString)
}
var chunkTotalSize fs.SizeSuffix
chunkTotalSizeString := fs.ConfigFileGet(name, "chunk_total_size", DefCacheTotalChunkSize)
if *cacheTotalChunkSize != DefCacheTotalChunkSize {
chunkTotalSizeString = *cacheTotalChunkSize
}
err = chunkTotalSize.Set(chunkTotalSizeString)
if err != nil {
return nil, errors.Wrapf(err, "failed to understand chunk total size", chunkTotalSizeString)
}
chunkCleanIntervalStr := *cacheChunkCleanInterval
chunkCleanInterval, err := time.ParseDuration(chunkCleanIntervalStr)
if err != nil {
return nil, errors.Wrapf(err, "failed to understand duration %v", chunkCleanIntervalStr)
}
infoAge := fs.ConfigFileGet(name, "info_age", DefCacheInfoAge)
if *cacheInfoAge != DefCacheInfoAge {
infoAge = *cacheInfoAge
@ -268,58 +273,70 @@ func NewFs(name, rpath string) (fs.Fs, error) {
if err != nil {
return nil, errors.Wrapf(err, "failed to understand duration", infoAge)
}
chunkAge := fs.ConfigFileGet(name, "chunk_age", DefCacheChunkAge)
if *cacheChunkAge != DefCacheChunkAge {
chunkAge = *cacheChunkAge
}
chunkDuration, err := time.ParseDuration(chunkAge)
if err != nil {
return nil, errors.Wrapf(err, "failed to understand duration", chunkAge)
}
metaAge := fs.ConfigFileGet(name, "warmup_age", DefCacheMetaAge)
if *cacheMetaAge != DefCacheMetaAge {
metaAge = *cacheMetaAge
}
metaDuration, err := time.ParseDuration(metaAge)
if err != nil {
return nil, errors.Wrapf(err, "failed to understand duration", metaAge)
}
warmupRps := strings.Split(*cacheWarmUp, "/")
warmupRate, err := strconv.Atoi(warmupRps[0])
if err != nil {
return nil, errors.Wrapf(err, "failed to understand warm up rate", *cacheWarmUp)
}
warmupSec, err := strconv.Atoi(warmupRps[1])
if err != nil {
return nil, errors.Wrapf(err, "failed to understand warm up seconds", *cacheWarmUp)
}
// configure cache backend
if *cacheDbPurge {
fs.Debugf(name, "Purging the DB")
}
f := &Fs{
Fs: wrappedFs,
name: name,
root: rpath,
fileAge: infoDuration,
chunkSize: int64(chunkSize),
chunkAge: chunkDuration,
metaAge: metaDuration,
readRetries: *cacheReadRetries,
totalWorkers: *cacheTotalWorkers,
originalTotalWorkers: *cacheTotalWorkers,
chunkMemory: !*cacheChunkNoMemory,
originalChunkMemory: !*cacheChunkNoMemory,
warmUp: false,
warmUpRate: warmupRate,
warmUpSec: warmupSec,
cacheWrites: *cacheStoreWrites,
lastChunkCleanup: time.Now().Truncate(time.Hour * 24 * 30),
lastRootCleanup: time.Now().Truncate(time.Hour * 24 * 30),
lastOpenedEntries: make(map[string]time.Time),
Fs: wrappedFs,
name: name,
root: rpath,
fileAge: infoDuration,
chunkSize: int64(chunkSize),
chunkTotalSize: int64(chunkTotalSize),
chunkCleanInterval: chunkCleanInterval,
readRetries: *cacheReadRetries,
totalWorkers: *cacheTotalWorkers,
totalMaxWorkers: *cacheTotalWorkers,
chunkMemory: !*cacheChunkNoMemory,
cacheWrites: *cacheStoreWrites,
lastChunkCleanup: time.Now().Truncate(time.Hour * 24 * 30),
lastRootCleanup: time.Now().Truncate(time.Hour * 24 * 30),
}
if f.chunkTotalSize < (f.chunkSize * int64(f.totalWorkers)) {
return nil, errors.Errorf("don't set cache-total-chunk-size(%v) less than cache-chunk-size(%v) * cache-workers(%v)",
f.chunkTotalSize, f.chunkSize, f.totalWorkers)
}
f.rateLimiter = rate.NewLimiter(rate.Limit(float64(*cacheRps)), f.totalWorkers)
f.plexConnector = &plexConnector{}
if plexURL != "" {
usingPlex := false
if plexToken != "" {
f.plexConnector, err = newPlexConnectorWithToken(f, plexURL, plexToken)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", plexURL)
}
usingPlex = true
} else {
plexUsername := fs.ConfigFileGet(name, "plex_username")
plexPassword := fs.ConfigFileGet(name, "plex_password")
if plexPassword != "" && plexUsername != "" {
decPass, err := fs.Reveal(plexPassword)
if err != nil {
decPass = plexPassword
}
f.plexConnector, err = newPlexConnector(f, plexURL, plexUsername, decPass)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to the Plex API %v", plexURL)
}
if f.plexConnector.token != "" {
fs.ConfigFileSet(name, "plex_token", f.plexConnector.token)
fs.SaveConfig()
}
usingPlex = true
}
}
if usingPlex {
fs.Infof(name, "Connected to Plex server: %v", plexURL)
// when connected to a Plex server we default to 1 worker (Plex scans all the time)
// and leave max workers as a setting to scale out the workers on demand during playback
f.totalWorkers = 1
}
}
dbPath := *cacheDbPath
if filepath.Ext(dbPath) != "" {
dbPath = filepath.Dir(dbPath)
@ -331,7 +348,9 @@ func NewFs(name, rpath string) (fs.Fs, error) {
dbPath = filepath.Join(dbPath, name+".db")
fs.Infof(name, "Storage DB path: %v", dbPath)
f.cache, err = GetPersistent(dbPath, *cacheDbPurge)
f.cache, err = GetPersistent(dbPath, &Features{
PurgeDb: *cacheDbPurge,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to start cache db")
}
@ -348,9 +367,10 @@ func NewFs(name, rpath string) (fs.Fs, error) {
fs.Infof(name, "Chunk Memory: %v", f.chunkMemory)
fs.Infof(name, "Chunk Size: %v", fs.SizeSuffix(f.chunkSize))
fs.Infof(name, "Chunk Total Size: %v", fs.SizeSuffix(f.chunkTotalSize))
fs.Infof(name, "Chunk Clean Interval: %v", f.chunkCleanInterval.String())
fs.Infof(name, "Workers: %v", f.totalWorkers)
fs.Infof(name, "File Age: %v", f.fileAge.String())
fs.Infof(name, "Chunk Age: %v", f.chunkAge.String())
fs.Infof(name, "Cache Writes: %v", f.cacheWrites)
go f.CleanUpCache(false)
@ -412,35 +432,6 @@ func (f *Fs) ChunkSize() int64 {
return f.chunkSize
}
// originalSettingWorkers will return the original value of this config
func (f *Fs) originalSettingWorkers() int {
return f.originalTotalWorkers
}
// originalSettingChunkNoMemory will return the original value of this config
func (f *Fs) originalSettingChunkNoMemory() bool {
return f.originalChunkMemory
}
// InWarmUp says if cache warm up is active
func (f *Fs) InWarmUp() bool {
return f.warmUp
}
// enableWarmUp will enable the warm up state of this cache along with the relevant settings
func (f *Fs) enableWarmUp() {
f.totalWorkers = 1
f.chunkMemory = false
f.warmUp = true
}
// disableWarmUp will disable the warm up state of this cache along with the relevant settings
func (f *Fs) disableWarmUp() {
f.totalWorkers = f.originalSettingWorkers()
f.chunkMemory = !f.originalSettingChunkNoMemory()
f.warmUp = false
}
// NewObject finds the Object at remote.
func (f *Fs) NewObject(remote string) (fs.Object, error) {
co := NewObject(f, remote)
@ -668,7 +659,7 @@ func (f *Fs) cacheReader(u io.Reader, src fs.ObjectInfo, originalRead func(inn i
// if we have some bytes we cache them
if readSize > 0 {
chunk = chunk[:readSize]
err2 := f.cache.AddChunkAhead(cleanPath(path.Join(f.root, src.Remote())), chunk, offset, f.metaAge)
err2 := f.cache.AddChunk(cleanPath(path.Join(f.root, src.Remote())), chunk, offset)
if err2 != nil {
fs.Errorf(src, "error saving new data in cache '%v'", err2)
_ = pr.CloseWithError(err2)
@ -840,10 +831,6 @@ func (f *Fs) Purge() error {
fs.Infof(f, "purging cache")
f.cache.Purge()
f.warmupMu.Lock()
defer f.warmupMu.Unlock()
f.lastOpenedEntries = make(map[string]time.Time)
do := f.Fs.Features().Purge
if do == nil {
return nil
@ -892,45 +879,17 @@ func (f *Fs) OpenRateLimited(fn func() (io.ReadCloser, error)) (io.ReadCloser, e
return fn()
}
// CheckIfWarmupNeeded changes the FS settings during warmups
func (f *Fs) CheckIfWarmupNeeded(remote string) {
f.warmupMu.Lock()
defer f.warmupMu.Unlock()
secondCount := time.Duration(f.warmUpSec)
rate := f.warmUpRate
// clean up entries older than the needed time frame needed
for k, v := range f.lastOpenedEntries {
if time.Now().After(v.Add(time.Second * secondCount)) {
delete(f.lastOpenedEntries, k)
}
}
f.lastOpenedEntries[remote] = time.Now()
// simple check for the current load
if len(f.lastOpenedEntries) >= rate && !f.warmUp {
fs.Infof(f, "turning on cache warmup")
f.enableWarmUp()
} else if len(f.lastOpenedEntries) < rate && f.warmUp {
fs.Infof(f, "turning off cache warmup")
f.disableWarmUp()
}
}
// CleanUpCache will cleanup only the cache data that is expired
func (f *Fs) CleanUpCache(ignoreLastTs bool) {
f.cleanupMu.Lock()
defer f.cleanupMu.Unlock()
if ignoreLastTs || time.Now().After(f.lastChunkCleanup.Add(f.chunkAge/4)) {
fs.Infof("cache", "running chunks cleanup")
f.cache.CleanChunksByAge(f.chunkAge)
if ignoreLastTs || time.Now().After(f.lastChunkCleanup.Add(f.chunkCleanInterval)) {
f.cache.CleanChunksBySize(f.chunkTotalSize)
f.lastChunkCleanup = time.Now()
}
if ignoreLastTs || time.Now().After(f.lastRootCleanup.Add(f.fileAge/4)) {
fs.Infof("cache", "running root cleanup")
f.cache.CleanEntriesByAge(f.fileAge)
f.lastRootCleanup = time.Now()
}
@ -951,6 +910,15 @@ func (f *Fs) SetWrapper(wrapper fs.Fs) {
f.wrapper = wrapper
}
// Wrap returns the Fs that is wrapping this Fs
func (f *Fs) isWrappedByCrypt() (*crypt.Fs, bool) {
if f.wrapper == nil {
return nil, false
}
c, ok := f.wrapper.(*crypt.Fs)
return c, ok
}
// DirCacheFlush flushes the dir cache
func (f *Fs) DirCacheFlush() {
_ = f.cache.RemoveDir("")

View File

@ -1,4 +1,4 @@
// +build !plan9
// +build !plan9,go1.7
package cache_test
@ -24,18 +24,14 @@ import (
)
var (
WrapRemote = flag.String("wrap-remote", "", "Remote to wrap")
RemoteName = flag.String("remote-name", "TestCacheInternal", "Root remote")
SkipTimeouts = flag.Bool("skip-waits", false, "To skip tests that have wait times")
rootFs fs.Fs
boltDb *cache.Persistent
metaAge = time.Second * 30
infoAge = time.Second * 10
chunkAge = time.Second * 10
okDiff = time.Second * 9 // really big diff here but the build machines seem to be slow. need a different way for this
workers = 2
warmupRate = 3
warmupSec = 10
WrapRemote = flag.String("wrap-remote", "", "Remote to wrap")
RemoteName = flag.String("remote-name", "TestCacheInternal", "Root remote")
rootFs fs.Fs
boltDb *cache.Persistent
infoAge = time.Second * 10
chunkClean = time.Second
okDiff = time.Second * 9 // really big diff here but the build machines seem to be slow. need a different way for this
workers = 2
)
// prepare the test server and return a function to tidy it up afterwards
@ -44,7 +40,7 @@ func TestInternalInit(t *testing.T) {
// delete the default path
dbPath := filepath.Join(fs.CacheDir, "cache-backend", *RemoteName+".db")
boltDb, err = cache.GetPersistent(dbPath, true)
boltDb, err = cache.GetPersistent(dbPath, &cache.Features{PurgeDb: true})
require.NoError(t, err)
fstest.Initialise()
@ -65,17 +61,17 @@ func TestInternalInit(t *testing.T) {
fs.ConfigFileSet(*RemoteName, "type", "cache")
fs.ConfigFileSet(*RemoteName, "remote", *WrapRemote)
fs.ConfigFileSet(*RemoteName, "chunk_size", "1024")
fs.ConfigFileSet(*RemoteName, "chunk_age", chunkAge.String())
fs.ConfigFileSet(*RemoteName, "chunk_total_size", "2048")
fs.ConfigFileSet(*RemoteName, "info_age", infoAge.String())
}
_ = flag.Set("cache-warm-up-age", metaAge.String())
_ = flag.Set("cache-warm-up-rps", fmt.Sprintf("%v/%v", warmupRate, warmupSec))
_ = flag.Set("cache-chunk-no-memory", "true")
_ = flag.Set("cache-workers", strconv.Itoa(workers))
_ = flag.Set("cache-chunk-clean-interval", chunkClean.String())
// Instantiate root
rootFs, err = fs.NewFs(*RemoteName + ":")
require.NoError(t, err)
_ = rootFs.Features().Purge()
require.NoError(t, err)
err = rootFs.Mkdir("")
@ -305,143 +301,6 @@ func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) {
require.Equal(t, o.ModTime(), co.ModTime())
}
func TestInternalWarmUp(t *testing.T) {
if *SkipTimeouts {
t.Skip("--skip-waits set")
}
reset(t)
cfs, err := getCacheFs(rootFs)
require.NoError(t, err)
chunkSize := cfs.ChunkSize()
o1 := writeObjectRandomBytes(t, rootFs, (chunkSize * 3))
o2 := writeObjectRandomBytes(t, rootFs, (chunkSize * 4))
o3 := writeObjectRandomBytes(t, rootFs, (chunkSize * 6))
_ = readDataFromObj(t, o1, 0, chunkSize, false)
_ = readDataFromObj(t, o2, 0, chunkSize, false)
// validate a fresh chunk
expectedExpiry := time.Now().Add(chunkAge)
ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), o2.Remote()), 0)
require.NoError(t, err)
require.WithinDuration(t, expectedExpiry, ts, okDiff)
// validate that we entered a warm up state
_ = readDataFromObj(t, o3, 0, chunkSize, false)
require.True(t, cfs.InWarmUp())
expectedExpiry = time.Now().Add(metaAge)
ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), 0)
require.NoError(t, err)
require.WithinDuration(t, expectedExpiry, ts, okDiff)
// validate that we cooled down and exit warm up
// we wait for the cache to expire
t.Logf("Waiting 10 seconds for warm up to expire\n")
time.Sleep(time.Second * 10)
_ = readDataFromObj(t, o3, chunkSize, chunkSize*2, false)
require.False(t, cfs.InWarmUp())
expectedExpiry = time.Now().Add(chunkAge)
ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), chunkSize)
require.NoError(t, err)
require.WithinDuration(t, expectedExpiry, ts, okDiff)
}
func TestInternalWarmUpInFlight(t *testing.T) {
if *SkipTimeouts {
t.Skip("--skip-waits set")
}
reset(t)
cfs, err := getCacheFs(rootFs)
require.NoError(t, err)
chunkSize := cfs.ChunkSize()
o1 := writeObjectRandomBytes(t, rootFs, (chunkSize * 3))
o2 := writeObjectRandomBytes(t, rootFs, (chunkSize * 4))
o3 := writeObjectRandomBytes(t, rootFs, (chunkSize * int64(workers) * int64(2)))
_ = readDataFromObj(t, o1, 0, chunkSize, false)
_ = readDataFromObj(t, o2, 0, chunkSize, false)
require.False(t, cfs.InWarmUp())
// validate that we entered a warm up state
_ = readDataFromObj(t, o3, 0, chunkSize, false)
require.True(t, cfs.InWarmUp())
expectedExpiry := time.Now().Add(metaAge)
ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), 0)
require.NoError(t, err)
require.WithinDuration(t, expectedExpiry, ts, okDiff)
checkSample := make([]byte, chunkSize)
reader, err := o3.Open(&fs.SeekOption{Offset: 0})
require.NoError(t, err)
rs, ok := reader.(*cache.Handle)
require.True(t, ok)
for i := 0; i <= workers; i++ {
_, _ = rs.Seek(int64(i)*chunkSize, 0)
_, err = io.ReadFull(reader, checkSample)
require.NoError(t, err)
if i == workers {
require.False(t, rs.InWarmUp(), "iteration %v", i)
} else {
require.True(t, rs.InWarmUp(), "iteration %v", i)
}
}
_ = reader.Close()
require.True(t, cfs.InWarmUp())
expectedExpiry = time.Now().Add(chunkAge)
ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o3.Remote()), chunkSize*int64(workers+1))
require.NoError(t, err)
require.WithinDuration(t, expectedExpiry, ts, okDiff)
// validate that we cooled down and exit warm up
// we wait for the cache to expire
t.Logf("Waiting 10 seconds for warm up to expire\n")
time.Sleep(time.Second * 10)
_ = readDataFromObj(t, o2, chunkSize, chunkSize*2, false)
require.False(t, cfs.InWarmUp())
expectedExpiry = time.Now().Add(chunkAge)
ts, err = boltDb.GetChunkTs(path.Join(rootFs.Root(), o2.Remote()), chunkSize)
require.NoError(t, err)
require.WithinDuration(t, expectedExpiry, ts, okDiff)
}
// TODO: this is bugged
//func TestInternalRateLimiter(t *testing.T) {
// reset(t)
// _ = flag.Set("cache-rps", "2")
// rootFs, err := fs.NewFs(*RemoteName + ":")
// require.NoError(t, err)
// defer func() {
// _ = flag.Set("cache-rps", "-1")
// rootFs, err = fs.NewFs(*RemoteName + ":")
// require.NoError(t, err)
// }()
// cfs, err := getCacheFs(rootFs)
// require.NoError(t, err)
// chunkSize := cfs.ChunkSize()
//
// // create some rand test data
// co := writeObjectRandomBytes(t, rootFs, (chunkSize*4 + chunkSize/2))
//
// doStuff(t, 5, time.Second, func() {
// r, err := co.Open(&fs.SeekOption{Offset: chunkSize + 1})
// require.NoError(t, err)
//
// buf := make([]byte, chunkSize)
// totalRead, err := io.ReadFull(r, buf)
// require.NoError(t, err)
// require.Equal(t, len(buf), totalRead)
// _ = r.Close()
// })
//}
func TestInternalCacheWrites(t *testing.T) {
reset(t)
_ = flag.Set("cache-writes", "true")
@ -453,10 +312,10 @@ func TestInternalCacheWrites(t *testing.T) {
// create some rand test data
co := writeObjectRandomBytes(t, rootFs, (chunkSize*4 + chunkSize/2))
expectedExpiry := time.Now().Add(metaAge)
expectedTs := time.Now()
ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), co.Remote()), 0)
require.NoError(t, err)
require.WithinDuration(t, expectedExpiry, ts, okDiff)
require.WithinDuration(t, expectedTs, ts, okDiff)
// reset fs
_ = flag.Set("cache-writes", "false")
@ -464,43 +323,44 @@ func TestInternalCacheWrites(t *testing.T) {
require.NoError(t, err)
}
func TestInternalExpiredChunkRemoved(t *testing.T) {
t.Skip("FIXME disabled because it is unreliable")
if *SkipTimeouts {
t.Skip("--skip-waits set")
}
func TestInternalMaxChunkSizeRespected(t *testing.T) {
reset(t)
_ = flag.Set("cache-workers", "1")
rootFs, err := fs.NewFs(*RemoteName + ":")
require.NoError(t, err)
cfs, err := getCacheFs(rootFs)
require.NoError(t, err)
chunkSize := cfs.ChunkSize()
totalChunks := 20
// create some rand test data
co := writeObjectRandomBytes(t, cfs, (int64(totalChunks-1)*chunkSize + chunkSize/2))
remote := co.Remote()
// cache all the chunks
_ = readDataFromObj(t, co, 0, co.Size(), false)
// we wait for the cache to expire
t.Logf("Waiting %v for cache to expire\n", chunkAge.String())
time.Sleep(chunkAge)
_, _ = cfs.List("")
time.Sleep(time.Second * 2)
o, err := cfs.NewObject(remote)
require.NoError(t, err)
co2, ok := o.(*cache.Object)
o := writeObjectRandomBytes(t, cfs, (int64(totalChunks-1)*chunkSize + chunkSize/2))
co, ok := o.(*cache.Object)
require.True(t, ok)
require.False(t, boltDb.HasChunk(co2, 0))
for i := 0; i < 4; i++ { // read first 4
_ = readDataFromObj(t, co, chunkSize*int64(i), chunkSize*int64(i+1), false)
}
cfs.CleanUpCache(true)
// the last 2 **must** be in the cache
require.True(t, boltDb.HasChunk(co, chunkSize*2))
require.True(t, boltDb.HasChunk(co, chunkSize*3))
for i := 4; i < 6; i++ { // read next 2
_ = readDataFromObj(t, co, chunkSize*int64(i), chunkSize*int64(i+1), false)
}
cfs.CleanUpCache(true)
// the last 2 **must** be in the cache
require.True(t, boltDb.HasChunk(co, chunkSize*4))
require.True(t, boltDb.HasChunk(co, chunkSize*5))
// reset fs
_ = flag.Set("cache-workers", strconv.Itoa(workers))
rootFs, err = fs.NewFs(*RemoteName + ":")
require.NoError(t, err)
}
func TestInternalExpiredEntriesRemoved(t *testing.T) {
if *SkipTimeouts {
t.Skip("--skip-waits set")
}
reset(t)
cfs, err := getCacheFs(rootFs)
require.NoError(t, err)

2
cache/cache_test.go vendored
View File

@ -3,7 +3,7 @@
// Automatically generated - DO NOT EDIT
// Regenerate with: make gen_tests
// +build !plan9
// +build !plan9,go1.7
package cache_test

View File

@ -1,6 +1,6 @@
// Build for cache for unsupported platforms to stop go complaining
// about "no buildable Go source files "
// +build plan9
// +build plan9 !go1.7
package cache

2
cache/directory.go vendored
View File

@ -1,4 +1,4 @@
// +build !plan9
// +build !plan9,go1.7
package cache

253
cache/handle.go vendored
View File

@ -1,4 +1,4 @@
// +build !plan9
// +build !plan9,go1.7
package cache
@ -15,21 +15,19 @@ import (
// Handle is managing the read/write/seek operations on an open handle
type Handle struct {
cachedObject *Object
memory ChunkStorage
preloadQueue chan int64
preloadOffset int64
offset int64
seenOffsets map[int64]bool
mu sync.Mutex
cachedObject *Object
memory ChunkStorage
preloadQueue chan int64
preloadOffset int64
offset int64
seenOffsets map[int64]bool
mu sync.Mutex
confirmReading chan bool
ReadRetries int
TotalWorkers int
UseMemory bool
workers []*worker
chunkAge time.Duration
warmup bool
closed bool
UseMemory bool
workers []*worker
closed bool
reading bool
}
// NewObjectHandle returns a new Handle for an existing Object
@ -39,20 +37,15 @@ func NewObjectHandle(o *Object) *Handle {
offset: 0,
preloadOffset: -1, // -1 to trigger the first preload
ReadRetries: o.CacheFs.readRetries,
TotalWorkers: o.CacheFs.totalWorkers,
UseMemory: o.CacheFs.chunkMemory,
chunkAge: o.CacheFs.chunkAge,
warmup: o.CacheFs.InWarmUp(),
UseMemory: o.CacheFs.chunkMemory,
reading: false,
}
r.seenOffsets = make(map[int64]bool)
r.memory = NewMemory(o.CacheFs.chunkAge)
if o.CacheFs.InWarmUp() {
r.chunkAge = o.CacheFs.metaAge
}
r.memory = NewMemory(-1)
// create a larger buffer to queue up requests
r.preloadQueue = make(chan int64, r.TotalWorkers*10)
r.preloadQueue = make(chan int64, o.CacheFs.totalWorkers*10)
r.confirmReading = make(chan bool)
r.startReadWorkers()
return r
}
@ -72,34 +65,86 @@ func (r *Handle) String() string {
return r.cachedObject.abs()
}
// InWarmUp says if this handle is in warmup mode
func (r *Handle) InWarmUp() bool {
return r.warmup
}
// startReadWorkers will start the worker pool
func (r *Handle) startReadWorkers() {
if r.hasAtLeastOneWorker() {
return
}
for i := 0; i < r.TotalWorkers; i++ {
w := &worker{
r: r,
ch: r.preloadQueue,
id: i,
}
go w.run()
r.scaleWorkers(r.cacheFs().totalWorkers)
}
r.workers = append(r.workers, w)
// scaleOutWorkers will increase the worker pool count by the provided amount
func (r *Handle) scaleWorkers(desired int) {
current := len(r.workers)
if current == desired {
return
}
if current > desired {
// scale in gracefully
for i := 0; i < current-desired; i++ {
r.preloadQueue <- -1
}
} else {
// scale out
for i := 0; i < desired-current; i++ {
w := &worker{
r: r,
ch: r.preloadQueue,
id: current + i,
}
go w.run()
r.workers = append(r.workers, w)
}
}
// ignore first scale out from 0
if current != 0 {
fs.Infof(r, "scale workers to %v", desired)
}
}
func (r *Handle) requestExternalConfirmation() {
// if there's no external confirmation available
// then we skip this step
if len(r.workers) >= r.cacheFs().totalMaxWorkers ||
!r.cacheFs().plexConnector.isConnected() {
return
}
go r.cacheFs().plexConnector.isPlayingAsync(r.cachedObject, r.confirmReading)
}
func (r *Handle) confirmExternalReading() {
// if we have a max value of workers
// or there's no external confirmation available
// then we skip this step
if len(r.workers) >= r.cacheFs().totalMaxWorkers ||
!r.cacheFs().plexConnector.isConnected() {
return
}
select {
case confirmed := <-r.confirmReading:
if !confirmed {
return
}
default:
return
}
fs.Infof(r, "confirmed reading by external reader")
r.scaleWorkers(r.cacheFs().totalMaxWorkers)
}
// queueOffset will send an offset to the workers if it's different from the last one
func (r *Handle) queueOffset(offset int64) {
if offset != r.preloadOffset {
// clean past in-memory chunks
if r.UseMemory {
go r.memory.CleanChunksByNeed(offset)
}
go r.cacheFs().CleanUpCache(false)
r.confirmExternalReading()
r.preloadOffset = offset
previousChunksCounter := 0
maxOffset := r.cacheFs().chunkSize * int64(r.cacheFs().originalSettingWorkers())
// clear the past seen chunks
// they will remain in our persistent storage but will be removed from transient
@ -107,25 +152,10 @@ func (r *Handle) queueOffset(offset int64) {
for k := range r.seenOffsets {
if k < offset {
r.seenOffsets[k] = false
// we count how many continuous chunks were seen before
if offset >= maxOffset && k >= offset-maxOffset {
previousChunksCounter++
}
}
}
// if we read all the previous chunks that could have been preloaded
// we should then disable warm up setting for this handle
if r.warmup && previousChunksCounter >= r.cacheFs().originalSettingWorkers() {
r.TotalWorkers = r.cacheFs().originalSettingWorkers()
r.UseMemory = !r.cacheFs().originalSettingChunkNoMemory()
r.chunkAge = r.cacheFs().chunkAge
r.warmup = false
fs.Infof(r, "disabling warm up")
}
for i := 0; i < r.TotalWorkers; i++ {
for i := 0; i < len(r.workers); i++ {
o := r.preloadOffset + r.cacheFs().chunkSize*int64(i)
if o < 0 || o >= r.cachedObject.Size() {
continue
@ -137,6 +167,8 @@ func (r *Handle) queueOffset(offset int64) {
r.seenOffsets[o] = true
r.preloadQueue <- o
}
r.requestExternalConfirmation()
}
}
@ -157,12 +189,6 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) {
var data []byte
var err error
// we reached the end of the file
if chunkStart >= r.cachedObject.Size() {
fs.Debugf(r, "reached EOF %v", chunkStart)
return nil, io.EOF
}
// we calculate the modulus of the requested offset with the size of a chunk
offset := chunkStart % r.cacheFs().chunkSize
@ -171,10 +197,7 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) {
r.queueOffset(chunkStart)
found := false
// delete old chunks from memory
if r.UseMemory {
go r.memory.CleanChunksByNeed(chunkStart)
data, err = r.memory.GetChunk(r.cachedObject, chunkStart)
if err == nil {
found = true
@ -184,16 +207,15 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) {
if !found {
// we're gonna give the workers a chance to pickup the chunk
// and retry a couple of times
for i := 0; i < r.ReadRetries; i++ {
for i := 0; i < r.cacheFs().readRetries*2; i++ {
data, err = r.storage().GetChunk(r.cachedObject, chunkStart)
if err == nil {
found = true
break
}
fs.Debugf(r, "%v: chunk retry storage: %v", chunkStart, i)
time.Sleep(time.Second)
time.Sleep(time.Millisecond * 500)
}
}
@ -222,17 +244,24 @@ func (r *Handle) Read(p []byte) (n int, err error) {
defer r.mu.Unlock()
var buf []byte
// first reading
if !r.reading {
r.reading = true
r.requestExternalConfirmation()
}
// reached EOF
if r.offset >= r.cachedObject.Size() {
return 0, io.EOF
}
currentOffset := r.offset
buf, err = r.getChunk(currentOffset)
if err != nil && len(buf) == 0 {
fs.Errorf(r, "(%v/%v) empty and error (%v) response", currentOffset, r.cachedObject.Size(), err)
return 0, io.EOF
}
readSize := copy(p, buf)
newOffset := currentOffset + int64(readSize)
r.offset = newOffset
if r.offset >= r.cachedObject.Size() {
err = io.EOF
}
return readSize, err
}
@ -257,6 +286,7 @@ func (r *Handle) Close() error {
}
}
go r.cacheFs().CleanUpCache(false)
fs.Debugf(r, "cache reader closed %v", r.offset)
return nil
}
@ -313,7 +343,7 @@ func (w *worker) reader(offset, end int64) (io.ReadCloser, error) {
r := w.rc
if w.rc == nil {
r, err = w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) {
return w.r.cachedObject.Object.Open(&fs.RangeOption{Start: offset, End: end}, &fs.SeekOption{Offset: offset})
return w.r.cachedObject.Object.Open(&fs.SeekOption{Offset: offset}, &fs.RangeOption{Start: offset, End: end})
})
if err != nil {
return nil, err
@ -329,7 +359,7 @@ func (w *worker) reader(offset, end int64) (io.ReadCloser, error) {
_ = w.rc.Close()
return w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) {
r, err = w.r.cachedObject.Object.Open(&fs.RangeOption{Start: offset, End: end}, &fs.SeekOption{Offset: offset})
r, err = w.r.cachedObject.Object.Open(&fs.SeekOption{Offset: offset}, &fs.RangeOption{Start: offset, End: end})
if err != nil {
return nil, err
}
@ -377,7 +407,7 @@ func (w *worker) run() {
// add it in ram if it's in the persistent storage
data, err = w.r.storage().GetChunk(w.r.cachedObject, chunkStart)
if err == nil {
err = w.r.memory.AddChunk(w.r.cachedObject, data, chunkStart)
err = w.r.memory.AddChunk(w.r.cachedObject.abs(), data, chunkStart)
if err != nil {
fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err)
} else {
@ -395,37 +425,56 @@ func (w *worker) run() {
if chunkEnd > w.r.cachedObject.Size() {
chunkEnd = w.r.cachedObject.Size()
}
w.rc, err = w.reader(chunkStart, chunkEnd)
// we seem to be getting only errors so we abort
w.download(chunkStart, chunkEnd, 0)
}
}
func (w *worker) download(chunkStart, chunkEnd int64, retry int) {
var err error
var data []byte
// stop retries
if retry >= w.r.cacheFs().readRetries {
return
}
// back-off between retries
if retry > 0 {
time.Sleep(time.Second * time.Duration(retry))
}
w.rc, err = w.reader(chunkStart, chunkEnd)
// we seem to be getting only errors so we abort
if err != nil {
fs.Errorf(w, "object open failed %v: %v", chunkStart, err)
w.download(chunkStart, chunkEnd, retry+1)
return
}
data = make([]byte, chunkEnd-chunkStart)
sourceRead := 0
sourceRead, err = io.ReadFull(w.rc, data)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
fs.Errorf(w, "failed to read chunk %v: %v", chunkStart, err)
w.download(chunkStart, chunkEnd, retry+1)
return
}
if err == io.ErrUnexpectedEOF {
fs.Debugf(w, "partial read chunk %v: %v", chunkStart, err)
}
data = data[:sourceRead] // reslice to remove extra garbage
fs.Debugf(w, "downloaded chunk %v", fs.SizeSuffix(chunkStart))
if w.r.UseMemory {
err = w.r.memory.AddChunk(w.r.cachedObject.abs(), data, chunkStart)
if err != nil {
fs.Errorf(w, "object open failed %v: %v", chunkStart, err)
return
fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err)
}
}
data = make([]byte, chunkEnd-chunkStart)
sourceRead := 0
sourceRead, err = io.ReadFull(w.rc, data)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
fs.Errorf(w, "failed to read chunk %v: %v", chunkStart, err)
return
}
if err == io.ErrUnexpectedEOF {
fs.Debugf(w, "partial read chunk %v: %v", chunkStart, err)
}
data = data[:sourceRead] // reslice to remove extra garbage
fs.Debugf(w, "downloaded chunk %v", fs.SizeSuffix(chunkStart))
if w.r.UseMemory {
err = w.r.memory.AddChunk(w.r.cachedObject, data, chunkStart)
if err != nil {
fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err)
}
}
err = w.r.storage().AddChunkAhead(w.r.cachedObject.abs(), data, chunkStart, w.r.chunkAge)
if err != nil {
fs.Errorf(w, "failed caching chunk in storage %v: %v", chunkStart, err)
}
err = w.r.storage().AddChunk(w.r.cachedObject.abs(), data, chunkStart)
if err != nil {
fs.Errorf(w, "failed caching chunk in storage %v: %v", chunkStart, err)
}
}

14
cache/object.go vendored
View File

@ -1,4 +1,4 @@
// +build !plan9
// +build !plan9,go1.7
package cache
@ -205,16 +205,18 @@ func (o *Object) Open(options ...fs.OpenOption) (io.ReadCloser, error) {
if err := o.refreshFromSource(); err != nil {
return nil, err
}
o.CacheFs.CheckIfWarmupNeeded(o.Remote())
var err error
cacheReader := NewObjectHandle(o)
for _, option := range options {
switch x := option.(type) {
case *fs.SeekOption:
_, err := cacheReader.Seek(x.Offset, os.SEEK_SET)
if err != nil {
return cacheReader, err
}
_, err = cacheReader.Seek(x.Offset, os.SEEK_SET)
case *fs.RangeOption:
_, err = cacheReader.Seek(x.Start, os.SEEK_SET)
}
if err != nil {
return cacheReader, err
}
}

229
cache/plex.go vendored Normal file
View File

@ -0,0 +1,229 @@
// +build !plan9,go1.7
package cache
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/ncw/rclone/fs"
)
const (
// defPlexLoginURL is the default URL for Plex login
defPlexLoginURL = "https://plex.tv/users/sign_in.json"
)
// plexConnector is managing the cache integration with Plex
type plexConnector struct {
url *url.URL
token string
f *Fs
}
// newPlexConnector connects to a Plex server and generates a token
func newPlexConnector(f *Fs, plexURL, username, password string) (*plexConnector, error) {
u, err := url.ParseRequestURI(strings.TrimRight(plexURL, "/"))
if err != nil {
return nil, err
}
pc := &plexConnector{
f: f,
url: u,
token: "",
}
err = pc.authenticate(username, password)
if err != nil {
return nil, err
}
return pc, nil
}
// newPlexConnector connects to a Plex server and generates a token
func newPlexConnectorWithToken(f *Fs, plexURL, token string) (*plexConnector, error) {
u, err := url.ParseRequestURI(strings.TrimRight(plexURL, "/"))
if err != nil {
return nil, err
}
pc := &plexConnector{
f: f,
url: u,
token: token,
}
return pc, nil
}
// fillDefaultHeaders will add common headers to requests
func (p *plexConnector) fillDefaultHeaders(req *http.Request) {
req.Header.Add("X-Plex-Client-Identifier", fmt.Sprintf("rclone (%v)", p.f.String()))
req.Header.Add("X-Plex-Product", fmt.Sprintf("rclone (%v)", p.f.Name()))
req.Header.Add("X-Plex-Version", fs.Version)
req.Header.Add("Accept", "application/json")
if p.token != "" {
req.Header.Add("X-Plex-Token", p.token)
}
}
// authenticate will generate a token based on a username/password
func (p *plexConnector) authenticate(username, password string) error {
form := url.Values{}
form.Set("user[login]", username)
form.Add("user[password]", password)
req, err := http.NewRequest("POST", defPlexLoginURL, strings.NewReader(form.Encode()))
if err != nil {
return err
}
p.fillDefaultHeaders(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
var data map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&data)
if err != nil {
return fmt.Errorf("failed to obtain token: %v", err)
}
tokenGen, ok := get(data, "user", "authToken")
if !ok {
return fmt.Errorf("failed to obtain token: %v", data)
}
token, ok := tokenGen.(string)
if !ok {
return fmt.Errorf("failed to obtain token: %v", data)
}
p.token = token
return nil
}
// isConnected checks if this Plex
func (p *plexConnector) isConnected() bool {
return p.token != ""
}
func (p *plexConnector) isPlaying(co *Object) bool {
isPlaying := false
req, err := http.NewRequest("GET", fmt.Sprintf("%s/status/sessions", p.url.String()), nil)
if err != nil {
return false
}
p.fillDefaultHeaders(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false
}
var data map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&data)
if err != nil {
return false
}
sizeGen, ok := get(data, "MediaContainer", "size")
if !ok {
return false
}
size, ok := sizeGen.(float64)
if !ok || size < float64(1) {
return false
}
videosGen, ok := get(data, "MediaContainer", "Video")
if !ok {
fs.Errorf("plex", "empty videos: %v", data)
return false
}
videos, ok := videosGen.([]interface{})
if !ok || len(videos) < 1 {
fs.Errorf("plex", "empty videos: %v", data)
return false
}
for _, v := range videos {
keyGen, ok := get(v, "key")
if !ok {
fs.Errorf("plex", "failed to find: key")
continue
}
key, ok := keyGen.(string)
if !ok {
fs.Errorf("plex", "failed to understand: key")
continue
}
req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", p.url.String(), key), nil)
if err != nil {
return false
}
p.fillDefaultHeaders(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false
}
var data map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&data)
if err != nil {
return false
}
remote := co.Remote()
if cr, yes := co.CacheFs.isWrappedByCrypt(); yes {
remote, err = cr.DecryptFileName(co.Remote())
if err != nil {
fs.Errorf("plex", "can not decrypt wrapped file: %v", err)
continue
}
}
fpGen, ok := get(data, "MediaContainer", "Metadata", 0, "Media", 0, "Part", 0, "file")
if !ok {
fs.Errorf("plex", "failed to understand: %v", data)
continue
}
fp, ok := fpGen.(string)
if !ok {
fs.Errorf("plex", "failed to understand: %v", fp)
continue
}
if strings.Contains(fp, remote) {
isPlaying = true
break
}
}
return isPlaying
}
func (p *plexConnector) isPlayingAsync(co *Object, response chan bool) {
time.Sleep(time.Second) // FIXME random guess here
res := p.isPlaying(co)
response <- res
}
// adapted from: https://stackoverflow.com/a/28878037 (credit)
func get(m interface{}, path ...interface{}) (interface{}, bool) {
for _, p := range path {
switch idx := p.(type) {
case string:
if mm, ok := m.(map[string]interface{}); ok {
if val, found := mm[idx]; found {
m = val
continue
}
}
return nil, false
case int:
if mm, ok := m.([]interface{}); ok {
if len(mm) > idx {
m = mm[idx]
continue
}
}
return nil, false
}
}
return m, true
}

View File

@ -1,4 +1,4 @@
// +build !plan9
// +build !plan9,go1.7
package cache
@ -58,8 +58,8 @@ func (m *Memory) GetChunk(cachedObject *Object, offset int64) ([]byte, error) {
}
// AddChunk adds a new chunk of a cached object
func (m *Memory) AddChunk(cachedObject *Object, data []byte, offset int64) error {
return m.AddChunkAhead(cachedObject.abs(), data, offset, time.Second)
func (m *Memory) AddChunk(fp string, data []byte, offset int64) error {
return m.AddChunkAhead(fp, data, offset, time.Second)
}
// AddChunkAhead adds a new chunk of a cached object
@ -93,3 +93,8 @@ func (m *Memory) CleanChunksByNeed(offset int64) {
}
}
}
// CleanChunksBySize will cleanup chunks after the total size passes a certain point
func (m *Memory) CleanChunksBySize(maxSize int64) {
// NOOP
}

View File

@ -1,4 +1,4 @@
// +build !plan9
// +build !plan9,go1.7
package cache
@ -29,11 +29,16 @@ const (
DataTsBucket = "dataTs"
)
// Features flags for this storage type
type Features struct {
PurgeDb bool // purge the db before starting
}
var boltMap = make(map[string]*Persistent)
var boltMapMx sync.Mutex
// GetPersistent returns a single instance for the specific store
func GetPersistent(dbPath string, refreshDb bool) (*Persistent, error) {
func GetPersistent(dbPath string, f *Features) (*Persistent, error) {
// write lock to create one
boltMapMx.Lock()
defer boltMapMx.Unlock()
@ -41,7 +46,7 @@ func GetPersistent(dbPath string, refreshDb bool) (*Persistent, error) {
return b, nil
}
bb, err := newPersistent(dbPath, refreshDb)
bb, err := newPersistent(dbPath, f)
if err != nil {
return nil, err
}
@ -49,6 +54,12 @@ func GetPersistent(dbPath string, refreshDb bool) (*Persistent, error) {
return boltMap[dbPath], nil
}
type chunkInfo struct {
Path string
Offset int64
Size int64
}
// Persistent is a wrapper of persistent storage for a bolt.DB file
type Persistent struct {
Storage
@ -57,18 +68,20 @@ type Persistent struct {
dataPath string
db *bolt.DB
cleanupMux sync.Mutex
features *Features
}
// newPersistent builds a new wrapper and connects to the bolt.DB file
func newPersistent(dbPath string, refreshDb bool) (*Persistent, error) {
func newPersistent(dbPath string, f *Features) (*Persistent, error) {
dataPath := strings.TrimSuffix(dbPath, filepath.Ext(dbPath))
b := &Persistent{
dbPath: dbPath,
dataPath: dataPath,
features: f,
}
err := b.Connect(refreshDb)
err := b.Connect()
if err != nil {
fs.Errorf(dbPath, "Error opening storage cache. Is there another rclone running on the same remote? %v", err)
return nil, err
@ -84,11 +97,11 @@ func (b *Persistent) String() string {
// Connect creates a connection to the configured file
// refreshDb will delete the file before to create an empty DB if it's set to true
func (b *Persistent) Connect(refreshDb bool) error {
func (b *Persistent) Connect() error {
var db *bolt.DB
var err error
if refreshDb {
if b.features.PurgeDb {
err := os.Remove(b.dbPath)
if err != nil {
fs.Errorf(b, "failed to remove cache file: %v", err)
@ -146,37 +159,6 @@ func (b *Persistent) getBucket(dir string, createIfMissing bool, tx *bolt.Tx) *b
return bucket
}
// updateChunkTs is a convenience method to update a chunk timestamp to mark that it was used recently
func (b *Persistent) updateChunkTs(tx *bolt.Tx, path string, offset int64, t time.Duration) {
tsBucket := tx.Bucket([]byte(DataTsBucket))
tsVal := path + "-" + strconv.FormatInt(offset, 10)
ts := time.Now().Add(t)
found := false
// delete previous timestamps for the same object
c := tsBucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
if bytes.Equal(v, []byte(tsVal)) {
if tsInCache := time.Unix(0, btoi(k)); tsInCache.After(ts) && !found {
found = true
continue
}
err := c.Delete()
if err != nil {
fs.Debugf(path, "failed to clean chunk: %v", err)
}
}
}
if found {
return
}
err := tsBucket.Put(itob(ts.UnixNano()), []byte(tsVal))
if err != nil {
fs.Debugf(path, "failed to timestamp chunk: %v", err)
}
}
// updateRootTs is a convenience method to update an object timestamp to mark that it was used recently
func (b *Persistent) updateRootTs(tx *bolt.Tx, path string, t time.Duration) {
tsBucket := tx.Bucket([]byte(RootTsBucket))
@ -433,7 +415,6 @@ func (b *Persistent) HasChunk(cachedObject *Object, offset int64) bool {
// GetChunk will retrieve a single chunk which belongs to a cached object or an error if it doesn't find it
func (b *Persistent) GetChunk(cachedObject *Object, offset int64) ([]byte, error) {
p := cachedObject.abs()
var data []byte
fp := path.Join(b.dataPath, cachedObject.abs(), strconv.FormatInt(offset, 10))
@ -442,31 +423,11 @@ func (b *Persistent) GetChunk(cachedObject *Object, offset int64) ([]byte, error
return nil, err
}
d := cachedObject.CacheFs.chunkAge
if cachedObject.CacheFs.InWarmUp() {
d = cachedObject.CacheFs.metaAge
}
err = b.db.Update(func(tx *bolt.Tx) error {
b.updateChunkTs(tx, p, offset, d)
return nil
})
return data, err
}
// AddChunk adds a new chunk of a cached object
func (b *Persistent) AddChunk(cachedObject *Object, data []byte, offset int64) error {
t := cachedObject.CacheFs.chunkAge
if cachedObject.CacheFs.InWarmUp() {
t = cachedObject.CacheFs.metaAge
}
return b.AddChunkAhead(cachedObject.abs(), data, offset, t)
}
// AddChunkAhead adds a new chunk before caching an Object for it
// see fs.cacheWrites
func (b *Persistent) AddChunkAhead(fp string, data []byte, offset int64, t time.Duration) error {
func (b *Persistent) AddChunk(fp string, data []byte, offset int64) error {
_ = os.MkdirAll(path.Join(b.dataPath, fp), os.ModePerm)
filePath := path.Join(b.dataPath, fp, strconv.FormatInt(offset, 10))
@ -476,47 +437,101 @@ func (b *Persistent) AddChunkAhead(fp string, data []byte, offset int64, t time.
}
return b.db.Update(func(tx *bolt.Tx) error {
b.updateChunkTs(tx, fp, offset, t)
tsBucket := tx.Bucket([]byte(DataTsBucket))
ts := time.Now()
found := false
// delete (older) timestamps for the same object
c := tsBucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
var ci chunkInfo
err = json.Unmarshal(v, &ci)
if err != nil {
continue
}
if ci.Path == fp && ci.Offset == offset {
if tsInCache := time.Unix(0, btoi(k)); tsInCache.After(ts) && !found {
found = true
continue
}
err := c.Delete()
if err != nil {
fs.Debugf(fp, "failed to clean chunk: %v", err)
}
}
}
// don't overwrite if a newer one is already there
if found {
return nil
}
enc, err := json.Marshal(chunkInfo{Path: fp, Offset: offset, Size: int64(len(data))})
if err != nil {
fs.Debugf(fp, "failed to timestamp chunk: %v", err)
}
err = tsBucket.Put(itob(ts.UnixNano()), enc)
if err != nil {
fs.Debugf(fp, "failed to timestamp chunk: %v", err)
}
return nil
})
}
// CleanChunksByAge will cleanup on a cron basis
func (b *Persistent) CleanChunksByAge(chunkAge time.Duration) {
// NOOP
}
// CleanChunksByNeed is a noop for this implementation
func (b *Persistent) CleanChunksByNeed(offset int64) {
// noop: we want to clean a Bolt DB by time only
}
// CleanChunksBySize will cleanup chunks after the total size passes a certain point
func (b *Persistent) CleanChunksBySize(maxSize int64) {
b.cleanupMux.Lock()
defer b.cleanupMux.Unlock()
var cntChunks int
err := b.db.Update(func(tx *bolt.Tx) error {
min := itob(0)
max := itob(time.Now().UnixNano())
dataTsBucket := tx.Bucket([]byte(DataTsBucket))
if dataTsBucket == nil {
return errors.Errorf("Couldn't open (%v) bucket", DataTsBucket)
}
// iterate through ts
c := dataTsBucket.Cursor()
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
if v == nil {
continue
}
// split to get (abs path - offset)
val := string(v[:])
sepIdx := strings.LastIndex(val, "-")
pathCmp := val[:sepIdx]
offsetCmp := val[sepIdx+1:]
// delete this ts entry
err := c.Delete()
totalSize := int64(0)
for k, v := c.First(); k != nil; k, v = c.Next() {
var ci chunkInfo
err := json.Unmarshal(v, &ci)
if err != nil {
fs.Errorf(pathCmp, "failed deleting chunk ts during cleanup (%v): %v", val, err)
continue
}
err = os.Remove(path.Join(b.dataPath, pathCmp, offsetCmp))
if err == nil {
cntChunks = cntChunks + 1
totalSize += ci.Size
}
if totalSize > maxSize {
needToClean := totalSize - maxSize
for k, v := c.First(); k != nil; k, v = c.Next() {
var ci chunkInfo
err := json.Unmarshal(v, &ci)
if err != nil {
continue
}
// delete this ts entry
err = c.Delete()
if err != nil {
fs.Errorf(ci.Path, "failed deleting chunk ts during cleanup (%v): %v", ci.Offset, err)
continue
}
err = os.Remove(path.Join(b.dataPath, ci.Path, strconv.FormatInt(ci.Offset, 10)))
if err == nil {
cntChunks++
needToClean -= ci.Size
if needToClean <= 0 {
break
}
}
}
}
fs.Infof("cache", "deleted (%v) chunks", cntChunks)
@ -576,11 +591,6 @@ func (b *Persistent) CleanEntriesByAge(entryAge time.Duration) {
}
}
// CleanChunksByNeed is a noop for this implementation
func (b *Persistent) CleanChunksByNeed(offset int64) {
// noop: we want to clean a Bolt DB by time only
}
// Stats returns a go map with the stats key values
func (b *Persistent) Stats() (map[string]map[string]interface{}, error) {
r := make(map[string]map[string]interface{})
@ -590,6 +600,7 @@ func (b *Persistent) Stats() (map[string]map[string]interface{}, error) {
r["data"]["newest-ts"] = time.Now()
r["data"]["newest-file"] = ""
r["data"]["total-chunks"] = 0
r["data"]["total-size"] = int64(0)
r["files"] = make(map[string]interface{})
r["files"]["oldest-ts"] = time.Now()
r["files"]["oldest-name"] = ""
@ -612,19 +623,32 @@ func (b *Persistent) Stats() (map[string]map[string]interface{}, error) {
r["files"]["total-files"] = totalFiles
c := dataTsBucket.Cursor()
totalChunks := 0
totalSize := int64(0)
for k, v := c.First(); k != nil; k, v = c.Next() {
var ci chunkInfo
err := json.Unmarshal(v, &ci)
if err != nil {
continue
}
totalChunks++
totalSize += ci.Size
}
r["data"]["total-chunks"] = totalChunks
r["data"]["total-size"] = totalSize
if k, v := c.First(); k != nil {
// split to get (abs path - offset)
val := string(v[:])
p := val[:strings.LastIndex(val, "-")]
var ci chunkInfo
_ = json.Unmarshal(v, &ci)
r["data"]["oldest-ts"] = time.Unix(0, btoi(k))
r["data"]["oldest-file"] = p
r["data"]["oldest-file"] = ci.Path
}
if k, v := c.Last(); k != nil {
// split to get (abs path - offset)
val := string(v[:])
p := val[:strings.LastIndex(val, "-")]
var ci chunkInfo
_ = json.Unmarshal(v, &ci)
r["data"]["newest-ts"] = time.Unix(0, btoi(k))
r["data"]["newest-file"] = p
r["data"]["newest-file"] = ci.Path
}
c = rootTsBucket.Cursor()
@ -671,13 +695,17 @@ func (b *Persistent) Purge() {
// GetChunkTs retrieves the current timestamp of this chunk
func (b *Persistent) GetChunkTs(path string, offset int64) (time.Time, error) {
var t time.Time
tsVal := path + "-" + strconv.FormatInt(offset, 10)
err := b.db.View(func(tx *bolt.Tx) error {
tsBucket := tx.Bucket([]byte(DataTsBucket))
c := tsBucket.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
if bytes.Equal(v, []byte(tsVal)) {
var ci chunkInfo
err := json.Unmarshal(v, &ci)
if err != nil {
continue
}
if ci.Path == path && ci.Offset == offset {
t = time.Unix(0, btoi(k))
return nil
}

View File

@ -1,4 +1,4 @@
// +build !plan9
// +build !plan9,go1.7
package cachestats

View File

@ -1,6 +1,6 @@
// Build for cache for unsupported platforms to stop go complaining
// about "no buildable Go source files "
// +build plan9
// +build plan9 !go1.7
package cachestats

View File

@ -27,7 +27,7 @@ c) Copy remote
s) Set configuration password
q) Quit config
n/r/c/s/q> n
name> remote
name> test-cache
Type of storage to configure.
Choose a number from below, or type in your own value
...
@ -39,6 +39,19 @@ Remote to cache.
Normally should contain a ':' and a path, eg "myremote:path/to/dir",
"myremote:bucket" or maybe "myremote:" (not recommended).
remote> local:/test
Optional: The URL of the Plex server
plex_url> http://127.0.0.1:32400
Optional: The username of the Plex user
plex_username> dummyusername
Optional: The password of the Plex user
y) Yes type in my own password
g) Generate random password
n) No leave this optional password blank
y/g/n> y
Enter the password:
password:
Confirm the password:
password:
The size of a chunk. Lower value good for slow connections but can affect seamless reading.
Default: 5M
Choose a number from below, or type in your own value
@ -60,36 +73,26 @@ Choose a number from below, or type in your own value
3 / 24 hours
\ "48h"
info_age> 2
How much time should a chunk (file data) be stored in cache.
Accepted units are: "s", "m", "h".
Default: 3h
The maximum size of stored chunks. When the storage grows beyond this size, the oldest chunks will be deleted.
Default: 10G
Choose a number from below, or type in your own value
1 / 30 seconds
\ "30s"
2 / 1 minute
\ "1m"
3 / 1 hour and 30 minutes
\ "1h30m"
chunk_age> 3h
How much time should data be cached during warm up.
Accepted units are: "s", "m", "h".
Default: 24h
Choose a number from below, or type in your own value
1 / 3 hours
\ "3h"
2 / 6 hours
\ "6h"
3 / 24 hours
\ "24h"
warmup_age> 3
1 / 500 MB
\ "500M"
2 / 1 GB
\ "1G"
3 / 10 GB
\ "10G"
chunk_total_size> 3
Remote config
--------------------
[test-cache]
remote = local:/test
plex_url = http://127.0.0.1:32400
plex_username = dummyusername
plex_password = *** ENCRYPTED ***
chunk_size = 5M
info_age = 24h
chunk_age = 3h
warmup_age = 24h
info_age = 48h
chunk_total_size = 10G
```
You can then use it like this,
@ -126,30 +129,31 @@ and cloud providers, the cache remote can split multiple requests to the
cloud provider for smaller file chunks and combines them together locally
where they can be available almost immediately before the reader usually
needs them.
This is similar to buffering when media files are played online. Rclone
will stay around the current marker but always try its best to stay ahead
and prepare the data before.
#### Warmup mode ####
#### Plex Integration ####
A negative side of running multiple requests on the cloud provider is
that you can easily reach a limit on how many requests or how much data
you can download from a cloud provider in a window of time.
For this reason, a warmup mode is a state where `cache` changes its settings
to talk less with the cloud provider.
There is a direct integration with Plex which allows cache to detect during reading
if the file is in playback or not. This helps cache to adapt how it queries
the cloud provider depending on what is needed for.
To prevent a ban or a similar action from the cloud provider, `cache` will
keep track of all the open files and during intensive times when it passes
a configured threshold, it will change its settings to a warmup mode.
Scans will have a minimum amount of workers (1) while in a confirmed playback cache
will deploy the configured number of workers.
It can also be disabled during single file streaming if `cache` sees that we're
reading the file in sequence and can go load its parts in advance.
This integration opens the doorway to additional performance improvements
which will be explored in the near future.
**Note:** If Plex options are not configured, `cache` will function with its
configured options without adapting any of its settings.
How to enable? Run `rclone config` and add all the Plex options (endpoint, username
and password) in your remote and it will be automatically enabled.
Affected settings:
- `cache-chunk-no-memory`: _disabled_
- `cache-workers`: _1_
- file chunks will now be cached using `cache-warm-up-age` as a duration instead of the
regular `cache-chunk-age`
- `cache-workers`: _Configured value_ during confirmed playback or _1_ all the other times
### Known issues ###
@ -194,6 +198,22 @@ connections.
**Default**: 5M
#### --cache-total-chunk-size=SIZE ####
The total size that the chunks can take up on the local disk. If `cache`
exceeds this value then it will start to the delete the oldest chunks until
it goes under this value.
**Default**: 10G
#### --cache-chunk-clean-interval=DURATION ####
How often should `cache` perform cleanups of the chunk storage. The default value
should be ok for most people. If you find that `cache` goes over `cache-total-chunk-size`
too often then try to lower this value to force it to perform cleanups more often.
**Default**: 1m
#### --cache-info-age=DURATION ####
How long to keep file structure information (directory listings, file size,
@ -204,25 +224,6 @@ this value very large as the cache store will also be updated in real time.
**Default**: 6h
#### --cache-chunk-age=DURATION ####
How long to keep file chunks (partial data) locally.
Longer durations will result in larger cache stores as data will be cleared
less often.
**Default**: 3h
#### --cache-warm-up-age=DURATION ####
How long to keep file chunks (partial data) locally during warmup times.
If `cache` goes through intensive read times when it is scanned for information
then this setting will allow you to customize higher storage times for that
data. Otherwise, it's safe to keep the same value as `cache-chunk-age`.
**Default**: 3h
#### --cache-read-retries=RETRIES ####
How many times to retry a read from a cache storage.
@ -235,7 +236,7 @@ able to provide file data anymore.
For really slow connections, increase this to a point where the stream is
able to provide data but your experience will be very stuttering.
**Default**: 3
**Default**: 10
#### --cache-workers=WORKERS ####
@ -247,6 +248,9 @@ This impacts several aspects like the cloud provider API limits, more stress
on the hardware that rclone runs on but it also means that streams will
be more fluid and data will be available much more faster to readers.
**Note**: If the optional Plex integration is enabled then this setting
will adapt to the type of reading performed and the value specified here will be used
as a maximum number of workers to use.
**Default**: 4
#### --cache-chunk-no-memory ####
@ -268,9 +272,7 @@ available on the local machine.
#### --cache-rps=NUMBER ####
Some of the rclone remotes that `cache` will wrap have back-off or limits
in place to not reach cloud provider limits. This is similar to that.
It places a hard limit on the number of requests per second that `cache`
This setting places a hard limit on the number of requests per second that `cache`
will be doing to the cloud provider remote and try to respect that value
by setting waits between reads.
@ -278,27 +280,13 @@ If you find that you're getting banned or limited on the cloud provider
through cache and know that a smaller number of requests per second will
allow you to work with it then you can use this setting for that.
A good balance of all the other settings and warmup times should make this
A good balance of all the other settings should make this
setting useless but it is available to set for more special cases.
**NOTE**: This will limit the number of requests during streams but other
API calls to the cloud provider like directory listings will still pass.
**Default**: 4
#### --cache-warm-up-rps=RATE/SECONDS ####
This setting allows `cache` to change its settings for warmup mode or revert
back from it.
`cache` keeps track of all open files and when there are `RATE` files open
during `SECONDS` window of time reached it will activate warmup and change
its settings as explained in the `Warmup mode` section.
When the number of files being open goes under `RATE` in the same amount
of time, `cache` will disable this mode.
**Default**: 3/20
**Default**: disabled
#### --cache-writes ####

View File

@ -166,6 +166,6 @@ func main() {
generateTestProgram(t, fns, "AzureBlob", buildConstraint("go1.7"))
generateTestProgram(t, fns, "Pcloud")
generateTestProgram(t, fns, "Webdav")
generateTestProgram(t, fns, "Cache", buildConstraint("!plan9"))
generateTestProgram(t, fns, "Cache", buildConstraint("!plan9,go1.7"))
log.Printf("Done")
}