job: snap: workaround for replication cursor requirement

This commit is contained in:
Christian Schwarz 2019-03-17 21:06:19 +01:00
parent d8d9e34914
commit 5cd2593f52

View File

@ -2,6 +2,9 @@ package job
import (
"context"
"fmt"
"sort"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
@ -11,6 +14,7 @@ import (
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication/logic/pdu"
"github.com/zrepl/zrepl/zfs"
)
@ -102,11 +106,57 @@ outer:
}
}
// Adaptor that implements pruner.History around a pruner.Target.
// The ReplicationCursor method is Get-op only and always returns
// the filesystem's most recent version's GUID.
//
// TODO:
// This is a work-around for the current package daemon/pruner
// and package pruning.Snapshot limitation: they require the
// `Replicated` getter method be present, but obviously,
// a local job like SnapJob can't deliver on that.
// But the pruner.Pruner gives up on an FS if no replication
// cursor is present, which is why this pruner returns the
// most recent filesystem version.
type alwaysUpToDateReplicationCursorHistory struct {
// the Target passed as Target to BuildSinglePruner
target pruner.Target
}
var _ pruner.History = (*alwaysUpToDateReplicationCursorHistory)(nil)
func (h alwaysUpToDateReplicationCursorHistory) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
if req.GetGet() == nil {
return nil, fmt.Errorf("unsupported ReplicationCursor request: SnapJob only supports GETting a (faked) cursor")
}
fsvReq := &pdu.ListFilesystemVersionsReq{
Filesystem: req.GetFilesystem(),
}
res, err := h.target.ListFilesystemVersions(ctx, fsvReq)
if err != nil {
return nil, err
}
fsvs := res.GetVersions()
if len(fsvs) <= 0 {
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Notexist{Notexist: true}}, nil
}
// always return must recent version
sort.Slice(fsvs, func(i, j int) bool {
return fsvs[i].CreateTXG < fsvs[j].CreateTXG
})
mostRecent := fsvs[len(fsvs)-1]
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: mostRecent.GetGuid()}}, nil
}
func (h alwaysUpToDateReplicationCursorHistory) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
return h.target.ListFilesystems(ctx, req)
}
func (j *SnapJob) doPrune(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
sender := endpoint.NewSender(j.fsfilter)
j.pruner = j.prunerFactory.BuildSinglePruner(ctx, sender, sender)
j.pruner = j.prunerFactory.BuildSinglePruner(ctx, sender, alwaysUpToDateReplicationCursorHistory{sender})
log.Info("start pruning")
j.pruner.Prune()
log.Info("finished pruning")