job source: refactor + use Task API

refs #10
This commit is contained in:
Christian Schwarz 2017-12-26 22:29:46 +01:00
parent 7d89d1fb00
commit 63fa7a67e9

View File

@ -93,13 +93,10 @@ func (j *SourceJob) JobStart(ctx context.Context) {
return return
} }
snapContext := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "autosnap"))
prunerContext := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "prune"))
serveContext := context.WithValue(ctx, contextKeyLog, log.WithField(logTaskField, "serve"))
didSnaps := make(chan struct{}) didSnaps := make(chan struct{})
go j.serve(serveContext) go j.serve(ctx, j.serveTask)
go a.Run(snapContext, didSnaps) go a.Run(ctx, didSnaps)
outer: outer:
for { for {
@ -108,11 +105,11 @@ outer:
break outer break outer
case <-didSnaps: case <-didSnaps:
log.Info("starting pruner") log.Info("starting pruner")
p.Run(prunerContext) p.Run(ctx)
log.Info("pruner done") log.Info("pruner done")
} }
} }
log.WithError(prunerContext.Err()).Info("context") log.WithError(ctx.Err()).Info("context")
} }
@ -137,7 +134,7 @@ func (j *SourceJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pru
return return
} }
func (j *SourceJob) serve(ctx context.Context) { func (j *SourceJob) serve(ctx context.Context, task *Task) {
log := ctx.Value(contextKeyLog).(Logger) log := ctx.Value(contextKeyLog).(Logger)
@ -176,25 +173,7 @@ outer:
break outer break outer
} }
rwc, err := util.NewReadWriteCloserLogger(rwcMsg.rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump) j.handleConnection(rwcMsg.rwc, task)
if err != nil {
panic(err)
}
// construct connection handler
handler := NewHandler(log, j.Filesystems, NewPrefixFilter(j.SnapshotPrefix))
// handle connection
rpcServer := rpc.NewServer(rwc)
if j.Debug.RPC.Log {
rpclog := log.WithField("subsystem", "rpc")
rpcServer.SetLogger(rpclog, true)
}
registerEndpoints(rpcServer, handler)
if err = rpcServer.Serve(); err != nil {
log.WithError(err).Error("error serving connection")
}
rwc.Close()
case <-ctx.Done(): case <-ctx.Done():
log.WithError(ctx.Err()).Info("context") log.WithError(ctx.Err()).Info("context")
@ -204,7 +183,8 @@ outer:
} }
log.Info("closing listener") task.Enter("close_listener")
defer task.Finish()
err = listener.Close() err = listener.Close()
if err != nil { if err != nil {
log.WithError(err).Error("error closing listener") log.WithError(err).Error("error closing listener")
@ -213,3 +193,29 @@ outer:
return return
} }
func (j *SourceJob) handleConnection(rwc io.ReadWriteCloser, task *Task) {
task.Enter("handle_connection")
defer task.Finish()
rwc, err := util.NewReadWriteCloserLogger(rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump)
if err != nil {
panic(err)
}
// construct connection handler
handler := NewHandler(task.Log(), j.Filesystems, NewPrefixFilter(j.SnapshotPrefix))
// handle connection
rpcServer := rpc.NewServer(rwc)
if j.Debug.RPC.Log {
rpclog := task.Log().WithField("subsystem", "rpc")
rpcServer.SetLogger(rpclog, true)
}
registerEndpoints(rpcServer, handler)
if err = rpcServer.Serve(); err != nil {
task.Log().WithError(err).Error("error serving connection")
}
rwc.Close()
}