mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-13 09:49:32 +01:00
simplify mapping & filtering in endpoints (re-rooting only)
This commit is contained in:
parent
1323a30a0c
commit
2c25f28972
@ -114,7 +114,7 @@ func (j *Push) do(ctx context.Context) {
|
||||
}
|
||||
defer client.Close(ctx)
|
||||
|
||||
sender := endpoint.NewSender(j.fsfilter, filters.NewAnyFSVFilter())
|
||||
sender := endpoint.NewSender(j.fsfilter)
|
||||
receiver := endpoint.NewRemote(client)
|
||||
|
||||
j.mtx.Lock()
|
||||
|
@ -5,18 +5,18 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/problame/go-streamrpc"
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/filters"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/daemon/serve"
|
||||
"github.com/zrepl/zrepl/endpoint"
|
||||
"path"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
type Sink struct {
|
||||
name string
|
||||
l serve.ListenerFactory
|
||||
rpcConf *streamrpc.ConnConfig
|
||||
rootDataset string
|
||||
rootDataset *zfs.DatasetPath
|
||||
}
|
||||
|
||||
func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) {
|
||||
@ -26,10 +26,14 @@ func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) {
|
||||
return nil, errors.Wrap(err, "cannot build server")
|
||||
}
|
||||
|
||||
if in.RootDataset == "" {
|
||||
return nil, errors.Wrap(err, "must specify root dataset")
|
||||
s.rootDataset, err = zfs.NewDatasetPath(in.RootDataset)
|
||||
if err != nil {
|
||||
return nil, errors.New("root dataset is not a valid zfs filesystem path")
|
||||
}
|
||||
s.rootDataset = in.RootDataset
|
||||
if s.rootDataset.Length() <= 0 {
|
||||
return nil, errors.New("root dataset must not be empty") // duplicates error check of receiver
|
||||
}
|
||||
|
||||
|
||||
return s, nil
|
||||
}
|
||||
@ -89,18 +93,18 @@ func (j *Sink) handleConnection(ctx context.Context, conn serve.AuthenticatedCon
|
||||
Info("handling connection")
|
||||
defer log.Info("finished handling connection")
|
||||
|
||||
clientRoot := path.Join(j.rootDataset, conn.ClientIdentity())
|
||||
log.WithField("client_root", clientRoot).Debug("client root")
|
||||
fsmap := filters.NewDatasetMapFilter(1, false)
|
||||
if err := fsmap.Add("<", clientRoot); err != nil {
|
||||
clientRootStr := path.Join(j.rootDataset.ToString(), conn.ClientIdentity())
|
||||
clientRoot, err := zfs.NewDatasetPath(clientRootStr)
|
||||
if err != nil {
|
||||
log.WithError(err).
|
||||
WithField("client_identity", conn.ClientIdentity()).
|
||||
Error("cannot build client filesystem map (client identity must be a valid ZFS FS name")
|
||||
}
|
||||
log.WithField("client_root", clientRoot).Debug("client root")
|
||||
|
||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||
|
||||
local, err := endpoint.NewReceiver(fsmap, filters.NewAnyFSVFilter())
|
||||
local, err := endpoint.NewReceiver(clientRoot)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("unexpected error: cannot convert mapping to filter")
|
||||
return
|
||||
|
@ -17,11 +17,28 @@ import (
|
||||
// Sender implements replication.ReplicationEndpoint for a sending side
|
||||
type Sender struct {
|
||||
FSFilter zfs.DatasetFilter
|
||||
FilesystemVersionFilter zfs.FilesystemVersionFilter
|
||||
}
|
||||
|
||||
func NewSender(fsf zfs.DatasetFilter, fsvf zfs.FilesystemVersionFilter) *Sender {
|
||||
return &Sender{fsf, fsvf}
|
||||
func NewSender(fsf zfs.DatasetFilter) *Sender {
|
||||
return &Sender{FSFilter: fsf}
|
||||
}
|
||||
|
||||
func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) {
|
||||
dp, err := zfs.NewDatasetPath(fs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if dp.Length() == 0 {
|
||||
return nil, errors.New("empty filesystem not allowed")
|
||||
}
|
||||
pass, err := s.FSFilter.Filter(dp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !pass {
|
||||
return nil, replication.NewFilteredError(fs)
|
||||
}
|
||||
return dp, nil
|
||||
}
|
||||
|
||||
func (p *Sender) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) {
|
||||
@ -40,18 +57,11 @@ func (p *Sender) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error)
|
||||
}
|
||||
|
||||
func (p *Sender) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) {
|
||||
dp, err := zfs.NewDatasetPath(fs)
|
||||
lp, err := p.filterCheckFS(fs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pass, err := p.FSFilter.Filter(dp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !pass {
|
||||
return nil, replication.NewFilteredError(fs)
|
||||
}
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(dp, p.FilesystemVersionFilter)
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(lp, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -63,17 +73,10 @@ func (p *Sender) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.
|
||||
}
|
||||
|
||||
func (p *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
|
||||
dp, err := zfs.NewDatasetPath(r.Filesystem)
|
||||
_, err := p.filterCheckFS(r.Filesystem)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
pass, err := p.FSFilter.Filter(dp)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if !pass {
|
||||
return nil, nil, replication.NewFilteredError(r.Filesystem)
|
||||
}
|
||||
|
||||
if r.DryRun {
|
||||
size, err := zfs.ZFSSendDry(r.Filesystem, r.From, r.To)
|
||||
@ -94,33 +97,18 @@ func (p *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
|
||||
}
|
||||
|
||||
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
||||
dp, err := zfs.NewDatasetPath(req.Filesystem)
|
||||
dp, err := p.filterCheckFS(req.Filesystem)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pass, err := p.FSFilter.Filter(dp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !pass {
|
||||
return nil, replication.NewFilteredError(req.Filesystem)
|
||||
}
|
||||
|
||||
return doDestroySnapshots(ctx, dp, req.Snapshots)
|
||||
}
|
||||
|
||||
func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
|
||||
dp, err := zfs.NewDatasetPath(req.Filesystem)
|
||||
dp, err := p.filterCheckFS(req.Filesystem)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pass, err := p.FSFilter.Filter(dp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !pass {
|
||||
return nil, replication.NewFilteredError(req.Filesystem)
|
||||
}
|
||||
|
||||
switch op := req.Op.(type) {
|
||||
case *pdu.ReplicationCursorReq_Get:
|
||||
@ -143,12 +131,12 @@ func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCurs
|
||||
}
|
||||
}
|
||||
|
||||
type FSFilter interface {
|
||||
type FSFilter interface { // FIXME unused
|
||||
Filter(path *zfs.DatasetPath) (pass bool, err error)
|
||||
}
|
||||
|
||||
// FIXME: can we get away without error types here?
|
||||
type FSMap interface {
|
||||
type FSMap interface { // FIXME unused
|
||||
FSFilter
|
||||
Map(path *zfs.DatasetPath) (*zfs.DatasetPath, error)
|
||||
Invert() (FSMap, error)
|
||||
@ -157,49 +145,72 @@ type FSMap interface {
|
||||
|
||||
// Receiver implements replication.ReplicationEndpoint for a receiving side
|
||||
type Receiver struct {
|
||||
fsmapInv FSMap
|
||||
fsmap FSMap
|
||||
fsvf zfs.FilesystemVersionFilter
|
||||
root *zfs.DatasetPath
|
||||
}
|
||||
|
||||
func NewReceiver(fsmap FSMap, fsvf zfs.FilesystemVersionFilter) (*Receiver, error) {
|
||||
fsmapInv, err := fsmap.Invert()
|
||||
func NewReceiver(rootDataset *zfs.DatasetPath) (*Receiver, error) {
|
||||
if rootDataset.Length() <= 0 {
|
||||
return nil, errors.New("root dataset must not be an empty path")
|
||||
}
|
||||
return &Receiver{root: rootDataset.Copy()}, nil
|
||||
}
|
||||
|
||||
type subroot struct {
|
||||
localRoot *zfs.DatasetPath
|
||||
}
|
||||
|
||||
var _ zfs.DatasetFilter = subroot{}
|
||||
|
||||
// Filters local p
|
||||
func (f subroot) Filter(p *zfs.DatasetPath) (pass bool, err error) {
|
||||
return p.HasPrefix(f.localRoot) && !p.Equal(f.localRoot), nil
|
||||
}
|
||||
|
||||
func (f subroot) MapToLocal(fs string) (*zfs.DatasetPath, error) {
|
||||
p, err := zfs.NewDatasetPath(fs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Receiver{fsmapInv, fsmap, fsvf}, nil
|
||||
if p.Length() == 0 {
|
||||
return nil, errors.Errorf("cannot map empty filesystem")
|
||||
}
|
||||
c := f.localRoot.Copy()
|
||||
c.Extend(p)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (e *Receiver) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) {
|
||||
filtered, err := zfs.ZFSListMapping(e.fsmapInv.AsFilter())
|
||||
filtered, err := zfs.ZFSListMapping(subroot{e.root})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error checking client permission")
|
||||
return nil, err
|
||||
}
|
||||
fss := make([]*pdu.Filesystem, len(filtered))
|
||||
for i, a := range filtered {
|
||||
mapped, err := e.fsmapInv.Map(a)
|
||||
// present without prefix, and only those that are not placeholders
|
||||
fss := make([]*pdu.Filesystem, 0, len(filtered))
|
||||
for _, a := range filtered {
|
||||
ph, err := zfs.ZFSIsPlaceholderFilesystem(a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
getLogger(ctx).
|
||||
WithError(err).
|
||||
WithField("fs", a).
|
||||
Error("inconsistent placeholder property")
|
||||
return nil, errors.New("server error, see logs") // don't leak path
|
||||
}
|
||||
fss[i] = &pdu.Filesystem{Path: mapped.ToString()}
|
||||
if ph {
|
||||
continue
|
||||
}
|
||||
a.TrimPrefix(e.root)
|
||||
fss = append(fss, &pdu.Filesystem{Path: a.ToString()})
|
||||
}
|
||||
return fss, nil
|
||||
}
|
||||
|
||||
func (e *Receiver) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) {
|
||||
p, err := zfs.NewDatasetPath(fs)
|
||||
lp, err := subroot{e.root}.MapToLocal(fs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lp, err := e.fsmap.Map(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if lp == nil {
|
||||
return nil, errors.New("access to filesystem denied")
|
||||
}
|
||||
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(lp, e.fsvf)
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(lp, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -215,17 +226,10 @@ func (e *Receiver) ListFilesystemVersions(ctx context.Context, fs string) ([]*pd
|
||||
func (e *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream io.ReadCloser) error {
|
||||
defer sendStream.Close()
|
||||
|
||||
p, err := zfs.NewDatasetPath(req.Filesystem)
|
||||
lp, err := subroot{e.root}.MapToLocal(req.Filesystem)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lp, err := e.fsmap.Map(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if lp == nil {
|
||||
return errors.New("receive to filesystem denied")
|
||||
}
|
||||
|
||||
getLogger(ctx).Debug("incoming Receive")
|
||||
|
||||
@ -286,17 +290,10 @@ func (e *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream
|
||||
}
|
||||
|
||||
func (e *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
||||
dp, err := zfs.NewDatasetPath(req.Filesystem)
|
||||
lp, err := subroot{e.root}.MapToLocal(req.Filesystem)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lp, err := e.fsmap.Map(dp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if lp == nil {
|
||||
return nil, errors.New("access to filesystem denied")
|
||||
}
|
||||
return doDestroySnapshots(ctx, lp, req.Snapshots)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user