From 6a05e101cf0bf2c53d5da6611699540969f625fc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sat, 16 Sep 2017 21:12:26 +0200 Subject: [PATCH] WIP daemon: Implement * pruning on source side * local job * test subcommand for doing a dry-run of a prune policy * use a non-blocking callback from autosnap to trigger the depending jobs -> avoids races, looks saner in the debug log --- cmd/autosnap.go | 15 ++++-- cmd/config.go | 47 ++++++++++++++++ cmd/config_job_local.go | 114 +++++++++++++++++++++++++++++++++++++-- cmd/config_job_pull.go | 23 ++++++-- cmd/config_job_source.go | 62 ++++++++++++--------- cmd/prune.go | 20 ++++--- cmd/test.go | 99 ++++++++++++++++++++++++++++++++-- 7 files changed, 333 insertions(+), 47 deletions(-) diff --git a/cmd/autosnap.go b/cmd/autosnap.go index 9699b85..eaeef4d 100644 --- a/cmd/autosnap.go +++ b/cmd/autosnap.go @@ -16,7 +16,6 @@ type IntervalAutosnap struct { log Logger snaptimes []snapTime - timer time.Timer } type snapTime struct { @@ -24,7 +23,7 @@ type snapTime struct { time time.Time } -func (a *IntervalAutosnap) Run(ctx context.Context) { +func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) { a.log = ctx.Value(contextKeyLog).(Logger) @@ -94,7 +93,7 @@ func (a *IntervalAutosnap) Run(ctx context.Context) { case <-time.After(syncPoint.time.Sub(now)): a.log.Printf("snapshotting all filesystems to enable further snaps in lockstep") - a.doSnapshots() + a.doSnapshots(didSnaps) } ticker := time.NewTicker(a.SnapshotInterval) @@ -107,13 +106,13 @@ func (a *IntervalAutosnap) Run(ctx context.Context) { return case <-ticker.C: - a.doSnapshots() + a.doSnapshots(didSnaps) } } } -func (a *IntervalAutosnap) doSnapshots() { +func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) { // fetch new dataset list in case user added new dataset ds, err := zfs.ZFSListMapping(a.DatasetFilter) @@ -134,4 +133,10 @@ func (a *IntervalAutosnap) doSnapshots() { } } + select { + case didSnaps <- struct{}{}: + default: + a.log.Printf("warning: callback channel is full, discarding") + } + } diff --git a/cmd/config.go b/cmd/config.go index eb67a39..ccd7d41 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -3,6 +3,8 @@ package cmd import ( "io" + "fmt" + "github.com/pkg/errors" "github.com/zrepl/zrepl/zfs" ) @@ -11,6 +13,14 @@ type Config struct { Jobs map[string]Job } +func (c *Config) LookupJob(name string) (j Job, err error) { + j, ok := conf.Jobs[name] + if !ok { + return nil, errors.Errorf("job '%s' is not defined", name) + } + return j, nil +} + type Global struct { Serve struct { Stdinserver struct { @@ -47,3 +57,40 @@ type SSHStdinServerConnectDescr struct { type PrunePolicy interface { Prune(fs *zfs.DatasetPath, versions []zfs.FilesystemVersion) (keep, remove []zfs.FilesystemVersion, err error) } + +type PruningJob interface { + Pruner(side PrunePolicySide, dryRun bool) (Pruner, error) +} + +// A type for constants describing different prune policies of a PruningJob +// This is mostly a special-case for LocalJob, which is the only job that has two prune policies +// instead of one. +// It implements github.com/spf13/pflag.Value to be used as CLI flag for the test subcommand +type PrunePolicySide string + +const ( + PrunePolicySideDefault PrunePolicySide = "" + PrunePolicySideLeft PrunePolicySide = "left" + PrunePolicySideRight PrunePolicySide = "right" +) + +func (s *PrunePolicySide) String() string { + return string(*s) +} + +func (s *PrunePolicySide) Set(news string) error { + p := PrunePolicySide(news) + switch p { + case PrunePolicySideRight: + fallthrough + case PrunePolicySideLeft: + *s = p + default: + return errors.Errorf("must be either %s or %s", PrunePolicySideLeft, PrunePolicySideRight) + } + return nil +} + +func (s *PrunePolicySide) Type() string { + return fmt.Sprintf("%s | %s", PrunePolicySideLeft, PrunePolicySideRight) +} diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go index 313edf7..ada9c1f 100644 --- a/cmd/config_job_local.go +++ b/cmd/config_job_local.go @@ -7,6 +7,9 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/zrepl/zrepl/rpc" + "github.com/zrepl/zrepl/util" + "github.com/zrepl/zrepl/zfs" + "sync" ) type LocalJob struct { @@ -80,6 +83,7 @@ func (j *LocalJob) JobName() string { func (j *LocalJob) JobStart(ctx context.Context) { log := ctx.Value(contextKeyLog).(Logger) + defer log.Printf("exiting") local := rpc.NewLocalRPC() // Allow access to any dataset since we control what mapping @@ -91,8 +95,112 @@ func (j *LocalJob) JobStart(ctx context.Context) { registerEndpoints(local, handler) - err := doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy}) - if err != nil { - log.Printf("error doing pull: %s", err) + snapper := IntervalAutosnap{ + DatasetFilter: j.Mapping.AsFilter(), + Prefix: j.SnapshotPrefix, + SnapshotInterval: j.Interval, } + + plhs, err := j.Pruner(PrunePolicySideLeft, false) + if err != nil { + log.Printf("error creating lhs pruner: %s", err) + return + } + prhs, err := j.Pruner(PrunePolicySideRight, false) + if err != nil { + log.Printf("error creating rhs pruner: %s", err) + return + } + + makeCtx := func(parent context.Context, logPrefix string) (ctx context.Context) { + return context.WithValue(parent, contextKeyLog, util.NewPrefixLogger(log, logPrefix)) + } + var snapCtx, plCtx, prCtx, pullCtx context.Context + snapCtx = makeCtx(ctx, "autosnap") + plCtx = makeCtx(ctx, "prune_lhs") + prCtx = makeCtx(ctx, "prune_rhs") + pullCtx = makeCtx(ctx, "repl") + + didSnaps := make(chan struct{}) + go snapper.Run(snapCtx, didSnaps) + +outer: + for { + + select { + case <-ctx.Done(): + break outer + case <-didSnaps: + log.Printf("finished taking snapshots") + log.Printf("starting replication procedure") + } + + { + log := pullCtx.Value(contextKeyLog).(Logger) + log.Printf("replicating from lhs to rhs") + err := doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy}) + if err != nil { + log.Printf("error replicating lhs to rhs: %s", err) + } + // use a ctx as soon as doPull gains ctx support + select { + case <-ctx.Done(): + break outer + default: + } + } + + var wg sync.WaitGroup + + log.Printf("pruning lhs") + wg.Add(1) + go func() { + plhs.Run(plCtx) + wg.Done() + }() + + log.Printf("pruning rhs") + wg.Add(1) + go func() { + prhs.Run(prCtx) + wg.Done() + }() + + wg.Wait() + + } + + log.Printf("context: %s", ctx.Err()) + +} + +func (j *LocalJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { + + var dsfilter zfs.DatasetFilter + var pp PrunePolicy + switch side { + case PrunePolicySideLeft: + pp = j.PruneLHS + dsfilter = j.Mapping.AsFilter() + case PrunePolicySideRight: + pp = j.PruneRHS + dsfilter, err = j.Mapping.InvertedFilter() + if err != nil { + err = errors.Wrap(err, "cannot invert mapping for prune_rhs") + return + } + default: + err = errors.Errorf("must be either left or right side") + return + } + + p = Pruner{ + time.Now(), + dryRun, + dsfilter, + j.SnapshotPrefix, + pp, + } + + return } diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index fcd0fc0..37342e6 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -93,12 +93,13 @@ func (j *PullJob) JobName() string { func (j *PullJob) JobStart(ctx context.Context) { + log := ctx.Value(contextKeyLog).(Logger) + defer log.Printf("exiting") + ticker := time.NewTicker(j.Interval) start: - log := ctx.Value(contextKeyLog).(Logger) - log.Printf("connecting") rwc, err := j.Connect.Connect() if err != nil { @@ -128,7 +129,12 @@ start: log.Printf("starting prune") prunectx := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "prune")) - pruner := Pruner{time.Now(), false, j.pruneFilter, j.SnapshotPrefix, j.Prune} + pruner, err := j.Pruner(PrunePolicySideDefault, false) + if err != nil { + log.Printf("error creating pruner: %s", err) + return + } + pruner.Run(prunectx) log.Printf("finish prune") @@ -143,6 +149,17 @@ start: } +func (j *PullJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { + p = Pruner{ + time.Now(), + dryRun, + j.pruneFilter, + j.SnapshotPrefix, + j.Prune, + } + return +} + func (j *PullJob) doRun(ctx context.Context) { } diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go index 6d68ed7..7d4d8af 100644 --- a/cmd/config_job_source.go +++ b/cmd/config_job_source.go @@ -7,7 +7,6 @@ import ( "github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/util" "io" - "sync" "time" ) @@ -19,9 +18,6 @@ type SourceJob struct { Interval time.Duration Prune PrunePolicy Debug JobDebugSettings - - snapCancel context.CancelFunc - serveCancel context.CancelFunc } func parseSourceJob(name string, i map[string]interface{}) (j *SourceJob, err error) { @@ -78,32 +74,48 @@ func (j *SourceJob) JobName() string { func (j *SourceJob) JobStart(ctx context.Context) { - var wg sync.WaitGroup - log := ctx.Value(contextKeyLog).(Logger) + defer log.Printf("exiting") - log.Printf("starting autosnap") - var snapContext context.Context - snapContext, j.snapCancel = context.WithCancel(ctx) - snapContext = context.WithValue(snapContext, contextKeyLog, util.NewPrefixLogger(log, "autosnap")) a := IntervalAutosnap{DatasetFilter: j.Datasets, Prefix: j.SnapshotPrefix, SnapshotInterval: j.Interval} - wg.Add(1) - go func() { - a.Run(snapContext) - wg.Done() - }() + p, err := j.Pruner(PrunePolicySideDefault, false) + if err != nil { + log.Printf("error creating pruner: %s", err) + return + } - log.Printf("starting serve") - var serveContext context.Context - serveContext, j.serveCancel = context.WithCancel(ctx) - serveContext = context.WithValue(serveContext, contextKeyLog, util.NewPrefixLogger(log, "serve")) - wg.Add(1) - go func() { - j.serve(serveContext) - wg.Done() - }() + snapContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "autosnap")) + prunerContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "prune")) + serveContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "serve")) + didSnaps := make(chan struct{}) - wg.Wait() + go j.serve(serveContext) + go a.Run(snapContext, didSnaps) + +outer: + for { + select { + case <-ctx.Done(): + break outer + case <-didSnaps: + log.Printf("starting pruner") + p.Run(prunerContext) + log.Printf("pruner done") + } + } + log.Printf("context: %s", prunerContext.Err()) + +} + +func (j *SourceJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { + p = Pruner{ + time.Now(), + dryRun, + j.Datasets, + j.SnapshotPrefix, + j.Prune, + } + return } func (j *SourceJob) serve(ctx context.Context) { diff --git a/cmd/prune.go b/cmd/prune.go index d4bc389..6300a3d 100644 --- a/cmd/prune.go +++ b/cmd/prune.go @@ -17,7 +17,14 @@ type Pruner struct { PrunePolicy PrunePolicy } -func (p *Pruner) Run(ctx context.Context) { +type PruneResult struct { + Filesystem *zfs.DatasetPath + All []zfs.FilesystemVersion + Keep []zfs.FilesystemVersion + Remove []zfs.FilesystemVersion +} + +func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) { log := ctx.Value(contextKeyLog).(Logger) @@ -25,19 +32,18 @@ func (p *Pruner) Run(ctx context.Context) { log.Printf("doing dry run") } - // ZFSListSnapsFiltered --> todo can extend fsfilter or need new? Have already something per fs - // Dedicated snapshot object? Adaptor object to FilesystemVersion? - filesystems, err := zfs.ZFSListMapping(p.DatasetFilter) if err != nil { log.Printf("error applying filesystem filter: %s", err) - return + return nil, err } if len(filesystems) <= 0 { log.Printf("no filesystems matching filter") - return + return nil, err } + r = make([]PruneResult, 0, len(filesystems)) + for _, fs := range filesystems { fsversions, err := zfs.ZFSListFilesystemVersions(fs, &PrefixSnapshotFilter{p.SnapshotPrefix}) @@ -73,6 +79,8 @@ func (p *Pruner) Run(ctx context.Context) { dbgj, err = json.Marshal(remove) l.Printf("DEBUG: REMOVE=%s", dbgj) + r = append(r, PruneResult{fs, fsversions, keep, remove}) + describe := func(v zfs.FilesystemVersion) string { timeSince := v.Creation.Sub(p.Now) const day time.Duration = 24 * time.Hour diff --git a/cmd/test.go b/cmd/test.go index a7f328c..5702eef 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -3,6 +3,12 @@ package cmd import ( "os" + "bytes" + "context" + "fmt" + "sort" + "strings" + "github.com/kr/pretty" "github.com/spf13/cobra" "github.com/zrepl/zrepl/zfs" @@ -20,16 +26,33 @@ var testConfigSyntaxCmd = &cobra.Command{ } var testDatasetMapFilter = &cobra.Command{ - Use: "pattern jobtype.name test/zfs/dataset/path", + Use: "pattern jobname test/zfs/dataset/path", Short: "test dataset mapping / filter specified in config", Example: ` zrepl test pattern prune.clean_backups tank/backups/legacyscript/foo`, Run: doTestDatasetMapFilter, } +var testPrunePolicyArgs struct { + side PrunePolicySide + showKept bool + showRemoved bool +} + +var testPrunePolicyCmd = &cobra.Command{ + Use: "prune jobname", + Short: "do a dry-run of the pruning part of a job", + Run: doTestPrunePolicy, +} + func init() { RootCmd.AddCommand(testCmd) testCmd.AddCommand(testConfigSyntaxCmd) testCmd.AddCommand(testDatasetMapFilter) + + testPrunePolicyCmd.Flags().VarP(&testPrunePolicyArgs.side, "side", "s", "prune_lhs (left) or prune_rhs (right)") + testPrunePolicyCmd.Flags().BoolVar(&testPrunePolicyArgs.showKept, "kept", false, "show kept snapshots") + testPrunePolicyCmd.Flags().BoolVar(&testPrunePolicyArgs.showRemoved, "removed", true, "show removed snapshots") + testCmd.AddCommand(testPrunePolicyCmd) } func doTestConfig(cmd *cobra.Command, args []string) { @@ -48,9 +71,9 @@ func doTestDatasetMapFilter(cmd *cobra.Command, args []string) { } n, i := args[0], args[1] - jobi, ok := conf.Jobs[n] - if !ok { - log.Printf("no job %s defined in config") + jobi, err := conf.LookupJob(n) + if err != nil { + log.Printf("%s", err) os.Exit(1) } @@ -72,7 +95,7 @@ func doTestDatasetMapFilter(cmd *cobra.Command, args []string) { os.Exit(1) } - if mf.filterMode{ + if mf.filterMode { pass, err := mf.Filter(ip) if err != nil { log.Printf("error evaluating filter: %s", err) @@ -94,3 +117,69 @@ func doTestDatasetMapFilter(cmd *cobra.Command, args []string) { } } + +func doTestPrunePolicy(cmd *cobra.Command, args []string) { + + if cmd.Flags().NArg() != 1 { + log.Printf("specify job name as first positional argument") + log.Printf(cmd.UsageString()) + os.Exit(1) + } + + jobname := cmd.Flags().Arg(0) + jobi, err := conf.LookupJob(jobname) + if err != nil { + log.Printf("%s", err) + os.Exit(1) + } + + jobp, ok := jobi.(PruningJob) + if !ok { + log.Printf("job doesn't do any prunes") + os.Exit(0) + } + + log.Printf("job dump:\n%s", pretty.Sprint(jobp)) + + pruner, err := jobp.Pruner(testPrunePolicyArgs.side, true) + if err != nil { + log.Printf("cannot create test pruner: %s", err) + os.Exit(1) + } + + log.Printf("start pruning") + + ctx := context.WithValue(context.Background(), contextKeyLog, log) + result, err := pruner.Run(ctx) + if err != nil { + log.Printf("error running pruner: %s", err) + os.Exit(1) + } + + sort.Slice(result, func(i, j int) bool { + return strings.Compare(result[i].Filesystem.ToString(), result[j].Filesystem.ToString()) == -1 + }) + + var b bytes.Buffer + for _, r := range result { + fmt.Fprintf(&b, "%s\n", r.Filesystem.ToString()) + + if testPrunePolicyArgs.showKept { + fmt.Fprintf(&b, "\tkept:\n") + for _, v := range r.Keep { + fmt.Fprintf(&b, "\t- %s\n", v.Name) + } + } + + if testPrunePolicyArgs.showRemoved { + fmt.Fprintf(&b, "\tremoved:\n") + for _, v := range r.Remove { + fmt.Fprintf(&b, "\t- %s\n", v.Name) + } + } + + } + + log.Printf("pruning result:\n%s", b.String()) + +}