diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go index 4717155..ed44fca 100644 --- a/cmd/config_job_source.go +++ b/cmd/config_job_source.go @@ -93,13 +93,10 @@ func (j *SourceJob) JobStart(ctx context.Context) { return } - snapContext := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "autosnap")) - prunerContext := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "prune")) - serveContext := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "serve")) didSnaps := make(chan struct{}) - go j.serve(serveContext) - go a.Run(snapContext, didSnaps) + go j.serve(ctx, j.serveTask) + go a.Run(ctx, didSnaps) outer: for { @@ -108,11 +105,11 @@ outer: break outer case <-didSnaps: log.Info("starting pruner") - p.Run(prunerContext) + p.Run(ctx) log.Info("pruner done") } } - log.WithError(prunerContext.Err()).Info("context") + log.WithError(ctx.Err()).Info("context") } @@ -137,7 +134,7 @@ func (j *SourceJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pru return } -func (j *SourceJob) serve(ctx context.Context) { +func (j *SourceJob) serve(ctx context.Context, task *Task) { log := ctx.Value(contextKeyLog).(Logger) @@ -176,25 +173,7 @@ outer: break outer } - rwc, err := util.NewReadWriteCloserLogger(rwcMsg.rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump) - if err != nil { - panic(err) - } - - // construct connection handler - handler := NewHandler(log, j.Filesystems, NewPrefixFilter(j.SnapshotPrefix)) - - // handle connection - rpcServer := rpc.NewServer(rwc) - if j.Debug.RPC.Log { - rpclog := log.WithField("subsystem", "rpc") - rpcServer.SetLogger(rpclog, true) - } - registerEndpoints(rpcServer, handler) - if err = rpcServer.Serve(); err != nil { - log.WithError(err).Error("error serving connection") - } - rwc.Close() + j.handleConnection(rwcMsg.rwc, task) case <-ctx.Done(): log.WithError(ctx.Err()).Info("context") @@ -204,7 +183,8 @@ outer: } - log.Info("closing listener") + task.Enter("close_listener") + defer task.Finish() err = listener.Close() if err != nil { log.WithError(err).Error("error closing listener") @@ -213,3 +193,29 @@ outer: return } + +func (j *SourceJob) handleConnection(rwc io.ReadWriteCloser, task *Task) { + + task.Enter("handle_connection") + defer task.Finish() + + rwc, err := util.NewReadWriteCloserLogger(rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump) + if err != nil { + panic(err) + } + + // construct connection handler + handler := NewHandler(task.Log(), j.Filesystems, NewPrefixFilter(j.SnapshotPrefix)) + + // handle connection + rpcServer := rpc.NewServer(rwc) + if j.Debug.RPC.Log { + rpclog := task.Log().WithField("subsystem", "rpc") + rpcServer.SetLogger(rpclog, true) + } + registerEndpoints(rpcServer, handler) + if err = rpcServer.Serve(); err != nil { + task.Log().WithError(err).Error("error serving connection") + } + rwc.Close() +}