mirror of
https://github.com/rclone/rclone.git
synced 2024-12-22 15:11:56 +01:00
vfs: chunked files which can be read and written at will
This introduces the vfs/chunked library which can open a file like object which is stored in parts on the remote. This can be read and written to anywhere and at any time.
This commit is contained in:
parent
2f9c2cf75e
commit
9fb0afad88
439
vfs/chunked/chunked.go
Normal file
439
vfs/chunked/chunked.go
Normal file
@ -0,0 +1,439 @@
|
||||
// Package chunked provides an infinite chunked file abstraction from
|
||||
// the VFS.
|
||||
//
|
||||
// This can be used in the vfs layer to make chunked files, and in
|
||||
// something like rclone serve nbd.
|
||||
package chunked
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
stdfs "io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/log"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
"github.com/rclone/rclone/vfs"
|
||||
"github.com/rclone/rclone/vfs/vfscommon"
|
||||
)
|
||||
|
||||
const (
|
||||
infoName = "info.json" // name of chunk info file
|
||||
minChunkBits = 4 // min size of chunk is 16 bytes
|
||||
maxChunkBits = 30 // max size of chunk is 1 GB
|
||||
defaultChunkBits = 16 // 64k chunks by default
|
||||
maxBufferChunks = 1024 // default number of chunks in read buffer
|
||||
maxDirtyChunks = 128 // default number of chuns in write buffer
|
||||
currentInfoVersion = 1 // version of the info file
|
||||
)
|
||||
|
||||
// Info is serialized to the directory
|
||||
type Info struct {
|
||||
Version int // version of chunk file
|
||||
Comment string // note about this file
|
||||
Size int64 // current size of the file
|
||||
ChunkBits uint // number of bits in the chunk
|
||||
ChunkSize int // must be power of two (1 << ChunkBits)
|
||||
}
|
||||
|
||||
// File stores info about the file
|
||||
type File struct {
|
||||
// these are read-only after creation so no locking required
|
||||
|
||||
vfs *vfs.VFS // underlying VFS
|
||||
dir string // path to directory
|
||||
chunkSize int // size of a chunk 1 << info.ChunkBits
|
||||
mask int64 // mask an offset onto a chunk boundary ^(chunkSize-1)
|
||||
chunkMask int64 // mask an offset into an intra chunk index (chunkSize-1)
|
||||
|
||||
mu sync.Mutex // lock for info
|
||||
opens int // number of file handles open on this File
|
||||
accessed time.Time // time file was last opened or closed
|
||||
valid bool // true if the info is valid
|
||||
info Info // info about the file
|
||||
infoRemote string // path to info object
|
||||
sizeChanged time.Time // when the size was changed
|
||||
}
|
||||
|
||||
// New creates a new chunked file at dir.
|
||||
func New(vfs *vfs.VFS, dir string) (cf *File) {
|
||||
cf = &File{
|
||||
vfs: vfs,
|
||||
dir: dir,
|
||||
infoRemote: path.Join(dir, infoName),
|
||||
}
|
||||
return cf
|
||||
}
|
||||
|
||||
// Open - open an existing file or create a new one with bits chunksize
|
||||
//
|
||||
// if create is not set then it will error if the file does not exist
|
||||
//
|
||||
// if bits is 0 then it uses the default value.
|
||||
//
|
||||
// Call Close() to show that you are no longer using this file.
|
||||
//
|
||||
// Open and Close can be called multiple times on one *File
|
||||
func (cf *File) Open(create bool, bits uint) (err error) {
|
||||
cf.mu.Lock()
|
||||
defer cf.mu.Unlock()
|
||||
|
||||
if bits == 0 {
|
||||
bits = defaultChunkBits
|
||||
}
|
||||
if bits < minChunkBits {
|
||||
return fmt.Errorf("chunk bits %d too small, must be >= %d", bits, minChunkBits)
|
||||
}
|
||||
if bits > maxChunkBits {
|
||||
return fmt.Errorf("chunk bits %d too large, must be <= %d", bits, maxChunkBits)
|
||||
}
|
||||
|
||||
if !cf.valid {
|
||||
err = cf._readInfo()
|
||||
if err != nil && (!create || !errors.Is(err, stdfs.ErrNotExist)) {
|
||||
return fmt.Errorf("failed to open chunked file: read info failed: %w", err)
|
||||
}
|
||||
if err != nil {
|
||||
cf.info = Info{
|
||||
Size: 0,
|
||||
ChunkBits: bits,
|
||||
ChunkSize: 1 << bits,
|
||||
Version: currentInfoVersion,
|
||||
Comment: "rclone chunked file",
|
||||
}
|
||||
err = cf._writeInfo()
|
||||
if err != nil && err != fs.ErrorObjectNotFound {
|
||||
return fmt.Errorf("failed to open chunked file: write info failed: %w", err)
|
||||
}
|
||||
}
|
||||
cf.valid = true
|
||||
cf._updateChunkBits()
|
||||
}
|
||||
|
||||
// Show another open
|
||||
cf.accessed = time.Now()
|
||||
cf.opens++
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close this *File
|
||||
//
|
||||
// It also writes the size out if it has changed and flushes the
|
||||
// buffers.
|
||||
//
|
||||
// Open and Close can be called multiple times on one *File
|
||||
func (cf *File) Close() error {
|
||||
cf.mu.Lock()
|
||||
defer cf.mu.Unlock()
|
||||
cf.accessed = time.Now()
|
||||
if cf.opens <= 0 {
|
||||
return errors.New("unbalanced open/close on File")
|
||||
}
|
||||
cf.opens--
|
||||
return cf._sync()
|
||||
}
|
||||
|
||||
// sets all the constants which depend on cf.info.ChunkBits
|
||||
//
|
||||
// call with mu held
|
||||
func (cf *File) _updateChunkBits() {
|
||||
cf.chunkSize = 1 << cf.info.ChunkBits
|
||||
cf.chunkMask = int64(cf.chunkSize - 1)
|
||||
cf.mask = ^cf.chunkMask
|
||||
cf.info.ChunkSize = cf.chunkSize
|
||||
}
|
||||
|
||||
// makeChunkFileName makes a remote name for the chunk
|
||||
func (cf *File) makeChunkFileName(off int64) string {
|
||||
if off&cf.chunkMask != 0 {
|
||||
panic("makeChunkFileName: non chunk aligned offset")
|
||||
}
|
||||
cf.mu.Lock()
|
||||
off >>= cf.info.ChunkBits
|
||||
Bits := 64 - cf.info.ChunkBits
|
||||
cf.mu.Unlock()
|
||||
Bytes := Bits >> 3
|
||||
// round up
|
||||
if Bits&7 != 0 {
|
||||
Bytes += 1
|
||||
}
|
||||
|
||||
// Format to correct number of bytes
|
||||
// offS = "01234567"
|
||||
offS := fmt.Sprintf("%0*X", 2*Bytes, off)
|
||||
|
||||
// Now interpolated / except for the last
|
||||
var out bytes.Buffer
|
||||
if cf.dir != "" {
|
||||
out.WriteString(cf.dir)
|
||||
out.WriteRune('/')
|
||||
}
|
||||
// out = "path/to/file/"
|
||||
for i := uint(0); i < Bytes-1; i++ {
|
||||
out.WriteString(offS[i*2 : i*2+2])
|
||||
out.WriteRune('/')
|
||||
}
|
||||
// out = "path/to/file/01/23/45/"
|
||||
// now add full string
|
||||
out.WriteString(offS)
|
||||
// out = "path/to/file/01/23/45/01234567"
|
||||
out.WriteString(".bin")
|
||||
// out = "path/to/file/01/23/45/01234567.bin"
|
||||
return out.String()
|
||||
}
|
||||
|
||||
// readInfo writes the ChunkInfo to the object
|
||||
//
|
||||
// if it wasn't found then it returns fs.ErrorObjectNotFound
|
||||
//
|
||||
// Call with mu held
|
||||
func (cf *File) _readInfo() (err error) {
|
||||
content, err := cf.vfs.ReadFile(cf.infoRemote)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to find chunk info file %q: %w", cf.infoRemote, err)
|
||||
}
|
||||
err = json.Unmarshal(content, &cf.info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decode chunk info file %q: %w", cf.infoRemote, err)
|
||||
}
|
||||
if cf.info.Version > currentInfoVersion {
|
||||
return fmt.Errorf("don't understand version %d info files (current version in %d)", cf.info.Version, currentInfoVersion)
|
||||
}
|
||||
if cf.info.ChunkBits < minChunkBits {
|
||||
return fmt.Errorf("chunk bits %d too small, must be >= %d", cf.info.ChunkBits, minChunkBits)
|
||||
}
|
||||
if cf.info.ChunkBits > maxChunkBits {
|
||||
return fmt.Errorf("chunk bits %d too large, must be <= %d", cf.info.ChunkBits, maxChunkBits)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// _writeInfo writes the ChunkInfo to the object
|
||||
//
|
||||
// call with mu held
|
||||
func (cf *File) _writeInfo() (err error) {
|
||||
content, err := json.Marshal(&cf.info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to encode chunk info file %q: %w", cf.infoRemote, err)
|
||||
}
|
||||
err = cf.vfs.WriteFile(cf.infoRemote, content, 0600)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write chunk info file %q: %w", cf.infoRemote, err)
|
||||
}
|
||||
// show size is now unchanged
|
||||
cf.sizeChanged = time.Time{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// _writeSize writes the ChunkInfo if the size has changed
|
||||
//
|
||||
// call with mu held
|
||||
func (cf *File) _writeSize() (err error) {
|
||||
if cf.sizeChanged.IsZero() {
|
||||
return nil
|
||||
}
|
||||
return cf._writeInfo()
|
||||
}
|
||||
|
||||
// zeroBytes zeroes n bytes at the start of buf, or until the end of
|
||||
// buf, whichever comes first. It returns the number of bytes it
|
||||
// wrote.
|
||||
func zeroBytes(buf []byte, n int) int {
|
||||
if n > len(buf) {
|
||||
n = len(buf)
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
buf[i] = 0
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// Read bytes from the chunk at chunkStart from offset off in the
|
||||
// chunk.
|
||||
//
|
||||
// Return number of bytes read
|
||||
func (cf *File) chunkReadAt(b []byte, chunkStart int64, off int64) (n int, err error) {
|
||||
defer log.Trace(nil, "size=%d, chunkStart=%016x, off=%d", len(b), chunkStart, off)("n=%d, err=%v", &n, &err)
|
||||
fileName := cf.makeChunkFileName(chunkStart)
|
||||
if endPos := int64(cf.chunkSize) - off; endPos < int64(len(b)) {
|
||||
b = b[:endPos]
|
||||
}
|
||||
file, err := cf.vfs.Open(fileName)
|
||||
// If file doesn't exist, it is zero
|
||||
if errors.Is(err, stdfs.ErrNotExist) {
|
||||
return zeroBytes(b, len(b)), nil
|
||||
} else if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer fs.CheckClose(file, &err)
|
||||
n, err = file.ReadAt(b, off)
|
||||
if err == io.EOF && off+int64(n) >= int64(cf.chunkSize) {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ReadAt reads len(b) bytes from the File starting at byte offset off. It
|
||||
// returns the number of bytes read and the error, if any. ReadAt always
|
||||
// returns a non-nil error when n < len(b). At end of file, that error is
|
||||
// io.EOF.
|
||||
func (cf *File) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
cf.mu.Lock()
|
||||
size := cf.info.Size
|
||||
cf.mu.Unlock()
|
||||
if off >= size {
|
||||
return 0, io.EOF
|
||||
}
|
||||
isEOF := false
|
||||
if bytesToEnd := size - off; bytesToEnd < int64(len(b)) {
|
||||
b = b[:bytesToEnd]
|
||||
isEOF = true
|
||||
}
|
||||
for n < len(b) {
|
||||
chunkStart := off & cf.mask
|
||||
end := n + cf.chunkSize
|
||||
if end > len(b) {
|
||||
end = len(b)
|
||||
}
|
||||
var nn int
|
||||
nn, err = cf.chunkReadAt(b[n:end], chunkStart, off-chunkStart)
|
||||
n += nn
|
||||
off += int64(nn)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err == nil && isEOF {
|
||||
err = io.EOF
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Write b to the chunk at chunkStart at offset off
|
||||
//
|
||||
// Return number of bytes written
|
||||
func (cf *File) chunkWriteAt(b []byte, chunkStart int64, off int64) (n int, err error) {
|
||||
defer log.Trace(nil, "size=%d, chunkStart=%016x, off=%d", len(b), chunkStart, off)("n=%d, err=%v", &n, &err)
|
||||
fileName := cf.makeChunkFileName(chunkStart)
|
||||
err = cf.vfs.MkdirAll(path.Dir(fileName), 0700)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
file, err := cf.vfs.OpenFile(fileName, os.O_WRONLY|os.O_CREATE, 0600)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer fs.CheckClose(file, &err)
|
||||
// Make the file full size if we can
|
||||
if cf.vfs.Opt.CacheMode >= vfscommon.CacheModeWrites {
|
||||
err = file.Truncate(int64(cf.chunkSize))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
if endPos := int64(cf.chunkSize) - off; endPos < int64(len(b)) {
|
||||
b = b[:endPos]
|
||||
}
|
||||
return file.WriteAt(b, off)
|
||||
}
|
||||
|
||||
// WriteAt writes len(b) bytes to the File starting at byte offset off. It
|
||||
// returns the number of bytes written and an error, if any. WriteAt returns a
|
||||
// non-nil error when n != len(b).
|
||||
func (cf *File) WriteAt(b []byte, off int64) (n int, err error) {
|
||||
for n < len(b) {
|
||||
chunkStart := off & cf.mask
|
||||
var nn int
|
||||
end := n + cf.chunkSize
|
||||
if end > len(b) {
|
||||
end = len(b)
|
||||
}
|
||||
nn, err = cf.chunkWriteAt(b[n:end], chunkStart, off-chunkStart)
|
||||
n += nn
|
||||
off += int64(nn)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Write new size if needed
|
||||
cf.mu.Lock()
|
||||
size := cf.info.Size
|
||||
if off > size {
|
||||
cf.info.Size = off // extend the file if necessary
|
||||
cf.sizeChanged = time.Now()
|
||||
}
|
||||
cf.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Size reads the current size of the file
|
||||
func (cf *File) Size() int64 {
|
||||
cf.mu.Lock()
|
||||
if !cf.valid {
|
||||
err := cf._readInfo()
|
||||
if err != nil {
|
||||
fs.Errorf(cf.dir, "Failed to read size: %v", err)
|
||||
}
|
||||
}
|
||||
size := cf.info.Size
|
||||
cf.mu.Unlock()
|
||||
return size
|
||||
}
|
||||
|
||||
// Truncate sets the current size of the file
|
||||
//
|
||||
// FIXME it doesn't delete any data...
|
||||
func (cf *File) Truncate(size int64) error {
|
||||
cf.mu.Lock()
|
||||
if cf.info.Size != size {
|
||||
cf.info.Size = size
|
||||
cf.sizeChanged = time.Now()
|
||||
}
|
||||
cf.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// _sync writes any pending data to disk by flushing the write queue
|
||||
//
|
||||
// call with the lock held
|
||||
func (cf *File) _sync() error {
|
||||
err := cf._writeSize()
|
||||
// FIXME need a VFS function to flush everything to disk
|
||||
return err
|
||||
}
|
||||
|
||||
// Sync writes any pending data to disk by flushing the write queue
|
||||
func (cf *File) Sync() error {
|
||||
cf.mu.Lock()
|
||||
defer cf.mu.Unlock()
|
||||
return cf._sync()
|
||||
}
|
||||
|
||||
// Remove removes all the data in the file
|
||||
func (cf *File) Remove() error {
|
||||
cf.mu.Lock()
|
||||
defer cf.mu.Unlock()
|
||||
if !cf.valid {
|
||||
return nil
|
||||
}
|
||||
if cf.opens > 0 {
|
||||
return errors.New("can't delete chunked file when it is open")
|
||||
}
|
||||
cf.valid = false
|
||||
_ = cf._sync()
|
||||
|
||||
// Purge all the files
|
||||
// FIXME should get this into the VFS as RemoveAll
|
||||
err := operations.Purge(context.TODO(), cf.vfs.Fs(), cf.dir)
|
||||
cf.vfs.FlushDirCache()
|
||||
|
||||
return err
|
||||
}
|
452
vfs/chunked/chunked_test.go
Normal file
452
vfs/chunked/chunked_test.go
Normal file
@ -0,0 +1,452 @@
|
||||
package chunked
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
stdfs "io/fs"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
_ "github.com/rclone/rclone/backend/all" // import all the file systems
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/vfs"
|
||||
"github.com/rclone/rclone/vfs/vfscommon"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestMain drives the tests
|
||||
func TestMain(m *testing.M) {
|
||||
fstest.TestMain(m)
|
||||
}
|
||||
|
||||
func TestChunkFileName(t *testing.T) {
|
||||
cf := &File{
|
||||
dir: "path/to/dir",
|
||||
}
|
||||
|
||||
for _, test := range []struct {
|
||||
bits uint
|
||||
off int64
|
||||
want string
|
||||
panic bool
|
||||
}{
|
||||
{
|
||||
8,
|
||||
0,
|
||||
"path/to/dir/00/00/00/00/00/00/00000000000000.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
8,
|
||||
0x123456789ABCDE00,
|
||||
"path/to/dir/12/34/56/78/9A/BC/123456789ABCDE.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
8,
|
||||
0x123456789ABCDE80,
|
||||
"",
|
||||
true,
|
||||
},
|
||||
{
|
||||
12,
|
||||
0,
|
||||
"path/to/dir/00/00/00/00/00/00/00000000000000.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
12,
|
||||
0x123456789ABCD000,
|
||||
"path/to/dir/01/23/45/67/89/AB/0123456789ABCD.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
15,
|
||||
0,
|
||||
"path/to/dir/00/00/00/00/00/00/00000000000000.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
15,
|
||||
0x123456789ABCC000,
|
||||
"",
|
||||
true,
|
||||
},
|
||||
{
|
||||
15,
|
||||
0x123456789ABC0000,
|
||||
"path/to/dir/00/24/68/AC/F1/35/002468ACF13578.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
16,
|
||||
0,
|
||||
"path/to/dir/00/00/00/00/00/000000000000.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
16,
|
||||
0x123456789ABC8000,
|
||||
"",
|
||||
true,
|
||||
},
|
||||
{
|
||||
16,
|
||||
0x123456789ABC0000,
|
||||
"path/to/dir/12/34/56/78/9A/123456789ABC.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
20,
|
||||
0,
|
||||
"path/to/dir/00/00/00/00/00/000000000000.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
23,
|
||||
0,
|
||||
"path/to/dir/00/00/00/00/00/000000000000.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
24,
|
||||
0,
|
||||
"path/to/dir/00/00/00/00/0000000000.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
24,
|
||||
0x7EFDFCFBFA000000,
|
||||
"path/to/dir/7E/FD/FC/FB/7EFDFCFBFA.bin",
|
||||
false,
|
||||
},
|
||||
{
|
||||
28,
|
||||
0x7EFDFCFBF0000000,
|
||||
"path/to/dir/07/EF/DF/CF/07EFDFCFBF.bin",
|
||||
false,
|
||||
},
|
||||
} {
|
||||
cf.info.ChunkBits = test.bits
|
||||
cf._updateChunkBits()
|
||||
what := fmt.Sprintf("bits=%d, off=0x%X, panic=%v", test.bits, test.off, test.panic)
|
||||
if !test.panic {
|
||||
got := cf.makeChunkFileName(test.off)
|
||||
assert.Equal(t, test.want, got, what)
|
||||
} else {
|
||||
assert.Panics(t, func() {
|
||||
cf.makeChunkFileName(test.off)
|
||||
}, what)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check that the object exists and has the contents
|
||||
func checkObject(t *testing.T, f fs.Fs, remote string, want string) {
|
||||
o, err := f.NewObject(context.TODO(), remote)
|
||||
require.NoError(t, err)
|
||||
dst := object.NewMemoryObject(remote, time.Now(), nil)
|
||||
_, err = operations.Copy(context.TODO(), object.MemoryFs, dst, "", o)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, want, string(dst.Content()))
|
||||
}
|
||||
|
||||
// Constants uses in the tests
|
||||
const (
|
||||
writeBackDelay = 100 * time.Millisecond // A short writeback delay for testing
|
||||
waitForWritersDelay = 30 * time.Second // time to wait for existing writers
|
||||
)
|
||||
|
||||
// Clean up a test VFS
|
||||
func cleanupVFS(t *testing.T, vfs *vfs.VFS) {
|
||||
vfs.WaitForWriters(waitForWritersDelay)
|
||||
err := vfs.CleanUp()
|
||||
require.NoError(t, err)
|
||||
vfs.Shutdown()
|
||||
}
|
||||
|
||||
// Create a new VFS
|
||||
func newTestVFSOpt(t *testing.T, opt *vfscommon.Options) (r *fstest.Run, VFS *vfs.VFS) {
|
||||
r = fstest.NewRun(t)
|
||||
VFS = vfs.New(r.Fremote, opt)
|
||||
t.Cleanup(func() {
|
||||
cleanupVFS(t, VFS)
|
||||
})
|
||||
return r, VFS
|
||||
}
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
_, VFS := newTestVFSOpt(t, nil)
|
||||
|
||||
// check default open
|
||||
cf := New(VFS, "")
|
||||
assert.Equal(t, 0, cf.opens)
|
||||
err := cf.Open(true, defaultChunkBits)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(0), cf.info.Size)
|
||||
assert.Equal(t, uint(defaultChunkBits), cf.info.ChunkBits)
|
||||
assert.Equal(t, 0x10000, cf.chunkSize)
|
||||
assert.Equal(t, int64(0xFFFF), cf.chunkMask)
|
||||
assert.Equal(t, ^int64(0xFFFF), cf.mask)
|
||||
assert.Equal(t, 1, cf.opens)
|
||||
|
||||
// check the close
|
||||
err = cf.Close()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, cf.opens)
|
||||
|
||||
// check the double close
|
||||
err = cf.Close()
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, 0, cf.opens)
|
||||
|
||||
// check that the info got written
|
||||
checkObject(t, VFS.Fs(), cf.infoRemote, `{"Version":1,"Comment":"rclone chunked file","Size":0,"ChunkBits":16,"ChunkSize":65536}`)
|
||||
|
||||
// change the info
|
||||
cf.info.Size = 100
|
||||
cf.info.ChunkBits = 20
|
||||
cf._updateChunkBits()
|
||||
err = cf._writeInfo()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// read it back in
|
||||
cf = New(VFS, "")
|
||||
err = cf.Open(false, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(100), cf.info.Size)
|
||||
assert.Equal(t, uint(20), cf.info.ChunkBits)
|
||||
assert.Equal(t, 0x100000, cf.chunkSize)
|
||||
assert.Equal(t, int64(0xFFFFF), cf.chunkMask)
|
||||
assert.Equal(t, ^int64(0xFFFFF), cf.mask)
|
||||
|
||||
// check opens
|
||||
|
||||
// test limits for readInfo
|
||||
for _, test := range []struct {
|
||||
info string
|
||||
error string
|
||||
}{
|
||||
{
|
||||
`{"Version":1,"Comment":"rclone chunked file","Size":0,"ChunkBits":16,"ChunkSize":65536`,
|
||||
"failed to decode chunk info file",
|
||||
},
|
||||
{
|
||||
`{"Version":99,"Comment":"rclone chunked file","Size":0,"ChunkBits":16,"ChunkSize":65536}`,
|
||||
"don't understand version 99 info files",
|
||||
},
|
||||
{
|
||||
`{"Version":1,"Comment":"rclone chunked file","Size":0,"ChunkBits":1,"ChunkSize":65536}`,
|
||||
"chunk bits 1 too small",
|
||||
},
|
||||
{
|
||||
`{"Version":1,"Comment":"rclone chunked file","Size":0,"ChunkBits":99,"ChunkSize":65536}`,
|
||||
"chunk bits 99 too large",
|
||||
},
|
||||
} {
|
||||
require.NoError(t, VFS.WriteFile(cf.infoRemote, []byte(test.info), 0600))
|
||||
err = cf._readInfo()
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), test.error)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestFile(t *testing.T) (*vfs.VFS, *File) {
|
||||
opt := vfscommon.Opt
|
||||
opt.CacheMode = vfscommon.CacheModeFull
|
||||
opt.WriteBack = 0 // make writeback synchronous
|
||||
_, VFS := newTestVFSOpt(t, &opt)
|
||||
|
||||
cf := New(VFS, "")
|
||||
err := cf.Open(true, 4)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, cf.Close())
|
||||
})
|
||||
|
||||
return VFS, cf
|
||||
}
|
||||
|
||||
func TestReadWriteChunk(t *testing.T) {
|
||||
VFS, cf := newTestFile(t)
|
||||
|
||||
const (
|
||||
off = 0x123456789ABCDEF0
|
||||
wantRemote = "01/23/45/67/89/AB/CD/0123456789ABCDEF.bin"
|
||||
)
|
||||
|
||||
// pretend the file is big
|
||||
require.NoError(t, cf.Truncate(2*off))
|
||||
|
||||
// check reading non existent chunk gives 0
|
||||
var zero = make([]byte, 16)
|
||||
var b = make([]byte, 16)
|
||||
n, err := cf.ReadAt(b, off)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 16, n)
|
||||
assert.Equal(t, zero, b)
|
||||
|
||||
// create a new chunk and write some data
|
||||
n, err = cf.WriteAt([]byte("0123456789abcdef"), off)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 16, n)
|
||||
|
||||
// check the chunk on disk
|
||||
checkObject(t, VFS.Fs(), wantRemote, "0123456789abcdef")
|
||||
|
||||
// read the chunk off disk and check it
|
||||
n, err = cf.ReadAt(b, off)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 16, n)
|
||||
assert.Equal(t, "0123456789abcdef", string(b))
|
||||
}
|
||||
|
||||
func TestZeroBytes(t *testing.T) {
|
||||
b := []byte{1, 2, 3, 4}
|
||||
zeroBytes(b, 2)
|
||||
assert.Equal(t, []byte{0, 0, 3, 4}, b)
|
||||
|
||||
b = []byte{1, 2, 3, 4}
|
||||
zeroBytes(b, 17)
|
||||
assert.Equal(t, []byte{0, 0, 0, 0}, b)
|
||||
}
|
||||
|
||||
func TestReadAt(t *testing.T) {
|
||||
_, cf := newTestFile(t)
|
||||
|
||||
// make a new chunk and write it to disk as chunk 1
|
||||
zero := make([]byte, 16)
|
||||
middle := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
|
||||
n, err := cf.WriteAt(middle, 16)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 16, n)
|
||||
|
||||
// set the size to 0
|
||||
cf.info.Size = 0
|
||||
|
||||
// check reading
|
||||
b := make([]byte, 40)
|
||||
n, err = cf.ReadAt(b, 0)
|
||||
assert.Equal(t, 0, n)
|
||||
assert.Equal(t, io.EOF, err)
|
||||
|
||||
// set the size to 38
|
||||
cf.info.Size = 38
|
||||
|
||||
// read to end
|
||||
n, err = cf.ReadAt(b, 0)
|
||||
assert.Equal(t, 38, n)
|
||||
assert.Equal(t, io.EOF, err)
|
||||
expected := append([]byte(nil), zero...)
|
||||
expected = append(expected, middle...)
|
||||
expected = append(expected, zero[:6]...)
|
||||
assert.Equal(t, expected, b[:n])
|
||||
|
||||
// read not to end
|
||||
b = make([]byte, 16)
|
||||
n, err = cf.ReadAt(b, 10)
|
||||
assert.Equal(t, 16, n)
|
||||
assert.NoError(t, err)
|
||||
expected = append([]byte(nil), zero[10:]...)
|
||||
expected = append(expected, middle[:10]...)
|
||||
assert.Equal(t, expected, b[:n])
|
||||
|
||||
// read at end
|
||||
n, err = cf.ReadAt(b, 38)
|
||||
assert.Equal(t, 0, n)
|
||||
assert.Equal(t, io.EOF, err)
|
||||
|
||||
// read past end
|
||||
n, err = cf.ReadAt(b, 99)
|
||||
assert.Equal(t, 0, n)
|
||||
assert.Equal(t, io.EOF, err)
|
||||
}
|
||||
|
||||
func TestWriteAt(t *testing.T) {
|
||||
VFS, cf := newTestFile(t)
|
||||
f := VFS.Fs()
|
||||
|
||||
// Make test buffer
|
||||
b := []byte{}
|
||||
for i := byte(0); i < 30; i++ {
|
||||
b = append(b, '0'+i)
|
||||
}
|
||||
|
||||
t.Run("SizeZero", func(t *testing.T) {
|
||||
assert.Equal(t, int64(0), cf.Size())
|
||||
})
|
||||
|
||||
const (
|
||||
wantRemote1 = "00/00/00/00/00/00/00/0000000000000000.bin"
|
||||
wantRemote2 = "00/00/00/00/00/00/00/0000000000000001.bin"
|
||||
wantRemote3 = "00/00/00/00/00/00/00/0000000000000002.bin"
|
||||
)
|
||||
|
||||
t.Run("Extended", func(t *testing.T) {
|
||||
// write it and check file is extended
|
||||
n, err := cf.WriteAt(b, 8)
|
||||
assert.Equal(t, 30, n)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(38), cf.info.Size)
|
||||
|
||||
// flush the parts to disk
|
||||
require.NoError(t, cf.Sync())
|
||||
|
||||
// check the parts on disk
|
||||
checkObject(t, f, wantRemote1, "\x00\x00\x00\x00\x00\x00\x00\x0001234567")
|
||||
checkObject(t, f, wantRemote2, "89:;<=>?@ABCDEFG")
|
||||
checkObject(t, f, wantRemote3, "HIJKLM\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
|
||||
})
|
||||
|
||||
t.Run("Size", func(t *testing.T) {
|
||||
assert.Equal(t, int64(38), cf.Size())
|
||||
})
|
||||
|
||||
t.Run("Overwrite", func(t *testing.T) {
|
||||
// overwrite a part
|
||||
n, err := cf.WriteAt([]byte("abcdefgh"), 12)
|
||||
assert.Equal(t, 8, n)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(38), cf.info.Size)
|
||||
|
||||
// flush the parts to disk
|
||||
require.NoError(t, cf.Sync())
|
||||
|
||||
// check the parts on disk
|
||||
checkObject(t, f, wantRemote1, "\x00\x00\x00\x00\x00\x00\x00\x000123abcd")
|
||||
checkObject(t, f, wantRemote2, "efgh<=>?@ABCDEFG")
|
||||
checkObject(t, f, wantRemote3, "HIJKLM\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
|
||||
})
|
||||
|
||||
t.Run("Remove", func(t *testing.T) {
|
||||
require.Error(t, cf.Remove())
|
||||
require.NoError(t, cf.Close())
|
||||
|
||||
// Check files are there
|
||||
fis, err := VFS.ReadDir(cf.dir)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, len(fis) > 0)
|
||||
|
||||
// Remove the file
|
||||
require.NoError(t, cf.Remove())
|
||||
|
||||
// Check files have gone
|
||||
fis, err = VFS.ReadDir(cf.dir)
|
||||
what := fmt.Sprintf("err=%v, fis=%v", err, fis)
|
||||
if err == nil {
|
||||
assert.Equal(t, 0, len(fis), what)
|
||||
} else {
|
||||
require.True(t, errors.Is(err, stdfs.ErrNotExist), what)
|
||||
}
|
||||
|
||||
// Reopen for cleanup
|
||||
require.NoError(t, cf.Open(true, 0))
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue
Block a user