implement pull + sink modes for active and passive side

This commit is contained in:
Christian Schwarz 2018-09-24 12:31:29 +02:00
parent 6889f441b2
commit d04b9713c4
5 changed files with 120 additions and 4 deletions

View File

@ -188,7 +188,7 @@ func (t *tui) draw() {
t.setIndent(1)
t.newline()
if v.Type != job.TypePush {
if v.Type != job.TypePush && v.Type != job.TypePull {
t.printf("No status representation for job type '%s', dumping as YAML", v.Type)
t.newline()
asYaml, err := yaml.Marshal(v.JobSpecific)

View File

@ -11,9 +11,11 @@ import (
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/zfs"
"sync"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/snapper"
"time"
)
type ActiveSide struct {
@ -71,6 +73,57 @@ func modePushFromConfig(g *config.Global, in *config.PushJob) (*modePush, error)
return m, nil
}
type modePull struct {
rootDataset *zfs.DatasetPath
interval time.Duration
}
func (m *modePull) SenderReceiver(client *streamrpc.Client) (replication.Sender, replication.Receiver, error) {
sender := endpoint.NewRemote(client)
receiver, err := endpoint.NewReceiver(m.rootDataset)
return sender, receiver, err
}
func (*modePull) Type() Type { return TypePull }
func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) {
t := time.NewTicker(m.interval)
defer t.Stop()
for {
select {
case <-t.C:
select {
case wakeUpCommon <- struct{}{}:
default:
GetLogger(ctx).
WithField("pull_interval", m.interval).
Warn("pull job took longer than pull interval")
wakeUpCommon <- struct{}{} // block anyways, to queue up the wakeup
}
case <-ctx.Done():
return
}
}
}
func modePullFromConfig(g *config.Global, in *config.PullJob) (m *modePull, err error) {
m = &modePull{}
if in.Interval <= 0 {
return nil, errors.New("interval must be positive")
}
m.interval = in.Interval
m.rootDataset, err = zfs.NewDatasetPath(in.RootDataset)
if err != nil {
return nil, errors.New("root dataset is not a valid zfs filesystem path")
}
if m.rootDataset.Length() <= 0 {
return nil, errors.New("root dataset must not be empty") // duplicates error check of receiver
}
return m, nil
}
func activeSide(g *config.Global, in *config.ActiveJob, mode activeMode) (j *ActiveSide, err error) {
j = &ActiveSide{mode: mode}

View File

@ -22,6 +22,7 @@ func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) {
cannotBuildJob := func(e error, name string) (Job, error) {
return nil, errors.Wrapf(err, "cannot build job %q", name)
}
// FIXME prettify this
switch v := in.Ret.(type) {
case *config.SinkJob:
m, err := modeSinkFromConfig(c, v)
@ -32,6 +33,15 @@ func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) {
if err != nil {
return cannotBuildJob(err, v.Name)
}
case *config.SourceJob:
m, err := modeSourceFromConfig(c, v)
if err != nil {
return cannotBuildJob(err, v.Name)
}
j, err = passiveSideFromConfig(c, &v.PassiveJob, m)
if err != nil {
return cannotBuildJob(err, v.Name)
}
case *config.PushJob:
m, err := modePushFromConfig(c, v)
if err != nil {
@ -41,6 +51,15 @@ func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) {
if err != nil {
return cannotBuildJob(err, v.Name)
}
case *config.PullJob:
m, err := modePullFromConfig(c, v)
if err != nil {
return cannotBuildJob(err, v.Name)
}
j, err = activeSide(c, &v.ActiveJob, m)
if err != nil {
return cannotBuildJob(err, v.Name)
}
default:
panic(fmt.Sprintf("implementation error: unknown job type %T", v))
}

View File

@ -102,10 +102,12 @@ func (s *Status) UnmarshalJSON(in []byte) (err error) {
return fmt.Errorf("field '%s', not found", key)
}
switch s.Type {
case TypePull: fallthrough
case TypePush:
var st ActiveSideStatus
err = json.Unmarshal(jobJSON, &st)
s.JobSpecific = &st
case TypeSource: fallthrough
case TypeSink:
var st PassiveStatus
err = json.Unmarshal(jobJSON, &st)

View File

@ -6,11 +6,13 @@ import (
"github.com/problame/go-streamrpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/serve"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/endpoint"
"path"
"github.com/zrepl/zrepl/zfs"
"path"
)
type PassiveSide struct {
@ -22,6 +24,7 @@ type PassiveSide struct {
type passiveMode interface {
ConnHandleFunc(ctx context.Context, conn serve.AuthenticatedConn) streamrpc.HandlerFunc
RunPeriodic(ctx context.Context)
Type() Type
}
@ -53,6 +56,8 @@ func (m *modeSink) ConnHandleFunc(ctx context.Context, conn serve.AuthenticatedC
return h.Handle
}
func (m *modeSink) RunPeriodic(_ context.Context) {}
func modeSinkFromConfig(g *config.Global, in *config.SinkJob) (m *modeSink, err error) {
m = &modeSink{}
m.rootDataset, err = zfs.NewDatasetPath(in.RootDataset)
@ -65,6 +70,39 @@ func modeSinkFromConfig(g *config.Global, in *config.SinkJob) (m *modeSink, err
return m, nil
}
type modeSource struct {
fsfilter zfs.DatasetFilter
snapper *snapper.Snapper
}
func modeSourceFromConfig(g *config.Global, in *config.SourceJob) (m *modeSource, err error) {
// FIXME exact dedup of modePush
m = &modeSource{}
fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems)
if err != nil {
return nil, errors.Wrap(err, "cannnot build filesystem filter")
}
m.fsfilter = fsf
if m.snapper, err = snapper.FromConfig(g, fsf, &in.Snapshotting); err != nil {
return nil, errors.Wrap(err, "cannot build snapper")
}
return m, nil
}
func (m *modeSource) Type() Type { return TypeSource }
func (m *modeSource) ConnHandleFunc(ctx context.Context, conn serve.AuthenticatedConn) streamrpc.HandlerFunc {
sender := endpoint.NewSender(m.fsfilter)
h := endpoint.NewHandler(sender)
return h.Handle
}
func (m *modeSource) RunPeriodic(ctx context.Context) {
m.snapper.Run(ctx, nil)
}
func passiveSideFromConfig(g *config.Global, in *config.PassiveJob, mode passiveMode) (s *PassiveSide, err error) {
s = &PassiveSide{mode: mode, name: in.Name}
@ -97,10 +135,14 @@ func (j *PassiveSide) Run(ctx context.Context) {
}
defer l.Close()
{
ctx, cancel := context.WithCancel(logging.WithSubsystemLoggers(ctx, log)) // shadowing
defer cancel()
go j.mode.RunPeriodic(ctx)
}
log.WithField("addr", l.Addr()).Debug("accepting connections")
var connId int
outer:
for {