finish pruning implementation in push job

This commit is contained in:
Christian Schwarz 2018-08-30 11:52:05 +02:00
parent 22ca80eb7e
commit 7dd49b835a

View File

@ -8,11 +8,12 @@ import (
"github.com/zrepl/zrepl/daemon/connecter"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/pruning"
"github.com/zrepl/zrepl/replication"
"sync"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/pruning"
"time"
)
type Push struct {
@ -20,7 +21,7 @@ type Push struct {
connecter streamrpc.Connecter
fsfilter endpoint.FSFilter
keepRulesSender []pruning.KeepRule
keepRulesSender []pruning.KeepRule
keepRulesReceiver []pruning.KeepRule
mtx sync.Mutex
@ -38,6 +39,16 @@ func PushFromConfig(g config.Global, in *config.PushJob) (j *Push, err error) {
return nil, errors.Wrap(err, "cannnot build filesystem filter")
}
j.keepRulesReceiver, err = pruning.RulesFromConfig(in.Pruning.KeepReceiver)
if err != nil {
return nil, errors.Wrap(err, "cannot build receiver pruning rules")
}
j.keepRulesSender, err = pruning.RulesFromConfig(in.Pruning.KeepSender)
if err != nil {
return nil, errors.Wrap(err, "cannot build sender pruning rules")
}
return j, nil
}
@ -90,12 +101,11 @@ func (j *Push) do(ctx context.Context) {
rep.Drive(ctx, sender, receiver)
// Prune sender
senderPruner := pruner.NewPruner(sender, receiver, j.keepRulesSender)
senderPruner.Prune(ctx)
senderPruner := pruner.NewPruner(10*time.Second, sender, sender, j.keepRulesSender) // FIXME constant
senderPruner.Prune(pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "sender")))
// Prune receiver
receiverPruner := pruner.NewPruner(receiver, receiver, j.keepRulesReceiver)
receiverPruner.Prune(ctx)
receiverPruner := pruner.NewPruner(10*time.Second, receiver, sender, j.keepRulesReceiver) // FIXME constant
receiverPruner.Prune(pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "receiver")))
}