WIP daemon:

Implement
* pruning on source side
* local job
* test subcommand for doing a dry-run of a prune policy

* use a non-blocking callback from autosnap to trigger the depending
jobs -> avoids races, looks saner in the debug log
This commit is contained in:
Christian Schwarz 2017-09-16 21:12:26 +02:00
parent b168274048
commit 6a05e101cf
7 changed files with 333 additions and 47 deletions

View File

@ -16,7 +16,6 @@ type IntervalAutosnap struct {
log Logger
snaptimes []snapTime
timer time.Timer
}
type snapTime struct {
@ -24,7 +23,7 @@ type snapTime struct {
time time.Time
}
func (a *IntervalAutosnap) Run(ctx context.Context) {
func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
a.log = ctx.Value(contextKeyLog).(Logger)
@ -94,7 +93,7 @@ func (a *IntervalAutosnap) Run(ctx context.Context) {
case <-time.After(syncPoint.time.Sub(now)):
a.log.Printf("snapshotting all filesystems to enable further snaps in lockstep")
a.doSnapshots()
a.doSnapshots(didSnaps)
}
ticker := time.NewTicker(a.SnapshotInterval)
@ -107,13 +106,13 @@ func (a *IntervalAutosnap) Run(ctx context.Context) {
return
case <-ticker.C:
a.doSnapshots()
a.doSnapshots(didSnaps)
}
}
}
func (a *IntervalAutosnap) doSnapshots() {
func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
// fetch new dataset list in case user added new dataset
ds, err := zfs.ZFSListMapping(a.DatasetFilter)
@ -134,4 +133,10 @@ func (a *IntervalAutosnap) doSnapshots() {
}
}
select {
case didSnaps <- struct{}{}:
default:
a.log.Printf("warning: callback channel is full, discarding")
}
}

View File

