refactor push + source into active + passive 'sides' with push and source 'modes'

This commit is contained in:
Christian Schwarz 2018-09-23 23:04:31 +02:00
parent 9446b51a1f
commit e3be120d88
6 changed files with 174 additions and 114 deletions

View File

@ -202,9 +202,9 @@ func (t *tui) draw() {
continue
}
pushStatus, ok := v.JobSpecific.(*job.PushStatus)
pushStatus, ok := v.JobSpecific.(*job.ActiveSideStatus)
if !ok || pushStatus == nil {
t.printf("PushStatus is null")
t.printf("ActiveSideStatus is null")
t.newline()
continue
}

View File

@ -21,42 +21,42 @@ type JobEnum struct {
Ret interface{}
}
type PushJob struct {
type ActiveJob struct {
Type string `yaml:"type"`
Name string `yaml:"name"`
Connect ConnectEnum `yaml:"connect"`
Filesystems FilesystemsFilter `yaml:"filesystems"`
Snapshotting Snapshotting `yaml:"snapshotting"`
Pruning PruningSenderReceiver `yaml:"pruning"`
Debug JobDebugSettings `yaml:"debug,optional"`
}
type SinkJob struct {
type PassiveJob struct {
Type string `yaml:"type"`
Name string `yaml:"name"`
RootDataset string `yaml:"root_dataset"`
Serve ServeEnum `yaml:"serve"`
Debug JobDebugSettings `yaml:"debug,optional"`
}
type PushJob struct {
ActiveJob `yaml:",inline"`
Snapshotting Snapshotting `yaml:"snapshotting"`
Filesystems FilesystemsFilter `yaml:"filesystems"`
}
type PullJob struct {
Type string `yaml:"type"`
Name string `yaml:"name"`
Connect ConnectEnum `yaml:"connect"`
ActiveJob `yaml:",inline"`
RootDataset string `yaml:"root_dataset"`
Interval time.Duration `yaml:"interval,positive"`
Pruning PruningSenderReceiver `yaml:"pruning"`
Debug JobDebugSettings `yaml:"debug,optional"`
}
type SinkJob struct {
PassiveJob `yaml:",inline"`
RootDataset string `yaml:"root_dataset"`
}
type SourceJob struct {
Type string `yaml:"type"`
Name string `yaml:"name"`
Serve ServeEnum `yaml:"serve"`
Filesystems FilesystemsFilter `yaml:"filesystems"`
PassiveJob `yaml:",inline"`
Snapshotting Snapshotting `yaml:"snapshotting"`
Pruning PruningLocal `yaml:"pruning"`
Debug JobDebugSettings `yaml:"debug,optional"`
Filesystems FilesystemsFilter `yaml:"filesystems"`
}
type LocalJob struct {

View File

@ -3,6 +3,7 @@ package job
import (
"context"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/connecter"
@ -15,14 +16,13 @@ import (
"github.com/zrepl/zrepl/daemon/snapper"
)
type Push struct {
type ActiveSide struct {
mode activeMode
name string
clientFactory *connecter.ClientFactory
fsfilter endpoint.FSFilter
prunerFactory *pruner.PrunerFactory
snapper *snapper.Snapper
promRepStateSecs *prometheus.HistogramVec // labels: state
promPruneSecs *prometheus.HistogramVec // labels: prune_side
@ -32,9 +32,48 @@ type Push struct {
replication *replication.Replication
}
func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) {
type activeMode interface {
SenderReceiver(client *streamrpc.Client) (replication.Sender, replication.Receiver, error)
Type() Type
RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{})
}
j = &Push{}
type modePush struct {
fsfilter endpoint.FSFilter
snapper *snapper.Snapper
}
func (m *modePush) SenderReceiver(client *streamrpc.Client) (replication.Sender, replication.Receiver, error) {
sender := endpoint.NewSender(m.fsfilter)
receiver := endpoint.NewRemote(client)
return sender, receiver, nil
}
func (m *modePush) Type() Type { return TypePush }
func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan <- struct{}) {
m.snapper.Run(ctx, wakeUpCommon)
}
func modePushFromConfig(g *config.Global, in *config.PushJob) (*modePush, error) {
m := &modePush{}
fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems)
if err != nil {
return nil, errors.Wrap(err, "cannnot build filesystem filter")
}
m.fsfilter = fsf
if m.snapper, err = snapper.FromConfig(g, fsf, &in.Snapshotting); err != nil {
return nil, errors.Wrap(err, "cannot build snapper")
}
return m, nil
}
func activeSide(g *config.Global, in *config.ActiveJob, mode activeMode) (j *ActiveSide, err error) {
j = &ActiveSide{mode: mode}
j.name = in.Name
j.promRepStateSecs = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
@ -56,12 +95,6 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) {
return nil, errors.Wrap(err, "cannot build client")
}
fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems)
if err != nil {
return nil, errors.Wrap(err, "cannnot build filesystem filter")
}
j.fsfilter = fsf
j.promPruneSecs = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "pruning",
@ -74,26 +107,22 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) {
return nil, err
}
if j.snapper, err = snapper.FromConfig(g, fsf, &in.Snapshotting); err != nil {
return nil, errors.Wrap(err, "cannot build snapper")
}
return j, nil
}
func (j *Push) RegisterMetrics(registerer prometheus.Registerer) {
func (j *ActiveSide) RegisterMetrics(registerer prometheus.Registerer) {
registerer.MustRegister(j.promRepStateSecs)
registerer.MustRegister(j.promPruneSecs)
registerer.MustRegister(j.promBytesReplicated)
}
func (j *Push) Name() string { return j.name }
func (j *ActiveSide) Name() string { return j.name }
type PushStatus struct {
type ActiveSideStatus struct {
Replication *replication.Report
}
func (j *Push) Status() *Status {
func (j *ActiveSide) Status() *Status {
rep := func() *replication.Replication {
j.mtx.Lock()
defer j.mtx.Unlock()
@ -102,26 +131,25 @@ func (j *Push) Status() *Status {
}
return j.replication
}()
s := &PushStatus{}
s := &ActiveSideStatus{}
t := j.mode.Type()
if rep == nil {
return &Status{Type: TypePush, JobSpecific: s}
return &Status{Type: t, JobSpecific: s}
}
s.Replication = rep.Report()
return &Status{Type: TypePush, JobSpecific: s}
return &Status{Type: t, JobSpecific: s}
}
func (j *Push) Run(ctx context.Context) {
func (j *ActiveSide) Run(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
defer log.Info("job exiting")
snapshotsTaken := make(chan struct{})
{
periodicDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx = logging.WithSubsystemLoggers(ctx, log)
go j.snapper.Run(ctx, snapshotsTaken)
}
go j.mode.RunPeriodic(ctx, periodicDone)
invocationCount := 0
outer:
@ -133,7 +161,7 @@ outer:
break outer
case <-WaitWakeup(ctx):
case <-snapshotsTaken:
case <-periodicDone:
}
invocationCount++
invLog := log.WithField("invocation", invocationCount)
@ -141,7 +169,7 @@ outer:
}
}
func (j *Push) do(ctx context.Context) {
func (j *ActiveSide) do(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
@ -152,8 +180,7 @@ func (j *Push) do(ctx context.Context) {
}
defer client.Close(ctx)
sender := endpoint.NewSender(j.fsfilter)
receiver := endpoint.NewRemote(client)
sender, receiver, err := j.mode.SenderReceiver(client)
j.mtx.Lock()
j.replication = replication.NewReplication(j.promRepStateSecs, j.promBytesReplicated)

View File

@ -2,8 +2,8 @@ package job
import (
"fmt"
"github.com/zrepl/zrepl/config"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
)
func JobsFromConfig(c *config.Config) ([]Job, error) {
@ -19,19 +19,31 @@ func JobsFromConfig(c *config.Config) ([]Job, error) {
}
func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) {
cannotBuildJob := func(e error, name string) (Job, error) {
return nil, errors.Wrapf(err, "cannot build job %q", name)
}
switch v := in.Ret.(type) {
case *config.SinkJob:
j, err = SinkFromConfig(c, v)
m, err := modeSinkFromConfig(c, v)
if err != nil {
return nil, errors.Wrapf(err, "cannot build job %q", v.Name)
return cannotBuildJob(err, v.Name)
}
j, err = passiveSideFromConfig(c, &v.PassiveJob, m)
if err != nil {
return cannotBuildJob(err, v.Name)
}
case *config.PushJob:
j, err = PushFromConfig(c, v)
m, err := modePushFromConfig(c, v)
if err != nil {
return nil, errors.Wrapf(err, "cannot build job %q", v.Name)
return cannotBuildJob(err, v.Name)
}
j, err = activeSide(c, &v.ActiveJob, m)
if err != nil {
return cannotBuildJob(err, v.Name)
}
default:
panic(fmt.Sprintf("implementation error: unknown job type %T", v))
}
return j, err
return j, nil
}

View File

@ -59,6 +59,8 @@ const (
TypeInternal Type = "internal"
TypePush Type = "push"
TypeSink Type = "sink"
TypePull Type = "pull"
TypeSource Type = "source"
)
type Status struct {
@ -101,11 +103,11 @@ func (s *Status) UnmarshalJSON(in []byte) (err error) {
}
switch s.Type {
case TypePush:
var st PushStatus
var st ActiveSideStatus
err = json.Unmarshal(jobJSON, &st)
s.JobSpecific = &st
case TypeSink:
var st SinkStatus
var st PassiveStatus
err = json.Unmarshal(jobJSON, &st)
s.JobSpecific = &st
case TypeInternal:

View File

@ -13,43 +13,79 @@ import (
"github.com/zrepl/zrepl/zfs"
)
type Sink struct {
type PassiveSide struct {
mode passiveMode
name string
l serve.ListenerFactory
rpcConf *streamrpc.ConnConfig
}
type passiveMode interface {
ConnHandleFunc(ctx context.Context, conn serve.AuthenticatedConn) streamrpc.HandlerFunc
Type() Type
}
type modeSink struct {
rootDataset *zfs.DatasetPath
}
func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) {
func (m *modeSink) Type() Type { return TypeSink }
s = &Sink{name: in.Name}
func (m *modeSink) ConnHandleFunc(ctx context.Context, conn serve.AuthenticatedConn) streamrpc.HandlerFunc {
log := GetLogger(ctx)
clientRootStr := path.Join(m.rootDataset.ToString(), conn.ClientIdentity())
clientRoot, err := zfs.NewDatasetPath(clientRootStr)
if err != nil {
log.WithError(err).
WithField("client_identity", conn.ClientIdentity()).
Error("cannot build client filesystem map (client identity must be a valid ZFS FS name")
}
log.WithField("client_root", clientRoot).Debug("client root")
local, err := endpoint.NewReceiver(clientRoot)
if err != nil {
log.WithError(err).Error("unexpected error: cannot convert mapping to filter")
return nil
}
h := endpoint.NewHandler(local)
return h.Handle
}
func modeSinkFromConfig(g *config.Global, in *config.SinkJob) (m *modeSink, err error) {
m = &modeSink{}
m.rootDataset, err = zfs.NewDatasetPath(in.RootDataset)
if err != nil {
return nil, errors.New("root dataset is not a valid zfs filesystem path")
}
if m.rootDataset.Length() <= 0 {
return nil, errors.New("root dataset must not be empty") // duplicates error check of receiver
}
return m, nil
}
func passiveSideFromConfig(g *config.Global, in *config.PassiveJob, mode passiveMode) (s *PassiveSide, err error) {
s = &PassiveSide{mode: mode, name: in.Name}
if s.l, s.rpcConf, err = serve.FromConfig(g, in.Serve); err != nil {
return nil, errors.Wrap(err, "cannot build server")
}
s.rootDataset, err = zfs.NewDatasetPath(in.RootDataset)
if err != nil {
return nil, errors.New("root dataset is not a valid zfs filesystem path")
}
if s.rootDataset.Length() <= 0 {
return nil, errors.New("root dataset must not be empty") // duplicates error check of receiver
}
return s, nil
}
func (j *Sink) Name() string { return j.name }
func (j *PassiveSide) Name() string { return j.name }
type SinkStatus struct {}
type PassiveStatus struct {}
func (*Sink) Status() *Status {
return &Status{Type: TypeSink} // FIXME SinkStatus
func (s *PassiveSide) Status() *Status {
return &Status{Type: s.mode.Type()} // FIXME PassiveStatus
}
func (*Sink) RegisterMetrics(registerer prometheus.Registerer) {}
func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}
func (j *Sink) Run(ctx context.Context) {
func (j *PassiveSide) Run(ctx context.Context) {
log := GetLogger(ctx)
defer log.Info("job exiting")
@ -74,10 +110,26 @@ outer:
log.WithError(res.err).Info("accept error")
continue
}
conn := res.conn
connId++
connLog := log.
WithField("connID", connId)
go j.handleConnection(WithLogger(ctx, connLog), res.conn)
connLog.
WithField("addr", conn.RemoteAddr()).
WithField("client_identity", conn.ClientIdentity()).
Info("handling connection")
go func() {
defer connLog.Info("finished handling connection")
defer conn.Close()
ctx := logging.WithSubsystemLoggers(ctx, connLog)
handleFunc := j.mode.ConnHandleFunc(ctx, conn)
if handleFunc == nil {
return
}
if err := streamrpc.ServeConn(ctx, conn, j.rpcConf, handleFunc); err != nil {
log.WithError(err).Error("error serving client")
}
}()
case <-ctx.Done():
break outer
@ -87,39 +139,6 @@ outer:
}
func (j *Sink) handleConnection(ctx context.Context, conn serve.AuthenticatedConn) {
defer conn.Close()
log := GetLogger(ctx)
log.
WithField("addr", conn.RemoteAddr()).
WithField("client_identity", conn.ClientIdentity()).
Info("handling connection")
defer log.Info("finished handling connection")
clientRootStr := path.Join(j.rootDataset.ToString(), conn.ClientIdentity())
clientRoot, err := zfs.NewDatasetPath(clientRootStr)
if err != nil {
log.WithError(err).
WithField("client_identity", conn.ClientIdentity()).
Error("cannot build client filesystem map (client identity must be a valid ZFS FS name")
}
log.WithField("client_root", clientRoot).Debug("client root")
ctx = logging.WithSubsystemLoggers(ctx, log)
local, err := endpoint.NewReceiver(clientRoot)
if err != nil {
log.WithError(err).Error("unexpected error: cannot convert mapping to filter")
return
}
handler := endpoint.NewHandler(local)
if err := streamrpc.ServeConn(ctx, conn, j.rpcConf, handler.Handle); err != nil {
log.WithError(err).Error("error serving client")
}
}
type acceptResult struct {
conn serve.AuthenticatedConn
err error