mirror of
https://github.com/rclone/rclone.git
synced 2024-12-23 15:38:57 +01:00
57c7fde864
After testing concurrent calling of `kv.Start` and `db.Stop` I had to restrict more parts of these under mutex to make results deterministic without Sleep's in the test body. It's more safe but has potential to lock Start for up to 2 seconds due to `db.open`.
316 lines
6.4 KiB
Go
316 lines
6.4 KiB
Go
//go:build !plan9 && !js
|
|
// +build !plan9,!js
|
|
|
|
package kv
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/config"
|
|
"github.com/rclone/rclone/lib/encoder"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
const (
|
|
initTime = 24 * time.Hour // something reasonably long
|
|
dbFileMode = 0600
|
|
dbDirMode = 0700
|
|
queueSize = 2
|
|
)
|
|
|
|
// DB represents a key-value database
|
|
type DB struct {
|
|
name string
|
|
path string
|
|
facility string
|
|
refs int
|
|
bolt *bbolt.DB
|
|
mu sync.Mutex
|
|
canWrite bool
|
|
queue chan *request
|
|
lockTime time.Duration
|
|
idleTime time.Duration
|
|
openTime time.Duration
|
|
idleTimer *time.Timer
|
|
lockTimer *time.Timer
|
|
}
|
|
|
|
var (
|
|
dbMap = map[string]*DB{}
|
|
dbMut sync.Mutex
|
|
atExit bool
|
|
)
|
|
|
|
// Supported returns true on supported OSes
|
|
func Supported() bool { return true }
|
|
|
|
// makeName makes a store name
|
|
func makeName(facility string, f fs.Fs) string {
|
|
var name string
|
|
if f != nil {
|
|
name = f.Name()
|
|
if idx := strings.Index(name, "{"); idx != -1 {
|
|
name = name[:idx]
|
|
}
|
|
name = encoder.OS.FromStandardPath(name)
|
|
name += "~"
|
|
}
|
|
return name + facility + ".bolt"
|
|
}
|
|
|
|
// Start a new key-value database
|
|
func Start(ctx context.Context, facility string, f fs.Fs) (*DB, error) {
|
|
dbMut.Lock()
|
|
defer dbMut.Unlock()
|
|
if db := lockedGet(facility, f); db != nil {
|
|
return db, nil
|
|
}
|
|
|
|
dir := filepath.Join(config.GetCacheDir(), "kv")
|
|
if err := os.MkdirAll(dir, dbDirMode); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
name := makeName(facility, f)
|
|
lockTime := fs.GetConfig(ctx).KvLockTime
|
|
|
|
db := &DB{
|
|
name: name,
|
|
path: filepath.Join(dir, name),
|
|
facility: facility,
|
|
refs: 1,
|
|
lockTime: lockTime,
|
|
idleTime: lockTime / 4,
|
|
openTime: lockTime * 2,
|
|
idleTimer: time.NewTimer(initTime),
|
|
lockTimer: time.NewTimer(initTime),
|
|
queue: make(chan *request, queueSize),
|
|
}
|
|
|
|
fi, err := os.Stat(db.path)
|
|
if strings.HasSuffix(os.Args[0], ".test") || (err == nil && fi.Size() == 0) {
|
|
_ = os.Remove(db.path)
|
|
fs.Infof(db.name, "drop cache remaining after unit test")
|
|
}
|
|
|
|
if err = db.open(ctx, false); err != nil && err != ErrEmpty {
|
|
return nil, errors.Wrapf(err, "cannot open db: %s", db.path)
|
|
}
|
|
|
|
dbMap[name] = db
|
|
go db.loop()
|
|
return db, nil
|
|
}
|
|
|
|
// Get returns database record for given filesystem and facility
|
|
func Get(facility string, f fs.Fs) *DB {
|
|
dbMut.Lock()
|
|
defer dbMut.Unlock()
|
|
return lockedGet(facility, f)
|
|
}
|
|
|
|
func lockedGet(facility string, f fs.Fs) *DB {
|
|
name := makeName(facility, f)
|
|
db := dbMap[name]
|
|
if db != nil {
|
|
db.mu.Lock()
|
|
db.refs++
|
|
db.mu.Unlock()
|
|
}
|
|
return db
|
|
}
|
|
|
|
// Path returns database path
|
|
func (db *DB) Path() string { return db.path }
|
|
|
|
var modeNames = map[bool]string{false: "reading", true: "writing"}
|
|
|
|
func (db *DB) open(ctx context.Context, forWrite bool) (err error) {
|
|
if db.bolt != nil && (db.canWrite || !forWrite) {
|
|
return nil
|
|
}
|
|
_ = db.close()
|
|
|
|
db.canWrite = forWrite
|
|
if !forWrite {
|
|
// mitigate https://github.com/etcd-io/bbolt/issues/98
|
|
_, err = os.Stat(db.path)
|
|
if os.IsNotExist(err) {
|
|
return ErrEmpty
|
|
}
|
|
}
|
|
|
|
opt := &bbolt.Options{
|
|
Timeout: db.openTime,
|
|
ReadOnly: !forWrite,
|
|
}
|
|
openMode := modeNames[forWrite]
|
|
startTime := time.Now()
|
|
var bolt *bbolt.DB
|
|
retry := 1
|
|
maxRetries := fs.GetConfig(ctx).LowLevelRetries
|
|
for {
|
|
bolt, err = bbolt.Open(db.path, dbFileMode, opt)
|
|
if err == nil || retry >= maxRetries {
|
|
break
|
|
}
|
|
fs.Debugf(db.name, "Retry #%d opening for %s: %v", retry, openMode, err)
|
|
retry++
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fs.Debugf(db.name, "Opened for %s in %v", openMode, time.Since(startTime))
|
|
_ = db.lockTimer.Reset(db.lockTime)
|
|
_ = db.idleTimer.Reset(db.idleTime)
|
|
db.bolt = bolt
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) close() (err error) {
|
|
if db.bolt != nil {
|
|
_ = db.lockTimer.Stop()
|
|
_ = db.idleTimer.Stop()
|
|
err = db.bolt.Close()
|
|
db.bolt = nil
|
|
fs.Debugf(db.name, "released")
|
|
}
|
|
return
|
|
}
|
|
|
|
// loop over database operations sequentially
|
|
func (db *DB) loop() {
|
|
ctx := context.Background()
|
|
var req *request
|
|
quit := false
|
|
for !quit {
|
|
select {
|
|
case req = <-db.queue:
|
|
if quit = req.handle(ctx, db); !quit {
|
|
req.wg.Done()
|
|
_ = db.idleTimer.Reset(db.idleTime)
|
|
}
|
|
case <-db.idleTimer.C:
|
|
_ = db.close()
|
|
case <-db.lockTimer.C:
|
|
_ = db.close()
|
|
}
|
|
}
|
|
db.queue = nil
|
|
if !atExit {
|
|
dbMut.Lock()
|
|
delete(dbMap, db.name)
|
|
dbMut.Unlock()
|
|
}
|
|
req.wg.Done()
|
|
}
|
|
|
|
// Do a key-value operation and return error when done
|
|
func (db *DB) Do(write bool, op Op) error {
|
|
if db.queue == nil {
|
|
return ErrInactive
|
|
}
|
|
r := &request{
|
|
op: op,
|
|
wr: write,
|
|
}
|
|
r.wg.Add(1)
|
|
db.queue <- r
|
|
r.wg.Wait()
|
|
return r.err
|
|
}
|
|
|
|
// request encapsulates a synchronous operation and its results
|
|
type request struct {
|
|
op Op
|
|
wr bool
|
|
err error
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// handle a key-value request with given DB
|
|
// returns true as a signal to quit the loop
|
|
func (r *request) handle(ctx context.Context, db *DB) bool {
|
|
db.mu.Lock()
|
|
defer db.mu.Unlock()
|
|
if op, stop := r.op.(*opStop); stop {
|
|
r.err = db.close()
|
|
if op.remove {
|
|
if err := os.Remove(db.path); !os.IsNotExist(err) {
|
|
r.err = err
|
|
}
|
|
}
|
|
db.refs--
|
|
return db.refs <= 0
|
|
}
|
|
r.err = db.execute(ctx, r.op, r.wr)
|
|
return false
|
|
}
|
|
|
|
// execute a key-value DB operation
|
|
func (db *DB) execute(ctx context.Context, op Op, write bool) error {
|
|
if err := db.open(ctx, write); err != nil {
|
|
return err
|
|
}
|
|
if write {
|
|
return db.bolt.Update(func(tx *bbolt.Tx) error {
|
|
b, err := tx.CreateBucketIfNotExists([]byte(db.facility))
|
|
if err != nil || b == nil {
|
|
return ErrEmpty
|
|
}
|
|
return op.Do(ctx, &bucketAdapter{b})
|
|
})
|
|
}
|
|
return db.bolt.View(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket([]byte(db.facility))
|
|
if b == nil {
|
|
return ErrEmpty
|
|
}
|
|
return op.Do(ctx, &bucketAdapter{b})
|
|
})
|
|
}
|
|
|
|
// bucketAdapter is a thin wrapper adapting kv.Bucket to bbolt.Bucket
|
|
type bucketAdapter struct {
|
|
*bbolt.Bucket
|
|
}
|
|
|
|
func (b *bucketAdapter) Cursor() Cursor {
|
|
return b.Bucket.Cursor()
|
|
}
|
|
|
|
// Stop a database loop, optionally removing the file
|
|
func (db *DB) Stop(remove bool) error {
|
|
return db.Do(false, &opStop{remove: remove})
|
|
}
|
|
|
|
// opStop: close database and stop operation loop
|
|
type opStop struct {
|
|
remove bool
|
|
}
|
|
|
|
func (*opStop) Do(context.Context, Bucket) error {
|
|
return nil
|
|
}
|
|
|
|
// Exit immediately stops all databases
|
|
func Exit() {
|
|
dbMut.Lock()
|
|
atExit = true
|
|
for _, s := range dbMap {
|
|
s.refs = 0
|
|
_ = s.Stop(false)
|
|
}
|
|
dbMap = map[string]*DB{}
|
|
atExit = false
|
|
dbMut.Unlock()
|
|
}
|