From 32391adf4f3be6b902b9e70a5c0cea24990b097e Mon Sep 17 00:00:00 2001 From: Anton Schirg Date: Thu, 30 Aug 2018 17:40:45 +0200 Subject: [PATCH] build pruner in factory and check prune rules --- daemon/job/push.go | 22 ++++--------- daemon/pruner/pruner.go | 71 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 19 deletions(-) diff --git a/daemon/job/push.go b/daemon/job/push.go index 97746f7..62fbb24 100644 --- a/daemon/job/push.go +++ b/daemon/job/push.go @@ -10,10 +10,8 @@ import ( "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/endpoint" - "github.com/zrepl/zrepl/pruning" "github.com/zrepl/zrepl/replication" "sync" - "time" ) type Push struct { @@ -21,8 +19,7 @@ type Push struct { connecter streamrpc.Connecter fsfilter endpoint.FSFilter - keepRulesSender []pruning.KeepRule - keepRulesReceiver []pruning.KeepRule + prunerFactory *pruner.PrunerFactory mtx sync.Mutex replication *replication.Replication @@ -39,14 +36,9 @@ func PushFromConfig(g config.Global, in *config.PushJob) (j *Push, err error) { return nil, errors.Wrap(err, "cannnot build filesystem filter") } - j.keepRulesReceiver, err = pruning.RulesFromConfig(in.Pruning.KeepReceiver) + j.prunerFactory, err = pruner.NewPrunerFactory(in.Pruning) if err != nil { - return nil, errors.Wrap(err, "cannot build receiver pruning rules") - } - - j.keepRulesSender, err = pruning.RulesFromConfig(in.Pruning.KeepSender) - if err != nil { - return nil, errors.Wrap(err, "cannot build sender pruning rules") + return nil, err } return j, nil @@ -112,11 +104,11 @@ func (j *Push) do(ctx context.Context) { j.replication.Drive(ctx, sender, receiver) // Prune sender - senderPruner := pruner.NewPruner(10*time.Second, sender, sender, j.keepRulesSender) // FIXME constant - senderPruner.Prune(pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "sender"))) + senderPruner := j.prunerFactory.BuildSenderPruner(ctx, sender, sender) + senderPruner.Prune() // Prune receiver - receiverPruner := pruner.NewPruner(10*time.Second, receiver, sender, j.keepRulesReceiver) // FIXME constant - receiverPruner.Prune(pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "receiver"))) + receiverPruner := j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender) + receiverPruner.Prune() } diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go index 8041cc0..f912a90 100644 --- a/daemon/pruner/pruner.go +++ b/daemon/pruner/pruner.go @@ -3,6 +3,8 @@ package pruner import ( "context" "fmt" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/pruning" "github.com/zrepl/zrepl/replication/pdu" @@ -63,9 +65,71 @@ type Pruner struct { pruneCompleted []fs } -func NewPruner(retryWait time.Duration, target Target, receiver History, rules []pruning.KeepRule) *Pruner { +type PrunerFactory struct { + senderRules []pruning.KeepRule + receiverRules []pruning.KeepRule + retryWait time.Duration +} + +func checkContainsKeep1(rules []pruning.KeepRule) error { + if len(rules) == 0 { + return nil //No keep rules means keep all - ok + } + for _, e := range rules { + switch e.(type) { + case *pruning.KeepLastN: + return nil + } + } + return errors.New("sender keep rules must contain last_n or be empty so that the last snapshot is definitely kept") +} + +func NewPrunerFactory(in config.PruningSenderReceiver) (*PrunerFactory, error) { + keepRulesReceiver, err := pruning.RulesFromConfig(in.KeepReceiver) + if err != nil { + return nil, errors.Wrap(err, "cannot build receiver pruning rules") + } + + keepRulesSender, err := pruning.RulesFromConfig(in.KeepSender) + if err != nil { + return nil, errors.Wrap(err, "cannot build sender pruning rules") + } + + if err := checkContainsKeep1(keepRulesSender); err != nil { + return nil, err + } + + f := &PrunerFactory{ + keepRulesSender, + keepRulesReceiver, + 10 * time.Second, //FIXME constant + } + return f, nil +} + +func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, receiver History) *Pruner { p := &Pruner{ - args: args{nil, target, receiver, rules, retryWait}, // ctx is filled in Prune() + args: args{ + WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "sender")), + target, + receiver, + f.senderRules, + f.retryWait, + }, + state: Plan, + } + return p +} + +func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target, receiver History) *Pruner { + p := &Pruner{ + args: args{ + WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "receiver")), + target, + receiver, + f.receiverRules, + f.retryWait, + }, state: Plan, } return p @@ -98,8 +162,7 @@ func (s State) statefunc() state { type updater func(func(*Pruner)) State type state func(args *args, u updater) state -func (p *Pruner) Prune(ctx context.Context) { - p.args.ctx = ctx +func (p *Pruner) Prune() { p.prune(p.args) }