diff --git a/cmd/autosnap.go b/cmd/autosnap.go index 116d210..0c3d1d9 100644 --- a/cmd/autosnap.go +++ b/cmd/autosnap.go @@ -142,7 +142,7 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) { select { case didSnaps <- struct{}{}: default: - a.log.Warn("warning: callback channel is full, discarding") + a.log.Error("warning: callback channel is full, discarding") } } diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go index 99abd21..1a70d81 100644 --- a/cmd/config_job_local.go +++ b/cmd/config_job_local.go @@ -82,7 +82,7 @@ func (j *LocalJob) JobName() string { func (j *LocalJob) JobStart(ctx context.Context) { log := ctx.Value(contextKeyLog).(Logger) - defer log.Printf("exiting") + defer log.Info("exiting") local := rpc.NewLocalRPC() // Allow access to any dataset since we control what mapping @@ -102,12 +102,12 @@ func (j *LocalJob) JobStart(ctx context.Context) { plhs, err := j.Pruner(PrunePolicySideLeft, false) if err != nil { - log.Printf("error creating lhs pruner: %s", err) + log.WithError(err).Error("error creating lhs pruner") return } prhs, err := j.Pruner(PrunePolicySideRight, false) if err != nil { - log.Printf("error creating rhs pruner: %s", err) + log.WithError(err).Error("error creating rhs pruner") return } @@ -130,16 +130,16 @@ outer: case <-ctx.Done(): break outer case <-didSnaps: - log.Printf("finished taking snapshots") - log.Printf("starting replication procedure") + log.Debug("finished taking snapshots") + log.Info("starting replication procedure") } { log := pullCtx.Value(contextKeyLog).(Logger) - log.Printf("replicating from lhs to rhs") + log.Debug("replicating from lhs to rhs") err := doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy}) if err != nil { - log.Printf("error replicating lhs to rhs: %s", err) + log.WithError(err).Error("error replicating lhs to rhs") } // use a ctx as soon as doPull gains ctx support select { @@ -151,14 +151,14 @@ outer: var wg sync.WaitGroup - log.Printf("pruning lhs") + log.Info("pruning lhs") wg.Add(1) go func() { plhs.Run(plCtx) wg.Done() }() - log.Printf("pruning rhs") + log.Info("pruning rhs") wg.Add(1) go func() { prhs.Run(prCtx) @@ -169,7 +169,7 @@ outer: } - log.Printf("context: %s", ctx.Err()) + log.WithError(ctx.Err()).Info("context") } diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index c305f99..2afe7ae 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -94,16 +94,16 @@ func (j *PullJob) JobName() string { func (j *PullJob) JobStart(ctx context.Context) { log := ctx.Value(contextKeyLog).(Logger) - defer log.Printf("exiting") + defer log.Info("exiting") ticker := time.NewTicker(j.Interval) start: - log.Printf("connecting") + log.Info("connecting") rwc, err := j.Connect.Connect() if err != nil { - log.Printf("error connecting: %s", err) + log.WithError(err).Error("error connecting") return } @@ -117,31 +117,31 @@ start: client.SetLogger(log, true) } - log.Printf("starting pull") + log.Info("starting pull") pullLog := log.WithField(logTaskField, "pull") err = doPull(PullContext{client, pullLog, j.Mapping, j.InitialReplPolicy}) if err != nil { - log.Printf("error doing pull: %s", err) + log.WithError(err).Error("error doing pull") } closeRPCWithTimeout(log, client, time.Second*10, "") - log.Printf("starting prune") + log.Info("starting prune") prunectx := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "prune")) pruner, err := j.Pruner(PrunePolicySideDefault, false) if err != nil { - log.Printf("error creating pruner: %s", err) + log.WithError(err).Error("error creating pruner") return } pruner.Run(prunectx) - log.Printf("finish prune") + log.Info("finish prune") - log.Printf("wait for next interval") + log.Info("wait for next interval") select { case <-ctx.Done(): - log.Printf("context: %s", ctx.Err()) + log.WithError(ctx.Err()).Info("context") return case <-ticker.C: goto start diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go index 238b875..7d67522 100644 --- a/cmd/config_job_source.go +++ b/cmd/config_job_source.go @@ -75,12 +75,12 @@ func (j *SourceJob) JobName() string { func (j *SourceJob) JobStart(ctx context.Context) { log := ctx.Value(contextKeyLog).(Logger) - defer log.Printf("exiting") + defer log.Info("exiting") a := IntervalAutosnap{DatasetFilter: j.Datasets, Prefix: j.SnapshotPrefix, SnapshotInterval: j.Interval} p, err := j.Pruner(PrunePolicySideDefault, false) if err != nil { - log.Printf("error creating pruner: %s", err) + log.WithError(err).Error("error creating pruner") return } @@ -98,12 +98,12 @@ outer: case <-ctx.Done(): break outer case <-didSnaps: - log.Printf("starting pruner") + log.Info("starting pruner") p.Run(prunerContext) - log.Printf("pruner done") + log.Info("pruner done") } } - log.Printf("context: %s", prunerContext.Err()) + log.WithError(prunerContext.Err()).Info("context") } @@ -124,7 +124,7 @@ func (j *SourceJob) serve(ctx context.Context) { listener, err := j.Serve.Listen() if err != nil { - log.Printf("error listening: %s", err) + log.WithError(err).Error("error listening") return } @@ -137,7 +137,7 @@ outer: go func() { rwc, err := listener.Accept() if err != nil { - log.Printf("error accepting connection: %s", err) + log.WithError(err).Error("error accepting connection") close(rwcChan) return } @@ -168,22 +168,22 @@ outer: } registerEndpoints(rpcServer, handler) if err = rpcServer.Serve(); err != nil { - log.Printf("error serving connection: %s", err) + log.WithError(err).Error("error serving connection") } rwc.Close() case <-ctx.Done(): - log.Printf("context: %s", ctx.Err()) + log.WithError(ctx.Err()).Info("context") break outer } } - log.Printf("closing listener") + log.Info("closing listener") err = listener.Close() if err != nil { - log.Printf("error closing listener: %s", err) + log.WithError(err).Error("error closing listener") } return diff --git a/cmd/daemon.go b/cmd/daemon.go index 31e4b4c..de17adf 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -68,12 +68,11 @@ func (d *Daemon) Loop(ctx context.Context) { signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - log.Printf("starting jobs from config") + log.Info("starting jobs from config") i := 0 for _, job := range d.conf.Jobs { - log.Printf("starting job %s", job.JobName()) - logger := log.WithField(logJobField, job.JobName()) + logger.Info("starting") i++ jobCtx := context.WithValue(ctx, contextKeyLog, logger) go func(j Job) { @@ -86,23 +85,22 @@ func (d *Daemon) Loop(ctx context.Context) { outer: for { select { - case j := <-finishs: - log.Printf("job finished: %s", j.JobName()) + case <-finishs: finishCount++ if finishCount == len(d.conf.Jobs) { - log.Printf("all jobs finished") + log.Info("all jobs finished") break outer } case sig := <-sigChan: - log.Printf("received signal: %s", sig) - log.Printf("cancelling all jobs") + log.WithField("signal", sig).Info("received signal") + log.Info("cancelling all jobs") cancel() } } signal.Stop(sigChan) - log.Printf("exiting") + log.Info("exiting") } diff --git a/cmd/replication.go b/cmd/replication.go index 77a764a..4ecc6e9 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -30,7 +30,7 @@ const ( ) func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration, goodbye string) { - log.Printf("closing rpc connection") + log.Info("closing rpc connection") ch := make(chan error) go func() { @@ -47,7 +47,7 @@ func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration } if err != nil { - log.Printf("error closing connection: %s", err) + log.WithError(err).Error("error closing connection") } return } @@ -84,14 +84,14 @@ func doPull(pull PullContext) (err error) { localFs, err = pull.Mapping.Map(remoteFilesystems[fs]) if err != nil { err := fmt.Errorf("error mapping %s: %s", remoteFilesystems[fs], err) - log.WithError(err).Error() + log.WithError(err).WithField(logMapFromField, remoteFilesystems[fs]).Error("cannot map") return err } if localFs == nil { continue } log.WithField(logMapFromField, remoteFilesystems[fs].ToString()). - WithField(logMapToField, localFs.ToString()).Debug() + WithField(logMapToField, localFs.ToString()).Debug("mapping") m := RemoteLocalMapping{remoteFilesystems[fs], localFs} replMapping[m.Local.ToString()] = m localTraversal.Add(m.Local) @@ -194,7 +194,7 @@ func doPull(pull PullContext) (err error) { FilesystemVersion: snapsOnly[len(snapsOnly)-1], } - log.Debug("requesting snapshot stream for %s", r.FilesystemVersion) + log.WithField("version", r.FilesystemVersion).Debug("requesting snapshot stream") var stream io.Reader