@ -3,6 +3,8 @@ package cmd
import (
"io"
"fmt"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/zfs"
)
@ -11,6 +13,14 @@ type Config struct {
Jobs map[string]Job
}
func (c *Config) LookupJob(name string) (j Job, err error) {
j, ok := conf.Jobs[name]
if !ok {
return nil, errors.Errorf("job '%s' is not defined", name)
}
return j, nil
}
type Global struct {
Serve struct {
Stdinserver struct {
@ -47,3 +57,40 @@ type SSHStdinServerConnectDescr struct {
type PrunePolicy interface {
Prune(fs *zfs.DatasetPath, versions []zfs.FilesystemVersion) (keep, remove []zfs.FilesystemVersion, err error)
}
type PruningJob interface {
Pruner(side PrunePolicySide, dryRun bool) (Pruner, error)
}
// A type for constants describing different prune policies of a PruningJob
// This is mostly a special-case for LocalJob, which is the only job that has two prune policies
// instead of one.
// It implements github.com/spf13/pflag.Value to be used as CLI flag for the test subcommand
type PrunePolicySide string
const (
PrunePolicySideDefault PrunePolicySide = ""
PrunePolicySideLeft PrunePolicySide = "left"
PrunePolicySideRight PrunePolicySide = "right"
)
func (s *PrunePolicySide) String() string {
return string(*s)
}
func (s *PrunePolicySide) Set(news string) error {
p := PrunePolicySide(news)
switch p {
case PrunePolicySideRight:
fallthrough
case PrunePolicySideLeft:
*s = p
default:
return errors.Errorf("must be either %s or %s", PrunePolicySideLeft, PrunePolicySideRight)
}
return nil
}
func (s *PrunePolicySide) Type() string {
return fmt.Sprintf("%s | %s", PrunePolicySideLeft, PrunePolicySideRight)
}

View File

@ -7,6 +7,9 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
"sync"
)
type LocalJob struct {
@ -80,6 +83,7 @@ func (j *LocalJob) JobName() string {
func (j *LocalJob) JobStart(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger)
defer log.Printf("exiting")
local := rpc.NewLocalRPC()
// Allow access to any dataset since we control what mapping
@ -91,8 +95,112 @@ func (j *LocalJob) JobStart(ctx context.Context) {
registerEndpoints(local, handler)
err := doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy})
if err != nil {
log.Printf("error doing pull: %s", err)
snapper := IntervalAutosnap{
DatasetFilter: j.Mapping.AsFilter(),
Prefix: j.SnapshotPrefix,
SnapshotInterval: j.Interval,
}
plhs, err := j.Pruner(PrunePolicySideLeft, false)
if err != nil {
log.Printf("error creating lhs pruner: %s", err)
return
}
prhs, err := j.Pruner(PrunePolicySideRight, false)
if err != nil {
log.Printf("error creating rhs pruner: %s", err)
return
}
makeCtx := func(parent context.Context, logPrefix string) (ctx context.Context) {
return context.WithValue(parent, contextKeyLog, util.NewPrefixLogger(log, logPrefix))
}
var snapCtx, plCtx, prCtx, pullCtx context.Context
snapCtx = makeCtx(ctx, "autosnap")
plCtx = makeCtx(ctx, "prune_lhs")
prCtx = makeCtx(ctx, "prune_rhs")
pullCtx = makeCtx(ctx, "repl")
didSnaps := make(chan struct{})
go snapper.Run(snapCtx, didSnaps)
outer:
for {
select {
case <-ctx.Done():
break outer
case <-didSnaps:
log.Printf("finished taking snapshots")
log.Printf("starting replication procedure")
}
{
log := pullCtx.Value(contextKeyLog).(Logger)
log.Printf("replicating from lhs to rhs")
err := doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy})
if err != nil {
log.Printf("error replicating lhs to rhs: %s", err)
}
// use a ctx as soon as doPull gains ctx support
select {
case <-ctx.Done():
break outer
default:
}
}
var wg sync.WaitGroup
log.Printf("pruning lhs")
wg.Add(1)
go func() {
plhs.Run(plCtx)
wg.Done()
}()
log.Printf("pruning rhs")
wg.Add(1)
go func() {
prhs.Run(prCtx)
wg.Done()
}()
wg.Wait()
}
log.Printf("context: %s", ctx.Err())
}
func (j *LocalJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
var dsfilter zfs.DatasetFilter
var pp PrunePolicy
switch side {
case PrunePolicySideLeft:
pp = j.PruneLHS
dsfilter = j.Mapping.AsFilter()
case PrunePolicySideRight:
pp = j.PruneRHS
dsfilter, err = j.Mapping.InvertedFilter()
if err != nil {
err = errors.Wrap(err, "cannot invert mapping for prune_rhs")
return
}
default:
err = errors.Errorf("must be either left or right side")
return
}
p = Pruner{
time.Now(),
dryRun,
dsfilter,
j.SnapshotPrefix,
pp,
}
return
}

View File

@ -93,12 +93,13 @@ func (j *PullJob) JobName() string {
func (j *PullJob) JobStart(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger)
defer log.Printf("exiting")
ticker := time.NewTicker(j.Interval)
start:
log := ctx.Value(contextKeyLog).(Logger)
log.Printf("connecting")
rwc, err := j.Connect.Connect()
if err != nil {
@ -128,7 +129,12 @@ start:
log.Printf("starting prune")
prunectx := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "prune"))
pruner := Pruner{time.Now(), false, j.pruneFilter, j.SnapshotPrefix, j.Prune}
pruner, err := j.Pruner(PrunePolicySideDefault, false)
if err != nil {
log.Printf("error creating pruner: %s", err)
return
}
pruner.Run(prunectx)
log.Printf("finish prune")
@ -143,6 +149,17 @@ start:
}
func (j *PullJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
p = Pruner{
time.Now(),
dryRun,
j.pruneFilter,
j.SnapshotPrefix,
j.Prune,
}
return
}
func (j *PullJob) doRun(ctx context.Context) {
}

View File

@ -7,7 +7,6 @@ import (
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/util"
"io"
"sync"
"time"
)
@ -19,9 +18,6 @@ type SourceJob struct {
Interval time.Duration
Prune PrunePolicy
Debug JobDebugSettings
snapCancel context.CancelFunc
serveCancel context.CancelFunc
}
func parseSourceJob(name string, i map[string]interface{}) (j *SourceJob, err error) {
@ -78,32 +74,48 @@ func (j *SourceJob) JobName() string {
func (j *SourceJob) JobStart(ctx context.Context) {
var wg sync.WaitGroup
log := ctx.Value(contextKeyLog).(Logger)
defer log.Printf("exiting")
log.Printf("starting autosnap")
var snapContext context.Context
snapContext, j.snapCancel = context.WithCancel(ctx)
snapContext = context.WithValue(snapContext, contextKeyLog, util.NewPrefixLogger(log, "autosnap"))
a := IntervalAutosnap{DatasetFilter: j.Datasets, Prefix: j.SnapshotPrefix, SnapshotInterval: j.Interval}
wg.Add(1)
go func() {
a.Run(snapContext)
wg.Done()
}()
p, err := j.Pruner(PrunePolicySideDefault, false)
if err != nil {
log.Printf("error creating pruner: %s", err)
return
}
log.Printf("starting serve")
var serveContext context.Context
serveContext, j.serveCancel = context.WithCancel(ctx)
serveContext = context.WithValue(serveContext, contextKeyLog, util.NewPrefixLogger(log, "serve"))
wg.Add(1)
go func() {
j.serve(serveContext)
wg.Done()
}()
snapContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "autosnap"))
prunerContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "prune"))
serveContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "serve"))
didSnaps := make(chan struct{})
wg.Wait()
go j.serve(serveContext)
go a.Run(snapContext, didSnaps)
outer:
for {
select {
case <-ctx.Done():
break outer
case <-didSnaps:
log.Printf("starting pruner")
p.Run(prunerContext)
log.Printf("pruner done")
}
}
log.Printf("context: %s", prunerContext.Err())
}
func (j *SourceJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
p = Pruner{
time.Now(),
dryRun,
j.Datasets,
j.SnapshotPrefix,
j.Prune,
}
return
}
func (j *SourceJob) serve(ctx context.Context) {

View File

@ -17,7 +17,14 @@ type Pruner struct {
PrunePolicy PrunePolicy
}
func (p *Pruner) Run(ctx context.Context) {
type PruneResult struct {
Filesystem *zfs.DatasetPath
All []zfs.FilesystemVersion
Keep []zfs.FilesystemVersion
Remove []zfs.FilesystemVersion
}
func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) {
log := ctx.Value(contextKeyLog).(Logger)
@ -25,19 +32,18 @@ func (p *Pruner) Run(ctx context.Context) {
log.Printf("doing dry run")
}
// ZFSListSnapsFiltered --> todo can extend fsfilter or need new? Have already something per fs
// Dedicated snapshot object? Adaptor object to FilesystemVersion?
filesystems, err := zfs.ZFSListMapping(p.DatasetFilter)
if err != nil {
log.Printf("error applying filesystem filter: %s", err)
return
return nil, err
}
if len(filesystems) <= 0 {
log.Printf("no filesystems matching filter")
return
return nil, err
}
r = make([]PruneResult, 0, len(filesystems))
for _, fs := range filesystems {
fsversions, err := zfs.ZFSListFilesystemVersions(fs, &PrefixSnapshotFilter{p.SnapshotPrefix})
@ -73,6 +79,8 @@ func (p *Pruner) Run(ctx context.Context) {
dbgj, err = json.Marshal(remove)
l.Printf("DEBUG: REMOVE=%s", dbgj)
r = append(r, PruneResult{fs, fsversions, keep, remove})
describe := func(v zfs.FilesystemVersion) string {
timeSince := v.Creation.Sub(p.Now)
const day time.Duration = 24 * time.Hour

View File

@ -3,6 +3,12 @@ package cmd
import (
"os"
"bytes"
"context"
"fmt"
"sort"
"strings"
"github.com/kr/pretty"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/zfs"
@ -20,16 +26,33 @@ var testConfigSyntaxCmd = &cobra.Command{
}
var testDatasetMapFilter = &cobra.Command{
Use: "pattern jobtype.name test/zfs/dataset/path",
Use: "pattern jobname test/zfs/dataset/path",
Short: "test dataset mapping / filter specified in config",
Example: ` zrepl test pattern prune.clean_backups tank/backups/legacyscript/foo`,
Run: doTestDatasetMapFilter,
}
var testPrunePolicyArgs struct {
side PrunePolicySide
showKept bool
showRemoved bool
}
var testPrunePolicyCmd = &cobra.Command{
Use: "prune jobname",
Short: "do a dry-run of the pruning part of a job",
Run: doTestPrunePolicy,
}
func init() {
RootCmd.AddCommand(testCmd)
testCmd.AddCommand(testConfigSyntaxCmd)
testCmd.AddCommand(testDatasetMapFilter)
testPrunePolicyCmd.Flags().VarP(&testPrunePolicyArgs.side, "side", "s", "prune_lhs (left) or prune_rhs (right)")
testPrunePolicyCmd.Flags().BoolVar(&testPrunePolicyArgs.showKept, "kept", false, "show kept snapshots")
testPrunePolicyCmd.Flags().BoolVar(&testPrunePolicyArgs.showRemoved, "removed", true, "show removed snapshots")
testCmd.AddCommand(testPrunePolicyCmd)
}
func doTestConfig(cmd *cobra.Command, args []string) {
@ -48,9 +71,9 @@ func doTestDatasetMapFilter(cmd *cobra.Command, args []string) {
}
n, i := args[0], args[1]
jobi, ok := conf.Jobs[n]
if !ok {
log.Printf("no job %s defined in config")
jobi, err := conf.LookupJob(n)
if err != nil {
log.Printf("%s", err)
os.Exit(1)
}
@ -72,7 +95,7 @@ func doTestDatasetMapFilter(cmd *cobra.Command, args []string) {
os.Exit(1)
}
if mf.filterMode{
if mf.filterMode {
pass, err := mf.Filter(ip)
if err != nil {
log.Printf("error evaluating filter: %s", err)
@ -94,3 +117,69 @@ func doTestDatasetMapFilter(cmd *cobra.Command, args []string) {
}
}
func doTestPrunePolicy(cmd *cobra.Command, args []string) {
if cmd.Flags().NArg() != 1 {
log.Printf("specify job name as first positional argument")
log.Printf(cmd.UsageString())
os.Exit(1)
}
jobname := cmd.Flags().Arg(0)
jobi, err := conf.LookupJob(jobname)
if err != nil {
log.Printf("%s", err)
os.Exit(1)
}
jobp, ok := jobi.(PruningJob)
if !ok {
log.Printf("job doesn't do any prunes")
os.Exit(0)
}
log.Printf("job dump:\n%s", pretty.Sprint(jobp))
pruner, err := jobp.Pruner(testPrunePolicyArgs.side, true)
if err != nil {
log.Printf("cannot create test pruner: %s", err)
os.Exit(1)
}
log.Printf("start pruning")
ctx := context.WithValue(context.Background(), contextKeyLog, log)
result, err := pruner.Run(ctx)
if err != nil {
log.Printf("error running pruner: %s", err)
os.Exit(1)
}
sort.Slice(result, func(i, j int) bool {
return strings.Compare(result[i].Filesystem.ToString(), result[j].Filesystem.ToString()) == -1
})
var b bytes.Buffer
for _, r := range result {
fmt.Fprintf(&b, "%s\n", r.Filesystem.ToString())
if testPrunePolicyArgs.showKept {
fmt.Fprintf(&b, "\tkept:\n")
for _, v := range r.Keep {
fmt.Fprintf(&b, "\t- %s\n", v.Name)
}
}
if testPrunePolicyArgs.showRemoved {
fmt.Fprintf(&b, "\tremoved:\n")
for _, v := range r.Remove {
fmt.Fprintf(&b, "\t- %s\n", v.Name)
}
}
}
log.Printf("pruning result:\n%s", b.String())
}