From cdbe3691b7454be5919c1487d12a8d76fc73b6e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=B6ller?= Date: Thu, 30 Aug 2018 11:09:16 +0200 Subject: [PATCH] cache: add cache/fetch rc function --- backend/cache/cache.go | 203 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 189 insertions(+), 14 deletions(-) diff --git a/backend/cache/cache.go b/backend/cache/cache.go index d7541c080..88c98a894 100644 --- a/backend/cache/cache.go +++ b/backend/cache/cache.go @@ -6,10 +6,12 @@ import ( "context" "fmt" "io" + "math" "os" "os/signal" "path" "path/filepath" + "strconv" "strings" "sync" "syscall" @@ -455,6 +457,39 @@ Eg Title: "Get cache stats", Help: ` Show statistics for the cache remote. +`, + }) + + rc.Add(rc.Call{ + Path: "cache/fetch", + Fn: f.rcFetch, + Title: "Fetch file chunks", + Help: ` +Ensure the specified file chunks are cached on disk. + +The chunks= parameter specifies the file chunks to check. +It takes a comma separated list of array slice indices. +The slice indices are similar to Python slices: start[:end] + +start is the 0 based chunk number from the beginning of the file +to fetch inclusive. end is 0 based chunk number from the beginning +of the file to fetch exclisive. +Both values can be negative, in which case they count from the back +of the file. The value "-5:" represents the last 5 chunks of a file. + +Some valid examples are: +":5,-5:" -> the first and last five chunks +"0,-2" -> the first and the second last chunk +"0:10" -> the first ten chunks + +Any parameter with a key that starts with "file" can be used to +specify files to fetch, eg + + rclone rc cache/fetch chunks=0 file=hello file2=home/goodbye + +File names will automatically be encrypted when the a crypt remote +is used on top of the cache. + `, }) @@ -472,6 +507,22 @@ func (f *Fs) httpStats(in rc.Params) (out rc.Params, err error) { return out, nil } +func (f *Fs) unwrapRemote(remote string) string { + remote = cleanPath(remote) + if remote != "" { + // if it's wrapped by crypt we need to check what format we got + if cryptFs, yes := f.isWrappedByCrypt(); yes { + _, err := cryptFs.DecryptFileName(remote) + // if it failed to decrypt then it is a decrypted format and we need to encrypt it + if err != nil { + return cryptFs.EncryptFileName(remote) + } + // else it's an encrypted format and we can use it as it is + } + } + return remote +} + func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) { out = make(rc.Params) remoteInt, ok := in["remote"] @@ -485,20 +536,9 @@ func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) { withData = true } - if cleanPath(remote) != "" { - // if it's wrapped by crypt we need to check what format we got - if cryptFs, yes := f.isWrappedByCrypt(); yes { - _, err := cryptFs.DecryptFileName(remote) - // if it failed to decrypt then it is a decrypted format and we need to encrypt it - if err != nil { - remote = cryptFs.EncryptFileName(remote) - } - // else it's an encrypted format and we can use it as it is - } - - if !f.cache.HasEntry(path.Join(f.Root(), remote)) { - return out, errors.Errorf("%s doesn't exist in cache", remote) - } + remote = f.unwrapRemote(remote) + if !f.cache.HasEntry(path.Join(f.Root(), remote)) { + return out, errors.Errorf("%s doesn't exist in cache", remote) } co := NewObject(f, remote) @@ -528,6 +568,141 @@ func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) { return out, nil } +func (f *Fs) rcFetch(in rc.Params) (rc.Params, error) { + type chunkRange struct { + start, end int64 + } + parseChunks := func(ranges string) (crs []chunkRange, err error) { + for _, part := range strings.Split(ranges, ",") { + var start, end int64 = 0, math.MaxInt64 + switch ints := strings.Split(part, ":"); len(ints) { + case 1: + start, err = strconv.ParseInt(ints[0], 10, 64) + if err != nil { + return nil, errors.Errorf("invalid range: %q", part) + } + end = start + 1 + case 2: + if ints[0] != "" { + start, err = strconv.ParseInt(ints[0], 10, 64) + if err != nil { + return nil, errors.Errorf("invalid range: %q", part) + } + } + if ints[1] != "" { + end, err = strconv.ParseInt(ints[1], 10, 64) + if err != nil { + return nil, errors.Errorf("invalid range: %q", part) + } + } + default: + return nil, errors.Errorf("invalid range: %q", part) + } + crs = append(crs, chunkRange{start: start, end: end}) + } + return + } + walkChunkRange := func(cr chunkRange, size int64, cb func(chunk int64)) { + if size <= 0 { + return + } + chunks := (size-1)/f.ChunkSize() + 1 + + start, end := cr.start, cr.end + if start < 0 { + start += chunks + } + if end <= 0 { + end += chunks + } + if end <= start { + return + } + switch { + case start < 0: + start = 0 + case start >= chunks: + return + } + switch { + case end <= start: + end = start + 1 + case end >= chunks: + end = chunks + } + for i := start; i < end; i++ { + cb(i) + } + } + walkChunkRanges := func(crs []chunkRange, size int64, cb func(chunk int64)) { + for _, cr := range crs { + walkChunkRange(cr, size, cb) + } + } + + v, ok := in["chunks"] + if !ok { + return nil, errors.New("missing chunks parameter") + } + s, ok := v.(string) + if !ok { + return nil, errors.New("invalid chunks parameter") + } + delete(in, "chunks") + crs, err := parseChunks(s) + if err != nil { + return nil, errors.Wrap(err, "invalid chunks parameter") + } + var files [][2]string + for k, v := range in { + if !strings.HasPrefix(k, "file") { + return nil, errors.Errorf("invalid parameter %s=%s", k, v) + } + switch v := v.(type) { + case string: + files = append(files, [2]string{v, f.unwrapRemote(v)}) + default: + return nil, errors.Errorf("invalid parameter %s=%s", k, v) + } + } + type fileStatus struct { + Error error + FetchedChunks int + } + fetchedChunks := make(map[string]fileStatus, len(files)) + for _, pair := range files { + file, remote := pair[0], pair[1] + var status fileStatus + o, err := f.NewObject(remote) + if err != nil { + status.Error = err + continue + } + co := o.(*Object) + err = co.refreshFromSource(true) + if err != nil { + status.Error = err + continue + } + handle := NewObjectHandle(co, f) + handle.UseMemory = false + handle.scaleWorkers(1) + walkChunkRanges(crs, co.Size(), func(chunk int64) { + _, err := handle.getChunk(chunk * f.ChunkSize()) + if err != nil { + if status.Error == nil { + status.Error = err + } + } else { + status.FetchedChunks++ + } + }) + fetchedChunks[file] = status + } + + return rc.Params{"status": fetchedChunks}, nil +} + // receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files func (f *Fs) receiveChangeNotify(forgetPath string, entryType fs.EntryType) { if crypt, yes := f.isWrappedByCrypt(); yes {