mirror of
https://github.com/rclone/rclone.git
synced 2025-01-22 14:19:56 +01:00
e43b5ce5e5
This is possible now that we no longer support go1.12 and brings rclone into line with standard practices in the Go world. This also removes errors.New and errors.Errorf from lib/errors and prefers the stdlib errors package over lib/errors.
1052 lines
28 KiB
Go
1052 lines
28 KiB
Go
//go:build !plan9 && !js
|
|
// +build !plan9,!js
|
|
|
|
package cache
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/walk"
|
|
bolt "go.etcd.io/bbolt"
|
|
)
|
|
|
|
// Constants
|
|
const (
|
|
RootBucket = "root"
|
|
RootTsBucket = "rootTs"
|
|
DataTsBucket = "dataTs"
|
|
tempBucket = "pending"
|
|
)
|
|
|
|
// Features flags for this storage type
|
|
type Features struct {
|
|
PurgeDb bool // purge the db before starting
|
|
DbWaitTime time.Duration // time to wait for DB to be available
|
|
}
|
|
|
|
var boltMap = make(map[string]*Persistent)
|
|
var boltMapMx sync.Mutex
|
|
|
|
// GetPersistent returns a single instance for the specific store
|
|
func GetPersistent(dbPath, chunkPath string, f *Features) (*Persistent, error) {
|
|
// write lock to create one
|
|
boltMapMx.Lock()
|
|
defer boltMapMx.Unlock()
|
|
if b, ok := boltMap[dbPath]; ok {
|
|
if !b.open {
|
|
err := b.connect()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return b, nil
|
|
}
|
|
|
|
bb, err := newPersistent(dbPath, chunkPath, f)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
boltMap[dbPath] = bb
|
|
return boltMap[dbPath], nil
|
|
}
|
|
|
|
type chunkInfo struct {
|
|
Path string
|
|
Offset int64
|
|
Size int64
|
|
}
|
|
|
|
type tempUploadInfo struct {
|
|
DestPath string
|
|
AddedOn time.Time
|
|
Started bool
|
|
}
|
|
|
|
// String representation of a tempUploadInfo
|
|
func (t *tempUploadInfo) String() string {
|
|
return fmt.Sprintf("%v - %v (%v)", t.DestPath, t.Started, t.AddedOn)
|
|
}
|
|
|
|
// Persistent is a wrapper of persistent storage for a bolt.DB file
|
|
type Persistent struct {
|
|
dbPath string
|
|
dataPath string
|
|
open bool
|
|
db *bolt.DB
|
|
cleanupMux sync.Mutex
|
|
tempQueueMux sync.Mutex
|
|
features *Features
|
|
}
|
|
|
|
// newPersistent builds a new wrapper and connects to the bolt.DB file
|
|
func newPersistent(dbPath, chunkPath string, f *Features) (*Persistent, error) {
|
|
b := &Persistent{
|
|
dbPath: dbPath,
|
|
dataPath: chunkPath,
|
|
features: f,
|
|
}
|
|
|
|
err := b.connect()
|
|
if err != nil {
|
|
fs.Errorf(dbPath, "Error opening storage cache. Is there another rclone running on the same remote? %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
return b, nil
|
|
}
|
|
|
|
// String will return a human friendly string for this DB (currently the dbPath)
|
|
func (b *Persistent) String() string {
|
|
return "<Cache DB> " + b.dbPath
|
|
}
|
|
|
|
// connect creates a connection to the configured file
|
|
// refreshDb will delete the file before to create an empty DB if it's set to true
|
|
func (b *Persistent) connect() error {
|
|
var err error
|
|
|
|
err = os.MkdirAll(b.dataPath, os.ModePerm)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create a data directory %q: %w", b.dataPath, err)
|
|
}
|
|
b.db, err = bolt.Open(b.dbPath, 0644, &bolt.Options{Timeout: b.features.DbWaitTime})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open a cache connection to %q: %w", b.dbPath, err)
|
|
}
|
|
if b.features.PurgeDb {
|
|
b.Purge()
|
|
}
|
|
_ = b.db.Update(func(tx *bolt.Tx) error {
|
|
_, _ = tx.CreateBucketIfNotExists([]byte(RootBucket))
|
|
_, _ = tx.CreateBucketIfNotExists([]byte(RootTsBucket))
|
|
_, _ = tx.CreateBucketIfNotExists([]byte(DataTsBucket))
|
|
_, _ = tx.CreateBucketIfNotExists([]byte(tempBucket))
|
|
|
|
return nil
|
|
})
|
|
|
|
b.open = true
|
|
return nil
|
|
}
|
|
|
|
// getBucket prepares and cleans a specific path of the form: /var/tmp and will iterate through each path component
|
|
// to get to the nested bucket of the final part (in this example: tmp)
|
|
func (b *Persistent) getBucket(dir string, createIfMissing bool, tx *bolt.Tx) *bolt.Bucket {
|
|
cleanPath(dir)
|
|
|
|
entries := strings.FieldsFunc(dir, func(c rune) bool {
|
|
// cover Windows where rclone still uses '/' as path separator
|
|
// this should be safe as '/' is not a valid Windows character
|
|
return (os.PathSeparator == c || c == rune('/'))
|
|
})
|
|
bucket := tx.Bucket([]byte(RootBucket))
|
|
|
|
for _, entry := range entries {
|
|
if createIfMissing {
|
|
bucket, _ = bucket.CreateBucketIfNotExists([]byte(entry))
|
|
} else {
|
|
bucket = bucket.Bucket([]byte(entry))
|
|
}
|
|
|
|
if bucket == nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return bucket
|
|
}
|
|
|
|
// GetDir will retrieve data of a cached directory
|
|
func (b *Persistent) GetDir(remote string) (*Directory, error) {
|
|
cd := &Directory{}
|
|
|
|
err := b.db.View(func(tx *bolt.Tx) error {
|
|
bucket := b.getBucket(remote, false, tx)
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't open bucket (%v)", remote)
|
|
}
|
|
|
|
data := bucket.Get([]byte("."))
|
|
if data != nil {
|
|
return json.Unmarshal(data, cd)
|
|
}
|
|
|
|
return fmt.Errorf("%v not found", remote)
|
|
})
|
|
|
|
return cd, err
|
|
}
|
|
|
|
// AddDir will update a CachedDirectory metadata and all its entries
|
|
func (b *Persistent) AddDir(cachedDir *Directory) error {
|
|
return b.AddBatchDir([]*Directory{cachedDir})
|
|
}
|
|
|
|
// AddBatchDir will update a list of CachedDirectory metadata and all their entries
|
|
func (b *Persistent) AddBatchDir(cachedDirs []*Directory) error {
|
|
if len(cachedDirs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
var bucket *bolt.Bucket
|
|
if cachedDirs[0].Dir == "" {
|
|
bucket = tx.Bucket([]byte(RootBucket))
|
|
} else {
|
|
bucket = b.getBucket(cachedDirs[0].Dir, true, tx)
|
|
}
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't open bucket (%v)", cachedDirs[0].Dir)
|
|
}
|
|
|
|
for _, cachedDir := range cachedDirs {
|
|
var b *bolt.Bucket
|
|
var err error
|
|
if cachedDir.Name == "" {
|
|
b = bucket
|
|
} else {
|
|
b, err = bucket.CreateBucketIfNotExists([]byte(cachedDir.Name))
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
encoded, err := json.Marshal(cachedDir)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't marshal object (%v): %v", cachedDir, err)
|
|
}
|
|
err = b.Put([]byte("."), encoded)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// GetDirEntries will return a CachedDirectory, its list of dir entries and/or an error if it encountered issues
|
|
func (b *Persistent) GetDirEntries(cachedDir *Directory) (fs.DirEntries, error) {
|
|
var dirEntries fs.DirEntries
|
|
|
|
err := b.db.View(func(tx *bolt.Tx) error {
|
|
bucket := b.getBucket(cachedDir.abs(), false, tx)
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't open bucket (%v)", cachedDir.abs())
|
|
}
|
|
|
|
val := bucket.Get([]byte("."))
|
|
if val != nil {
|
|
err := json.Unmarshal(val, cachedDir)
|
|
if err != nil {
|
|
return fmt.Errorf("error during unmarshalling obj: %v", err)
|
|
}
|
|
} else {
|
|
return fmt.Errorf("missing cached dir: %v", cachedDir)
|
|
}
|
|
|
|
c := bucket.Cursor()
|
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
// ignore metadata key: .
|
|
if bytes.Equal(k, []byte(".")) {
|
|
continue
|
|
}
|
|
entryPath := path.Join(cachedDir.Remote(), string(k))
|
|
|
|
if v == nil { // directory
|
|
// we try to find a cached meta for the dir
|
|
currentBucket := c.Bucket().Bucket(k)
|
|
if currentBucket == nil {
|
|
return fmt.Errorf("couldn't open bucket (%v)", string(k))
|
|
}
|
|
|
|
metaKey := currentBucket.Get([]byte("."))
|
|
d := NewDirectory(cachedDir.CacheFs, entryPath)
|
|
if metaKey != nil { //if we don't find it, we create an empty dir
|
|
err := json.Unmarshal(metaKey, d)
|
|
if err != nil { // if even this fails, we fallback to an empty dir
|
|
fs.Debugf(string(k), "error during unmarshalling obj: %v", err)
|
|
}
|
|
}
|
|
|
|
dirEntries = append(dirEntries, d)
|
|
} else { // object
|
|
o := NewObject(cachedDir.CacheFs, entryPath)
|
|
err := json.Unmarshal(v, o)
|
|
if err != nil {
|
|
fs.Debugf(string(k), "error during unmarshalling obj: %v", err)
|
|
continue
|
|
}
|
|
|
|
dirEntries = append(dirEntries, o)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return dirEntries, err
|
|
}
|
|
|
|
// RemoveDir will delete a CachedDirectory, all its objects and all the chunks stored for it
|
|
func (b *Persistent) RemoveDir(fp string) error {
|
|
var err error
|
|
parentDir, dirName := path.Split(fp)
|
|
if fp == "" {
|
|
err = b.db.Update(func(tx *bolt.Tx) error {
|
|
err := tx.DeleteBucket([]byte(RootBucket))
|
|
if err != nil {
|
|
fs.Debugf(fp, "couldn't delete from cache: %v", err)
|
|
return err
|
|
}
|
|
_, _ = tx.CreateBucketIfNotExists([]byte(RootBucket))
|
|
return nil
|
|
})
|
|
} else {
|
|
err = b.db.Update(func(tx *bolt.Tx) error {
|
|
bucket := b.getBucket(cleanPath(parentDir), false, tx)
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't open bucket (%v)", fp)
|
|
}
|
|
// delete the cached dir
|
|
err := bucket.DeleteBucket([]byte(cleanPath(dirName)))
|
|
if err != nil {
|
|
fs.Debugf(fp, "couldn't delete from cache: %v", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// delete chunks on disk
|
|
// safe to ignore as the files might not have been open
|
|
if err == nil {
|
|
_ = os.RemoveAll(path.Join(b.dataPath, fp))
|
|
_ = os.MkdirAll(b.dataPath, os.ModePerm)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// ExpireDir will flush a CachedDirectory and all its objects from the objects
|
|
// chunks will remain as they are
|
|
func (b *Persistent) ExpireDir(cd *Directory) error {
|
|
t := time.Now().Add(time.Duration(-cd.CacheFs.opt.InfoAge))
|
|
cd.CacheTs = &t
|
|
|
|
// expire all parents
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
// expire all the parents
|
|
currentDir := cd.abs()
|
|
for { // until we get to the root
|
|
bucket := b.getBucket(currentDir, false, tx)
|
|
if bucket != nil {
|
|
val := bucket.Get([]byte("."))
|
|
if val != nil {
|
|
cd2 := &Directory{CacheFs: cd.CacheFs}
|
|
err := json.Unmarshal(val, cd2)
|
|
if err == nil {
|
|
fs.Debugf(cd, "cache: expired %v", currentDir)
|
|
cd2.CacheTs = &t
|
|
enc2, _ := json.Marshal(cd2)
|
|
_ = bucket.Put([]byte("."), enc2)
|
|
}
|
|
}
|
|
}
|
|
if currentDir == "" {
|
|
break
|
|
}
|
|
currentDir = cleanPath(path.Dir(currentDir))
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// GetObject will return a CachedObject from its parent directory or an error if it doesn't find it
|
|
func (b *Persistent) GetObject(cachedObject *Object) (err error) {
|
|
return b.db.View(func(tx *bolt.Tx) error {
|
|
bucket := b.getBucket(cachedObject.Dir, false, tx)
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't open parent bucket for %v", cachedObject.Dir)
|
|
}
|
|
val := bucket.Get([]byte(cachedObject.Name))
|
|
if val != nil {
|
|
return json.Unmarshal(val, cachedObject)
|
|
}
|
|
return fmt.Errorf("couldn't find object (%v)", cachedObject.Name)
|
|
})
|
|
}
|
|
|
|
// AddObject will create a cached object in its parent directory
|
|
func (b *Persistent) AddObject(cachedObject *Object) error {
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
bucket := b.getBucket(cachedObject.Dir, true, tx)
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't open parent bucket for %v", cachedObject)
|
|
}
|
|
// cache Object Info
|
|
encoded, err := json.Marshal(cachedObject)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't marshal object (%v) info: %v", cachedObject, err)
|
|
}
|
|
err = bucket.Put([]byte(cachedObject.Name), encoded)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't cache object (%v) info: %v", cachedObject, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// RemoveObject will delete a single cached object and all the chunks which belong to it
|
|
func (b *Persistent) RemoveObject(fp string) error {
|
|
parentDir, objName := path.Split(fp)
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
bucket := b.getBucket(cleanPath(parentDir), false, tx)
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't open parent bucket for %v", cleanPath(parentDir))
|
|
}
|
|
err := bucket.Delete([]byte(cleanPath(objName)))
|
|
if err != nil {
|
|
fs.Debugf(fp, "couldn't delete obj from storage: %v", err)
|
|
}
|
|
// delete chunks on disk
|
|
// safe to ignore as the file might not have been open
|
|
_ = os.RemoveAll(path.Join(b.dataPath, fp))
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ExpireObject will flush an Object and all its data if desired
|
|
func (b *Persistent) ExpireObject(co *Object, withData bool) error {
|
|
co.CacheTs = time.Now().Add(time.Duration(-co.CacheFs.opt.InfoAge))
|
|
err := b.AddObject(co)
|
|
if withData {
|
|
_ = os.RemoveAll(path.Join(b.dataPath, co.abs()))
|
|
}
|
|
return err
|
|
}
|
|
|
|
// HasEntry confirms the existence of a single entry (dir or object)
|
|
func (b *Persistent) HasEntry(remote string) bool {
|
|
dir, name := path.Split(remote)
|
|
dir = cleanPath(dir)
|
|
name = cleanPath(name)
|
|
|
|
err := b.db.View(func(tx *bolt.Tx) error {
|
|
bucket := b.getBucket(dir, false, tx)
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't open parent bucket for %v", remote)
|
|
}
|
|
if f := bucket.Bucket([]byte(name)); f != nil {
|
|
return nil
|
|
}
|
|
if f := bucket.Get([]byte(name)); f != nil {
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("couldn't find object (%v)", remote)
|
|
})
|
|
if err == nil {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// HasChunk confirms the existence of a single chunk of an object
|
|
func (b *Persistent) HasChunk(cachedObject *Object, offset int64) bool {
|
|
fp := path.Join(b.dataPath, cachedObject.abs(), strconv.FormatInt(offset, 10))
|
|
if _, err := os.Stat(fp); !os.IsNotExist(err) {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// GetChunk will retrieve a single chunk which belongs to a cached object or an error if it doesn't find it
|
|
func (b *Persistent) GetChunk(cachedObject *Object, offset int64) ([]byte, error) {
|
|
var data []byte
|
|
|
|
fp := path.Join(b.dataPath, cachedObject.abs(), strconv.FormatInt(offset, 10))
|
|
data, err := ioutil.ReadFile(fp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return data, err
|
|
}
|
|
|
|
// AddChunk adds a new chunk of a cached object
|
|
func (b *Persistent) AddChunk(fp string, data []byte, offset int64) error {
|
|
_ = os.MkdirAll(path.Join(b.dataPath, fp), os.ModePerm)
|
|
|
|
filePath := path.Join(b.dataPath, fp, strconv.FormatInt(offset, 10))
|
|
err := ioutil.WriteFile(filePath, data, os.ModePerm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
tsBucket := tx.Bucket([]byte(DataTsBucket))
|
|
ts := time.Now()
|
|
found := false
|
|
|
|
// delete (older) timestamps for the same object
|
|
c := tsBucket.Cursor()
|
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
var ci chunkInfo
|
|
err = json.Unmarshal(v, &ci)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if ci.Path == fp && ci.Offset == offset {
|
|
if tsInCache := time.Unix(0, btoi(k)); tsInCache.After(ts) && !found {
|
|
found = true
|
|
continue
|
|
}
|
|
err := c.Delete()
|
|
if err != nil {
|
|
fs.Debugf(fp, "failed to clean chunk: %v", err)
|
|
}
|
|
}
|
|
}
|
|
// don't overwrite if a newer one is already there
|
|
if found {
|
|
return nil
|
|
}
|
|
enc, err := json.Marshal(chunkInfo{Path: fp, Offset: offset, Size: int64(len(data))})
|
|
if err != nil {
|
|
fs.Debugf(fp, "failed to timestamp chunk: %v", err)
|
|
}
|
|
err = tsBucket.Put(itob(ts.UnixNano()), enc)
|
|
if err != nil {
|
|
fs.Debugf(fp, "failed to timestamp chunk: %v", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// CleanChunksByAge will cleanup on a cron basis
|
|
func (b *Persistent) CleanChunksByAge(chunkAge time.Duration) {
|
|
// NOOP
|
|
}
|
|
|
|
// CleanChunksByNeed is a noop for this implementation
|
|
func (b *Persistent) CleanChunksByNeed(offset int64) {
|
|
// noop: we want to clean a Bolt DB by time only
|
|
}
|
|
|
|
// CleanChunksBySize will cleanup chunks after the total size passes a certain point
|
|
func (b *Persistent) CleanChunksBySize(maxSize int64) {
|
|
b.cleanupMux.Lock()
|
|
defer b.cleanupMux.Unlock()
|
|
var cntChunks int
|
|
var roughlyCleaned fs.SizeSuffix
|
|
|
|
err := b.db.Update(func(tx *bolt.Tx) error {
|
|
dataTsBucket := tx.Bucket([]byte(DataTsBucket))
|
|
if dataTsBucket == nil {
|
|
return fmt.Errorf("Couldn't open (%v) bucket", DataTsBucket)
|
|
}
|
|
// iterate through ts
|
|
c := dataTsBucket.Cursor()
|
|
totalSize := int64(0)
|
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
var ci chunkInfo
|
|
err := json.Unmarshal(v, &ci)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
totalSize += ci.Size
|
|
}
|
|
|
|
if totalSize > maxSize {
|
|
needToClean := totalSize - maxSize
|
|
roughlyCleaned = fs.SizeSuffix(needToClean)
|
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
var ci chunkInfo
|
|
err := json.Unmarshal(v, &ci)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
// delete this ts entry
|
|
err = c.Delete()
|
|
if err != nil {
|
|
fs.Errorf(ci.Path, "failed deleting chunk ts during cleanup (%v): %v", ci.Offset, err)
|
|
continue
|
|
}
|
|
err = os.Remove(path.Join(b.dataPath, ci.Path, strconv.FormatInt(ci.Offset, 10)))
|
|
if err == nil {
|
|
cntChunks++
|
|
needToClean -= ci.Size
|
|
if needToClean <= 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if cntChunks > 0 {
|
|
fs.Infof("cache-cleanup", "chunks %v, est. size: %v", cntChunks, roughlyCleaned.String())
|
|
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
if err == bolt.ErrDatabaseNotOpen {
|
|
// we're likely a late janitor and we need to end quietly as there's no guarantee of what exists anymore
|
|
return
|
|
}
|
|
fs.Errorf("cache", "cleanup failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// Stats returns a go map with the stats key values
|
|
func (b *Persistent) Stats() (map[string]map[string]interface{}, error) {
|
|
r := make(map[string]map[string]interface{})
|
|
r["data"] = make(map[string]interface{})
|
|
r["data"]["oldest-ts"] = time.Now()
|
|
r["data"]["oldest-file"] = ""
|
|
r["data"]["newest-ts"] = time.Now()
|
|
r["data"]["newest-file"] = ""
|
|
r["data"]["total-chunks"] = 0
|
|
r["data"]["total-size"] = int64(0)
|
|
r["files"] = make(map[string]interface{})
|
|
r["files"]["oldest-ts"] = time.Now()
|
|
r["files"]["oldest-name"] = ""
|
|
r["files"]["newest-ts"] = time.Now()
|
|
r["files"]["newest-name"] = ""
|
|
r["files"]["total-files"] = 0
|
|
|
|
_ = b.db.View(func(tx *bolt.Tx) error {
|
|
dataTsBucket := tx.Bucket([]byte(DataTsBucket))
|
|
rootTsBucket := tx.Bucket([]byte(RootTsBucket))
|
|
|
|
var totalDirs int
|
|
var totalFiles int
|
|
_ = b.iterateBuckets(tx.Bucket([]byte(RootBucket)), func(name string) {
|
|
totalDirs++
|
|
}, func(key string, val []byte) {
|
|
totalFiles++
|
|
})
|
|
r["files"]["total-dir"] = totalDirs
|
|
r["files"]["total-files"] = totalFiles
|
|
|
|
c := dataTsBucket.Cursor()
|
|
|
|
totalChunks := 0
|
|
totalSize := int64(0)
|
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
var ci chunkInfo
|
|
err := json.Unmarshal(v, &ci)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
totalChunks++
|
|
totalSize += ci.Size
|
|
}
|
|
r["data"]["total-chunks"] = totalChunks
|
|
r["data"]["total-size"] = totalSize
|
|
|
|
if k, v := c.First(); k != nil {
|
|
var ci chunkInfo
|
|
_ = json.Unmarshal(v, &ci)
|
|
r["data"]["oldest-ts"] = time.Unix(0, btoi(k))
|
|
r["data"]["oldest-file"] = ci.Path
|
|
}
|
|
if k, v := c.Last(); k != nil {
|
|
var ci chunkInfo
|
|
_ = json.Unmarshal(v, &ci)
|
|
r["data"]["newest-ts"] = time.Unix(0, btoi(k))
|
|
r["data"]["newest-file"] = ci.Path
|
|
}
|
|
|
|
c = rootTsBucket.Cursor()
|
|
if k, v := c.First(); k != nil {
|
|
// split to get (abs path - offset)
|
|
r["files"]["oldest-ts"] = time.Unix(0, btoi(k))
|
|
r["files"]["oldest-name"] = string(v)
|
|
}
|
|
if k, v := c.Last(); k != nil {
|
|
r["files"]["newest-ts"] = time.Unix(0, btoi(k))
|
|
r["files"]["newest-name"] = string(v)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return r, nil
|
|
}
|
|
|
|
// Purge will flush the entire cache
|
|
func (b *Persistent) Purge() {
|
|
b.cleanupMux.Lock()
|
|
defer b.cleanupMux.Unlock()
|
|
|
|
_ = b.db.Update(func(tx *bolt.Tx) error {
|
|
_ = tx.DeleteBucket([]byte(RootBucket))
|
|
_ = tx.DeleteBucket([]byte(RootTsBucket))
|
|
_ = tx.DeleteBucket([]byte(DataTsBucket))
|
|
|
|
_, _ = tx.CreateBucketIfNotExists([]byte(RootBucket))
|
|
_, _ = tx.CreateBucketIfNotExists([]byte(RootTsBucket))
|
|
_, _ = tx.CreateBucketIfNotExists([]byte(DataTsBucket))
|
|
|
|
return nil
|
|
})
|
|
|
|
err := os.RemoveAll(b.dataPath)
|
|
if err != nil {
|
|
fs.Errorf(b, "issue removing data folder: %v", err)
|
|
}
|
|
err = os.MkdirAll(b.dataPath, os.ModePerm)
|
|
if err != nil {
|
|
fs.Errorf(b, "issue removing data folder: %v", err)
|
|
}
|
|
}
|
|
|
|
// GetChunkTs retrieves the current timestamp of this chunk
|
|
func (b *Persistent) GetChunkTs(path string, offset int64) (time.Time, error) {
|
|
var t time.Time
|
|
|
|
err := b.db.View(func(tx *bolt.Tx) error {
|
|
tsBucket := tx.Bucket([]byte(DataTsBucket))
|
|
c := tsBucket.Cursor()
|
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
var ci chunkInfo
|
|
err := json.Unmarshal(v, &ci)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if ci.Path == path && ci.Offset == offset {
|
|
t = time.Unix(0, btoi(k))
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("not found %v-%v", path, offset)
|
|
})
|
|
|
|
return t, err
|
|
}
|
|
|
|
func (b *Persistent) iterateBuckets(buk *bolt.Bucket, bucketFn func(name string), kvFn func(key string, val []byte)) error {
|
|
err := b.db.View(func(tx *bolt.Tx) error {
|
|
var c *bolt.Cursor
|
|
if buk == nil {
|
|
c = tx.Cursor()
|
|
} else {
|
|
c = buk.Cursor()
|
|
}
|
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
if v == nil {
|
|
var buk2 *bolt.Bucket
|
|
if buk == nil {
|
|
buk2 = tx.Bucket(k)
|
|
} else {
|
|
buk2 = buk.Bucket(k)
|
|
}
|
|
|
|
bucketFn(string(k))
|
|
_ = b.iterateBuckets(buk2, bucketFn, kvFn)
|
|
} else {
|
|
kvFn(string(k), v)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// addPendingUpload adds a new file to the pending queue of uploads
|
|
func (b *Persistent) addPendingUpload(destPath string, started bool) error {
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't bucket for %v", tempBucket)
|
|
}
|
|
tempObj := &tempUploadInfo{
|
|
DestPath: destPath,
|
|
AddedOn: time.Now(),
|
|
Started: started,
|
|
}
|
|
|
|
// cache Object Info
|
|
encoded, err := json.Marshal(tempObj)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't marshal object (%v) info: %v", destPath, err)
|
|
}
|
|
err = bucket.Put([]byte(destPath), encoded)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't cache object (%v) info: %v", destPath, err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// getPendingUpload returns the next file from the pending queue of uploads
|
|
func (b *Persistent) getPendingUpload(inRoot string, waitTime time.Duration) (destPath string, err error) {
|
|
b.tempQueueMux.Lock()
|
|
defer b.tempQueueMux.Unlock()
|
|
|
|
err = b.db.Update(func(tx *bolt.Tx) error {
|
|
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't bucket for %v", tempBucket)
|
|
}
|
|
|
|
c := bucket.Cursor()
|
|
for k, v := c.Seek([]byte(inRoot)); k != nil && bytes.HasPrefix(k, []byte(inRoot)); k, v = c.Next() {
|
|
//for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
var tempObj = &tempUploadInfo{}
|
|
err = json.Unmarshal(v, tempObj)
|
|
if err != nil {
|
|
fs.Errorf(b, "failed to read pending upload: %v", err)
|
|
continue
|
|
}
|
|
// skip over started uploads
|
|
if tempObj.Started || time.Now().Before(tempObj.AddedOn.Add(waitTime)) {
|
|
continue
|
|
}
|
|
|
|
tempObj.Started = true
|
|
v2, err := json.Marshal(tempObj)
|
|
if err != nil {
|
|
fs.Errorf(b, "failed to update pending upload: %v", err)
|
|
continue
|
|
}
|
|
err = bucket.Put(k, v2)
|
|
if err != nil {
|
|
fs.Errorf(b, "failed to update pending upload: %v", err)
|
|
continue
|
|
}
|
|
|
|
destPath = tempObj.DestPath
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("no pending upload found")
|
|
})
|
|
|
|
return destPath, err
|
|
}
|
|
|
|
// SearchPendingUpload returns the file info from the pending queue of uploads
|
|
func (b *Persistent) SearchPendingUpload(remote string) (started bool, err error) {
|
|
err = b.db.View(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket([]byte(tempBucket))
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't bucket for %v", tempBucket)
|
|
}
|
|
|
|
var tempObj = &tempUploadInfo{}
|
|
v := bucket.Get([]byte(remote))
|
|
err = json.Unmarshal(v, tempObj)
|
|
if err != nil {
|
|
return fmt.Errorf("pending upload (%v) not found %v", remote, err)
|
|
}
|
|
|
|
started = tempObj.Started
|
|
return nil
|
|
})
|
|
|
|
return started, err
|
|
}
|
|
|
|
// searchPendingUploadFromDir files currently pending upload from a single dir
|
|
func (b *Persistent) searchPendingUploadFromDir(dir string) (remotes []string, err error) {
|
|
err = b.db.View(func(tx *bolt.Tx) error {
|
|
bucket := tx.Bucket([]byte(tempBucket))
|
|
if bucket == nil {
|
|
return fmt.Errorf("couldn't bucket for %v", tempBucket)
|
|
}
|
|
|
|
c := bucket.Cursor()
|
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
|
var tempObj = &tempUploadInfo{}
|
|
err = json.Unmarshal(v, tempObj)
|
|
if err != nil {
|
|
fs.Errorf(b, "failed to read pending upload: %v", err)
|
|
continue
|
|
}
|
|
parentDir := cleanPath(path.Dir(tempObj.DestPath))
|
|
if dir == parentDir {
|
|
remotes = append(remotes, tempObj.DestPath)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return remotes, err
|
|
}
|
|
|
|
func (b *Persistent) rollbackPendingUpload(remote string) error {
|
|
b.tempQueueMux.Lock()
|
|
defer b.tempQueueMux.Unlock()
|
|
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't bucket for %v", tempBucket)
|
|
}
|
|
var tempObj = &tempUploadInfo{}
|
|
v := bucket.Get([]byte(remote))
|
|
err = json.Unmarshal(v, tempObj)
|
|
if err != nil {
|
|
return fmt.Errorf("pending upload (%v) not found %v", remote, err)
|
|
}
|
|
tempObj.Started = false
|
|
v2, err := json.Marshal(tempObj)
|
|
if err != nil {
|
|
return fmt.Errorf("pending upload not updated %v", err)
|
|
}
|
|
err = bucket.Put([]byte(tempObj.DestPath), v2)
|
|
if err != nil {
|
|
return fmt.Errorf("pending upload not updated %v", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (b *Persistent) removePendingUpload(remote string) error {
|
|
b.tempQueueMux.Lock()
|
|
defer b.tempQueueMux.Unlock()
|
|
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't bucket for %v", tempBucket)
|
|
}
|
|
return bucket.Delete([]byte(remote))
|
|
})
|
|
}
|
|
|
|
// updatePendingUpload allows to update an existing item in the queue while checking if it's not started in the same
|
|
// transaction. If it is started, it will not allow the update
|
|
func (b *Persistent) updatePendingUpload(remote string, fn func(item *tempUploadInfo) error) error {
|
|
b.tempQueueMux.Lock()
|
|
defer b.tempQueueMux.Unlock()
|
|
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't bucket for %v", tempBucket)
|
|
}
|
|
|
|
var tempObj = &tempUploadInfo{}
|
|
v := bucket.Get([]byte(remote))
|
|
err = json.Unmarshal(v, tempObj)
|
|
if err != nil {
|
|
return fmt.Errorf("pending upload (%v) not found %v", remote, err)
|
|
}
|
|
if tempObj.Started {
|
|
return fmt.Errorf("pending upload already started %v", remote)
|
|
}
|
|
err = fn(tempObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if remote != tempObj.DestPath {
|
|
err := bucket.Delete([]byte(remote))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// if this is removed then the entry can be removed too
|
|
if tempObj.DestPath == "" {
|
|
return nil
|
|
}
|
|
}
|
|
v2, err := json.Marshal(tempObj)
|
|
if err != nil {
|
|
return fmt.Errorf("pending upload not updated %v", err)
|
|
}
|
|
err = bucket.Put([]byte(tempObj.DestPath), v2)
|
|
if err != nil {
|
|
return fmt.Errorf("pending upload not updated %v", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ReconcileTempUploads will recursively look for all the files in the temp directory and add them to the queue
|
|
func (b *Persistent) ReconcileTempUploads(ctx context.Context, cacheFs *Fs) error {
|
|
return b.db.Update(func(tx *bolt.Tx) error {
|
|
_ = tx.DeleteBucket([]byte(tempBucket))
|
|
bucket, err := tx.CreateBucketIfNotExists([]byte(tempBucket))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var queuedEntries []fs.Object
|
|
err = walk.ListR(ctx, cacheFs.tempFs, "", true, -1, walk.ListObjects, func(entries fs.DirEntries) error {
|
|
for _, o := range entries {
|
|
if oo, ok := o.(fs.Object); ok {
|
|
queuedEntries = append(queuedEntries, oo)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fs.Debugf(cacheFs, "reconciling temporary uploads")
|
|
for _, queuedEntry := range queuedEntries {
|
|
destPath := path.Join(cacheFs.Root(), queuedEntry.Remote())
|
|
tempObj := &tempUploadInfo{
|
|
DestPath: destPath,
|
|
AddedOn: time.Now(),
|
|
Started: false,
|
|
}
|
|
|
|
// cache Object Info
|
|
encoded, err := json.Marshal(tempObj)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't marshal object (%v) info: %v", queuedEntry, err)
|
|
}
|
|
err = bucket.Put([]byte(destPath), encoded)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't cache object (%v) info: %v", destPath, err)
|
|
}
|
|
fs.Debugf(cacheFs, "reconciled temporary upload: %v", destPath)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Close should be called when the program ends gracefully
|
|
func (b *Persistent) Close() {
|
|
b.cleanupMux.Lock()
|
|
defer b.cleanupMux.Unlock()
|
|
|
|
err := b.db.Close()
|
|
if err != nil {
|
|
fs.Errorf(b, "closing handle: %v", err)
|
|
}
|
|
b.open = false
|
|
}
|
|
|
|
// itob returns an 8-byte big endian representation of v.
|
|
func itob(v int64) []byte {
|
|
b := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(b, uint64(v))
|
|
return b
|
|
}
|
|
|
|
func btoi(d []byte) int64 {
|
|
return int64(binary.BigEndian.Uint64(d))
|
|
}
|