zrepl/cmd/config_job_source.go
Christian Schwarz a0f72b585b remove JobStatus, Task abstraction and 'control status' subcommand
Control status will be replaced by job-specific output at some point.

Task was not useful anymore with state machine, may reintroduce
something similar at a later point, but consider alternatives:

- opentracing.io
- embedding everything in ctx
	- activity stack would work easily
	- log entries via proxy logger.Logger object
- progress reporting should be in status reports of individial jobs
2018-08-26 19:08:30 +02:00

196 lines
4.3 KiB
Go

package cmd
import (
"context"
"time"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/cmd/endpoint"
"net"
)
type SourceJob struct {
Name string
Serve ListenerFactory
Filesystems *DatasetMapFilter
SnapshotPrefix string
Interval time.Duration
Prune PrunePolicy
Debug JobDebugSettings
}
func parseSourceJob(c JobParsingContext, name string, i map[string]interface{}) (j *SourceJob, err error) {
var asMap struct {
Serve map[string]interface{}
Filesystems map[string]string
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
Interval string
Prune map[string]interface{}
Debug map[string]interface{}
}
if err = mapstructure.Decode(i, &asMap); err != nil {
err = errors.Wrap(err, "mapstructure error")
return nil, err
}
j = &SourceJob{Name: name}
if j.Serve, err = parseAuthenticatedChannelListenerFactory(c, asMap.Serve); err != nil {
return
}
if j.Filesystems, err = parseDatasetMapFilter(asMap.Filesystems, true); err != nil {
return
}
if j.SnapshotPrefix, err = parseSnapshotPrefix(asMap.SnapshotPrefix); err != nil {
return
}
if j.Interval, err = parsePostitiveDuration(asMap.Interval); err != nil {
err = errors.Wrap(err, "cannot parse 'interval'")
return
}
if j.Prune, err = parsePrunePolicy(asMap.Prune, true); err != nil {
err = errors.Wrap(err, "cannot parse 'prune'")
return
}
if err = mapstructure.Decode(asMap.Debug, &j.Debug); err != nil {
err = errors.Wrap(err, "cannot parse 'debug'")
return
}
if j.Debug.Conn.ReadDump != "" || j.Debug.Conn.WriteDump != "" {
logServe := logListenerFactory{
ListenerFactory: j.Serve,
ReadDump: j.Debug.Conn.ReadDump,
WriteDump: j.Debug.Conn.WriteDump,
}
j.Serve = logServe
}
return
}
func (j *SourceJob) JobName() string {
return j.Name
}
func (j *SourceJob) JobType() JobType { return JobTypeSource }
func (j *SourceJob) JobStart(ctx context.Context) {
log := getLogger(ctx)
defer log.Info("exiting")
a := IntervalAutosnap{j.Filesystems, j.SnapshotPrefix, j.Interval}
p, err := j.Pruner(PrunePolicySideDefault, false)
if err != nil {
log.WithError(err).Error("error creating pruner")
return
}
didSnaps := make(chan struct{})
go j.serve(ctx) // logSubsysField set by handleConnection
go a.Run(WithLogger(ctx, log.WithField(logSubsysField, "snap")), didSnaps)
outer:
for {
select {
case <-ctx.Done():
break outer
case <-didSnaps:
p.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune")))
}
}
log.WithError(ctx.Err()).Info("context")
}
func (j *SourceJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
p = Pruner{
time.Now(),
dryRun,
j.Filesystems,
j.SnapshotPrefix,
j.Prune,
}
return
}
func (j *SourceJob) serve(ctx context.Context) {
log := getLogger(ctx)
listener, err := j.Serve.Listen()
if err != nil {
getLogger(ctx).WithError(err).Error("error listening")
return
}
type connChanMsg struct {
conn net.Conn
err error
}
connChan := make(chan connChanMsg)
// Serve connections until interrupted or error
outer:
for {
go func() {
rwc, err := listener.Accept()
connChan <- connChanMsg{rwc, err}
}()
select {
case rwcMsg := <-connChan:
if rwcMsg.err != nil {
log.WithError(rwcMsg.err).Error("error accepting connection")
continue
}
j.handleConnection(ctx, rwcMsg.conn)
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context")
break outer
}
}
log.Info("closing listener")
err = listener.Close()
if err != nil {
log.WithError(err).Error("error closing listener")
}
return
}
func (j *SourceJob) handleConnection(ctx context.Context, conn net.Conn) {
log := getLogger(ctx)
log.Info("handling client connection")
senderEP := endpoint.NewSender(j.Filesystems, NewPrefixFilter(j.SnapshotPrefix))
ctx = endpoint.WithLogger(ctx, log.WithField(logSubsysField, "serve"))
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(logSubsysField, "rpc")})
handler := endpoint.NewHandler(senderEP)
if err := streamrpc.ServeConn(ctx, conn, STREAMRPC_CONFIG, handler.Handle); err != nil {
log.WithError(err).Error("error serving connection")
} else {
log.Info("client closed connection")
}
}