zrepl/client/migrate.go
2020-04-21 14:10:53 +02:00

272 lines
7.3 KiB
Go

package client
import (
"context"
"fmt"
"github.com/fatih/color"
"github.com/kr/pretty"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/zfs"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/config"
)
var (
MigrateCmd = &cli.Subcommand{
Use: "migrate",
Short: "perform migration of the on-disk / zfs properties",
SetupSubcommands: func() []*cli.Subcommand {
return migrations
},
}
)
var migrations = []*cli.Subcommand{
&cli.Subcommand{
Use: "0.0.X:0.1:placeholder",
Run: doMigratePlaceholder0_1,
SetupFlags: func(f *pflag.FlagSet) {
f.BoolVar(&migratePlaceholder0_1Args.dryRun, "dry-run", false, "dry run")
},
},
&cli.Subcommand{
Use: "replication-cursor:v1-v2",
Run: doMigrateReplicationCursor,
SetupFlags: func(f *pflag.FlagSet) {
f.BoolVar(&migrateReplicationCursorArgs.dryRun, "dry-run", false, "dry run")
},
},
}
var migratePlaceholder0_1Args struct {
dryRun bool
}
func doMigratePlaceholder0_1(sc *cli.Subcommand, args []string) error {
if len(args) != 0 {
return fmt.Errorf("migration does not take arguments, got %v", args)
}
cfg := sc.Config()
ctx := context.Background()
allFSS, err := zfs.ZFSListMapping(ctx, zfs.NoFilter())
if err != nil {
return errors.Wrap(err, "cannot list filesystems")
}
type workItem struct {
jobName string
rootFS *zfs.DatasetPath
fss []*zfs.DatasetPath
}
var wis []workItem
for i, j := range cfg.Jobs {
var rfsS string
switch job := j.Ret.(type) {
case *config.SinkJob:
rfsS = job.RootFS
case *config.PullJob:
rfsS = job.RootFS
default:
fmt.Printf("ignoring job %q (%d/%d, type %T)\n", j.Name(), i, len(cfg.Jobs), j.Ret)
continue
}
rfs, err := zfs.NewDatasetPath(rfsS)
if err != nil {
return errors.Wrapf(err, "root fs for job %q is not a valid dataset path", j.Name())
}
var fss []*zfs.DatasetPath
for _, fs := range allFSS {
if fs.HasPrefix(rfs) {
fss = append(fss, fs)
}
}
wis = append(wis, workItem{j.Name(), rfs, fss})
}
for _, wi := range wis {
fmt.Printf("job %q => migrate filesystems below root_fs %q\n", wi.jobName, wi.rootFS.ToString())
if len(wi.fss) == 0 {
fmt.Printf("\tno filesystems\n")
continue
}
for _, fs := range wi.fss {
fmt.Printf("\t%q ... ", fs.ToString())
r, err := zfs.ZFSMigrateHashBasedPlaceholderToCurrent(ctx, fs, migratePlaceholder0_1Args.dryRun)
if err != nil {
fmt.Printf("error: %s\n", err)
} else if !r.NeedsModification {
fmt.Printf("unchanged (placeholder=%v)\n", r.OriginalState.IsPlaceholder)
} else {
fmt.Printf("migrate (placeholder=%v) (old value = %q)\n",
r.OriginalState.IsPlaceholder, r.OriginalState.RawLocalPropertyValue)
}
}
}
return nil
}
var migrateReplicationCursorArgs struct {
dryRun bool
}
var bold = color.New(color.Bold)
var succ = color.New(color.FgGreen)
var fail = color.New(color.FgRed)
var migrateReplicationCursorSkipSentinel = fmt.Errorf("skipping this filesystem")
func doMigrateReplicationCursor(sc *cli.Subcommand, args []string) error {
if len(args) != 0 {
return fmt.Errorf("migration does not take arguments, got %v", args)
}
cfg := sc.Config()
jobs, err := job.JobsFromConfig(cfg)
if err != nil {
fmt.Printf("cannot parse config:\n%s\n\n", err)
fmt.Printf("NOTE: this migration was released together with a change in job name requirements.\n")
return fmt.Errorf("exiting migration after error")
}
ctx := context.Background()
v1cursorJobs := make([]job.Job, 0, len(cfg.Jobs))
for i, j := range cfg.Jobs {
if jobs[i].Name() != j.Name() {
panic("implementation error")
}
switch j.Ret.(type) {
case *config.PushJob:
v1cursorJobs = append(v1cursorJobs, jobs[i])
case *config.SourceJob:
v1cursorJobs = append(v1cursorJobs, jobs[i])
default:
fmt.Printf("ignoring job %q (%d/%d, type %T), not supposed to create v1 replication cursors\n", j.Name(), i, len(cfg.Jobs), j.Ret)
continue
}
}
// scan all filesystems for v1 replication cursors
fss, err := zfs.ZFSListMapping(ctx, zfs.NoFilter())
if err != nil {
return errors.Wrap(err, "list filesystems")
}
var hadError bool
for _, fs := range fss {
bold.Printf("INSPECT FILESYSTEM %q\n", fs.ToString())
err := doMigrateReplicationCursorFS(ctx, v1cursorJobs, fs)
if err == migrateReplicationCursorSkipSentinel {
bold.Printf("FILESYSTEM SKIPPED\n")
} else if err != nil {
hadError = true
fail.Printf("MIGRATION FAILED: %s\n", err)
} else {
succ.Printf("FILESYSTEM %q COMPLETE\n", fs.ToString())
}
}
if hadError {
fail.Printf("\n\none or more filesystems could not be migrated, please inspect output and or re-run migration")
return errors.Errorf("")
}
return nil
}
func doMigrateReplicationCursorFS(ctx context.Context, v1CursorJobs []job.Job, fs *zfs.DatasetPath) error {
var owningJob job.Job = nil
for _, job := range v1CursorJobs {
conf := job.SenderConfig()
if conf == nil {
continue
}
pass, err := conf.FSF.Filter(fs)
if err != nil {
return errors.Wrapf(err, "filesystem filter error in job %q for fs %q", job.Name(), fs.ToString())
}
if !pass {
continue
}
if owningJob != nil {
return errors.Errorf("jobs %q and %q both match %q\ncannot attribute replication cursor to either one", owningJob.Name(), job.Name(), fs)
}
owningJob = job
}
if owningJob == nil {
fmt.Printf("no job's Filesystems filter matches\n")
return migrateReplicationCursorSkipSentinel
}
fmt.Printf("identified owning job %q\n", owningJob.Name())
bookmarks, err := zfs.ZFSListFilesystemVersions(ctx, fs, zfs.ListFilesystemVersionsOptions{
Types: zfs.Bookmarks,
})
if err != nil {
return errors.Wrapf(err, "list filesystem versions of %q", fs.ToString())
}
var oldCursor *zfs.FilesystemVersion
for i, fsv := range bookmarks {
_, _, err := endpoint.ParseReplicationCursorBookmarkName(fsv.ToAbsPath(fs))
if err != endpoint.ErrV1ReplicationCursor {
continue
}
if oldCursor != nil {
fmt.Printf("unexpected v1 replication cursor candidate: %q", fsv.ToAbsPath(fs))
return errors.Wrap(err, "multiple filesystem versions identified as v1 replication cursors")
}
oldCursor = &bookmarks[i]
}
if oldCursor == nil {
bold.Printf("no v1 replication cursor found for filesystem %q\n", fs.ToString())
return migrateReplicationCursorSkipSentinel
}
fmt.Printf("found v1 replication cursor:\n%s\n", pretty.Sprint(oldCursor))
mostRecentNew, err := endpoint.GetMostRecentReplicationCursorOfJob(ctx, fs.ToString(), owningJob.SenderConfig().JobID)
if err != nil {
return errors.Wrapf(err, "get most recent v2 replication cursor")
}
if mostRecentNew == nil {
return errors.Errorf("no v2 replication cursor found for job %q on filesystem %q", owningJob.SenderConfig().JobID, fs.ToString())
}
fmt.Printf("most recent v2 replication cursor:\n%#v", oldCursor)
if !(mostRecentNew.CreateTXG >= oldCursor.CreateTXG) {
return errors.Errorf("v1 replication cursor createtxg is higher than v2 cursor's, skipping this filesystem")
}
fmt.Printf("determined that v2 cursor is bookmark of same or newer version than v1 cursor\n")
fmt.Printf("destroying v1 cursor %q\n", oldCursor.ToAbsPath(fs))
if migrateReplicationCursorArgs.dryRun {
succ.Printf("DRY RUN\n")
} else {
if err := zfs.ZFSDestroyFilesystemVersion(ctx, fs, oldCursor); err != nil {
return err
}
}
return nil
}