mirror of
https://github.com/rclone/rclone.git
synced 2024-11-07 09:04:52 +01:00
serve nfs: implement --nfs-cache-type symlink
`--nfs-cache-type symlink` is similar to `--nfs-cache-type disk` in that it uses an on disk cache, but the cache entries are held as symlinks. Rclone will use the handle of the underlying file as the NFS handle which improves performance.
This commit is contained in:
parent
1ca3f12672
commit
de20f9fc83
@ -3,6 +3,9 @@
|
||||
package nfsmount
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"testing"
|
||||
@ -30,7 +33,24 @@ func TestMount(t *testing.T) {
|
||||
}
|
||||
sudo = true
|
||||
}
|
||||
nfs.Opt.HandleCacheDir = t.TempDir()
|
||||
require.NoError(t, nfs.Opt.HandleCache.Set("disk"))
|
||||
vfstest.RunTests(t, false, vfscommon.CacheModeWrites, false, mount)
|
||||
for _, cacheType := range []string{"memory", "disk", "symlink"} {
|
||||
t.Run(cacheType, func(t *testing.T) {
|
||||
nfs.Opt.HandleCacheDir = t.TempDir()
|
||||
require.NoError(t, nfs.Opt.HandleCache.Set(cacheType))
|
||||
// Check we can create a handler
|
||||
_, err := nfs.NewHandler(context.Background(), nil, &nfs.Opt)
|
||||
if errors.Is(err, nfs.ErrorSymlinkCacheNotSupported) || errors.Is(err, nfs.ErrorSymlinkCacheNoPermission) {
|
||||
t.Skip(err.Error() + ": run with: go test -c && sudo setcap cap_dac_read_search+ep ./nfsmount.test && ./nfsmount.test -test.v")
|
||||
}
|
||||
require.NoError(t, err)
|
||||
// Configure rclone via environment var since the mount gets run in a subprocess
|
||||
_ = os.Setenv("RCLONE_NFS_CACHE_DIR", nfs.Opt.HandleCacheDir)
|
||||
_ = os.Setenv("RCLONE_NFS_CACHE_TYPE", cacheType)
|
||||
t.Cleanup(func() {
|
||||
_ = os.Unsetenv("RCLONE_NFS_CACHE_DIR")
|
||||
_ = os.Unsetenv("RCLONE_NFS_CACHE_TYPE")
|
||||
})
|
||||
vfstest.RunTests(t, false, vfscommon.CacheModeWrites, false, mount)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,12 @@ import (
|
||||
nfshelper "github.com/willscott/go-nfs/helpers"
|
||||
)
|
||||
|
||||
// Errors on cache initialisation
|
||||
var (
|
||||
ErrorSymlinkCacheNotSupported = errors.New("symlink cache not supported on " + runtime.GOOS)
|
||||
ErrorSymlinkCacheNoPermission = errors.New("symlink cache must be run as root or with CAP_DAC_READ_SEARCH")
|
||||
)
|
||||
|
||||
// Cache controls the file handle cache implementation
|
||||
type Cache interface {
|
||||
// ToHandle takes a file and represents it with an opaque handle to reference it.
|
||||
@ -43,25 +49,35 @@ type Cache interface {
|
||||
|
||||
// Set the cache of the handler to the type required by the user
|
||||
func (h *Handler) getCache() (c Cache, err error) {
|
||||
fs.Debugf("nfs", "Starting %v handle cache", h.opt.HandleCache)
|
||||
switch h.opt.HandleCache {
|
||||
case cacheMemory:
|
||||
return nfshelper.NewCachingHandler(h, h.opt.HandleLimit), nil
|
||||
case cacheDisk:
|
||||
return newDiskHandler(h)
|
||||
case cacheSymlink:
|
||||
if runtime.GOOS != "linux" {
|
||||
return nil, errors.New("can only use symlink cache on Linux")
|
||||
dh, err := newDiskHandler(h)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, errors.New("FIXME not implemented yet")
|
||||
err = dh.makeSymlinkCache()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dh, nil
|
||||
}
|
||||
return nil, errors.New("unknown handle cache type")
|
||||
}
|
||||
|
||||
// diskHandler implements an on disk NFS file handle cache
|
||||
type diskHandler struct {
|
||||
mu sync.RWMutex
|
||||
cacheDir string
|
||||
billyFS billy.Filesystem
|
||||
mu sync.RWMutex
|
||||
cacheDir string
|
||||
billyFS billy.Filesystem
|
||||
write func(fh []byte, cachePath string, fullPath string) ([]byte, error)
|
||||
read func(fh []byte, cachePath string) ([]byte, error)
|
||||
remove func(fh []byte, cachePath string) error
|
||||
handleType int32 //nolint:unused // used by the symlink cache
|
||||
}
|
||||
|
||||
// Create a new disk handler
|
||||
@ -83,6 +99,9 @@ func newDiskHandler(h *Handler) (dh *diskHandler, err error) {
|
||||
dh = &diskHandler{
|
||||
cacheDir: cacheDir,
|
||||
billyFS: h.billyFS,
|
||||
write: dh.diskCacheWrite,
|
||||
read: dh.diskCacheRead,
|
||||
remove: dh.diskCacheRemove,
|
||||
}
|
||||
fs.Infof("nfs", "Storing handle cache in %q", dh.cacheDir)
|
||||
return dh, nil
|
||||
@ -120,7 +139,7 @@ func (dh *diskHandler) ToHandle(f billy.Filesystem, splitPath []string) (fh []by
|
||||
fs.Errorf("nfs", "Couldn't create cache file handle directory: %v", err)
|
||||
return fh
|
||||
}
|
||||
err = os.WriteFile(cachePath, []byte(fullPath), 0600)
|
||||
fh, err = dh.write(fh, cachePath, fullPath)
|
||||
if err != nil {
|
||||
fs.Errorf("nfs", "Couldn't create cache file handle: %v", err)
|
||||
return fh
|
||||
@ -128,6 +147,11 @@ func (dh *diskHandler) ToHandle(f billy.Filesystem, splitPath []string) (fh []by
|
||||
return fh
|
||||
}
|
||||
|
||||
// Write the fullPath into cachePath returning the possibly updated fh
|
||||
func (dh *diskHandler) diskCacheWrite(fh []byte, cachePath string, fullPath string) ([]byte, error) {
|
||||
return fh, os.WriteFile(cachePath, []byte(fullPath), 0600)
|
||||
}
|
||||
|
||||
var errStaleHandle = &nfs.NFSStatusError{NFSStatus: nfs.NFSStatusStale}
|
||||
|
||||
// FromHandle converts from an opaque handle to the file it represents
|
||||
@ -135,7 +159,7 @@ func (dh *diskHandler) FromHandle(fh []byte) (f billy.Filesystem, splitPath []st
|
||||
dh.mu.RLock()
|
||||
defer dh.mu.RUnlock()
|
||||
cachePath := dh.handleToPath(fh)
|
||||
fullPathBytes, err := os.ReadFile(cachePath)
|
||||
fullPathBytes, err := dh.read(fh, cachePath)
|
||||
if err != nil {
|
||||
fs.Errorf("nfs", "Stale handle %q: %v", cachePath, err)
|
||||
return nil, nil, errStaleHandle
|
||||
@ -144,18 +168,28 @@ func (dh *diskHandler) FromHandle(fh []byte) (f billy.Filesystem, splitPath []st
|
||||
return dh.billyFS, splitPath, nil
|
||||
}
|
||||
|
||||
// Read the contents of (fh, cachePath)
|
||||
func (dh *diskHandler) diskCacheRead(fh []byte, cachePath string) ([]byte, error) {
|
||||
return os.ReadFile(cachePath)
|
||||
}
|
||||
|
||||
// Invalidate the handle passed - used on rename and delete
|
||||
func (dh *diskHandler) InvalidateHandle(f billy.Filesystem, fh []byte) error {
|
||||
dh.mu.Lock()
|
||||
defer dh.mu.Unlock()
|
||||
cachePath := dh.handleToPath(fh)
|
||||
err := os.Remove(cachePath)
|
||||
err := dh.remove(fh, cachePath)
|
||||
if err != nil {
|
||||
fs.Errorf("nfs", "Failed to remove handle %q: %v", cachePath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove the (fh, cachePath) file
|
||||
func (dh *diskHandler) diskCacheRemove(fh []byte, cachePath string) error {
|
||||
return os.Remove(cachePath)
|
||||
}
|
||||
|
||||
// HandleLimit exports how many file handles can be safely stored by this cache.
|
||||
func (dh *diskHandler) HandleLimit() int {
|
||||
return math.MaxInt
|
||||
|
@ -13,6 +13,9 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// NB to test the symlink cache, running with elevated permissions is needed
|
||||
const testSymlinkCache = "go test -c && sudo setcap cap_dac_read_search+ep ./nfs.test && ./nfs.test -test.v -test.run TestCache/symlink"
|
||||
|
||||
// Check basic CRUD operations
|
||||
func testCacheCRUD(t *testing.T, h *Handler, c Cache, fileName string) {
|
||||
// Check reading a non existent handle returns an error
|
||||
@ -101,11 +104,12 @@ func TestCache(t *testing.T) {
|
||||
ci := fs.GetConfig(context.Background())
|
||||
oldLogLevel := ci.LogLevel
|
||||
ci.LogLevel = fs.LogLevelEmergency
|
||||
//ci.LogLevel = fs.LogLevelDebug
|
||||
defer func() {
|
||||
ci.LogLevel = oldLogLevel
|
||||
}()
|
||||
billyFS := &FS{nil} // place holder billyFS
|
||||
for _, cacheType := range []handleCache{cacheMemory, cacheDisk} {
|
||||
for _, cacheType := range []handleCache{cacheMemory, cacheDisk, cacheSymlink} {
|
||||
cacheType := cacheType
|
||||
t.Run(cacheType.String(), func(t *testing.T) {
|
||||
h := &Handler{
|
||||
@ -115,8 +119,27 @@ func TestCache(t *testing.T) {
|
||||
h.opt.HandleCache = cacheType
|
||||
h.opt.HandleCacheDir = t.TempDir()
|
||||
c, err := h.getCache()
|
||||
if err == ErrorSymlinkCacheNotSupported {
|
||||
t.Skip(err.Error())
|
||||
}
|
||||
if err == ErrorSymlinkCacheNoPermission {
|
||||
t.Skip("Need more permissions to run symlink cache tests: " + testSymlinkCache)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("Empty", func(t *testing.T) {
|
||||
// Write a handle
|
||||
splitPath := []string{""}
|
||||
fh := c.ToHandle(h.billyFS, splitPath)
|
||||
assert.True(t, len(fh) > 0)
|
||||
|
||||
// Read the handle back
|
||||
newFs, newSplitPath, err := c.FromHandle(fh)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, h.billyFS, newFs)
|
||||
assert.Equal(t, splitPath, newSplitPath)
|
||||
testCacheCRUD(t, h, c, "file")
|
||||
})
|
||||
t.Run("CRUD", func(t *testing.T) {
|
||||
testCacheCRUD(t, h, c, "file")
|
||||
})
|
||||
|
@ -145,7 +145,9 @@ that it uses an on disk cache, but the cache entries are held as
|
||||
symlinks. Rclone will use the handle of the underlying file as the NFS
|
||||
handle which improves performance. This sort of cache can't be backed
|
||||
up and restored as the underlying handles will change. This is Linux
|
||||
only.
|
||||
only. It requres running rclone as root or with |CAP_DAC_READ_SEARCH|.
|
||||
You can run rclone with this extra permission by doing this to the
|
||||
rclone binary |sudo setcap cap_dac_read_search+ep /path/to/rclone|.
|
||||
|
||||
|--nfs-cache-handle-limit| controls the maximum number of cached NFS
|
||||
handles stored by the caching handler. This should not be set too low
|
||||
|
177
cmd/serve/nfs/symlink_cache_linux.go
Normal file
177
cmd/serve/nfs/symlink_cache_linux.go
Normal file
@ -0,0 +1,177 @@
|
||||
//go:build unix && linux
|
||||
|
||||
/*
|
||||
This implements an efficient disk cache for the NFS file handles for
|
||||
Linux only.
|
||||
|
||||
1. The destination paths are stored as symlink destinations. These
|
||||
can be stored in the directory for maximum efficiency.
|
||||
|
||||
2. The on disk handle of the cache file is returned to NFS with
|
||||
name_to_handle_at(). This means that if the cache is deleted and
|
||||
restored, the file handle mapping will be lost.
|
||||
|
||||
3. These handles are looked up with open_by_handle_at() so no
|
||||
searching through directory trees is needed.
|
||||
|
||||
Note that open_by_handle_at requires CAP_DAC_READ_SEARCH so rclone
|
||||
will need to be run as root or with elevated permissions.
|
||||
|
||||
Test with
|
||||
|
||||
go test -c && sudo setcap cap_dac_read_search+ep ./nfs.test && ./nfs.test -test.v -test.run TestCache/symlink
|
||||
|
||||
*/
|
||||
|
||||
package nfs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// emptyPath is written instead of "" as symlinks can't be empty
|
||||
var (
|
||||
emptyPath = "\x01"
|
||||
emptyPathBytes = []byte(emptyPath)
|
||||
)
|
||||
|
||||
// Turn the diskHandler into a symlink cache
|
||||
//
|
||||
// This also tests the cache works as it may not have enough
|
||||
// permissions or have be the correct Linux version.
|
||||
func (dh *diskHandler) makeSymlinkCache() error {
|
||||
path := filepath.Join(dh.cacheDir, "test")
|
||||
fullPath := "testpath"
|
||||
fh := []byte{1, 2, 3, 4, 5}
|
||||
|
||||
// Create a symlink
|
||||
newFh, err := dh.symlinkCacheWrite(fh, path, fullPath)
|
||||
fs.Debugf(nil, "newFh = %q", newFh)
|
||||
if err != nil {
|
||||
return fmt.Errorf("symlink cache write test failed: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = os.Remove(path)
|
||||
}()
|
||||
|
||||
// Read it back
|
||||
newFullPath, err := dh.symlinkCacheRead(newFh, path)
|
||||
fs.Debugf(nil, "newFullPath = %q", newFullPath)
|
||||
if err != nil {
|
||||
if errors.Is(err, syscall.EPERM) {
|
||||
return ErrorSymlinkCacheNoPermission
|
||||
}
|
||||
return fmt.Errorf("symlink cache read test failed: %w", err)
|
||||
}
|
||||
|
||||
// Check result all OK
|
||||
if string(newFullPath) != fullPath {
|
||||
return fmt.Errorf("symlink cache read test failed: expecting %q read %q", string(newFullPath), fullPath)
|
||||
}
|
||||
|
||||
// If OK install symlink cache
|
||||
dh.read = dh.symlinkCacheRead
|
||||
dh.write = dh.symlinkCacheWrite
|
||||
dh.remove = dh.symlinkCacheRemove
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write the fullPath into cachePath returning the possibly updated fh
|
||||
//
|
||||
// This writes the fullPath into the file with the cachePath given and
|
||||
// returns the handle for that file so we can look it up later.
|
||||
func (dh *diskHandler) symlinkCacheWrite(fh []byte, cachePath string, fullPath string) (newFh []byte, err error) {
|
||||
//defer log.Trace(nil, "fh=%x, cachePath=%q, fullPath=%q", fh, cachePath)("newFh=%x, err=%v", &newFh, &err)
|
||||
|
||||
// Can't write an empty symlink so write a substitution
|
||||
if fullPath == "" {
|
||||
fullPath = emptyPath
|
||||
}
|
||||
|
||||
// Write the symlink
|
||||
err = os.Symlink(fullPath, cachePath)
|
||||
if err != nil && !errors.Is(err, syscall.EEXIST) {
|
||||
return nil, fmt.Errorf("symlink cache create symlink: %w", err)
|
||||
}
|
||||
|
||||
// Read the newly created symlinks handle
|
||||
handle, _, err := unix.NameToHandleAt(unix.AT_FDCWD, cachePath, 0)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("symlink cache name to handle at: %w", err)
|
||||
}
|
||||
|
||||
// Store the handle type if it hasn't changed
|
||||
// This should run once only when called by makeSymlinkCache
|
||||
if dh.handleType != handle.Type() {
|
||||
dh.handleType = handle.Type()
|
||||
}
|
||||
|
||||
return handle.Bytes(), nil
|
||||
}
|
||||
|
||||
// Read the contents of (fh, cachePath)
|
||||
//
|
||||
// This reads the symlink with the corresponding file handle and
|
||||
// returns the contents. It ignores the cachePath which will be
|
||||
// pointing in the wrong place.
|
||||
//
|
||||
// Note that the caller needs CAP_DAC_READ_SEARCH to use this.
|
||||
func (dh *diskHandler) symlinkCacheRead(fh []byte, cachePath string) (fullPath []byte, err error) {
|
||||
//defer log.Trace(nil, "fh=%x, cachePath=%q", fh, cachePath)("fullPath=%q, err=%v", &fullPath, &err)
|
||||
|
||||
// Find the file with the handle passed in
|
||||
handle := unix.NewFileHandle(dh.handleType, fh)
|
||||
fd, err := unix.OpenByHandleAt(unix.AT_FDCWD, handle, unix.O_RDONLY|unix.O_PATH|unix.O_NOFOLLOW) // needs O_PATH for symlinks
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("symlink cache open by handle at: %w", err)
|
||||
}
|
||||
|
||||
// Close it on exit
|
||||
defer func() {
|
||||
newErr := unix.Close(fd)
|
||||
if err != nil {
|
||||
err = newErr
|
||||
}
|
||||
}()
|
||||
|
||||
// Read the symlink which is the path required
|
||||
buf := make([]byte, 1024) // Max path length
|
||||
n, err := unix.Readlinkat(fd, "", buf) // It will (silently) truncate the contents, in case the buffer is too small to hold all of the contents.
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("symlink cache read: %w", err)
|
||||
}
|
||||
fullPath = buf[:n:n]
|
||||
|
||||
// Undo empty symlink substitution
|
||||
if bytes.Equal(fullPath, emptyPathBytes) {
|
||||
fullPath = buf[:0:0]
|
||||
}
|
||||
|
||||
return fullPath, nil
|
||||
}
|
||||
|
||||
// Remove the (fh, cachePath) file
|
||||
func (dh *diskHandler) symlinkCacheRemove(fh []byte, cachePath string) error {
|
||||
// First read the path
|
||||
fullPath, err := dh.symlinkCacheRead(fh, cachePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// fh for the actual cache file
|
||||
fh = hashPath(string(fullPath))
|
||||
|
||||
// cachePath for the actual cache file
|
||||
cachePath = dh.handleToPath(fh)
|
||||
|
||||
return os.Remove(cachePath)
|
||||
}
|
8
cmd/serve/nfs/symlink_cache_other.go
Normal file
8
cmd/serve/nfs/symlink_cache_other.go
Normal file
@ -0,0 +1,8 @@
|
||||
//go:build unix && !linux
|
||||
|
||||
package nfs
|
||||
|
||||
// Turn the diskHandler into a symlink cache
|
||||
func (dh *diskHandler) makeSymlinkCache() error {
|
||||
return ErrorSymlinkCacheNotSupported
|
||||
}
|
Loading…
Reference in New Issue
Block a user