rclone/lib/kv/bolt.go
Nick Craig-Wood e43b5ce5e5 Remove github.com/pkg/errors and replace with std library version
This is possible now that we no longer support go1.12 and brings
rclone into line with standard practices in the Go world.

This also removes errors.New and errors.Errorf from lib/errors and
prefers the stdlib errors package over lib/errors.
2021-11-07 11:53:30 +00:00

316 lines
6.3 KiB
Go

//go:build !plan9 && !js
// +build !plan9,!js
package kv
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/lib/encoder"
"go.etcd.io/bbolt"
)
const (
initTime = 24 * time.Hour // something reasonably long
dbFileMode = 0600
dbDirMode = 0700
queueSize = 2
)
// DB represents a key-value database
type DB struct {
name string
path string
facility string
refs int
bolt *bbolt.DB
mu sync.Mutex
canWrite bool
queue chan *request
lockTime time.Duration
idleTime time.Duration
openTime time.Duration
idleTimer *time.Timer
lockTimer *time.Timer
}
var (
dbMap = map[string]*DB{}
dbMut sync.Mutex
atExit bool
)
// Supported returns true on supported OSes
func Supported() bool { return true }
// makeName makes a store name
func makeName(facility string, f fs.Fs) string {
var name string
if f != nil {
name = f.Name()
if idx := strings.Index(name, "{"); idx != -1 {
name = name[:idx]
}
name = encoder.OS.FromStandardPath(name)
name += "~"
}
return name + facility + ".bolt"
}
// Start a new key-value database
func Start(ctx context.Context, facility string, f fs.Fs) (*DB, error) {
dbMut.Lock()
defer dbMut.Unlock()
if db := lockedGet(facility, f); db != nil {
return db, nil
}
dir := filepath.Join(config.GetCacheDir(), "kv")
if err := os.MkdirAll(dir, dbDirMode); err != nil {
return nil, err
}
name := makeName(facility, f)
lockTime := fs.GetConfig(ctx).KvLockTime
db := &DB{
name: name,
path: filepath.Join(dir, name),
facility: facility,
refs: 1,
lockTime: lockTime,
idleTime: lockTime / 4,
openTime: lockTime * 2,
idleTimer: time.NewTimer(initTime),
lockTimer: time.NewTimer(initTime),
queue: make(chan *request, queueSize),
}
fi, err := os.Stat(db.path)
if strings.HasSuffix(os.Args[0], ".test") || (err == nil && fi.Size() == 0) {
_ = os.Remove(db.path)
fs.Infof(db.name, "drop cache remaining after unit test")
}
if err = db.open(ctx, false); err != nil && err != ErrEmpty {
return nil, fmt.Errorf("cannot open db: %s: %w", db.path, err)
}
dbMap[name] = db
go db.loop()
return db, nil
}
// Get returns database record for given filesystem and facility
func Get(facility string, f fs.Fs) *DB {
dbMut.Lock()
defer dbMut.Unlock()
return lockedGet(facility, f)
}
func lockedGet(facility string, f fs.Fs) *DB {
name := makeName(facility, f)
db := dbMap[name]
if db != nil {
db.mu.Lock()
db.refs++
db.mu.Unlock()
}
return db
}
// Path returns database path
func (db *DB) Path() string { return db.path }
var modeNames = map[bool]string{false: "reading", true: "writing"}
func (db *DB) open(ctx context.Context, forWrite bool) (err error) {
if db.bolt != nil && (db.canWrite || !forWrite) {
return nil
}
_ = db.close()
db.canWrite = forWrite
if !forWrite {
// mitigate https://github.com/etcd-io/bbolt/issues/98
_, err = os.Stat(db.path)
if os.IsNotExist(err) {
return ErrEmpty
}
}
opt := &bbolt.Options{
Timeout: db.openTime,
ReadOnly: !forWrite,
}
openMode := modeNames[forWrite]
startTime := time.Now()
var bolt *bbolt.DB
retry := 1
maxRetries := fs.GetConfig(ctx).LowLevelRetries
for {
bolt, err = bbolt.Open(db.path, dbFileMode, opt)
if err == nil || retry >= maxRetries {
break
}
fs.Debugf(db.name, "Retry #%d opening for %s: %v", retry, openMode, err)
retry++
}
if err != nil {
return err
}
fs.Debugf(db.name, "Opened for %s in %v", openMode, time.Since(startTime))
_ = db.lockTimer.Reset(db.lockTime)
_ = db.idleTimer.Reset(db.idleTime)
db.bolt = bolt
return nil
}
func (db *DB) close() (err error) {
if db.bolt != nil {
_ = db.lockTimer.Stop()
_ = db.idleTimer.Stop()
err = db.bolt.Close()
db.bolt = nil
fs.Debugf(db.name, "released")
}
return
}
// loop over database operations sequentially
func (db *DB) loop() {
ctx := context.Background()
var req *request
quit := false
for !quit {
select {
case req = <-db.queue:
if quit = req.handle(ctx, db); !quit {
req.wg.Done()
_ = db.idleTimer.Reset(db.idleTime)
}
case <-db.idleTimer.C:
_ = db.close()
case <-db.lockTimer.C:
_ = db.close()
}
}
db.queue = nil
if !atExit {
dbMut.Lock()
delete(dbMap, db.name)
dbMut.Unlock()
}
req.wg.Done()
}
// Do a key-value operation and return error when done
func (db *DB) Do(write bool, op Op) error {
if db.queue == nil {
return ErrInactive
}
r := &request{
op: op,
wr: write,
}
r.wg.Add(1)
db.queue <- r
r.wg.Wait()
return r.err
}
// request encapsulates a synchronous operation and its results
type request struct {
op Op
wr bool
err error
wg sync.WaitGroup
}
// handle a key-value request with given DB
// returns true as a signal to quit the loop
func (r *request) handle(ctx context.Context, db *DB) bool {
db.mu.Lock()
defer db.mu.Unlock()
if op, stop := r.op.(*opStop); stop {
r.err = db.close()
if op.remove {
if err := os.Remove(db.path); !os.IsNotExist(err) {
r.err = err
}
}
db.refs--
return db.refs <= 0
}
r.err = db.execute(ctx, r.op, r.wr)
return false
}
// execute a key-value DB operation
func (db *DB) execute(ctx context.Context, op Op, write bool) error {
if err := db.open(ctx, write); err != nil {
return err
}
if write {
return db.bolt.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(db.facility))
if err != nil || b == nil {
return ErrEmpty
}
return op.Do(ctx, &bucketAdapter{b})
})
}
return db.bolt.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(db.facility))
if b == nil {
return ErrEmpty
}
return op.Do(ctx, &bucketAdapter{b})
})
}
// bucketAdapter is a thin wrapper adapting kv.Bucket to bbolt.Bucket
type bucketAdapter struct {
*bbolt.Bucket
}
func (b *bucketAdapter) Cursor() Cursor {
return b.Bucket.Cursor()
}
// Stop a database loop, optionally removing the file
func (db *DB) Stop(remove bool) error {
return db.Do(false, &opStop{remove: remove})
}
// opStop: close database and stop operation loop
type opStop struct {
remove bool
}
func (*opStop) Do(context.Context, Bucket) error {
return nil
}
// Exit immediately stops all databases
func Exit() {
dbMut.Lock()
atExit = true
for _, s := range dbMap {
s.refs = 0
_ = s.Stop(false)
}
dbMap = map[string]*DB{}
atExit = false
dbMut.Unlock()
}