diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index 09bc59b..1fcb90e 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -4,6 +4,7 @@ import ( "time" "context" + "fmt" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/zrepl/zrepl/rpc" @@ -98,16 +99,30 @@ func (j *PullJob) JobStart(ctx context.Context) { log := ctx.Value(contextKeyLog).(Logger) defer log.Info("exiting") j.task = NewTask("main", log) - log = j.task.Log() + + // j.task is idle here idle here ticker := time.NewTicker(j.Interval) + for { + j.doRun(ctx) + select { + case <-ctx.Done(): + j.task.Log().WithError(ctx.Err()).Info("context") + return + case <-ticker.C: + } + } +} -start: +func (j *PullJob) doRun(ctx context.Context) { - log.Info("connecting") + j.task.Enter("run") + defer j.task.Finish() + + j.task.Log().Info("connecting") rwc, err := j.Connect.Connect() if err != nil { - log.WithError(err).Error("error connecting") + j.task.Log().WithError(err).Error("error connecting") return } @@ -118,37 +133,24 @@ start: client := rpc.NewClient(rwc) if j.Debug.RPC.Log { - client.SetLogger(log, true) + client.SetLogger(j.task.Log(), true) } - log.Info("starting pull") - j.task.Enter("pull") puller := Puller{j.task, client, j.Mapping, j.InitialReplPolicy} puller.Pull() j.task.Finish() - closeRPCWithTimeout(log, client, time.Second*10, "") + closeRPCWithTimeout(j.task, client, time.Second*10, "") - log.Info("starting prune") - prunectx := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "prune")) + j.task.Enter("prune") pruner, err := j.Pruner(j.task, PrunePolicySideDefault, false) if err != nil { - log.WithError(err).Error("error creating pruner") - return - } - - pruner.Run(prunectx) - log.Info("finish prune") - - log.Info("wait for next interval") - select { - case <-ctx.Done(): - log.WithError(ctx.Err()).Info("context") - return - case <-ticker.C: - goto start + j.task.Log().WithError(err).Error("error creating pruner") + } else { + pruner.Run(ctx) } + j.task.Finish() } @@ -168,6 +170,26 @@ func (j *PullJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Prune return } -func (j *PullJob) doRun(ctx context.Context) { +func closeRPCWithTimeout(task *Task, remote rpc.RPCClient, timeout time.Duration, goodbye string) { + task.Log().Info("closing rpc connection") + + ch := make(chan error) + go func() { + ch <- remote.Close() + close(ch) + }() + + var err error + select { + case <-time.After(timeout): + err = fmt.Errorf("timeout exceeded (%s)", timeout) + case closeRequestErr := <-ch: + err = closeRequestErr + } + + if err != nil { + task.Log().WithError(err).Error("error closing connection") + } + return } diff --git a/cmd/replication.go b/cmd/replication.go index d4cb5bc..cd9ef9a 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -3,7 +3,6 @@ package cmd import ( "fmt" "io" - "time" "bytes" "encoding/json" @@ -26,29 +25,6 @@ const ( InitialReplPolicyAll InitialReplPolicy = "all" ) -func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration, goodbye string) { - log.Info("closing rpc connection") - - ch := make(chan error) - go func() { - ch <- remote.Close() - close(ch) - }() - - var err error - select { - case <-time.After(timeout): - err = fmt.Errorf("timeout exceeded (%s)", timeout) - case closeRequestErr := <-ch: - err = closeRequestErr - } - - if err != nil { - log.WithError(err).Error("error closing connection") - } - return -} - type Puller struct { task *Task Remote rpc.RPCClient