mirror of
https://github.com/zrepl/zrepl.git
synced 2025-02-02 03:29:13 +01:00
build pruner in factory and check prune rules
This commit is contained in:
parent
c0a3e1f121
commit
32391adf4f
@ -10,10 +10,8 @@ import (
|
|||||||
"github.com/zrepl/zrepl/daemon/logging"
|
"github.com/zrepl/zrepl/daemon/logging"
|
||||||
"github.com/zrepl/zrepl/daemon/pruner"
|
"github.com/zrepl/zrepl/daemon/pruner"
|
||||||
"github.com/zrepl/zrepl/endpoint"
|
"github.com/zrepl/zrepl/endpoint"
|
||||||
"github.com/zrepl/zrepl/pruning"
|
|
||||||
"github.com/zrepl/zrepl/replication"
|
"github.com/zrepl/zrepl/replication"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Push struct {
|
type Push struct {
|
||||||
@ -21,8 +19,7 @@ type Push struct {
|
|||||||
connecter streamrpc.Connecter
|
connecter streamrpc.Connecter
|
||||||
fsfilter endpoint.FSFilter
|
fsfilter endpoint.FSFilter
|
||||||
|
|
||||||
keepRulesSender []pruning.KeepRule
|
prunerFactory *pruner.PrunerFactory
|
||||||
keepRulesReceiver []pruning.KeepRule
|
|
||||||
|
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
replication *replication.Replication
|
replication *replication.Replication
|
||||||
@ -39,14 +36,9 @@ func PushFromConfig(g config.Global, in *config.PushJob) (j *Push, err error) {
|
|||||||
return nil, errors.Wrap(err, "cannnot build filesystem filter")
|
return nil, errors.Wrap(err, "cannnot build filesystem filter")
|
||||||
}
|
}
|
||||||
|
|
||||||
j.keepRulesReceiver, err = pruning.RulesFromConfig(in.Pruning.KeepReceiver)
|
j.prunerFactory, err = pruner.NewPrunerFactory(in.Pruning)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot build receiver pruning rules")
|
return nil, err
|
||||||
}
|
|
||||||
|
|
||||||
j.keepRulesSender, err = pruning.RulesFromConfig(in.Pruning.KeepSender)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "cannot build sender pruning rules")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return j, nil
|
return j, nil
|
||||||
@ -112,11 +104,11 @@ func (j *Push) do(ctx context.Context) {
|
|||||||
j.replication.Drive(ctx, sender, receiver)
|
j.replication.Drive(ctx, sender, receiver)
|
||||||
|
|
||||||
// Prune sender
|
// Prune sender
|
||||||
senderPruner := pruner.NewPruner(10*time.Second, sender, sender, j.keepRulesSender) // FIXME constant
|
senderPruner := j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
|
||||||
senderPruner.Prune(pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "sender")))
|
senderPruner.Prune()
|
||||||
|
|
||||||
// Prune receiver
|
// Prune receiver
|
||||||
receiverPruner := pruner.NewPruner(10*time.Second, receiver, sender, j.keepRulesReceiver) // FIXME constant
|
receiverPruner := j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender)
|
||||||
receiverPruner.Prune(pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "receiver")))
|
receiverPruner.Prune()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,8 @@ package pruner
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/zrepl/zrepl/config"
|
||||||
"github.com/zrepl/zrepl/logger"
|
"github.com/zrepl/zrepl/logger"
|
||||||
"github.com/zrepl/zrepl/pruning"
|
"github.com/zrepl/zrepl/pruning"
|
||||||
"github.com/zrepl/zrepl/replication/pdu"
|
"github.com/zrepl/zrepl/replication/pdu"
|
||||||
@ -63,9 +65,71 @@ type Pruner struct {
|
|||||||
pruneCompleted []fs
|
pruneCompleted []fs
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPruner(retryWait time.Duration, target Target, receiver History, rules []pruning.KeepRule) *Pruner {
|
type PrunerFactory struct {
|
||||||
|
senderRules []pruning.KeepRule
|
||||||
|
receiverRules []pruning.KeepRule
|
||||||
|
retryWait time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkContainsKeep1(rules []pruning.KeepRule) error {
|
||||||
|
if len(rules) == 0 {
|
||||||
|
return nil //No keep rules means keep all - ok
|
||||||
|
}
|
||||||
|
for _, e := range rules {
|
||||||
|
switch e.(type) {
|
||||||
|
case *pruning.KeepLastN:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors.New("sender keep rules must contain last_n or be empty so that the last snapshot is definitely kept")
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPrunerFactory(in config.PruningSenderReceiver) (*PrunerFactory, error) {
|
||||||
|
keepRulesReceiver, err := pruning.RulesFromConfig(in.KeepReceiver)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "cannot build receiver pruning rules")
|
||||||
|
}
|
||||||
|
|
||||||
|
keepRulesSender, err := pruning.RulesFromConfig(in.KeepSender)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "cannot build sender pruning rules")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := checkContainsKeep1(keepRulesSender); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
f := &PrunerFactory{
|
||||||
|
keepRulesSender,
|
||||||
|
keepRulesReceiver,
|
||||||
|
10 * time.Second, //FIXME constant
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, receiver History) *Pruner {
|
||||||
p := &Pruner{
|
p := &Pruner{
|
||||||
args: args{nil, target, receiver, rules, retryWait}, // ctx is filled in Prune()
|
args: args{
|
||||||
|
WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "sender")),
|
||||||
|
target,
|
||||||
|
receiver,
|
||||||
|
f.senderRules,
|
||||||
|
f.retryWait,
|
||||||
|
},
|
||||||
|
state: Plan,
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target, receiver History) *Pruner {
|
||||||
|
p := &Pruner{
|
||||||
|
args: args{
|
||||||
|
WithLogger(ctx, GetLogger(ctx).WithField("prune_side", "receiver")),
|
||||||
|
target,
|
||||||
|
receiver,
|
||||||
|
f.receiverRules,
|
||||||
|
f.retryWait,
|
||||||
|
},
|
||||||
state: Plan,
|
state: Plan,
|
||||||
}
|
}
|
||||||
return p
|
return p
|
||||||
@ -98,8 +162,7 @@ func (s State) statefunc() state {
|
|||||||
type updater func(func(*Pruner)) State
|
type updater func(func(*Pruner)) State
|
||||||
type state func(args *args, u updater) state
|
type state func(args *args, u updater) state
|
||||||
|
|
||||||
func (p *Pruner) Prune(ctx context.Context) {
|
func (p *Pruner) Prune() {
|
||||||
p.args.ctx = ctx
|
|
||||||
p.prune(p.args)
|
p.prune(p.args)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user