From 5cd2593f521fd5ee475a3ac6aff0c44f8690825f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 17 Mar 2019 21:06:19 +0100 Subject: [PATCH] job: snap: workaround for replication cursor requirement --- daemon/job/snapjob.go | 52 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index 15feb42..0f47d5e 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -2,6 +2,9 @@ package job import ( "context" + "fmt" + "sort" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/config" @@ -11,6 +14,7 @@ import ( "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/endpoint" + "github.com/zrepl/zrepl/replication/logic/pdu" "github.com/zrepl/zrepl/zfs" ) @@ -102,11 +106,57 @@ outer: } } +// Adaptor that implements pruner.History around a pruner.Target. +// The ReplicationCursor method is Get-op only and always returns +// the filesystem's most recent version's GUID. +// +// TODO: +// This is a work-around for the current package daemon/pruner +// and package pruning.Snapshot limitation: they require the +// `Replicated` getter method be present, but obviously, +// a local job like SnapJob can't deliver on that. +// But the pruner.Pruner gives up on an FS if no replication +// cursor is present, which is why this pruner returns the +// most recent filesystem version. +type alwaysUpToDateReplicationCursorHistory struct { + // the Target passed as Target to BuildSinglePruner + target pruner.Target +} + +var _ pruner.History = (*alwaysUpToDateReplicationCursorHistory)(nil) + +func (h alwaysUpToDateReplicationCursorHistory) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) { + if req.GetGet() == nil { + return nil, fmt.Errorf("unsupported ReplicationCursor request: SnapJob only supports GETting a (faked) cursor") + } + fsvReq := &pdu.ListFilesystemVersionsReq{ + Filesystem: req.GetFilesystem(), + } + res, err := h.target.ListFilesystemVersions(ctx, fsvReq) + if err != nil { + return nil, err + } + fsvs := res.GetVersions() + if len(fsvs) <= 0 { + return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Notexist{Notexist: true}}, nil + } + // always return must recent version + sort.Slice(fsvs, func(i, j int) bool { + return fsvs[i].CreateTXG < fsvs[j].CreateTXG + }) + mostRecent := fsvs[len(fsvs)-1] + return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: mostRecent.GetGuid()}}, nil +} + +func (h alwaysUpToDateReplicationCursorHistory) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) { + return h.target.ListFilesystems(ctx, req) +} + func (j *SnapJob) doPrune(ctx context.Context) { log := GetLogger(ctx) ctx = logging.WithSubsystemLoggers(ctx, log) sender := endpoint.NewSender(j.fsfilter) - j.pruner = j.prunerFactory.BuildSinglePruner(ctx, sender, sender) + j.pruner = j.prunerFactory.BuildSinglePruner(ctx, sender, alwaysUpToDateReplicationCursorHistory{sender}) log.Info("start pruning") j.pruner.Prune() log.Info("finished pruning")