diff --git a/client/status.go b/client/status.go index 5d7a95a..c72ca10 100644 --- a/client/status.go +++ b/client/status.go @@ -202,9 +202,9 @@ func (t *tui) draw() { continue } - pushStatus, ok := v.JobSpecific.(*job.PushStatus) + pushStatus, ok := v.JobSpecific.(*job.ActiveSideStatus) if !ok || pushStatus == nil { - t.printf("PushStatus is null") + t.printf("ActiveSideStatus is null") t.newline() continue } diff --git a/config/config.go b/config/config.go index d661d4a..081e335 100644 --- a/config/config.go +++ b/config/config.go @@ -21,42 +21,42 @@ type JobEnum struct { Ret interface{} } -type PushJob struct { +type ActiveJob struct { Type string `yaml:"type"` Name string `yaml:"name"` Connect ConnectEnum `yaml:"connect"` - Filesystems FilesystemsFilter `yaml:"filesystems"` - Snapshotting Snapshotting `yaml:"snapshotting"` Pruning PruningSenderReceiver `yaml:"pruning"` Debug JobDebugSettings `yaml:"debug,optional"` } -type SinkJob struct { +type PassiveJob struct { Type string `yaml:"type"` Name string `yaml:"name"` - RootDataset string `yaml:"root_dataset"` Serve ServeEnum `yaml:"serve"` Debug JobDebugSettings `yaml:"debug,optional"` } +type PushJob struct { + ActiveJob `yaml:",inline"` + Snapshotting Snapshotting `yaml:"snapshotting"` + Filesystems FilesystemsFilter `yaml:"filesystems"` +} + type PullJob struct { - Type string `yaml:"type"` - Name string `yaml:"name"` - Connect ConnectEnum `yaml:"connect"` + ActiveJob `yaml:",inline"` RootDataset string `yaml:"root_dataset"` Interval time.Duration `yaml:"interval,positive"` - Pruning PruningSenderReceiver `yaml:"pruning"` - Debug JobDebugSettings `yaml:"debug,optional"` +} + +type SinkJob struct { + PassiveJob `yaml:",inline"` + RootDataset string `yaml:"root_dataset"` } type SourceJob struct { - Type string `yaml:"type"` - Name string `yaml:"name"` - Serve ServeEnum `yaml:"serve"` - Filesystems FilesystemsFilter `yaml:"filesystems"` + PassiveJob `yaml:",inline"` Snapshotting Snapshotting `yaml:"snapshotting"` - Pruning PruningLocal `yaml:"pruning"` - Debug JobDebugSettings `yaml:"debug,optional"` + Filesystems FilesystemsFilter `yaml:"filesystems"` } type LocalJob struct { diff --git a/daemon/job/push.go b/daemon/job/active.go similarity index 67% rename from daemon/job/push.go rename to daemon/job/active.go index c77e4d3..5d20f65 100644 --- a/daemon/job/push.go +++ b/daemon/job/active.go @@ -3,6 +3,7 @@ package job import ( "context" "github.com/pkg/errors" + "github.com/problame/go-streamrpc" "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/connecter" @@ -15,14 +16,13 @@ import ( "github.com/zrepl/zrepl/daemon/snapper" ) -type Push struct { - name string - clientFactory *connecter.ClientFactory - fsfilter endpoint.FSFilter +type ActiveSide struct { + mode activeMode + name string + clientFactory *connecter.ClientFactory prunerFactory *pruner.PrunerFactory - snapper *snapper.Snapper promRepStateSecs *prometheus.HistogramVec // labels: state promPruneSecs *prometheus.HistogramVec // labels: prune_side @@ -32,9 +32,48 @@ type Push struct { replication *replication.Replication } -func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) { +type activeMode interface { + SenderReceiver(client *streamrpc.Client) (replication.Sender, replication.Receiver, error) + Type() Type + RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) +} - j = &Push{} +type modePush struct { + fsfilter endpoint.FSFilter + snapper *snapper.Snapper +} + +func (m *modePush) SenderReceiver(client *streamrpc.Client) (replication.Sender, replication.Receiver, error) { + sender := endpoint.NewSender(m.fsfilter) + receiver := endpoint.NewRemote(client) + return sender, receiver, nil +} + +func (m *modePush) Type() Type { return TypePush } + +func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan <- struct{}) { + m.snapper.Run(ctx, wakeUpCommon) +} + + +func modePushFromConfig(g *config.Global, in *config.PushJob) (*modePush, error) { + m := &modePush{} + fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems) + if err != nil { + return nil, errors.Wrap(err, "cannnot build filesystem filter") + } + m.fsfilter = fsf + + if m.snapper, err = snapper.FromConfig(g, fsf, &in.Snapshotting); err != nil { + return nil, errors.Wrap(err, "cannot build snapper") + } + + return m, nil +} + +func activeSide(g *config.Global, in *config.ActiveJob, mode activeMode) (j *ActiveSide, err error) { + + j = &ActiveSide{mode: mode} j.name = in.Name j.promRepStateSecs = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "zrepl", @@ -56,12 +95,6 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) { return nil, errors.Wrap(err, "cannot build client") } - fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems) - if err != nil { - return nil, errors.Wrap(err, "cannnot build filesystem filter") - } - j.fsfilter = fsf - j.promPruneSecs = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "zrepl", Subsystem: "pruning", @@ -74,26 +107,22 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) { return nil, err } - if j.snapper, err = snapper.FromConfig(g, fsf, &in.Snapshotting); err != nil { - return nil, errors.Wrap(err, "cannot build snapper") - } - return j, nil } -func (j *Push) RegisterMetrics(registerer prometheus.Registerer) { +func (j *ActiveSide) RegisterMetrics(registerer prometheus.Registerer) { registerer.MustRegister(j.promRepStateSecs) registerer.MustRegister(j.promPruneSecs) registerer.MustRegister(j.promBytesReplicated) } -func (j *Push) Name() string { return j.name } +func (j *ActiveSide) Name() string { return j.name } -type PushStatus struct { +type ActiveSideStatus struct { Replication *replication.Report } -func (j *Push) Status() *Status { +func (j *ActiveSide) Status() *Status { rep := func() *replication.Replication { j.mtx.Lock() defer j.mtx.Unlock() @@ -102,26 +131,25 @@ func (j *Push) Status() *Status { } return j.replication }() - s := &PushStatus{} + s := &ActiveSideStatus{} + t := j.mode.Type() if rep == nil { - return &Status{Type: TypePush, JobSpecific: s} + return &Status{Type: t, JobSpecific: s} } s.Replication = rep.Report() - return &Status{Type: TypePush, JobSpecific: s} + return &Status{Type: t, JobSpecific: s} } -func (j *Push) Run(ctx context.Context) { +func (j *ActiveSide) Run(ctx context.Context) { log := GetLogger(ctx) + ctx = logging.WithSubsystemLoggers(ctx, log) defer log.Info("job exiting") - snapshotsTaken := make(chan struct{}) - { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - ctx = logging.WithSubsystemLoggers(ctx, log) - go j.snapper.Run(ctx, snapshotsTaken) - } + periodicDone := make(chan struct{}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go j.mode.RunPeriodic(ctx, periodicDone) invocationCount := 0 outer: @@ -133,7 +161,7 @@ outer: break outer case <-WaitWakeup(ctx): - case <-snapshotsTaken: + case <-periodicDone: } invocationCount++ invLog := log.WithField("invocation", invocationCount) @@ -141,7 +169,7 @@ outer: } } -func (j *Push) do(ctx context.Context) { +func (j *ActiveSide) do(ctx context.Context) { log := GetLogger(ctx) ctx = logging.WithSubsystemLoggers(ctx, log) @@ -152,8 +180,7 @@ func (j *Push) do(ctx context.Context) { } defer client.Close(ctx) - sender := endpoint.NewSender(j.fsfilter) - receiver := endpoint.NewRemote(client) + sender, receiver, err := j.mode.SenderReceiver(client) j.mtx.Lock() j.replication = replication.NewReplication(j.promRepStateSecs, j.promBytesReplicated) diff --git a/daemon/job/build_jobs.go b/daemon/job/build_jobs.go index b438cac..368d9bf 100644 --- a/daemon/job/build_jobs.go +++ b/daemon/job/build_jobs.go @@ -2,8 +2,8 @@ package job import ( "fmt" - "github.com/zrepl/zrepl/config" "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" ) func JobsFromConfig(c *config.Config) ([]Job, error) { @@ -19,19 +19,31 @@ func JobsFromConfig(c *config.Config) ([]Job, error) { } func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) { + cannotBuildJob := func(e error, name string) (Job, error) { + return nil, errors.Wrapf(err, "cannot build job %q", name) + } switch v := in.Ret.(type) { case *config.SinkJob: - j, err = SinkFromConfig(c, v) + m, err := modeSinkFromConfig(c, v) if err != nil { - return nil, errors.Wrapf(err, "cannot build job %q", v.Name) + return cannotBuildJob(err, v.Name) + } + j, err = passiveSideFromConfig(c, &v.PassiveJob, m) + if err != nil { + return cannotBuildJob(err, v.Name) } case *config.PushJob: - j, err = PushFromConfig(c, v) + m, err := modePushFromConfig(c, v) if err != nil { - return nil, errors.Wrapf(err, "cannot build job %q", v.Name) + return cannotBuildJob(err, v.Name) + } + j, err = activeSide(c, &v.ActiveJob, m) + if err != nil { + return cannotBuildJob(err, v.Name) } default: panic(fmt.Sprintf("implementation error: unknown job type %T", v)) } - return j, err + return j, nil + } diff --git a/daemon/job/job.go b/daemon/job/job.go index 23f580d..1cac343 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -59,6 +59,8 @@ const ( TypeInternal Type = "internal" TypePush Type = "push" TypeSink Type = "sink" + TypePull Type = "pull" + TypeSource Type = "source" ) type Status struct { @@ -101,11 +103,11 @@ func (s *Status) UnmarshalJSON(in []byte) (err error) { } switch s.Type { case TypePush: - var st PushStatus + var st ActiveSideStatus err = json.Unmarshal(jobJSON, &st) s.JobSpecific = &st case TypeSink: - var st SinkStatus + var st PassiveStatus err = json.Unmarshal(jobJSON, &st) s.JobSpecific = &st case TypeInternal: diff --git a/daemon/job/sink.go b/daemon/job/passive.go similarity index 55% rename from daemon/job/sink.go rename to daemon/job/passive.go index 0c1c114..4326562 100644 --- a/daemon/job/sink.go +++ b/daemon/job/passive.go @@ -13,43 +13,79 @@ import ( "github.com/zrepl/zrepl/zfs" ) -type Sink struct { +type PassiveSide struct { + mode passiveMode name string l serve.ListenerFactory rpcConf *streamrpc.ConnConfig +} + +type passiveMode interface { + ConnHandleFunc(ctx context.Context, conn serve.AuthenticatedConn) streamrpc.HandlerFunc + Type() Type +} + +type modeSink struct { rootDataset *zfs.DatasetPath } -func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) { +func (m *modeSink) Type() Type { return TypeSink } - s = &Sink{name: in.Name} +func (m *modeSink) ConnHandleFunc(ctx context.Context, conn serve.AuthenticatedConn) streamrpc.HandlerFunc { + log := GetLogger(ctx) + + clientRootStr := path.Join(m.rootDataset.ToString(), conn.ClientIdentity()) + clientRoot, err := zfs.NewDatasetPath(clientRootStr) + if err != nil { + log.WithError(err). + WithField("client_identity", conn.ClientIdentity()). + Error("cannot build client filesystem map (client identity must be a valid ZFS FS name") + } + log.WithField("client_root", clientRoot).Debug("client root") + + local, err := endpoint.NewReceiver(clientRoot) + if err != nil { + log.WithError(err).Error("unexpected error: cannot convert mapping to filter") + return nil + } + + h := endpoint.NewHandler(local) + return h.Handle +} + +func modeSinkFromConfig(g *config.Global, in *config.SinkJob) (m *modeSink, err error) { + m = &modeSink{} + m.rootDataset, err = zfs.NewDatasetPath(in.RootDataset) + if err != nil { + return nil, errors.New("root dataset is not a valid zfs filesystem path") + } + if m.rootDataset.Length() <= 0 { + return nil, errors.New("root dataset must not be empty") // duplicates error check of receiver + } + return m, nil +} + +func passiveSideFromConfig(g *config.Global, in *config.PassiveJob, mode passiveMode) (s *PassiveSide, err error) { + + s = &PassiveSide{mode: mode, name: in.Name} if s.l, s.rpcConf, err = serve.FromConfig(g, in.Serve); err != nil { return nil, errors.Wrap(err, "cannot build server") } - s.rootDataset, err = zfs.NewDatasetPath(in.RootDataset) - if err != nil { - return nil, errors.New("root dataset is not a valid zfs filesystem path") - } - if s.rootDataset.Length() <= 0 { - return nil, errors.New("root dataset must not be empty") // duplicates error check of receiver - } - - return s, nil } -func (j *Sink) Name() string { return j.name } +func (j *PassiveSide) Name() string { return j.name } -type SinkStatus struct {} +type PassiveStatus struct {} -func (*Sink) Status() *Status { - return &Status{Type: TypeSink} // FIXME SinkStatus +func (s *PassiveSide) Status() *Status { + return &Status{Type: s.mode.Type()} // FIXME PassiveStatus } -func (*Sink) RegisterMetrics(registerer prometheus.Registerer) {} +func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {} -func (j *Sink) Run(ctx context.Context) { +func (j *PassiveSide) Run(ctx context.Context) { log := GetLogger(ctx) defer log.Info("job exiting") @@ -74,10 +110,26 @@ outer: log.WithError(res.err).Info("accept error") continue } + conn := res.conn connId++ connLog := log. WithField("connID", connId) - go j.handleConnection(WithLogger(ctx, connLog), res.conn) + connLog. + WithField("addr", conn.RemoteAddr()). + WithField("client_identity", conn.ClientIdentity()). + Info("handling connection") + go func() { + defer connLog.Info("finished handling connection") + defer conn.Close() + ctx := logging.WithSubsystemLoggers(ctx, connLog) + handleFunc := j.mode.ConnHandleFunc(ctx, conn) + if handleFunc == nil { + return + } + if err := streamrpc.ServeConn(ctx, conn, j.rpcConf, handleFunc); err != nil { + log.WithError(err).Error("error serving client") + } + }() case <-ctx.Done(): break outer @@ -87,39 +139,6 @@ outer: } -func (j *Sink) handleConnection(ctx context.Context, conn serve.AuthenticatedConn) { - defer conn.Close() - - log := GetLogger(ctx) - log. - WithField("addr", conn.RemoteAddr()). - WithField("client_identity", conn.ClientIdentity()). - Info("handling connection") - defer log.Info("finished handling connection") - - clientRootStr := path.Join(j.rootDataset.ToString(), conn.ClientIdentity()) - clientRoot, err := zfs.NewDatasetPath(clientRootStr) - if err != nil { - log.WithError(err). - WithField("client_identity", conn.ClientIdentity()). - Error("cannot build client filesystem map (client identity must be a valid ZFS FS name") - } - log.WithField("client_root", clientRoot).Debug("client root") - - ctx = logging.WithSubsystemLoggers(ctx, log) - - local, err := endpoint.NewReceiver(clientRoot) - if err != nil { - log.WithError(err).Error("unexpected error: cannot convert mapping to filter") - return - } - - handler := endpoint.NewHandler(local) - if err := streamrpc.ServeConn(ctx, conn, j.rpcConf, handler.Handle); err != nil { - log.WithError(err).Error("error serving client") - } -} - type acceptResult struct { conn serve.AuthenticatedConn err error