job source with new config

This commit is contained in:
Anton Schirg 2018-08-27 15:19:56 +02:00
parent 16e1396261
commit b0d17803f0

View File

@ -4,9 +4,9 @@ import (
"context" "context"
"time" "time"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/problame/go-streamrpc" "github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/cmd/endpoint" "github.com/zrepl/zrepl/cmd/endpoint"
"net" "net"
) )
@ -18,59 +18,36 @@ type SourceJob struct {
SnapshotPrefix string SnapshotPrefix string
Interval time.Duration Interval time.Duration
Prune PrunePolicy Prune PrunePolicy
Debug JobDebugSettings
} }
func parseSourceJob(c JobParsingContext, name string, i map[string]interface{}) (j *SourceJob, err error) { func parseSourceJob(c config.Global, in config.SourceJob) (j *SourceJob, err error) {
j = &SourceJob{
var asMap struct { Name: in.Name,
Serve map[string]interface{} Interval: in.Snapshotting.Interval,
Filesystems map[string]string
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
Interval string
Prune map[string]interface{}
Debug map[string]interface{}
} }
if err = mapstructure.Decode(i, &asMap); err != nil { if j.Serve, err = parseAuthenticatedChannelListenerFactory(c, in.Replication.Serve); err != nil {
err = errors.Wrap(err, "mapstructure error")
return nil, err
}
j = &SourceJob{Name: name}
if j.Serve, err = parseAuthenticatedChannelListenerFactory(c, asMap.Serve); err != nil {
return return
} }
if j.Filesystems, err = parseDatasetMapFilter(asMap.Filesystems, true); err != nil { if j.Filesystems, err = parseDatasetMapFilter(in.Replication.Filesystems, true); err != nil {
return return
} }
if j.SnapshotPrefix, err = parseSnapshotPrefix(asMap.SnapshotPrefix); err != nil { if j.SnapshotPrefix, err = parseSnapshotPrefix(in.Snapshotting.SnapshotPrefix); err != nil {
return return
} }
if j.Interval, err = parsePostitiveDuration(asMap.Interval); err != nil { if j.Prune, err = parsePrunePolicy(in.Pruning, true); err != nil {
err = errors.Wrap(err, "cannot parse 'interval'")
return
}
if j.Prune, err = parsePrunePolicy(asMap.Prune, true); err != nil {
err = errors.Wrap(err, "cannot parse 'prune'") err = errors.Wrap(err, "cannot parse 'prune'")
return return
} }
if err = mapstructure.Decode(asMap.Debug, &j.Debug); err != nil { if in.Debug.Conn.ReadDump != "" || in.Debug.Conn.WriteDump != "" {
err = errors.Wrap(err, "cannot parse 'debug'")
return
}
if j.Debug.Conn.ReadDump != "" || j.Debug.Conn.WriteDump != "" {
logServe := logListenerFactory{ logServe := logListenerFactory{
ListenerFactory: j.Serve, ListenerFactory: j.Serve,
ReadDump: j.Debug.Conn.ReadDump, ReadDump: in.Debug.Conn.ReadDump,
WriteDump: j.Debug.Conn.WriteDump, WriteDump: in.Debug.Conn.WriteDump,
} }
j.Serve = logServe j.Serve = logServe
} }