From 2c25f28972bf81adfcb2cadbd33b84e905df5d25 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 5 Sep 2018 19:49:26 -0700 Subject: [PATCH] simplify mapping & filtering in endpoints (re-rooting only) --- daemon/job/push.go | 2 +- daemon/job/sink.go | 24 ++++--- endpoint/endpoint.go | 151 +++++++++++++++++++++---------------------- 3 files changed, 89 insertions(+), 88 deletions(-) diff --git a/daemon/job/push.go b/daemon/job/push.go index ee9d00d..36159b4 100644 --- a/daemon/job/push.go +++ b/daemon/job/push.go @@ -114,7 +114,7 @@ func (j *Push) do(ctx context.Context) { } defer client.Close(ctx) - sender := endpoint.NewSender(j.fsfilter, filters.NewAnyFSVFilter()) + sender := endpoint.NewSender(j.fsfilter) receiver := endpoint.NewRemote(client) j.mtx.Lock() diff --git a/daemon/job/sink.go b/daemon/job/sink.go index e69a7fb..6e308ff 100644 --- a/daemon/job/sink.go +++ b/daemon/job/sink.go @@ -5,18 +5,18 @@ import ( "github.com/pkg/errors" "github.com/problame/go-streamrpc" "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/daemon/filters" "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/daemon/serve" "github.com/zrepl/zrepl/endpoint" "path" + "github.com/zrepl/zrepl/zfs" ) type Sink struct { name string l serve.ListenerFactory rpcConf *streamrpc.ConnConfig - rootDataset string + rootDataset *zfs.DatasetPath } func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) { @@ -26,10 +26,14 @@ func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) { return nil, errors.Wrap(err, "cannot build server") } - if in.RootDataset == "" { - return nil, errors.Wrap(err, "must specify root dataset") + s.rootDataset, err = zfs.NewDatasetPath(in.RootDataset) + if err != nil { + return nil, errors.New("root dataset is not a valid zfs filesystem path") } - s.rootDataset = in.RootDataset + if s.rootDataset.Length() <= 0 { + return nil, errors.New("root dataset must not be empty") // duplicates error check of receiver + } + return s, nil } @@ -89,18 +93,18 @@ func (j *Sink) handleConnection(ctx context.Context, conn serve.AuthenticatedCon Info("handling connection") defer log.Info("finished handling connection") - clientRoot := path.Join(j.rootDataset, conn.ClientIdentity()) - log.WithField("client_root", clientRoot).Debug("client root") - fsmap := filters.NewDatasetMapFilter(1, false) - if err := fsmap.Add("<", clientRoot); err != nil { + clientRootStr := path.Join(j.rootDataset.ToString(), conn.ClientIdentity()) + clientRoot, err := zfs.NewDatasetPath(clientRootStr) + if err != nil { log.WithError(err). WithField("client_identity", conn.ClientIdentity()). Error("cannot build client filesystem map (client identity must be a valid ZFS FS name") } + log.WithField("client_root", clientRoot).Debug("client root") ctx = logging.WithSubsystemLoggers(ctx, log) - local, err := endpoint.NewReceiver(fsmap, filters.NewAnyFSVFilter()) + local, err := endpoint.NewReceiver(clientRoot) if err != nil { log.WithError(err).Error("unexpected error: cannot convert mapping to filter") return diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index bd3e2f7..bb1a051 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -17,11 +17,28 @@ import ( // Sender implements replication.ReplicationEndpoint for a sending side type Sender struct { FSFilter zfs.DatasetFilter - FilesystemVersionFilter zfs.FilesystemVersionFilter } -func NewSender(fsf zfs.DatasetFilter, fsvf zfs.FilesystemVersionFilter) *Sender { - return &Sender{fsf, fsvf} +func NewSender(fsf zfs.DatasetFilter) *Sender { + return &Sender{FSFilter: fsf} +} + +func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) { + dp, err := zfs.NewDatasetPath(fs) + if err != nil { + return nil, err + } + if dp.Length() == 0 { + return nil, errors.New("empty filesystem not allowed") + } + pass, err := s.FSFilter.Filter(dp) + if err != nil { + return nil, err + } + if !pass { + return nil, replication.NewFilteredError(fs) + } + return dp, nil } func (p *Sender) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) { @@ -40,18 +57,11 @@ func (p *Sender) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) } func (p *Sender) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) { - dp, err := zfs.NewDatasetPath(fs) + lp, err := p.filterCheckFS(fs) if err != nil { return nil, err } - pass, err := p.FSFilter.Filter(dp) - if err != nil { - return nil, err - } - if !pass { - return nil, replication.NewFilteredError(fs) - } - fsvs, err := zfs.ZFSListFilesystemVersions(dp, p.FilesystemVersionFilter) + fsvs, err := zfs.ZFSListFilesystemVersions(lp, nil) if err != nil { return nil, err } @@ -63,17 +73,10 @@ func (p *Sender) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu. } func (p *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) { - dp, err := zfs.NewDatasetPath(r.Filesystem) + _, err := p.filterCheckFS(r.Filesystem) if err != nil { return nil, nil, err } - pass, err := p.FSFilter.Filter(dp) - if err != nil { - return nil, nil, err - } - if !pass { - return nil, nil, replication.NewFilteredError(r.Filesystem) - } if r.DryRun { size, err := zfs.ZFSSendDry(r.Filesystem, r.From, r.To) @@ -94,33 +97,18 @@ func (p *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea } func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) { - dp, err := zfs.NewDatasetPath(req.Filesystem) + dp, err := p.filterCheckFS(req.Filesystem) if err != nil { return nil, err } - pass, err := p.FSFilter.Filter(dp) - if err != nil { - return nil, err - } - if !pass { - return nil, replication.NewFilteredError(req.Filesystem) - } - return doDestroySnapshots(ctx, dp, req.Snapshots) } func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) { - dp, err := zfs.NewDatasetPath(req.Filesystem) + dp, err := p.filterCheckFS(req.Filesystem) if err != nil { return nil, err } - pass, err := p.FSFilter.Filter(dp) - if err != nil { - return nil, err - } - if !pass { - return nil, replication.NewFilteredError(req.Filesystem) - } switch op := req.Op.(type) { case *pdu.ReplicationCursorReq_Get: @@ -143,12 +131,12 @@ func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCurs } } -type FSFilter interface { +type FSFilter interface { // FIXME unused Filter(path *zfs.DatasetPath) (pass bool, err error) } // FIXME: can we get away without error types here? -type FSMap interface { +type FSMap interface { // FIXME unused FSFilter Map(path *zfs.DatasetPath) (*zfs.DatasetPath, error) Invert() (FSMap, error) @@ -157,49 +145,72 @@ type FSMap interface { // Receiver implements replication.ReplicationEndpoint for a receiving side type Receiver struct { - fsmapInv FSMap - fsmap FSMap - fsvf zfs.FilesystemVersionFilter + root *zfs.DatasetPath } -func NewReceiver(fsmap FSMap, fsvf zfs.FilesystemVersionFilter) (*Receiver, error) { - fsmapInv, err := fsmap.Invert() +func NewReceiver(rootDataset *zfs.DatasetPath) (*Receiver, error) { + if rootDataset.Length() <= 0 { + return nil, errors.New("root dataset must not be an empty path") + } + return &Receiver{root: rootDataset.Copy()}, nil +} + +type subroot struct { + localRoot *zfs.DatasetPath +} + +var _ zfs.DatasetFilter = subroot{} + +// Filters local p +func (f subroot) Filter(p *zfs.DatasetPath) (pass bool, err error) { + return p.HasPrefix(f.localRoot) && !p.Equal(f.localRoot), nil +} + +func (f subroot) MapToLocal(fs string) (*zfs.DatasetPath, error) { + p, err := zfs.NewDatasetPath(fs) if err != nil { return nil, err } - return &Receiver{fsmapInv, fsmap, fsvf}, nil + if p.Length() == 0 { + return nil, errors.Errorf("cannot map empty filesystem") + } + c := f.localRoot.Copy() + c.Extend(p) + return c, nil } func (e *Receiver) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) { - filtered, err := zfs.ZFSListMapping(e.fsmapInv.AsFilter()) + filtered, err := zfs.ZFSListMapping(subroot{e.root}) if err != nil { - return nil, errors.Wrap(err, "error checking client permission") + return nil, err } - fss := make([]*pdu.Filesystem, len(filtered)) - for i, a := range filtered { - mapped, err := e.fsmapInv.Map(a) + // present without prefix, and only those that are not placeholders + fss := make([]*pdu.Filesystem, 0, len(filtered)) + for _, a := range filtered { + ph, err := zfs.ZFSIsPlaceholderFilesystem(a) if err != nil { - return nil, err + getLogger(ctx). + WithError(err). + WithField("fs", a). + Error("inconsistent placeholder property") + return nil, errors.New("server error, see logs") // don't leak path } - fss[i] = &pdu.Filesystem{Path: mapped.ToString()} + if ph { + continue + } + a.TrimPrefix(e.root) + fss = append(fss, &pdu.Filesystem{Path: a.ToString()}) } return fss, nil } func (e *Receiver) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) { - p, err := zfs.NewDatasetPath(fs) + lp, err := subroot{e.root}.MapToLocal(fs) if err != nil { return nil, err } - lp, err := e.fsmap.Map(p) - if err != nil { - return nil, err - } - if lp == nil { - return nil, errors.New("access to filesystem denied") - } - fsvs, err := zfs.ZFSListFilesystemVersions(lp, e.fsvf) + fsvs, err := zfs.ZFSListFilesystemVersions(lp, nil) if err != nil { return nil, err } @@ -215,17 +226,10 @@ func (e *Receiver) ListFilesystemVersions(ctx context.Context, fs string) ([]*pd func (e *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream io.ReadCloser) error { defer sendStream.Close() - p, err := zfs.NewDatasetPath(req.Filesystem) + lp, err := subroot{e.root}.MapToLocal(req.Filesystem) if err != nil { return err } - lp, err := e.fsmap.Map(p) - if err != nil { - return err - } - if lp == nil { - return errors.New("receive to filesystem denied") - } getLogger(ctx).Debug("incoming Receive") @@ -286,17 +290,10 @@ func (e *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream } func (e *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) { - dp, err := zfs.NewDatasetPath(req.Filesystem) + lp, err := subroot{e.root}.MapToLocal(req.Filesystem) if err != nil { return nil, err } - lp, err := e.fsmap.Map(dp) - if err != nil { - return nil, err - } - if lp == nil { - return nil, errors.New("access to filesystem denied") - } return doDestroySnapshots(ctx, lp, req.Snapshots) }