mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-20 13:18:47 +01:00
10a14a8c50
package trace: - introduce the concept of tasks and spans, tracked as linked list within ctx - see package-level docs for an overview of the concepts - **main feature 1**: unique stack of task and span IDs - makes it easy to follow a series of log entries in concurrent code - **main feature 2**: ability to produce a chrome://tracing-compatible trace file - either via an env variable or a `zrepl pprof` subcommand - this is not a CPU profile, we already have go pprof for that - but it is very useful to visually inspect where the replication / snapshotter / pruner spends its time ( fixes #307 ) usage in package daemon/logging: - goal: every log entry should have a trace field with the ID stack from package trace - make `logging.GetLogger(ctx, Subsys)` the authoritative `logger.Logger` factory function - the context carries a linked list of injected fields which `logging.GetLogger` adds to the logger it returns - `logging.GetLogger` also uses package `trace` to get the task-and-span-stack and injects it into the returned logger's fields
269 lines
7.3 KiB
Go
269 lines
7.3 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/fatih/color"
|
|
"github.com/kr/pretty"
|
|
"github.com/pkg/errors"
|
|
"github.com/spf13/pflag"
|
|
|
|
"github.com/zrepl/zrepl/daemon/job"
|
|
"github.com/zrepl/zrepl/endpoint"
|
|
"github.com/zrepl/zrepl/zfs"
|
|
|
|
"github.com/zrepl/zrepl/cli"
|
|
"github.com/zrepl/zrepl/config"
|
|
)
|
|
|
|
var (
|
|
MigrateCmd = &cli.Subcommand{
|
|
Use: "migrate",
|
|
Short: "perform migration of the on-disk / zfs properties",
|
|
SetupSubcommands: func() []*cli.Subcommand {
|
|
return migrations
|
|
},
|
|
}
|
|
)
|
|
|
|
var migrations = []*cli.Subcommand{
|
|
&cli.Subcommand{
|
|
Use: "0.0.X:0.1:placeholder",
|
|
Run: doMigratePlaceholder0_1,
|
|
SetupFlags: func(f *pflag.FlagSet) {
|
|
f.BoolVar(&migratePlaceholder0_1Args.dryRun, "dry-run", false, "dry run")
|
|
},
|
|
},
|
|
&cli.Subcommand{
|
|
Use: "replication-cursor:v1-v2",
|
|
Run: doMigrateReplicationCursor,
|
|
SetupFlags: func(f *pflag.FlagSet) {
|
|
f.BoolVar(&migrateReplicationCursorArgs.dryRun, "dry-run", false, "dry run")
|
|
},
|
|
},
|
|
}
|
|
|
|
var migratePlaceholder0_1Args struct {
|
|
dryRun bool
|
|
}
|
|
|
|
func doMigratePlaceholder0_1(ctx context.Context, sc *cli.Subcommand, args []string) error {
|
|
if len(args) != 0 {
|
|
return fmt.Errorf("migration does not take arguments, got %v", args)
|
|
}
|
|
|
|
cfg := sc.Config()
|
|
|
|
allFSS, err := zfs.ZFSListMapping(ctx, zfs.NoFilter())
|
|
if err != nil {
|
|
return errors.Wrap(err, "cannot list filesystems")
|
|
}
|
|
|
|
type workItem struct {
|
|
jobName string
|
|
rootFS *zfs.DatasetPath
|
|
fss []*zfs.DatasetPath
|
|
}
|
|
var wis []workItem
|
|
for i, j := range cfg.Jobs {
|
|
var rfsS string
|
|
switch job := j.Ret.(type) {
|
|
case *config.SinkJob:
|
|
rfsS = job.RootFS
|
|
case *config.PullJob:
|
|
rfsS = job.RootFS
|
|
default:
|
|
fmt.Printf("ignoring job %q (%d/%d, type %T)\n", j.Name(), i, len(cfg.Jobs), j.Ret)
|
|
continue
|
|
}
|
|
rfs, err := zfs.NewDatasetPath(rfsS)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "root fs for job %q is not a valid dataset path", j.Name())
|
|
}
|
|
var fss []*zfs.DatasetPath
|
|
for _, fs := range allFSS {
|
|
if fs.HasPrefix(rfs) {
|
|
fss = append(fss, fs)
|
|
}
|
|
}
|
|
wis = append(wis, workItem{j.Name(), rfs, fss})
|
|
}
|
|
|
|
for _, wi := range wis {
|
|
fmt.Printf("job %q => migrate filesystems below root_fs %q\n", wi.jobName, wi.rootFS.ToString())
|
|
if len(wi.fss) == 0 {
|
|
fmt.Printf("\tno filesystems\n")
|
|
continue
|
|
}
|
|
for _, fs := range wi.fss {
|
|
fmt.Printf("\t%q ... ", fs.ToString())
|
|
r, err := zfs.ZFSMigrateHashBasedPlaceholderToCurrent(ctx, fs, migratePlaceholder0_1Args.dryRun)
|
|
if err != nil {
|
|
fmt.Printf("error: %s\n", err)
|
|
} else if !r.NeedsModification {
|
|
fmt.Printf("unchanged (placeholder=%v)\n", r.OriginalState.IsPlaceholder)
|
|
} else {
|
|
fmt.Printf("migrate (placeholder=%v) (old value = %q)\n",
|
|
r.OriginalState.IsPlaceholder, r.OriginalState.RawLocalPropertyValue)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var migrateReplicationCursorArgs struct {
|
|
dryRun bool
|
|
}
|
|
|
|
var bold = color.New(color.Bold)
|
|
var succ = color.New(color.FgGreen)
|
|
var fail = color.New(color.FgRed)
|
|
|
|
var migrateReplicationCursorSkipSentinel = fmt.Errorf("skipping this filesystem")
|
|
|
|
func doMigrateReplicationCursor(ctx context.Context, sc *cli.Subcommand, args []string) error {
|
|
if len(args) != 0 {
|
|
return fmt.Errorf("migration does not take arguments, got %v", args)
|
|
}
|
|
|
|
cfg := sc.Config()
|
|
jobs, err := job.JobsFromConfig(cfg)
|
|
if err != nil {
|
|
fmt.Printf("cannot parse config:\n%s\n\n", err)
|
|
fmt.Printf("NOTE: this migration was released together with a change in job name requirements.\n")
|
|
return fmt.Errorf("exiting migration after error")
|
|
}
|
|
|
|
v1cursorJobs := make([]job.Job, 0, len(cfg.Jobs))
|
|
for i, j := range cfg.Jobs {
|
|
if jobs[i].Name() != j.Name() {
|
|
panic("implementation error")
|
|
}
|
|
switch j.Ret.(type) {
|
|
case *config.PushJob:
|
|
v1cursorJobs = append(v1cursorJobs, jobs[i])
|
|
case *config.SourceJob:
|
|
v1cursorJobs = append(v1cursorJobs, jobs[i])
|
|
default:
|
|
fmt.Printf("ignoring job %q (%d/%d, type %T), not supposed to create v1 replication cursors\n", j.Name(), i, len(cfg.Jobs), j.Ret)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// scan all filesystems for v1 replication cursors
|
|
|
|
fss, err := zfs.ZFSListMapping(ctx, zfs.NoFilter())
|
|
if err != nil {
|
|
return errors.Wrap(err, "list filesystems")
|
|
}
|
|
|
|
var hadError bool
|
|
for _, fs := range fss {
|
|
|
|
bold.Printf("INSPECT FILESYSTEM %q\n", fs.ToString())
|
|
|
|
err := doMigrateReplicationCursorFS(ctx, v1cursorJobs, fs)
|
|
if err == migrateReplicationCursorSkipSentinel {
|
|
bold.Printf("FILESYSTEM SKIPPED\n")
|
|
} else if err != nil {
|
|
hadError = true
|
|
fail.Printf("MIGRATION FAILED: %s\n", err)
|
|
} else {
|
|
succ.Printf("FILESYSTEM %q COMPLETE\n", fs.ToString())
|
|
}
|
|
}
|
|
|
|
if hadError {
|
|
fail.Printf("\n\none or more filesystems could not be migrated, please inspect output and or re-run migration")
|
|
return errors.Errorf("")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func doMigrateReplicationCursorFS(ctx context.Context, v1CursorJobs []job.Job, fs *zfs.DatasetPath) error {
|
|
|
|
var owningJob job.Job = nil
|
|
for _, job := range v1CursorJobs {
|
|
conf := job.SenderConfig()
|
|
if conf == nil {
|
|
continue
|
|
}
|
|
pass, err := conf.FSF.Filter(fs)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "filesystem filter error in job %q for fs %q", job.Name(), fs.ToString())
|
|
}
|
|
if !pass {
|
|
continue
|
|
}
|
|
if owningJob != nil {
|
|
return errors.Errorf("jobs %q and %q both match %q\ncannot attribute replication cursor to either one", owningJob.Name(), job.Name(), fs)
|
|
}
|
|
owningJob = job
|
|
}
|
|
if owningJob == nil {
|
|
fmt.Printf("no job's Filesystems filter matches\n")
|
|
return migrateReplicationCursorSkipSentinel
|
|
}
|
|
fmt.Printf("identified owning job %q\n", owningJob.Name())
|
|
|
|
bookmarks, err := zfs.ZFSListFilesystemVersions(ctx, fs, zfs.ListFilesystemVersionsOptions{
|
|
Types: zfs.Bookmarks,
|
|
})
|
|
if err != nil {
|
|
return errors.Wrapf(err, "list filesystem versions of %q", fs.ToString())
|
|
}
|
|
|
|
var oldCursor *zfs.FilesystemVersion
|
|
for i, fsv := range bookmarks {
|
|
_, _, err := endpoint.ParseReplicationCursorBookmarkName(fsv.ToAbsPath(fs))
|
|
if err != endpoint.ErrV1ReplicationCursor {
|
|
continue
|
|
}
|
|
|
|
if oldCursor != nil {
|
|
fmt.Printf("unexpected v1 replication cursor candidate: %q", fsv.ToAbsPath(fs))
|
|
return errors.Wrap(err, "multiple filesystem versions identified as v1 replication cursors")
|
|
}
|
|
|
|
oldCursor = &bookmarks[i]
|
|
|
|
}
|
|
|
|
if oldCursor == nil {
|
|
bold.Printf("no v1 replication cursor found for filesystem %q\n", fs.ToString())
|
|
return migrateReplicationCursorSkipSentinel
|
|
}
|
|
|
|
fmt.Printf("found v1 replication cursor:\n%s\n", pretty.Sprint(oldCursor))
|
|
|
|
mostRecentNew, err := endpoint.GetMostRecentReplicationCursorOfJob(ctx, fs.ToString(), owningJob.SenderConfig().JobID)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "get most recent v2 replication cursor")
|
|
}
|
|
|
|
if mostRecentNew == nil {
|
|
return errors.Errorf("no v2 replication cursor found for job %q on filesystem %q", owningJob.SenderConfig().JobID, fs.ToString())
|
|
}
|
|
|
|
fmt.Printf("most recent v2 replication cursor:\n%#v", oldCursor)
|
|
|
|
if !(mostRecentNew.CreateTXG >= oldCursor.CreateTXG) {
|
|
return errors.Errorf("v1 replication cursor createtxg is higher than v2 cursor's, skipping this filesystem")
|
|
}
|
|
|
|
fmt.Printf("determined that v2 cursor is bookmark of same or newer version than v1 cursor\n")
|
|
fmt.Printf("destroying v1 cursor %q\n", oldCursor.ToAbsPath(fs))
|
|
|
|
if migrateReplicationCursorArgs.dryRun {
|
|
succ.Printf("DRY RUN\n")
|
|
} else {
|
|
if err := zfs.ZFSDestroyFilesystemVersion(ctx, fs, oldCursor); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|