mirror of
https://github.com/zrepl/zrepl.git
synced 2025-06-04 17:16:23 +02:00
convert more code to structured logging
This commit is contained in:
parent
83edcb3889
commit
c31ec8c646
@ -142,7 +142,7 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
|
|||||||
select {
|
select {
|
||||||
case didSnaps <- struct{}{}:
|
case didSnaps <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
a.log.Warn("warning: callback channel is full, discarding")
|
a.log.Error("warning: callback channel is full, discarding")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ func (j *LocalJob) JobName() string {
|
|||||||
func (j *LocalJob) JobStart(ctx context.Context) {
|
func (j *LocalJob) JobStart(ctx context.Context) {
|
||||||
|
|
||||||
log := ctx.Value(contextKeyLog).(Logger)
|
log := ctx.Value(contextKeyLog).(Logger)
|
||||||
defer log.Printf("exiting")
|
defer log.Info("exiting")
|
||||||
|
|
||||||
local := rpc.NewLocalRPC()
|
local := rpc.NewLocalRPC()
|
||||||
// Allow access to any dataset since we control what mapping
|
// Allow access to any dataset since we control what mapping
|
||||||
@ -102,12 +102,12 @@ func (j *LocalJob) JobStart(ctx context.Context) {
|
|||||||
|
|
||||||
plhs, err := j.Pruner(PrunePolicySideLeft, false)
|
plhs, err := j.Pruner(PrunePolicySideLeft, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error creating lhs pruner: %s", err)
|
log.WithError(err).Error("error creating lhs pruner")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
prhs, err := j.Pruner(PrunePolicySideRight, false)
|
prhs, err := j.Pruner(PrunePolicySideRight, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error creating rhs pruner: %s", err)
|
log.WithError(err).Error("error creating rhs pruner")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,16 +130,16 @@ outer:
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break outer
|
break outer
|
||||||
case <-didSnaps:
|
case <-didSnaps:
|
||||||
log.Printf("finished taking snapshots")
|
log.Debug("finished taking snapshots")
|
||||||
log.Printf("starting replication procedure")
|
log.Info("starting replication procedure")
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
log := pullCtx.Value(contextKeyLog).(Logger)
|
log := pullCtx.Value(contextKeyLog).(Logger)
|
||||||
log.Printf("replicating from lhs to rhs")
|
log.Debug("replicating from lhs to rhs")
|
||||||
err := doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy})
|
err := doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error replicating lhs to rhs: %s", err)
|
log.WithError(err).Error("error replicating lhs to rhs")
|
||||||
}
|
}
|
||||||
// use a ctx as soon as doPull gains ctx support
|
// use a ctx as soon as doPull gains ctx support
|
||||||
select {
|
select {
|
||||||
@ -151,14 +151,14 @@ outer:
|
|||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
log.Printf("pruning lhs")
|
log.Info("pruning lhs")
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
plhs.Run(plCtx)
|
plhs.Run(plCtx)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
log.Printf("pruning rhs")
|
log.Info("pruning rhs")
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
prhs.Run(prCtx)
|
prhs.Run(prCtx)
|
||||||
@ -169,7 +169,7 @@ outer:
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("context: %s", ctx.Err())
|
log.WithError(ctx.Err()).Info("context")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,16 +94,16 @@ func (j *PullJob) JobName() string {
|
|||||||
func (j *PullJob) JobStart(ctx context.Context) {
|
func (j *PullJob) JobStart(ctx context.Context) {
|
||||||
|
|
||||||
log := ctx.Value(contextKeyLog).(Logger)
|
log := ctx.Value(contextKeyLog).(Logger)
|
||||||
defer log.Printf("exiting")
|
defer log.Info("exiting")
|
||||||
|
|
||||||
ticker := time.NewTicker(j.Interval)
|
ticker := time.NewTicker(j.Interval)
|
||||||
|
|
||||||
start:
|
start:
|
||||||
|
|
||||||
log.Printf("connecting")
|
log.Info("connecting")
|
||||||
rwc, err := j.Connect.Connect()
|
rwc, err := j.Connect.Connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error connecting: %s", err)
|
log.WithError(err).Error("error connecting")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,31 +117,31 @@ start:
|
|||||||
client.SetLogger(log, true)
|
client.SetLogger(log, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("starting pull")
|
log.Info("starting pull")
|
||||||
|
|
||||||
pullLog := log.WithField(logTaskField, "pull")
|
pullLog := log.WithField(logTaskField, "pull")
|
||||||
err = doPull(PullContext{client, pullLog, j.Mapping, j.InitialReplPolicy})
|
err = doPull(PullContext{client, pullLog, j.Mapping, j.InitialReplPolicy})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error doing pull: %s", err)
|
log.WithError(err).Error("error doing pull")
|
||||||
}
|
}
|
||||||
|
|
||||||
closeRPCWithTimeout(log, client, time.Second*10, "")
|
closeRPCWithTimeout(log, client, time.Second*10, "")
|
||||||
|
|
||||||
log.Printf("starting prune")
|
log.Info("starting prune")
|
||||||
prunectx := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "prune"))
|
prunectx := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "prune"))
|
||||||
pruner, err := j.Pruner(PrunePolicySideDefault, false)
|
pruner, err := j.Pruner(PrunePolicySideDefault, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error creating pruner: %s", err)
|
log.WithError(err).Error("error creating pruner")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
pruner.Run(prunectx)
|
pruner.Run(prunectx)
|
||||||
log.Printf("finish prune")
|
log.Info("finish prune")
|
||||||
|
|
||||||
log.Printf("wait for next interval")
|
log.Info("wait for next interval")
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Printf("context: %s", ctx.Err())
|
log.WithError(ctx.Err()).Info("context")
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
goto start
|
goto start
|
||||||
|
@ -75,12 +75,12 @@ func (j *SourceJob) JobName() string {
|
|||||||
func (j *SourceJob) JobStart(ctx context.Context) {
|
func (j *SourceJob) JobStart(ctx context.Context) {
|
||||||
|
|
||||||
log := ctx.Value(contextKeyLog).(Logger)
|
log := ctx.Value(contextKeyLog).(Logger)
|
||||||
defer log.Printf("exiting")
|
defer log.Info("exiting")
|
||||||
|
|
||||||
a := IntervalAutosnap{DatasetFilter: j.Datasets, Prefix: j.SnapshotPrefix, SnapshotInterval: j.Interval}
|
a := IntervalAutosnap{DatasetFilter: j.Datasets, Prefix: j.SnapshotPrefix, SnapshotInterval: j.Interval}
|
||||||
p, err := j.Pruner(PrunePolicySideDefault, false)
|
p, err := j.Pruner(PrunePolicySideDefault, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error creating pruner: %s", err)
|
log.WithError(err).Error("error creating pruner")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,12 +98,12 @@ outer:
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break outer
|
break outer
|
||||||
case <-didSnaps:
|
case <-didSnaps:
|
||||||
log.Printf("starting pruner")
|
log.Info("starting pruner")
|
||||||
p.Run(prunerContext)
|
p.Run(prunerContext)
|
||||||
log.Printf("pruner done")
|
log.Info("pruner done")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("context: %s", prunerContext.Err())
|
log.WithError(prunerContext.Err()).Info("context")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,7 +124,7 @@ func (j *SourceJob) serve(ctx context.Context) {
|
|||||||
|
|
||||||
listener, err := j.Serve.Listen()
|
listener, err := j.Serve.Listen()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error listening: %s", err)
|
log.WithError(err).Error("error listening")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -137,7 +137,7 @@ outer:
|
|||||||
go func() {
|
go func() {
|
||||||
rwc, err := listener.Accept()
|
rwc, err := listener.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error accepting connection: %s", err)
|
log.WithError(err).Error("error accepting connection")
|
||||||
close(rwcChan)
|
close(rwcChan)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -168,22 +168,22 @@ outer:
|
|||||||
}
|
}
|
||||||
registerEndpoints(rpcServer, handler)
|
registerEndpoints(rpcServer, handler)
|
||||||
if err = rpcServer.Serve(); err != nil {
|
if err = rpcServer.Serve(); err != nil {
|
||||||
log.Printf("error serving connection: %s", err)
|
log.WithError(err).Error("error serving connection")
|
||||||
}
|
}
|
||||||
rwc.Close()
|
rwc.Close()
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Printf("context: %s", ctx.Err())
|
log.WithError(ctx.Err()).Info("context")
|
||||||
break outer
|
break outer
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("closing listener")
|
log.Info("closing listener")
|
||||||
err = listener.Close()
|
err = listener.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error closing listener: %s", err)
|
log.WithError(err).Error("error closing listener")
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -68,12 +68,11 @@ func (d *Daemon) Loop(ctx context.Context) {
|
|||||||
|
|
||||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
log.Printf("starting jobs from config")
|
log.Info("starting jobs from config")
|
||||||
i := 0
|
i := 0
|
||||||
for _, job := range d.conf.Jobs {
|
for _, job := range d.conf.Jobs {
|
||||||
log.Printf("starting job %s", job.JobName())
|
|
||||||
|
|
||||||
logger := log.WithField(logJobField, job.JobName())
|
logger := log.WithField(logJobField, job.JobName())
|
||||||
|
logger.Info("starting")
|
||||||
i++
|
i++
|
||||||
jobCtx := context.WithValue(ctx, contextKeyLog, logger)
|
jobCtx := context.WithValue(ctx, contextKeyLog, logger)
|
||||||
go func(j Job) {
|
go func(j Job) {
|
||||||
@ -86,23 +85,22 @@ func (d *Daemon) Loop(ctx context.Context) {
|
|||||||
outer:
|
outer:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case j := <-finishs:
|
case <-finishs:
|
||||||
log.Printf("job finished: %s", j.JobName())
|
|
||||||
finishCount++
|
finishCount++
|
||||||
if finishCount == len(d.conf.Jobs) {
|
if finishCount == len(d.conf.Jobs) {
|
||||||
log.Printf("all jobs finished")
|
log.Info("all jobs finished")
|
||||||
break outer
|
break outer
|
||||||
}
|
}
|
||||||
|
|
||||||
case sig := <-sigChan:
|
case sig := <-sigChan:
|
||||||
log.Printf("received signal: %s", sig)
|
log.WithField("signal", sig).Info("received signal")
|
||||||
log.Printf("cancelling all jobs")
|
log.Info("cancelling all jobs")
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
signal.Stop(sigChan)
|
signal.Stop(sigChan)
|
||||||
|
|
||||||
log.Printf("exiting")
|
log.Info("exiting")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration, goodbye string) {
|
func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration, goodbye string) {
|
||||||
log.Printf("closing rpc connection")
|
log.Info("closing rpc connection")
|
||||||
|
|
||||||
ch := make(chan error)
|
ch := make(chan error)
|
||||||
go func() {
|
go func() {
|
||||||
@ -47,7 +47,7 @@ func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error closing connection: %s", err)
|
log.WithError(err).Error("error closing connection")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -84,14 +84,14 @@ func doPull(pull PullContext) (err error) {
|
|||||||
localFs, err = pull.Mapping.Map(remoteFilesystems[fs])
|
localFs, err = pull.Mapping.Map(remoteFilesystems[fs])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := fmt.Errorf("error mapping %s: %s", remoteFilesystems[fs], err)
|
err := fmt.Errorf("error mapping %s: %s", remoteFilesystems[fs], err)
|
||||||
log.WithError(err).Error()
|
log.WithError(err).WithField(logMapFromField, remoteFilesystems[fs]).Error("cannot map")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if localFs == nil {
|
if localFs == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.WithField(logMapFromField, remoteFilesystems[fs].ToString()).
|
log.WithField(logMapFromField, remoteFilesystems[fs].ToString()).
|
||||||
WithField(logMapToField, localFs.ToString()).Debug()
|
WithField(logMapToField, localFs.ToString()).Debug("mapping")
|
||||||
m := RemoteLocalMapping{remoteFilesystems[fs], localFs}
|
m := RemoteLocalMapping{remoteFilesystems[fs], localFs}
|
||||||
replMapping[m.Local.ToString()] = m
|
replMapping[m.Local.ToString()] = m
|
||||||
localTraversal.Add(m.Local)
|
localTraversal.Add(m.Local)
|
||||||
@ -194,7 +194,7 @@ func doPull(pull PullContext) (err error) {
|
|||||||
FilesystemVersion: snapsOnly[len(snapsOnly)-1],
|
FilesystemVersion: snapsOnly[len(snapsOnly)-1],
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("requesting snapshot stream for %s", r.FilesystemVersion)
|
log.WithField("version", r.FilesystemVersion).Debug("requesting snapshot stream")
|
||||||
|
|
||||||
var stream io.Reader
|
var stream io.Reader
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user