diff --git a/config/config.go b/config/config.go index 2cff88e..afe75d0 100644 --- a/config/config.go +++ b/config/config.go @@ -139,6 +139,7 @@ type SnapshottingPeriodic struct { Type string `yaml:"type"` Prefix string `yaml:"prefix"` Interval time.Duration `yaml:"interval,positive"` + Hooks HookList `yaml:"hooks,optional"` } type SnapshottingManual struct { @@ -361,6 +362,38 @@ type JobDebugSettings struct { RPCLog bool `yaml:"rpc_log,optional,default=false"` } +type HookList []HookEnum + +type HookEnum struct { + Ret interface{} +} + +type HookCommand struct { + Path string `yaml:"path"` + Timeout time.Duration `yaml:"timeout,optional,positive,default=30s"` + Filesystems FilesystemsFilter `yaml:"filesystems,optional,default={'<': true}"` + HookSettingsCommon `yaml:",inline"` +} + +type HookPostgresCheckpoint struct { + HookSettingsCommon `yaml:",inline"` + DSN string `yaml:"dsn"` + Timeout time.Duration `yaml:"timeout,optional,positive,default=30s"` + Filesystems FilesystemsFilter `yaml:"filesystems"` // required, user should not CHECKPOINT for every FS +} + +type HookMySQLLockTables struct { + HookSettingsCommon `yaml:",inline"` + DSN string `yaml:"dsn"` + Timeout time.Duration `yaml:"timeout,optional,positive,default=30s"` + Filesystems FilesystemsFilter `yaml:"filesystems"` +} + +type HookSettingsCommon struct { + Type string `yaml:"type"` + ErrIsFatal bool `yaml:"err_is_fatal,optional,default=false"` +} + func enumUnmarshal(u func(interface{}, bool) error, types map[string]interface{}) (interface{}, error) { var in struct { Type string @@ -501,6 +534,15 @@ func (t *SyslogFacility) UnmarshalYAML(u func(interface{}, bool) error) (err err return nil } +func (t *HookEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) { + t.Ret, err = enumUnmarshal(u, map[string]interface{}{ + "command": &HookCommand{}, + "postgres-checkpoint": &HookPostgresCheckpoint{}, + "mysql-lock-tables": &HookMySQLLockTables{}, + }) + return +} + var ConfigFileDefaultLocations = []string{ "/etc/zrepl/zrepl.yml", "/usr/local/etc/zrepl/zrepl.yml", diff --git a/config/config_snapshotting_test.go b/config/config_snapshotting_test.go index 1bfb7e2..5bccf00 100644 --- a/config/config_snapshotting_test.go +++ b/config/config_snapshotting_test.go @@ -38,6 +38,29 @@ jobs: interval: 10m ` + hooks := ` + snapshotting: + type: periodic + prefix: zrepl_ + interval: 10m + hooks: + - type: command + path: /tmp/path/to/command + - type: command + path: /tmp/path/to/command + filesystems: { "zroot<": true, "<": false } + - type: postgres-checkpoint + dsn: "host=localhost port=5432 user=postgres sslmode=disable" + filesystems: { + "tank/postgres/data11": true + } + - type: mysql-lock-tables + dsn: "root@tcp(localhost)/" + filesystems: { + "tank/mysql": true + } +` + fillSnapshotting := func(s string) string { return fmt.Sprintf(tmpl, s) } var c *Config @@ -55,4 +78,13 @@ jobs: assert.Equal(t, "zrepl_", snp.Prefix) }) + t.Run("hooks", func(t *testing.T) { + c = testValidConfig(t, fillSnapshotting(hooks)) + hs := c.Jobs[0].Ret.(*PushJob).Snapshotting.Ret.(*SnapshottingPeriodic).Hooks + assert.Equal(t, hs[0].Ret.(*HookCommand).Filesystems["<"], true) + assert.Equal(t, hs[1].Ret.(*HookCommand).Filesystems["zroot<"], true) + assert.Equal(t, hs[2].Ret.(*HookPostgresCheckpoint).Filesystems["tank/postgres/data11"], true) + assert.Equal(t, hs[3].Ret.(*HookMySQLLockTables).Filesystems["tank/mysql"], true) + }) + } diff --git a/config/samples/hooks/template.sh b/config/samples/hooks/template.sh new file mode 100644 index 0000000..53a21ef --- /dev/null +++ b/config/samples/hooks/template.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +set -e + +[ "$ZREPL_DRYRUN" = "true" ] && DRYRUN="echo DRYRUN: " + +pre_snapshot() { + $DRYRUN date +} + +post_snapshot() { + $DRYRUN date +} + +case "$ZREPL_HOOKTYPE" in + pre_snapshot|post_snapshot) + "$ZREPL_HOOKTYPE" + ;; + *) + printf 'Unrecognized hook type: %s\n' "$ZREPL_HOOKTYPE" + exit 255 + ;; +esac diff --git a/daemon/hooks/edge_string.go b/daemon/hooks/edge_string.go new file mode 100644 index 0000000..f8ea545 --- /dev/null +++ b/daemon/hooks/edge_string.go @@ -0,0 +1,35 @@ +// Code generated by "stringer -type=Edge"; DO NOT EDIT. + +package hooks + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Pre-1] + _ = x[Callback-2] + _ = x[Post-4] +} + +const ( + _Edge_name_0 = "PreCallback" + _Edge_name_1 = "Post" +) + +var ( + _Edge_index_0 = [...]uint8{0, 3, 11} +) + +func (i Edge) String() string { + switch { + case 1 <= i && i <= 2: + i -= 1 + return _Edge_name_0[_Edge_index_0[i]:_Edge_index_0[i+1]] + case i == 4: + return _Edge_name_1 + default: + return "Edge(" + strconv.FormatInt(int64(i), 10) + ")" + } +} diff --git a/daemon/hooks/hook_config.go b/daemon/hooks/hook_config.go new file mode 100644 index 0000000..6ff193b --- /dev/null +++ b/daemon/hooks/hook_config.go @@ -0,0 +1,52 @@ +package hooks + +import ( + "fmt" + + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/zfs" +) + +type List []Hook + +func HookFromConfig(in config.HookEnum) (Hook, error) { + switch v := in.Ret.(type) { + case *config.HookCommand: + return NewCommandHook(v) + case *config.HookPostgresCheckpoint: + return PgChkptHookFromConfig(v) + case *config.HookMySQLLockTables: + return MyLockTablesFromConfig(v) + default: + return nil, fmt.Errorf("unknown hook type %T", v) + } +} + +func ListFromConfig(in *config.HookList) (r *List, err error) { + hl := make(List, len(*in)) + + for i, h := range *in { + hl[i], err = HookFromConfig(h) + if err != nil { + return nil, fmt.Errorf("create hook #%d: %s", i+1, err) + } + } + + return &hl, nil +} + +func (l List) CopyFilteredForFilesystem(fs *zfs.DatasetPath) (ret List, err error) { + ret = make(List, 0, len(l)) + + for _, h := range l { + var passFilesystem bool + if passFilesystem, err = h.Filesystems().Filter(fs); err != nil { + return nil, err + } + if passFilesystem { + ret = append(ret, h) + } + } + + return ret, nil +} diff --git a/daemon/hooks/hook_docs.go b/daemon/hooks/hook_docs.go new file mode 100644 index 0000000..ec1a37b --- /dev/null +++ b/daemon/hooks/hook_docs.go @@ -0,0 +1,34 @@ +// Package hooks implements pre- and post snapshot hooks. +// +// Plan is a generic executor for ExpectStepReports before and after an activity specified in a callback. +// It provides a reporting facility that can be polled while the plan is executing to gather progress information. +// +// This package also provides all supported hook type implementations and abstractions around them. +// +// Use For Other Kinds Of ExpectStepReports +// +// This package REQUIRES REFACTORING before it can be used for other activities than snapshots, e.g. pre- and post-replication: +// +// The Hook interface requires a hook to provide a Filesystems() filter, which doesn't make sense for +// all kinds of activities. +// +// The hook implementations should move out of this package. +// However, there is a lot of tight coupling which to untangle isn't worth it ATM. +// +// How This Package Is Used By Package Snapper +// +// Deserialize a config.List using ListFromConfig(). +// Then it MUST filter the list to only contain hooks for a particular filesystem using +// hooksList.CopyFilteredForFilesystem(fs). +// +// Then create a CallbackHook using NewCallbackHookForFilesystem(). +// +// Pass all of the above to NewPlan() which provides a Report() and Run() method: +// +// Plan.Run(ctx context.Context,dryRun bool) executes the plan and take a context as argument that should contain a logger added using hooks.WithLogger()). +// The value of dryRun is passed through to the hooks' Run() method. +// Command hooks make it available in the environment variable ZREPL_DRYRUN. +// +// Plan.Report() can be called while Plan.Run() is executing to give an overview of plan execution progress (future use in "zrepl status"). +// +package hooks diff --git a/daemon/hooks/hook_exec.go b/daemon/hooks/hook_exec.go new file mode 100644 index 0000000..b579f24 --- /dev/null +++ b/daemon/hooks/hook_exec.go @@ -0,0 +1,292 @@ +package hooks + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/zrepl/zrepl/zfs" +) + +// Re-export type here so that +// every file in package hooks doesn't +// have to import github.com/zrepl/zrepl/zfs +type Filter zfs.DatasetFilter + +type Hook interface { + Filesystems() Filter + + // If true and the Pre edge invocation of Run fails, Post edge will not run and other Pre edges will not run. + ErrIsFatal() bool + + // Run is invoked by HookPlan for a Pre edge. + // If HookReport.HadError() == false, the Post edge will be invoked, too. + Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport + + String() string +} + +type Phase string + +const ( + PhaseSnapshot = Phase("snapshot") + PhaseTesting = Phase("testing") +) + +func (p Phase) String() string { + return string(p) +} + +//go:generate stringer -type=Edge +type Edge uint + +const ( + Pre = Edge(1 << iota) + Callback + Post +) + +func (e Edge) StringForPhase(phase Phase) string { + return fmt.Sprintf("%s_%s", e.String(), phase.String()) +} + +//go:generate enumer -type=StepStatus -trimprefix=Step +type StepStatus int + +const ( + StepPending StepStatus = 1 << iota + StepExec + StepOk + StepErr + StepSkippedDueToFatalErr + StepSkippedDueToPreErr +) + +type HookReport interface { + String() string + HadError() bool + Error() string +} + +type Step struct { + Hook Hook + Edge Edge + Status StepStatus + Begin, End time.Time + // Report may be nil + // FIXME cannot serialize this for client status, but contains interesting info (like what error happened) + Report HookReport + state map[interface{}]interface{} +} + +func (s Step) String() (out string) { + fatal := "~" + if s.Hook.ErrIsFatal() && s.Edge == Pre { + fatal = "!" + } + runTime := "..." + if s.Status != StepPending { + t := s.End.Sub(s.Begin) + runTime = t.Round(time.Millisecond).String() + } + return fmt.Sprintf("[%s] [%5s] %s [%s] %s", s.Status, runTime, fatal, s.Edge, s.Hook) +} + +type Plan struct { + mtx sync.RWMutex + + steps []*Step + pre []*Step // protected by mtx + cb *Step + post []*Step // not reversed, i.e. entry at index i corresponds to pre-edge in pre[i] + + phase Phase + env Env +} + +func NewPlan(hooks *List, phase Phase, cb *CallbackHook, extra Env) (*Plan, error) { + + var pre, post []*Step + // TODO sanity check unique name of hook? + for _, hook := range *hooks { + state := make(map[interface{}]interface{}) + preE := &Step{ + Hook: hook, + Edge: Pre, + Status: StepPending, + state: state, + } + pre = append(pre, preE) + postE := &Step{ + Hook: hook, + Edge: Post, + Status: StepPending, + state: state, + } + post = append(post, postE) + } + + cbE := &Step{ + Hook: cb, + Edge: Callback, + Status: StepPending, + } + + steps := make([]*Step, 0, len(pre)+len(post)+1) + steps = append(steps, pre...) + steps = append(steps, cbE) + for i := len(post) - 1; i >= 0; i-- { + steps = append(steps, post[i]) + } + + plan := &Plan{ + phase: phase, + env: extra, + steps: steps, + pre: pre, + post: post, + cb: cbE, + } + + return plan, nil +} + +type PlanReport []Step + +func (p *Plan) Report() PlanReport { + p.mtx.RLock() + defer p.mtx.RUnlock() + rep := make([]Step, len(p.steps)) + for i := range rep { + rep[i] = *p.steps[i] + } + return rep +} + +func (r PlanReport) HadError() bool { + for _, e := range r { + if e.Status == StepErr { + return true + } + } + return false +} + +func (r PlanReport) HadFatalError() bool { + for _, e := range r { + if e.Status == StepSkippedDueToFatalErr { + return true + } + } + return false +} + +func (r PlanReport) String() string { + stepStrings := make([]string, len(r)) + for i, e := range r { + stepStrings[i] = fmt.Sprintf("%02d %s", i+1, e) + } + return strings.Join(stepStrings, "\n") +} + +func (p *Plan) Run(ctx context.Context, dryRun bool) { + p.mtx.RLock() + defer p.mtx.RUnlock() + w := func(f func()) { + p.mtx.RUnlock() + defer p.mtx.RLock() + p.mtx.Lock() + defer p.mtx.Unlock() + f() + } + runHook := func(s *Step, ctx context.Context, edge Edge) HookReport { + w(func() { s.Status = StepExec }) + begin := time.Now() + r := s.Hook.Run(ctx, edge, p.phase, dryRun, p.env, s.state) + end := time.Now() + w(func() { + s.Report = r + s.Status = StepOk + if r.HadError() { + s.Status = StepErr + } + s.Begin, s.End = begin, end + }) + return r + } + + l := getLogger(ctx) + + // it's a stack, execute until we reach the end of the list (last item in) + // or fail inbetween + l.Info("run pre-edges in configuration order") + next := 0 + for ; next < len(p.pre); next++ { + e := p.pre[next] + l := l.WithField("hook", e.Hook) + r := runHook(e, ctx, Pre) + if r.HadError() { + l.WithError(r).Error("hook invocation failed for pre-edge") + if e.Hook.ErrIsFatal() { + l.Error("the hook run was aborted due to a fatal error in this hook") + break + } + } + } + + hadFatalErr := next != len(p.pre) + if hadFatalErr { + l.Error("fatal error in a pre-snapshot hook invocation") + l.Error("no snapshot will be taken") + l.Error("only running post-edges for successful pre-edges") + w(func() { + p.post[next].Status = StepSkippedDueToFatalErr + for i := next + 1; i < len(p.pre); i++ { + p.pre[i].Status = StepSkippedDueToFatalErr + p.post[i].Status = StepSkippedDueToFatalErr + } + p.cb.Status = StepSkippedDueToFatalErr + }) + return + } + + l.Info("running callback") + cbR := runHook(p.cb, ctx, Callback) + if cbR.HadError() { + l.WithError(cbR).Error("callback failed") + } + + l.Info("run post-edges for successful pre-edges in reverse configuration order") + + // the constructor produces pre and post entries + // post is NOT reversed + next-- // now at index of last executed pre-edge + for ; next >= 0; next-- { + e := p.post[next] + l := l.WithField("hook", e.Hook) + + if p.pre[next].Status != StepOk { + if p.pre[next].Status != StepErr { + panic(fmt.Sprintf("expecting a pre-edge hook report to be either Ok or Err, got %s", p.pre[next].Status)) + } + l.Info("skip post-edge because pre-edge failed") + w(func() { + e.Status = StepSkippedDueToPreErr + }) + continue + } + + report := runHook(e, ctx, Post) + + if report.HadError() { + l.WithError(report).Error("hook invocation failed for post-edge") + l.Error("subsequent post-edges run regardless of this post-edge failure") + } + + // ErrIsFatal is only relevant for Pre + + } + +} diff --git a/daemon/hooks/hook_logging.go b/daemon/hooks/hook_logging.go new file mode 100644 index 0000000..2f0b03c --- /dev/null +++ b/daemon/hooks/hook_logging.go @@ -0,0 +1,103 @@ +package hooks + +import ( + "bufio" + "bytes" + "context" + "sync" + + "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/util/envconst" +) + +type contextKey int + +const ( + contextKeyLog contextKey = 0 +) + +type Logger = logger.Logger + +func WithLogger(ctx context.Context, log Logger) context.Context { + return context.WithValue(ctx, contextKeyLog, log) +} + +func GetLogger(ctx context.Context) Logger { return getLogger(ctx) } + +func getLogger(ctx context.Context) Logger { + if log, ok := ctx.Value(contextKeyLog).(Logger); ok { + return log + } + return logger.NewNullLogger() +} + +const MAX_HOOK_LOG_SIZE_DEFAULT int = 1 << 20 + +type logWriter struct { + /* + Mutex prevents: + concurrent writes to buf, scanner in Write([]byte) + data race on scanner vs Write([]byte) + and concurrent write to buf (call to buf.Reset()) + in Close() + + (Also, Close() should generally block until any Write() call completes.) + */ + mtx *sync.Mutex + buf bytes.Buffer + scanner *bufio.Scanner + logger Logger + level logger.Level + field string +} + +func NewLogWriter(mtx *sync.Mutex, logger Logger, level logger.Level, field string) *logWriter { + w := new(logWriter) + w.mtx = mtx + w.scanner = bufio.NewScanner(&w.buf) + w.logger = logger + w.level = level + w.field = field + return w +} + +func (w *logWriter) log(line string) { + w.logger.WithField(w.field, line).Log(w.level, "hook output") +} + +func (w *logWriter) logUnreadBytes() error { + for w.scanner.Scan() { + w.log(w.scanner.Text()) + } + if w.buf.Cap() > envconst.Int("ZREPL_MAX_HOOK_LOG_SIZE", MAX_HOOK_LOG_SIZE_DEFAULT) { + w.buf.Reset() + } + + return nil +} + +func (w *logWriter) Write(in []byte) (int, error) { + w.mtx.Lock() + defer w.mtx.Unlock() + + n, bufErr := w.buf.Write(in) + if bufErr != nil { + return n, bufErr + } + + err := w.logUnreadBytes() + if err != nil { + return n, err + } + // Always reset the scanner for the next Write + w.scanner = bufio.NewScanner(&w.buf) + + return n, nil +} + +func (w *logWriter) Close() (err error) { + w.mtx.Lock() + defer w.mtx.Unlock() + + return w.logUnreadBytes() +} diff --git a/daemon/hooks/hook_type_callback.go b/daemon/hooks/hook_type_callback.go new file mode 100644 index 0000000..63fa5d2 --- /dev/null +++ b/daemon/hooks/hook_type_callback.go @@ -0,0 +1,65 @@ +package hooks + +import ( + "context" + "fmt" + + "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/zfs" +) + +type HookJobCallback func(ctx context.Context) error + +type CallbackHook struct { + cb HookJobCallback + filter Filter + displayString string +} + +func NewCallbackHookForFilesystem(displayString string, fs *zfs.DatasetPath, cb HookJobCallback) *CallbackHook { + filter, _ := filters.DatasetMapFilterFromConfig(map[string]bool{fs.ToString(): true}) + return NewCallbackHook(displayString, cb, filter) +} + +func NewCallbackHook(displayString string, cb HookJobCallback, filter Filter) *CallbackHook { + return &CallbackHook{ + cb: cb, + filter: filter, + displayString: displayString, + } +} + +func (h *CallbackHook) Filesystems() Filter { + return h.filter +} + +func (h *CallbackHook) ErrIsFatal() bool { + return false // callback is by definition +} + +func (h *CallbackHook) String() string { + return h.displayString +} + +type CallbackHookReport struct { + Name string + Err error +} + +func (r *CallbackHookReport) String() string { + if r.HadError() { + return r.Error() + } + return r.Name +} + +func (r *CallbackHookReport) HadError() bool { return r.Err != nil } + +func (r *CallbackHookReport) Error() string { + return fmt.Sprintf("%s error: %s", r.Name, r.Err) +} + +func (h *CallbackHook) Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport { + err := h.cb(ctx) + return &CallbackHookReport{h.displayString, err} +} diff --git a/daemon/hooks/hook_type_command.go b/daemon/hooks/hook_type_command.go new file mode 100644 index 0000000..d238dd9 --- /dev/null +++ b/daemon/hooks/hook_type_command.go @@ -0,0 +1,192 @@ +package hooks + +import ( + "context" + "fmt" + "io" + "math" + "os" + "os/exec" + "sort" + "strings" + "sync" + "time" + + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/util/circlog" + "github.com/zrepl/zrepl/util/envconst" +) + +type HookEnvVar string + +const ( + EnvType HookEnvVar = "ZREPL_HOOKTYPE" + EnvDryRun HookEnvVar = "ZREPL_DRYRUN" + EnvFS HookEnvVar = "ZREPL_FS" + EnvSnapshot HookEnvVar = "ZREPL_SNAPNAME" + EnvTimeout HookEnvVar = "ZREPL_TIMEOUT" +) + +type Env map[HookEnvVar]string + +func NewHookEnv(edge Edge, phase Phase, dryRun bool, timeout time.Duration, extra Env) Env { + r := Env{ + EnvTimeout: fmt.Sprintf("%.f", math.Floor(timeout.Seconds())), + } + + edgeString := edge.StringForPhase(phase) + r[EnvType] = strings.ToLower(edgeString) + + var dryRunString string + if dryRun { + dryRunString = "true" + } else { + dryRunString = "" + } + r[EnvDryRun] = dryRunString + + for k, v := range extra { + r[k] = v + } + + return r +} + +type CommandHook struct { + edge Edge + filter Filter + errIsFatal bool + command string + timeout time.Duration +} + +type CommandHookReport struct { + Command string + Args []string // currently always empty + Env Env + Err error + CapturedStdoutStderrCombined []byte +} + +func (r *CommandHookReport) String() string { + // Reproduces a POSIX shell-compatible command line + var cmdLine strings.Builder + sep := "" + + // Make sure environment variables are always + // printed in the same order + var hookEnvKeys []HookEnvVar + for k := range r.Env { + hookEnvKeys = append(hookEnvKeys, k) + } + sort.Slice(hookEnvKeys, func(i, j int) bool { return string(hookEnvKeys[i]) < string(hookEnvKeys[j]) }) + + for _, k := range hookEnvKeys { + cmdLine.WriteString(fmt.Sprintf("%s%s='%s'", sep, k, r.Env[k])) + sep = " " + } + + cmdLine.WriteString(fmt.Sprintf("%s%s", sep, r.Command)) + for _, a := range r.Args { + cmdLine.WriteString(fmt.Sprintf("%s'%s'", sep, a)) + } + + return fmt.Sprintf("command hook invocation: \"%s\"", cmdLine.String()) // no %q to make copy-pastable +} +func (r *CommandHookReport) Error() string { + if r.Err == nil { + return "" + } + return fmt.Sprintf("%s FAILED with error: %s", r.String(), r.Err) +} + +func (r *CommandHookReport) HadError() bool { + return r.Err != nil +} + +func NewCommandHook(in *config.HookCommand) (r *CommandHook, err error) { + r = &CommandHook{ + errIsFatal: in.ErrIsFatal, + command: in.Path, + timeout: in.Timeout, + } + + r.filter, err = filters.DatasetMapFilterFromConfig(in.Filesystems) + if err != nil { + return nil, fmt.Errorf("cannot parse filesystem filter: %s", err) + } + + r.edge = Pre | Post + + return r, nil +} + +func (h *CommandHook) Filesystems() Filter { + return h.filter +} + +func (h *CommandHook) ErrIsFatal() bool { + return h.errIsFatal +} + +func (h *CommandHook) String() string { + return h.command +} + +func (h *CommandHook) Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport { + l := getLogger(ctx).WithField("command", h.command) + + cmdCtx, cancel := context.WithTimeout(ctx, h.timeout) + defer cancel() + + cmdExec := exec.CommandContext(cmdCtx, h.command) + + hookEnv := NewHookEnv(edge, phase, dryRun, h.timeout, extra) + cmdEnv := os.Environ() + for k, v := range hookEnv { + cmdEnv = append(cmdEnv, fmt.Sprintf("%s=%s", k, v)) + } + cmdExec.Env = cmdEnv + + var scanMutex sync.Mutex + combinedOutput, err := circlog.NewCircularLog(envconst.Int("ZREPL_MAX_HOOK_LOG_SIZE", MAX_HOOK_LOG_SIZE_DEFAULT)) + if err != nil { + return &CommandHookReport{Err: err} + } + logErrWriter := NewLogWriter(&scanMutex, l, logger.Warn, "stderr") + logOutWriter := NewLogWriter(&scanMutex, l, logger.Info, "stdout") + defer logErrWriter.Close() + defer logOutWriter.Close() + + cmdExec.Stderr = io.MultiWriter(logErrWriter, combinedOutput) + cmdExec.Stdout = io.MultiWriter(logOutWriter, combinedOutput) + + report := &CommandHookReport{ + Command: h.command, + Env: hookEnv, + // no report.Args + } + + err = cmdExec.Start() + if err != nil { + report.Err = err + return report + } + + err = cmdExec.Wait() + combinedOutputBytes := combinedOutput.Bytes() + report.CapturedStdoutStderrCombined = make([]byte, len(combinedOutputBytes)) + copy(report.CapturedStdoutStderrCombined, combinedOutputBytes) + if err != nil { + if cmdCtx.Err() == context.DeadlineExceeded { + report.Err = fmt.Errorf("timed out after %s: %s", h.timeout, err) + return report + } + report.Err = err + return report + } + + return report +} diff --git a/daemon/hooks/hook_type_mysql_lock_tables.go b/daemon/hooks/hook_type_mysql_lock_tables.go new file mode 100644 index 0000000..a3a72e0 --- /dev/null +++ b/daemon/hooks/hook_type_mysql_lock_tables.go @@ -0,0 +1,145 @@ +package hooks + +import ( + "context" + "database/sql" + "fmt" + "strings" + + sqldriver "database/sql/driver" + + "github.com/go-sql-driver/mysql" + + "github.com/pkg/errors" + + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/zfs" +) + +// https://dev.mysql.com/doc/mysql-backup-excerpt/5.7/en/backup-methods.html +// +// Making Backups Using a File System Snapshot: +// +// If you are using a Veritas file system, you can make a backup like this: +// +// From a client program, execute FLUSH TABLES WITH READ LOCK. +// From another shell, execute mount vxfs snapshot. +// From the first client, execute UNLOCK TABLES. +// Copy files from the snapshot. +// Unmount the snapshot. +// +// Similar snapshot capabilities may be available in other file systems, such as LVM or ZFS. +type MySQLLockTables struct { + errIsFatal bool + connector sqldriver.Connector + filesystems Filter +} + +type myLockTablesStateKey int + +const ( + myLockTablesConnection myLockTablesStateKey = 1 + iota +) + +func MyLockTablesFromConfig(in *config.HookMySQLLockTables) (*MySQLLockTables, error) { + conf, err := mysql.ParseDSN(in.DSN) + if err != nil { + return nil, errors.Wrap(err, "`dsn` invalid") + } + cn, err := mysql.NewConnector(conf) + if err != nil { + return nil, errors.Wrap(err, "`connect` invalid") + } + + filesystems, err := filters.DatasetMapFilterFromConfig(in.Filesystems) + if err != nil { + return nil, errors.Wrap(err, "`filesystems` invalid") + } + + return &MySQLLockTables{ + in.ErrIsFatal, + cn, + filesystems, + }, nil +} + +func (h *MySQLLockTables) ErrIsFatal() bool { return h.errIsFatal } +func (h *MySQLLockTables) Filesystems() Filter { return h.filesystems } +func (h *MySQLLockTables) String() string { return "MySQL FLUSH TABLES WITH READ LOCK" } + +type MyLockTablesReport struct { + What string + Err error +} + +func (r *MyLockTablesReport) HadError() bool { return r.Err != nil } +func (r *MyLockTablesReport) Error() string { return r.String() } +func (r *MyLockTablesReport) String() string { + var s strings.Builder + s.WriteString(r.What) + if r.Err != nil { + fmt.Fprintf(&s, ": %s", r.Err) + } + return s.String() +} + +func (h *MySQLLockTables) Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport { + fs, ok := extra[EnvFS] + if !ok { + panic(extra) + } + dp, err := zfs.NewDatasetPath(fs) + if err != nil { + panic(err) + } + if pass, err := h.filesystems.Filter(dp); err != nil { + return &MyLockTablesReport{What: "filesystem filter", Err: err} + } else if !pass { + getLogger(ctx).Debug("filesystem does not match filter, skipping") + return &MyLockTablesReport{What: "filesystem filter skipped this filesystem", Err: nil} + } + + switch edge { + case Pre: + err := h.doRunPre(ctx, dp, dryRun, state) + return &MyLockTablesReport{"FLUSH TABLES WITH READ LOCK", err} + case Post: + err := h.doRunPost(ctx, dp, dryRun, state) + return &MyLockTablesReport{"UNLOCK TABLES", err} + } + return &MyLockTablesReport{What: "skipped this edge", Err: nil} +} + +func (h *MySQLLockTables) doRunPre(ctx context.Context, fs *zfs.DatasetPath, dry bool, state map[interface{}]interface{}) (err error) { + db := sql.OpenDB(h.connector) + defer func(err *error) { + if *err != nil { + db.Close() + } + }(&err) + + getLogger(ctx).Debug("do FLUSH TABLES WITH READ LOCK") + _, err = db.ExecContext(ctx, "FLUSH TABLES WITH READ LOCK") + if err != nil { + return + } + + state[myLockTablesConnection] = db + + return nil +} + +func (h *MySQLLockTables) doRunPost(ctx context.Context, fs *zfs.DatasetPath, dry bool, state map[interface{}]interface{}) error { + + db := state[myLockTablesConnection].(*sql.DB) + defer db.Close() + + getLogger(ctx).Debug("do UNLOCK TABLES") + _, err := db.ExecContext(ctx, "UNLOCK TABLES") + if err != nil { + return err + } + + return nil +} diff --git a/daemon/hooks/hook_type_postgres_checkpoint.go b/daemon/hooks/hook_type_postgres_checkpoint.go new file mode 100644 index 0000000..60af3c3 --- /dev/null +++ b/daemon/hooks/hook_type_postgres_checkpoint.go @@ -0,0 +1,98 @@ +package hooks + +import ( + "context" + "database/sql" + "fmt" + "math" + "time" + + "github.com/lib/pq" + "github.com/pkg/errors" + + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/zfs" +) + +type PgChkptHook struct { + errIsFatal bool + connector *pq.Connector + filesystems Filter +} + +func PgChkptHookFromConfig(in *config.HookPostgresCheckpoint) (*PgChkptHook, error) { + filesystems, err := filters.DatasetMapFilterFromConfig(in.Filesystems) + if err != nil { + return nil, errors.Wrap(err, "`filesystems` invalid") + } + cn, err := pq.NewConnector(in.DSN) + if err != nil { + return nil, errors.Wrap(err, "`dsn` invalid") + } + + return &PgChkptHook{ + in.ErrIsFatal, + cn, + filesystems, + }, nil +} + +func (h *PgChkptHook) ErrIsFatal() bool { return h.errIsFatal } +func (h *PgChkptHook) Filesystems() Filter { return h.filesystems } +func (h *PgChkptHook) String() string { return "postgres checkpoint" } + +type PgChkptHookReport struct{ Err error } + +func (r *PgChkptHookReport) HadError() bool { return r.Err != nil } +func (r *PgChkptHookReport) Error() string { return r.Err.Error() } +func (r *PgChkptHookReport) String() string { + if r.Err != nil { + return fmt.Sprintf("postgres CHECKPOINT failed: %s", r.Err) + } else { + return "postgres CHECKPOINT completed" + } +} + +func (h *PgChkptHook) Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport { + if edge != Pre { + return &PgChkptHookReport{nil} + } + fs, ok := extra[EnvFS] + if !ok { + panic(extra) + } + dp, err := zfs.NewDatasetPath(fs) + if err != nil { + panic(err) + } + err = h.doRunPre(ctx, dp, dryRun) + return &PgChkptHookReport{err} +} + +func (h *PgChkptHook) doRunPre(ctx context.Context, fs *zfs.DatasetPath, dry bool) error { + + if pass, err := h.filesystems.Filter(fs); err != nil || !pass { + getLogger(ctx).Debug("filesystem does not match filter, skipping") + return err + } + + db := sql.OpenDB(h.connector) + defer db.Close() + dl, ok := ctx.Deadline() + if ok { + timeout := uint64(math.Floor(time.Until(dl).Seconds() * 1000)) // TODO go1.13 milliseconds + getLogger(ctx).WithField("statement_timeout", timeout).Debug("setting statement timeout for CHECKPOINT") + _, err := db.ExecContext(ctx, "SET statement_timeout TO ?", timeout) + if err != nil { + return err + } + } + if dry { + getLogger(ctx).Debug("dry-run - use ping instead of CHECKPOINT") + return db.PingContext(ctx) + } + getLogger(ctx).Info("execute CHECKPOINT command") + _, err := db.ExecContext(ctx, "CHECKPOINT") + return err +} diff --git a/daemon/hooks/hooks_test.go b/daemon/hooks/hooks_test.go new file mode 100644 index 0000000..e6f57b7 --- /dev/null +++ b/daemon/hooks/hooks_test.go @@ -0,0 +1,484 @@ +package hooks_test + +import ( + "bytes" + "context" + "fmt" + "os" + "regexp" + "testing" + "text/template" + + "github.com/stretchr/testify/require" + + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/hooks" + "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/zfs" +) + +type comparisonAssertionFunc func(require.TestingT, interface{}, interface{}, ...interface{}) +type valueAssertionFunc func(require.TestingT, interface{}, ...interface{}) + +type expectStep struct { + ExpectedEdge hooks.Edge + ExpectStatus hooks.StepStatus + OutputTest valueAssertionFunc + ErrorTest valueAssertionFunc +} + +type testCase struct { + Name string + Config []string + IsSlow bool + SuppressOutput bool + + ExpectCallbackSkipped bool + ExpectHadFatalErr bool + ExpectHadError bool + ExpectStepReports []expectStep +} + +func curryLeft(f comparisonAssertionFunc, expected interface{}) valueAssertionFunc { + return curry(f, expected, false) +} + +func curryRight(f comparisonAssertionFunc, expected interface{}) valueAssertionFunc { + return curry(f, expected, true) +} + +func curry(f comparisonAssertionFunc, expected interface{}, right bool) (ret valueAssertionFunc) { + ret = func(t require.TestingT, s interface{}, v ...interface{}) { + var x interface{} + var y interface{} + if right { + x = s + y = expected + } else { + x = expected + y = s + } + + if len(v) > 0 { + f(t, x, y, v) + } else { + f(t, x, y) + } + } + return +} + +func TestHooks(t *testing.T) { + testFSName := "testpool/testdataset" + testSnapshotName := "testsnap" + + tmpl, err := template.New("TestHooks").Parse(` +jobs: +- name: TestHooks + type: snap + filesystems: {"<": true} + snapshotting: + type: periodic + interval: 1m + prefix: zrepl_snapjob_ + hooks: + {{- template "List" . }} + pruning: + keep: + - type: last_n + count: 10 +`) + if err != nil { + panic(err) + } + + regexpTest := func(s string) valueAssertionFunc { + return curryLeft(require.Regexp, regexp.MustCompile(s)) + } + + containsTest := func(s string) valueAssertionFunc { + return curryRight(require.Contains, s) + } + + testTable := []testCase{ + testCase{ + Name: "no_hooks", + ExpectStepReports: []expectStep{ + {ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk}, + }, + }, + testCase{ + Name: "timeout", + IsSlow: true, + ExpectHadError: true, + Config: []string{`{type: command, path: {{.WorkDir}}/test/test-timeout.sh, timeout: 2s}`}, + ExpectStepReports: []expectStep{ + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepErr, + OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s ZREPL_TIMEOUT=2", testFSName, testSnapshotName)), + ErrorTest: regexpTest(`timed out after 2(.\d+)?s`), + }, + expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk}, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepSkippedDueToPreErr, + }, + }, + }, + testCase{ + Name: "check_env", + Config: []string{`{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`}, + ExpectHadError: false, + ExpectStepReports: []expectStep{ + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepOk, + OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s", testFSName, testSnapshotName)), + }, + expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk}, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepOk, + OutputTest: containsTest(fmt.Sprintf("TEST post_testing %s@%s", testFSName, testSnapshotName)), + }, + }, + }, + + testCase{ + Name: "nonfatal_pre_error_continues", + ExpectCallbackSkipped: false, + ExpectHadError: true, + Config: []string{`{type: command, path: {{.WorkDir}}/test/test-error.sh}`}, + ExpectStepReports: []expectStep{ + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepErr, + OutputTest: containsTest(fmt.Sprintf("TEST ERROR pre_testing %s@%s", testFSName, testSnapshotName)), + ErrorTest: regexpTest("^command hook invocation.*exit status 1$"), + }, + expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk}, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepSkippedDueToPreErr, // post-edge is not executed for failing pre-edge + }, + }, + }, + + testCase{ + Name: "pre_error_fatal_skips_subsequent_pre_edges_and_callback_and_its_post_edge_and_post_edges", + ExpectCallbackSkipped: true, + ExpectHadFatalErr: true, + ExpectHadError: true, + Config: []string{ + `{type: command, path: {{.WorkDir}}/test/test-error.sh, err_is_fatal: true}`, + `{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`, + }, + ExpectStepReports: []expectStep{ + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepErr, + OutputTest: containsTest(fmt.Sprintf("TEST ERROR pre_testing %s@%s", testFSName, testSnapshotName)), + ErrorTest: regexpTest("^command hook invocation.*exit status 1$"), + }, + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepSkippedDueToFatalErr, + }, + expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepSkippedDueToFatalErr}, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepSkippedDueToFatalErr, + }, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepSkippedDueToFatalErr, + }, + }, + }, + + testCase{ + Name: "post_error_fails_are_ignored_even_if_fatal", + ExpectHadFatalErr: false, // only occurs during Post, so it's not a fatal error + ExpectHadError: true, + Config: []string{ + `{type: command, path: {{.WorkDir}}/test/test-post-error.sh, err_is_fatal: true}`, + `{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`, + }, + ExpectStepReports: []expectStep{ + expectStep{ + // No-action run of test-post-error.sh + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepOk, + OutputTest: require.Empty, + }, + expectStep{ + // Pre run of test-report-env.sh + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepOk, + OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s", testFSName, testSnapshotName)), + }, + expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk}, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepOk, + OutputTest: containsTest(fmt.Sprintf("TEST post_testing %s@%s", testFSName, testSnapshotName)), + }, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepErr, + OutputTest: containsTest(fmt.Sprintf("TEST ERROR post_testing %s@%s", testFSName, testSnapshotName)), + ErrorTest: regexpTest("^command hook invocation.*exit status 1$"), + }, + }, + }, + + testCase{ + Name: "cleanup_check_env", + Config: []string{`{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`}, + ExpectStepReports: []expectStep{ + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepOk, + OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s", testFSName, testSnapshotName)), + }, + expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk}, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepOk, + OutputTest: containsTest(fmt.Sprintf("TEST post_testing %s@%s", testFSName, testSnapshotName)), + }, + }, + }, + + testCase{ + Name: "pre_error_cancels_post", + Config: []string{`{type: command, path: {{.WorkDir}}/test/test-pre-error-post-ok.sh}`}, + ExpectHadError: true, + ExpectHadFatalErr: false, + ExpectStepReports: []expectStep{ + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepErr, + OutputTest: containsTest(fmt.Sprintf("TEST ERROR pre_testing %s@%s", testFSName, testSnapshotName)), + ErrorTest: regexpTest("^command hook invocation.*exit status 1$"), + }, + expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk}, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepSkippedDueToPreErr, + }, + }, + }, + + testCase{ + Name: "pre_error_does_not_cancel_other_posts_but_itself", + Config: []string{ + `{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`, + `{type: command, path: {{.WorkDir}}/test/test-pre-error-post-ok.sh}`, + }, + ExpectHadError: true, + ExpectHadFatalErr: false, + ExpectStepReports: []expectStep{ + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepOk, + OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s", testFSName, testSnapshotName)), + }, + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepErr, + OutputTest: containsTest(fmt.Sprintf("TEST ERROR pre_testing %s@%s", testFSName, testSnapshotName)), + ErrorTest: regexpTest("^command hook invocation.*exit status 1$"), + }, + expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk}, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepSkippedDueToPreErr, + }, + expectStep{ + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepOk, + OutputTest: containsTest(fmt.Sprintf("TEST post_testing %s@%s", testFSName, testSnapshotName)), + }, + }, + }, + + testCase{ + Name: "exceed_buffer_limit", + SuppressOutput: true, + Config: []string{`{type: command, path: {{.WorkDir}}/test/test-large-stdout.sh}`}, + ExpectHadError: false, + ExpectHadFatalErr: false, + ExpectStepReports: []expectStep{ + expectStep{ + ExpectedEdge: hooks.Pre, + ExpectStatus: hooks.StepOk, + OutputTest: func(t require.TestingT, s interface{}, v ...interface{}) { + require.Len(t, s, 1<<20) + }, + }, + expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk}, + expectStep{ + // No-action run of above hook + ExpectedEdge: hooks.Post, + ExpectStatus: hooks.StepOk, + OutputTest: require.Empty, + }, + }, + }, + + /* + Following not intended to test functionality of + filter package. Only to demonstrate that hook + filters are being applied. The following should + result in NO hooks running. If it does run, a + fatal hooks.RunReport will be returned. + */ + testCase{ + Name: "exclude_all_filesystems", + Config: []string{ + `{type: command, path: {{.WorkDir}}/test/test-error.sh, err_is_fatal: true, filesystems: {"<": false}}`, + }, + ExpectStepReports: []expectStep{ + expectStep{ + ExpectedEdge: hooks.Callback, + ExpectStatus: hooks.StepOk, + }, + }, + }, + } + + parseHookConfig := func(t *testing.T, in string) *config.Config { + t.Helper() + conf, err := config.ParseConfigBytes([]byte(in)) + require.NoError(t, err) + require.NotNil(t, conf) + return conf + } + + fillHooks := func(tt *testCase) string { + // make hook path absolute + cwd, err := os.Getwd() + if err != nil { + panic("os.Getwd() failed") + } + var hooksTmpl string = "\n" + for _, l := range tt.Config { + hooksTmpl += fmt.Sprintf(" - %s\n", l) + } + tmpl.New("List").Parse(hooksTmpl) + + var outBytes bytes.Buffer + data := struct { + WorkDir string + }{ + WorkDir: cwd, + } + if err := tmpl.Execute(&outBytes, data); err != nil { + panic(err) + } + + return outBytes.String() + } + + var c *config.Config + fs, err := zfs.NewDatasetPath(testFSName) + require.NoError(t, err) + + log := logger.NewTestLogger(t) + + var cbReached bool + cb := hooks.NewCallbackHookForFilesystem("testcallback", fs, func(_ context.Context) error { + cbReached = true + return nil + }) + + hookEnvExtra := hooks.Env{ + hooks.EnvFS: fs.ToString(), + hooks.EnvSnapshot: testSnapshotName, + } + + for _, tt := range testTable { + if testing.Short() && tt.IsSlow { + continue + } + + t.Run(tt.Name, func(t *testing.T) { + c = parseHookConfig(t, fillHooks(&tt)) + snp := c.Jobs[0].Ret.(*config.SnapJob).Snapshotting.Ret.(*config.SnapshottingPeriodic) + hookList, err := hooks.ListFromConfig(&snp.Hooks) + require.NoError(t, err) + + filteredHooks, err := hookList.CopyFilteredForFilesystem(fs) + require.NoError(t, err) + plan, err := hooks.NewPlan(&filteredHooks, hooks.PhaseTesting, cb, hookEnvExtra) + require.NoError(t, err) + t.Logf("REPORT PRE EXECUTION:\n%s", plan.Report()) + + cbReached = false + + ctx := context.Background() + if testing.Verbose() && !tt.SuppressOutput { + ctx = hooks.WithLogger(ctx, log) + } + plan.Run(ctx, false) + report := plan.Report() + + t.Logf("REPORT POST EXECUTION:\n%s", report) + + /* + * TEST ASSERTIONS + */ + + t.Logf("len(runReports)=%v", len(report)) + t.Logf("len(tt.ExpectStepReports)=%v", len(tt.ExpectStepReports)) + require.Equal(t, len(tt.ExpectStepReports), len(report), "ExpectStepReports must be same length as expected number of hook runs, excluding possible Callback") + + // Check if callback ran, when required + if tt.ExpectCallbackSkipped { + require.False(t, cbReached, "callback ran but should not have run") + } else { + require.True(t, cbReached, "callback should have run but did not") + } + + // Check if a fatal run error occurred and was expected + require.Equal(t, tt.ExpectHadFatalErr, report.HadFatalError(), "non-matching HadFatalError") + require.Equal(t, tt.ExpectHadError, report.HadError(), "non-matching HadError") + + if tt.ExpectHadFatalErr { + require.True(t, tt.ExpectHadError, "ExpectHadFatalErr implies ExpectHadError") + } + if !tt.ExpectHadError { + require.False(t, tt.ExpectHadFatalErr, "!ExpectHadError implies !ExpectHadFatalErr") + } + + // Iterate through each expected hook run + for i, hook := range tt.ExpectStepReports { + t.Logf("expecting report conforming to %v", hook) + + exp, act := hook.ExpectStatus, report[i].Status + require.Equal(t, exp, act, "%s != %s", exp, act) + + // Check for required ExpectedEdge + require.NotZero(t, hook.ExpectedEdge, "each hook must have an ExpectedEdge") + require.Equal(t, hook.ExpectedEdge, report[i].Edge, + "incorrect edge: expected %q, actual %q", hook.ExpectedEdge.String(), report[i].Edge.String(), + ) + + // Check for expected output + if hook.OutputTest != nil { + require.IsType(t, (*hooks.CommandHookReport)(nil), report[i].Report) + chr := report[i].Report.(*hooks.CommandHookReport) + hook.OutputTest(t, string(chr.CapturedStdoutStderrCombined)) + } + + // Check for expected errors + if hook.ErrorTest != nil { + hook.ErrorTest(t, string(report[i].Report.Error())) + } + } + + }) + } +} diff --git a/daemon/hooks/stepstatus_enumer.go b/daemon/hooks/stepstatus_enumer.go new file mode 100644 index 0000000..f6d6b57 --- /dev/null +++ b/daemon/hooks/stepstatus_enumer.go @@ -0,0 +1,77 @@ +// Code generated by "enumer -type=StepStatus -trimprefix=Step"; DO NOT EDIT. + +// +package hooks + +import ( + "fmt" +) + +const ( + _StepStatusName_0 = "PendingExec" + _StepStatusName_1 = "Ok" + _StepStatusName_2 = "Err" + _StepStatusName_3 = "SkippedDueToFatalErr" + _StepStatusName_4 = "SkippedDueToPreErr" +) + +var ( + _StepStatusIndex_0 = [...]uint8{0, 7, 11} + _StepStatusIndex_1 = [...]uint8{0, 2} + _StepStatusIndex_2 = [...]uint8{0, 3} + _StepStatusIndex_3 = [...]uint8{0, 20} + _StepStatusIndex_4 = [...]uint8{0, 18} +) + +func (i StepStatus) String() string { + switch { + case 1 <= i && i <= 2: + i -= 1 + return _StepStatusName_0[_StepStatusIndex_0[i]:_StepStatusIndex_0[i+1]] + case i == 4: + return _StepStatusName_1 + case i == 8: + return _StepStatusName_2 + case i == 16: + return _StepStatusName_3 + case i == 32: + return _StepStatusName_4 + default: + return fmt.Sprintf("StepStatus(%d)", i) + } +} + +var _StepStatusValues = []StepStatus{1, 2, 4, 8, 16, 32} + +var _StepStatusNameToValueMap = map[string]StepStatus{ + _StepStatusName_0[0:7]: 1, + _StepStatusName_0[7:11]: 2, + _StepStatusName_1[0:2]: 4, + _StepStatusName_2[0:3]: 8, + _StepStatusName_3[0:20]: 16, + _StepStatusName_4[0:18]: 32, +} + +// StepStatusString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func StepStatusString(s string) (StepStatus, error) { + if val, ok := _StepStatusNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to StepStatus values", s) +} + +// StepStatusValues returns all values of the enum +func StepStatusValues() []StepStatus { + return _StepStatusValues +} + +// IsAStepStatus returns "true" if the value is listed in the enum definition. "false" otherwise +func (i StepStatus) IsAStepStatus() bool { + for _, v := range _StepStatusValues { + if i == v { + return true + } + } + return false +} diff --git a/daemon/hooks/test/test-error.sh b/daemon/hooks/test/test-error.sh new file mode 100755 index 0000000..e8a212a --- /dev/null +++ b/daemon/hooks/test/test-error.sh @@ -0,0 +1,5 @@ +#!/bin/sh -eu + +>&2 echo "TEST ERROR $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME" + +exit 1 diff --git a/daemon/hooks/test/test-large-stdout.sh b/daemon/hooks/test/test-large-stdout.sh new file mode 100755 index 0000000..fa95b39 --- /dev/null +++ b/daemon/hooks/test/test-large-stdout.sh @@ -0,0 +1,18 @@ +#!/bin/sh -eu + +# Exceed default hook log size of 1<<20 bytes by double +# The Go test should fail if buffer size exceeds 1MB + +pre_testing() { + ln=0 + while :; do printf '%06d: 012345678901234567890123456789012345678901234567890123456789012345678901\n' "$ln"; ln="$(($ln + 1))"; done \ + | head -c$(( 2 << 20 )) +} + +case "$ZREPL_HOOKTYPE" in + pre_testing) + "$ZREPL_HOOKTYPE";; + *) + # Not handled by this script + exit 0;; +esac diff --git a/daemon/hooks/test/test-post-error.sh b/daemon/hooks/test/test-post-error.sh new file mode 100755 index 0000000..c0ffbbf --- /dev/null +++ b/daemon/hooks/test/test-post-error.sh @@ -0,0 +1,15 @@ +#!/bin/sh -eu + +post_testing() { + >&2 echo "TEST ERROR $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME" + + exit 1 +} + +case "$ZREPL_HOOKTYPE" in + post_testing) + "$ZREPL_HOOKTYPE";; + *) + # Not handled by this script + exit 0;; +esac diff --git a/daemon/hooks/test/test-pre-error-post-ok.sh b/daemon/hooks/test/test-pre-error-post-ok.sh new file mode 100755 index 0000000..7a505ac --- /dev/null +++ b/daemon/hooks/test/test-pre-error-post-ok.sh @@ -0,0 +1,11 @@ +#!/bin/sh -eu + +if [ "$ZREPL_HOOKTYPE" = "pre_testing" ]; then + >&2 echo "TEST ERROR $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME" + exit 1 +elif [ "$ZREPL_HOOKTYPE" = "post_testing" ]; then + echo "TEST $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME" +else + printf "Unknown hook type: %s" "$ZREPL_HOOKTYPE" + exit 255 +fi diff --git a/daemon/hooks/test/test-report-env.sh b/daemon/hooks/test/test-report-env.sh new file mode 100755 index 0000000..33f1d30 --- /dev/null +++ b/daemon/hooks/test/test-report-env.sh @@ -0,0 +1,3 @@ +#!/bin/sh -eu + +echo "TEST $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME ${ZREPL_DRYRUN:+(dry run)}" diff --git a/daemon/hooks/test/test-timeout.sh b/daemon/hooks/test/test-timeout.sh new file mode 100755 index 0000000..de51a29 --- /dev/null +++ b/daemon/hooks/test/test-timeout.sh @@ -0,0 +1,5 @@ +#!/bin/sh -eu + +echo "TEST $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME ZREPL_TIMEOUT=$ZREPL_TIMEOUT" + +exec sleep $(($ZREPL_TIMEOUT + 1)) diff --git a/daemon/logging/build_logging.go b/daemon/logging/build_logging.go index 13e8da4..5021bed 100644 --- a/daemon/logging/build_logging.go +++ b/daemon/logging/build_logging.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/hooks" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/daemon/snapper" "github.com/zrepl/zrepl/endpoint" @@ -72,6 +73,7 @@ const ( SubsyEndpoint Subsystem = "endpoint" SubsysPruning Subsystem = "pruning" SubsysSnapshot Subsystem = "snapshot" + SubsysHooks Subsystem = "hook" SubsysTransport Subsystem = "transport" SubsysTransportMux Subsystem = "transportmux" SubsysRPC Subsystem = "rpc" @@ -85,6 +87,7 @@ func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Contex ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, SubsyEndpoint)) ctx = pruner.WithLogger(ctx, log.WithField(SubsysField, SubsysPruning)) ctx = snapper.WithLogger(ctx, log.WithField(SubsysField, SubsysSnapshot)) + ctx = hooks.WithLogger(ctx, log.WithField(SubsysField, SubsysHooks)) ctx = transport.WithLogger(ctx, log.WithField(SubsysField, SubsysTransport)) ctx = transportmux.WithLogger(ctx, log.WithField(SubsysField, SubsysTransportMux)) ctx = rpc.WithLoggers(ctx, diff --git a/daemon/snapper/snapper.go b/daemon/snapper/snapper.go index a6dc268..b7fffa4 100644 --- a/daemon/snapper/snapper.go +++ b/daemon/snapper/snapper.go @@ -11,6 +11,7 @@ import ( "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/daemon/hooks" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/zfs" ) @@ -35,8 +36,8 @@ type snapProgress struct { // SnapDone doneAt time.Time - // SnapErr - err error + // SnapErr TODO disambiguate state + runResults hooks.PlanReport } type args struct { @@ -46,6 +47,8 @@ type args struct { interval time.Duration fsf *filters.DatasetMapFilter snapshotsTaken chan<- struct{} + hooks *hooks.List + dryRun bool } type Snapper struct { @@ -123,10 +126,16 @@ func PeriodicFromConfig(g *config.Global, fsf *filters.DatasetMapFilter, in *con return nil, errors.New("interval must be positive") } + hookList, err := hooks.ListFromConfig(&in.Hooks) + if err != nil { + return nil, errors.Wrap(err, "hook config error") + } + args := args{ prefix: in.Prefix, interval: in.Interval, fsf: fsf, + hooks: hookList, // ctx and log is set in Run() } @@ -141,6 +150,7 @@ func (s *Snapper) Run(ctx context.Context, snapshotsTaken chan<- struct{}) { s.args.snapshotsTaken = snapshotsTaken s.args.ctx = ctx s.args.log = getLogger(ctx) + s.args.dryRun = false // for future expansion u := func(u func(*Snapper)) State { s.mtx.Lock() @@ -241,7 +251,12 @@ func snapshot(a args, u updater) state { plan = snapper.plan }) - hadErr := false + hookMatchCount := make(map[hooks.Hook]int, len(*a.hooks)) + for _, h := range *a.hooks { + hookMatchCount[h] = 0 + } + + anyFsHadErr := false // TODO channel programs -> allow a little jitter? for fs, progress := range plan { suffix := time.Now().In(time.UTC).Format("20060102_150405_000") @@ -257,21 +272,68 @@ func snapshot(a args, u updater) state { progress.state = SnapStarted }) - l.Debug("create snapshot") - err := zfs.ZFSSnapshot(fs, snapname, false) - if err != nil { - hadErr = true - l.WithError(err).Error("cannot create snapshot") - } - doneAt := time.Now() + var doneAt time.Time + hookEnvExtra := hooks.Env{ + hooks.EnvFS: fs.ToString(), + hooks.EnvSnapshot: snapname, + } + + jobCallback := hooks.NewCallbackHookForFilesystem("snapshot", fs, func(_ context.Context) (err error) { + l.Debug("create snapshot") + err = zfs.ZFSSnapshot(fs, snapname, false) // TODO propagagte context to ZFSSnapshot + if err != nil { + l.WithError(err).Error("cannot create snapshot") + } + doneAt = time.Now() + return + }) + + fsHadErr := false + var planReport hooks.PlanReport + var plan *hooks.Plan + { + filteredHooks, err := a.hooks.CopyFilteredForFilesystem(fs) + if err != nil { + l.WithError(err).Error("unexpected filter error") + fsHadErr = true + goto updateFSState + } + // account for running hooks + for _, h := range filteredHooks { + hookMatchCount[h] = hookMatchCount[h] + 1 + } + + var planErr error + plan, planErr = hooks.NewPlan(&filteredHooks, hooks.PhaseSnapshot, jobCallback, hookEnvExtra) + if planErr != nil { + fsHadErr = true + l.WithError(planErr).Error("cannot create job hook plan") + goto updateFSState + } + } + { + l := hooks.GetLogger(a.ctx).WithField("fs", fs.ToString()).WithField("snap", snapname) + l.WithField("report", plan.Report().String()).Debug("begin run job plan") + plan.Run(hooks.WithLogger(a.ctx, l), a.dryRun) + planReport = plan.Report() + fsHadErr = planReport.HadError() // not just fatal errors + if fsHadErr { + l.WithField("report", planReport.String()).Error("end run job plan with error") + } else { + l.WithField("report", planReport.String()).Info("end run job plan successful") + } + } + + updateFSState: + anyFsHadErr = anyFsHadErr || fsHadErr u(func(snapper *Snapper) { progress.doneAt = doneAt progress.state = SnapDone - if err != nil { + if fsHadErr { progress.state = SnapError - progress.err = err } + progress.runResults = planReport }) } @@ -283,8 +345,21 @@ func snapshot(a args, u updater) state { } } + for h, mc := range hookMatchCount { + if mc == 0 { + hookIdx := -1 + for idx, ah := range *a.hooks { + if ah == h { + hookIdx = idx + break + } + } + a.log.WithField("hook", h.String()).WithField("hook_number", hookIdx+1).Warn("hook did not match any snapshotted filesystems") + } + } + return u(func(snapper *Snapper) { - if hadErr { + if anyFsHadErr { snapper.state = ErrorWait snapper.err = errors.New("one or more snapshots could not be created, check logs for details") } else { diff --git a/docs/changelog.rst b/docs/changelog.rst index 53d7e86..546707c 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -32,6 +32,9 @@ We use the following annotations for classifying changes: * Go modules for dependency management both inside and outside of GOPATH (``lazy.sh`` and ``Makefile`` force ``GO111MODULE=on``) * |feature| Use ``zfs destroy pool/fs@snap1,snap2,...`` CLI feature if available +* |feature| :ref:`Pre- and Post-Snapshot Hooks ` + with built-in support for MySQL and Postgres checkpointing + as well as custom scripts 0.1.1 ----- diff --git a/docs/configuration.rst b/docs/configuration.rst index ffba719..a3c4d0c 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -7,10 +7,11 @@ Configuration .. toctree:: - configuration/preface + configuration/overview configuration/jobs configuration/transports configuration/filter_syntax + configuration/snapshotting configuration/prune configuration/logging configuration/monitoring diff --git a/docs/configuration/jobs.rst b/docs/configuration/jobs.rst index 7f5207b..1aefee7 100644 --- a/docs/configuration/jobs.rst +++ b/docs/configuration/jobs.rst @@ -1,222 +1,9 @@ .. include:: ../global.rst.inc -.. |serve-transport| replace:: :ref:`serve specification` -.. |connect-transport| replace:: :ref:`connect specification` -.. |snapshotting-spec| replace:: :ref:`snapshotting specification ` -.. |pruning-spec| replace:: :ref:`pruning specification ` -.. |filter-spec| replace:: :ref:`filter specification` - .. _job: -Job Types & Replication -======================= - -.. _job-overview: - -Overview & Terminology ----------------------- - -A *job* is the unit of activity tracked by the zrepl daemon. -Every job has a unique ``name``, a ``type`` and type-dependent fields which are documented on this page. - -Replication always happens between a pair of jobs: one is the **active side**, and one the **passive side**. -The active side executes the replication logic whereas the passive side responds to requests after checking the active side's permissions. -For communication, the active side connects to the passive side using a :ref:`transport ` and starts issuing remote procedure calls (RPCs). - -The following table shows how different job types can be combined to achieve both push and pull mode setups: - -+-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ -| Setup name | active side | passive side | use case | -+=======================+==============+==================================+====================================================================================+ -| Push mode | ``push`` | ``sink`` | * Laptop backup | -| | | | * NAS behind NAT to offsite | -+-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ -| Pull mode | ``pull`` | ``source`` | * Central backup-server for many nodes | -| | | | * Remote server to NAS behind NAT | -+-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ -| Local replication | | ``push`` + ``sink`` in one config | * Backup FreeBSD boot pool | -| | | with :ref:`local transport ` | | -+-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ -| Snap & prune-only | ``snap`` | N/A | * | Snapshots & pruning but no replication | -| | | | | required | -| | | | * Workaround for :ref:`source-side pruning ` | -+-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ - -How the Active Side Works -~~~~~~~~~~~~~~~~~~~~~~~~~ - -The active side (:ref:`push ` and :ref:`pull ` job) executes the replication and pruning logic: - -* Wakeup because of finished snapshotting (``push`` job) or pull interval ticker (``pull`` job). -* Connect to the corresponding passive side using a :ref:`transport ` and instantiate an RPC client. -* Replicate data from the sending to the receiving side. -* Prune on sender & receiver. - -.. TIP:: - The progress of the active side can be watched live using the ``zrepl status`` subcommand. - -How the Passive Side Works -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The passive side (:ref:`sink ` and :ref:`source `) waits for connections from the corresponding active side, -using the transport listener type specified in the ``serve`` field of the job configuration. -Each transport listener provides a client's identity to the passive side job. -It uses the client identity for access control: - -* The ``sink`` job maps requests from different client identities to their respective sub-filesystem tree ``root_fs/${client_identity}``. -* The ``source`` job has a whitelist of client identities that are allowed pull access. - -.. TIP:: - The implementation of the ``sink`` job requires that the connecting client identities be a valid ZFS filesystem name components. - -How Replication Works -~~~~~~~~~~~~~~~~~~~~~ - -One of the major design goals of the replication module is to avoid any duplication of the nontrivial logic. -As such, the code works on abstract senders and receiver **endpoints**, where typically one will be implemented by a local program object and the other is an RPC client instance. -Regardless of push- or pull-style setup, the logic executes on the active side, i.e. in the ``push`` or ``pull`` job. - -The following steps take place during replication and can be monitored using the ``zrepl status`` subcommand: - -* Plan the replication: - - * Compare sender and receiver filesystem snapshots - * Build the **replication plan** - - * Per filesystem, compute a diff between sender and receiver snapshots - * Build a list of replication steps - - * If possible, use incremental sends (``zfs send -i``) - * Otherwise, use full send of most recent snapshot on sender - * Give up on filesystems that cannot be replicated without data loss - - * Retry on errors that are likely temporary (i.e. network failures). - * Give up on filesystems where a permanent error was received over RPC. - -* Execute the plan - - * Perform replication steps in the following order: - Among all filesystems with pending replication steps, pick the filesystem whose next replication step's snapshot is the oldest. - * Create placeholder filesystems on the receiving side to mirror the dataset paths on the sender to ``root_fs/${client_identity}``. - * After a successful replication step, update the replication cursor bookmark (see below). - -The idea behind the execution order of replication steps is that if the sender snapshots all filesystems simultaneously at fixed intervals, the receiver will have all filesystems snapshotted at time ``T1`` before the first snapshot at ``T2 = T1 + $interval`` is replicated. - -.. _replication-cursor-bookmark: - -The **replication cursor bookmark** ``#zrepl_replication_cursor`` is kept per filesystem on the sending side of a replication setup: -It is a bookmark of the most recent successfully replicated snapshot to the receiving side. -It is is used by the :ref:`not_replicated ` keep rule to identify all snapshots that have not yet been replicated to the receiving side. -Regardless of whether that keep rule is used, the bookmark ensures that replication can always continue incrementally. -Note that there is only one cursor bookmark per filesystem, which prohibits multiple jobs to replicate the same filesystem (:ref:`see below`). - -.. _replication-placeholder-property: - -**Placeholder filesystems** on the receiving side are regular ZFS filesystems with the placeholder property ``zrepl:placeholder=on``. -Placeholders allow the receiving side to mirror the sender's ZFS dataset hierachy without replicating every filesystem at every intermediary dataset path component. -Consider the following example: ``S/H/J`` shall be replicated to ``R/sink/job/S/H/J``, but neither ``S/H`` nor ``S`` shall be replicated. -ZFS requires the existence of ``R/sink/job/S`` and ``R/sink/job/S/H`` in order to receive into ``R/sink/job/S/H/J``. -Thus, zrepl creates the parent filesystems as placeholders on the receiving side. -If at some point ``S/H`` and ``S`` shall be replicated, the receiving side invalidates the placeholder flag automatically. -The ``zrepl test placeholder`` command can be used to check whether a filesystem is a placeholder. - -.. ATTENTION:: - - Currently, zrepl does not replicate filesystem properties. - Whe receiving a filesystem, it is never mounted (`-u` flag) and `mountpoint=none` is set. - This is temporary and being worked on :issue:`24`. - - -.. _job-snapshotting-spec: - -Taking Snaphots ---------------- - -The ``push``, ``source`` and ``snap`` jobs can automatically take periodic snapshots of the filesystems matched by the ``filesystems`` filter field. -The snapshot names are composed of a user-defined prefix followed by a UTC date formatted like ``20060102_150405_000``. -We use UTC because it will avoid name conflicts when switching time zones or between summer and winter time. - -For ``push`` jobs, replication is automatically triggered after all filesystems have been snapshotted. - -:: - - jobs: - - type: push - filesystems: { - "<": true, - "tmp": false - } - snapshotting: - type: periodic - prefix: zrepl_ - interval: 10m - ... - -There is also a ``manual`` snapshotting type, which covers the following use cases: - -* Existing infrastructure for automatic snapshots: you only want to use this zrepl job for replication. -* Run scripts before and after taking snapshots (like locking database tables). - We are working on better integration for this use case: see :issue:`74`. -* Handling snapshotting through a separate ``snap`` job. - -Note that you will have to trigger replication manually using the ``zrepl signal wakeup JOB`` subcommand in that case. - -:: - - jobs: - - type: push - filesystems: { - "<": true, - "tmp": false - } - snapshotting: - type: manual - ... - -.. _jobs-multiple-jobs: - -Multiple Jobs & More than 2 Machines ------------------------------------- - -.. ATTENTION:: - - When using multiple jobs across single or multiple machines, the following rules are critical to avoid race conditions & data loss: - - 1. The sets of ZFS filesystems matched by the ``filesystems`` filter fields must be disjoint across all jobs configured on a machine. - 2. The ZFS filesystem subtrees of jobs with ``root_fs`` must be disjoint. - 3. Across all zrepl instances on all machines in the replication domain, there must be a 1:1 correspondence between active and passive jobs. - - Explanations & exceptions to above rules are detailed below. - -If you would like to see improvements to multi-job setups, please `open an issue on GitHub `_. - -No Overlapping -~~~~~~~~~~~~~~ - -Jobs run independently of each other. -If two jobs match the same filesystem with their ``filesystems`` filter, they will operate on that filesystem independently and potentially in parallel. -For example, if job A prunes snapshots that job B is planning to replicate, the replication will fail because B asssumed the snapshot to still be present. -More subtle race conditions can occur with the :ref:`replication cursor bookmark `, which currently only exists once per filesystem. - -N push jobs to 1 sink -~~~~~~~~~~~~~~~~~~~~~ - -The :ref:`sink job ` namespaces by client identity. -It is thus safe to push to one sink job with different client identities. -If the push jobs have the same client identity, the filesystems matched by the push jobs must be disjoint to avoid races. - -N pull jobs from 1 source -~~~~~~~~~~~~~~~~~~~~~~~~~ - -Multiple pull jobs pulling from the same source have potential for race conditions during pruning: -each pull job prunes the source side independently, causing replication-prune and prune-prune races. - -There is currently no way for a pull job to filter which snapshots it should attempt to replicate. -Thus, it is not possibe to just manually assert that the prune rules of all pull jobs are disjoint to avoid replication-prune and prune-prune races. - - ------------------------------------------------------------------------------- - +Job Types in Detail +=================== .. _job-push: diff --git a/docs/configuration/misc.rst b/docs/configuration/misc.rst index 0aaf92b..fcd69ae 100644 --- a/docs/configuration/misc.rst +++ b/docs/configuration/misc.rst @@ -6,16 +6,16 @@ Miscellaneous Runtime Directories & UNIX Sockets ---------------------------------- -zrepl daemon creates various UNIX sockets to allow communicating with it: +The zrepl daemon needs to open various UNIX sockets in a runtime directory: -* the :ref:`transport-ssh+stdinserver` transport connects to a socket named after ``client_identity`` parameter -* the ``control`` CLI subcommand connects to a defined control socket +* a ``control`` socket that the CLI commands use to interact with the daemon +* the :ref:`transport-ssh+stdinserver` listener opens one socket per configured client, named after ``client_identity`` parameter -There is no further authentication on these sockets. -Therefore we have to make sure they can only be created and accessed by ``zrepl daemon``. -In fact, ``zrepl daemon`` will not bind a socket to a path in a directory that is world-accessible. +There is no authentication on these sockets except the UNIX permissions. +The zrepl daemon will refuse to bind any of the above sockets in a directory that is world-accessible. -The directories can be configured in the main configuration file, the defaults are provided below: +The following sections of the ``global`` config shows the default paths. +The shell script below shows how the default runtime directory can be created. :: @@ -27,6 +27,12 @@ The directories can be configured in the main configuration file, the defaults a sockdir: /var/run/zrepl/stdinserver +:: + + mkdir -p /var/run/zrepl/stdinserver + chmod -R 0700 /var/run/zrepl + + Durations & Intervals --------------------- diff --git a/docs/configuration/overview.rst b/docs/configuration/overview.rst new file mode 100644 index 0000000..fb1666b --- /dev/null +++ b/docs/configuration/overview.rst @@ -0,0 +1,191 @@ + +Overview & Terminology +====================== + +All work zrepl does is performed by the zrepl daemon which is configured in a single YAML configuration file loaded on startup. +The following paths are considered: + +* If set, the location specified via the global ``--config`` flag +* ``/etc/zrepl/zrepl.yml`` +* ``/usr/local/etc/zrepl/zrepl.yml`` + +The ``zrepl configcheck`` subcommand can be used to validate the configuration. +The command will output nothing and exit with zero status code if the configuration is valid. +The error messages vary in quality and usefulness: please report confusing config errors to the tracking :issue:`155`. +Full example configs such as in the :ref:`tutorial` or the :sampleconf:`/` directory might also be helpful. +However, copy-pasting examples is no substitute for reading documentation! + +Config File Structure +--------------------- + +.. code-block:: yaml + + global: ... + jobs: + - name: backup + type: push + - ... + +zrepl is confgured using a single YAML configuration file with two main sections: ``global`` and ``jobs``. +The ``global`` section is filled with sensible defaults and is covered later in this chapter. +The ``jobs`` section is a list of jobs which we are goind to explain now. + +.. _job-overview: + +Jobs \& How They Work Together +------------------------------ + +A *job* is the unit of activity tracked by the zrepl daemon. +The ``type`` of a job determines its role in a replication setup and in snapshot management. +Jobs are identified by their ``name``, both in log files and the ``zrepl status`` command. + +Replication always happens between a pair of jobs: one is the **active side**, and one the **passive side**. +The active side connects to the passive side using a :ref:`transport ` and starts executing the replication logic. +The passive side responds to requests from the active side after checking its persmissions. + +The following table shows how different job types can be combined to achieve **both push and pull mode setups**. +Note that snapshot-creation denoted by "(snap)" is orthogonal to whether a job is active or passive. + ++-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ +| Setup name | active side | passive side | use case | ++=======================+==============+==================================+====================================================================================+ +| Push mode | ``push`` | ``sink`` | * Laptop backup | +| | (snap) | | * NAS behind NAT to offsite | ++-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ +| Pull mode | ``pull`` | ``source`` | * Central backup-server for many nodes | +| | | (snap) | * Remote server to NAS behind NAT | ++-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ +| Local replication | | ``push`` + ``sink`` in one config | * Backup FreeBSD boot pool | +| | | with :ref:`local transport ` | | ++-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ +| Snap & prune-only | ``snap`` | N/A | * | Snapshots & pruning but no replication | +| | (snap) | | | required | +| | | | * Workaround for :ref:`source-side pruning ` | ++-----------------------+--------------+----------------------------------+------------------------------------------------------------------------------------+ + +How the Active Side Works +------------------------- + +The active side (:ref:`push ` and :ref:`pull ` job) executes the replication and pruning logic: + +* Wakeup because of finished snapshotting (``push`` job) or pull interval ticker (``pull`` job). +* Connect to the corresponding passive side using a :ref:`transport ` and instantiate an RPC client. +* Replicate data from the sending to the receiving side (see below). +* Prune on sender & receiver. + +.. TIP:: + The progress of the active side can be watched live using the ``zrepl status`` subcommand. + +How the Passive Side Works +-------------------------- + +The passive side (:ref:`sink ` and :ref:`source `) waits for connections from the corresponding active side, +using the transport listener type specified in the ``serve`` field of the job configuration. +Each transport listener provides a client's identity to the passive side job. +It uses the client identity for access control: + +* The ``sink`` job maps requests from different client identities to their respective sub-filesystem tree ``root_fs/${client_identity}``. +* The ``source`` job has a whitelist of client identities that are allowed pull access. + +.. TIP:: + The implementation of the ``sink`` job requires that the connecting client identities be a valid ZFS filesystem name components. + +How Replication Works +--------------------- + +One of the major design goals of the replication module is to avoid any duplication of the nontrivial logic. +As such, the code works on abstract senders and receiver **endpoints**, where typically one will be implemented by a local program object and the other is an RPC client instance. +Regardless of push- or pull-style setup, the logic executes on the active side, i.e. in the ``push`` or ``pull`` job. + +The following steps take place during replication and can be monitored using the ``zrepl status`` subcommand: + +* Plan the replication: + + * Compare sender and receiver filesystem snapshots + * Build the **replication plan** + + * Per filesystem, compute a diff between sender and receiver snapshots + * Build a list of replication steps + + * If possible, use incremental sends (``zfs send -i``) + * Otherwise, use full send of most recent snapshot on sender + * Give up on filesystems that cannot be replicated without data loss + + * Retry on errors that are likely temporary (i.e. network failures). + * Give up on filesystems where a permanent error was received over RPC. + +* Execute the plan + + * Perform replication steps in the following order: + Among all filesystems with pending replication steps, pick the filesystem whose next replication step's snapshot is the oldest. + * Create placeholder filesystems on the receiving side to mirror the dataset paths on the sender to ``root_fs/${client_identity}``. + * After a successful replication step, update the replication cursor bookmark (see below). + +The idea behind the execution order of replication steps is that if the sender snapshots all filesystems simultaneously at fixed intervals, the receiver will have all filesystems snapshotted at time ``T1`` before the first snapshot at ``T2 = T1 + $interval`` is replicated. + +.. _replication-cursor-bookmark: + +The **replication cursor bookmark** ``#zrepl_replication_cursor`` is kept per filesystem on the sending side of a replication setup: +It is a bookmark of the most recent successfully replicated snapshot to the receiving side. +It is is used by the :ref:`not_replicated ` keep rule to identify all snapshots that have not yet been replicated to the receiving side. +Regardless of whether that keep rule is used, the bookmark ensures that replication can always continue incrementally. +Note that there is only one cursor bookmark per filesystem, which prohibits multiple jobs to replicate the same filesystem (:ref:`see below`). + +.. _replication-placeholder-property: + +**Placeholder filesystems** on the receiving side are regular ZFS filesystems with the placeholder property ``zrepl:placeholder=on``. +Placeholders allow the receiving side to mirror the sender's ZFS dataset hierachy without replicating every filesystem at every intermediary dataset path component. +Consider the following example: ``S/H/J`` shall be replicated to ``R/sink/job/S/H/J``, but neither ``S/H`` nor ``S`` shall be replicated. +ZFS requires the existence of ``R/sink/job/S`` and ``R/sink/job/S/H`` in order to receive into ``R/sink/job/S/H/J``. +Thus, zrepl creates the parent filesystems as placeholders on the receiving side. +If at some point ``S/H`` and ``S`` shall be replicated, the receiving side invalidates the placeholder flag automatically. +The ``zrepl test placeholder`` command can be used to check whether a filesystem is a placeholder. + +.. ATTENTION:: + + Currently, zrepl does not replicate filesystem properties. + Whe receiving a filesystem, it is never mounted (`-u` flag) and `mountpoint=none` is set. + This is temporary and being worked on :issue:`24`. + + +.. _jobs-multiple-jobs: + +Multiple Jobs & More than 2 Machines +------------------------------------ + +.. ATTENTION:: + + When using multiple jobs across single or multiple machines, the following rules are critical to avoid race conditions & data loss: + + 1. The sets of ZFS filesystems matched by the ``filesystems`` filter fields must be disjoint across all jobs configured on a machine. + 2. The ZFS filesystem subtrees of jobs with ``root_fs`` must be disjoint. + 3. Across all zrepl instances on all machines in the replication domain, there must be a 1:1 correspondence between active and passive jobs. + + Explanations & exceptions to above rules are detailed below. + +If you would like to see improvements to multi-job setups, please `open an issue on GitHub `_. + +No Overlapping +~~~~~~~~~~~~~~ + +Jobs run independently of each other. +If two jobs match the same filesystem with their ``filesystems`` filter, they will operate on that filesystem independently and potentially in parallel. +For example, if job A prunes snapshots that job B is planning to replicate, the replication will fail because B asssumed the snapshot to still be present. +More subtle race conditions can occur with the :ref:`replication cursor bookmark `, which currently only exists once per filesystem. + +N push jobs to 1 sink +~~~~~~~~~~~~~~~~~~~~~ + +The :ref:`sink job ` namespaces by client identity. +It is thus safe to push to one sink job with different client identities. +If the push jobs have the same client identity, the filesystems matched by the push jobs must be disjoint to avoid races. + +N pull jobs from 1 source +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Multiple pull jobs pulling from the same source have potential for race conditions during pruning: +each pull job prunes the source side independently, causing replication-prune and prune-prune races. + +There is currently no way for a pull job to filter which snapshots it should attempt to replicate. +Thus, it is not possibe to just manually assert that the prune rules of all pull jobs are disjoint to avoid replication-prune and prune-prune races. + diff --git a/docs/configuration/preface.rst b/docs/configuration/preface.rst deleted file mode 100644 index b184d44..0000000 --- a/docs/configuration/preface.rst +++ /dev/null @@ -1,39 +0,0 @@ - -.. _configuration_preface: - -======= -Preface -======= - ------------------------ -Configuration File Path ------------------------ - -zrepl searches for its main configuration file in the following locations (in that order): - -* If set, the location specified via the global ``--config`` flag -* ``/etc/zrepl/zrepl.yml`` -* ``/usr/local/etc/zrepl/zrepl.yml`` - -The examples in the :ref:`tutorial` or the :sampleconf:`/` directory should provide a good starting point. - -------------------- -Runtime Directories -------------------- - -zrepl requires runtime directories for various UNIX sockets --- they are documented in the :ref:`config file`. -Your package maintainer / init script should take care of creating them. -Alternatively, for default settings, the following should to the trick. - -:: - - mkdir -p /var/run/zrepl/stdinserver - chmod -R 0700 /var/run/zrepl - - ----------- -Validating ----------- - -The config can be validated using the ``zrepl configcheck`` subcommand. - diff --git a/docs/configuration/snapshotting.rst b/docs/configuration/snapshotting.rst new file mode 100644 index 0000000..d18d63f --- /dev/null +++ b/docs/configuration/snapshotting.rst @@ -0,0 +1,192 @@ +.. include:: ../global.rst.inc + +.. _job-snapshotting-spec: + +Taking Snaphots +=============== + +The ``push``, ``source`` and ``snap`` jobs can automatically take periodic snapshots of the filesystems matched by the ``filesystems`` filter field. +The snapshot names are composed of a user-defined prefix followed by a UTC date formatted like ``20060102_150405_000``. +We use UTC because it will avoid name conflicts when switching time zones or between summer and winter time. + +For ``push`` jobs, replication is automatically triggered after all filesystems have been snapshotted. + +:: + + jobs: + - type: push + filesystems: { + "<": true, + "tmp": false + } + snapshotting: + type: periodic + prefix: zrepl_ + interval: 10m + hooks: ... + ... + +There is also a ``manual`` snapshotting type, which covers the following use cases: + +* Existing infrastructure for automatic snapshots: you only want to use this zrepl job for replication. +* Handling snapshotting through a separate ``snap`` job. + +Note that you will have to trigger replication manually using the ``zrepl signal wakeup JOB`` subcommand in that case. + +:: + + jobs: + - type: push + filesystems: { + "<": true, + "tmp": false + } + snapshotting: + type: manual + ... + +.. _job-snapshotting-hooks: + +Pre- and Post-Snapshot Hooks +---------------------------- + +Jobs with `periodic snapshots `_ can run hooks before and/or after taking the snapshot specified in ``snapshotting.hooks``: +Hooks are called per filesystem before and after the snapshot is taken (pre- and post-edge). +Pre-edge invocations are in configuration order, post-edge invocations in reverse order, i.e. like a stack. +If a pre-snapshot invocation fails, ``err_is_fatal=true`` cuts off subsequent hooks, does not take a snapshot, and only invokes post-edges corresponding to previous successful pre-edges. +``err_is_fatal=false`` logs the failed pre-edge invocation but does not affect subsequent hooks nor snapshotting itself. +Post-edges are only invoked for hooks whose pre-edges ran without error. +Note that hook failures for one filesystem never affect other filesystems. + +The optional ``timeout`` parameter specifies a period after which zrepl will kill the hook process and report an error. +The default is 30 seconds and may be specified in any units understood by `time.ParseDuration `_. + +The optional ``filesystems`` filter which limits the filesystems the hook runs for. This uses the same |filter-spec| as jobs. + +Most hook types take additional parameters, please refer to the respective subsections below. + +.. list-table:: + :widths: 20 10 70 + :header-rows: 1 + + * - Hook ``type`` + - Details + - Description + * - ``command`` + - :ref:`Details ` + - Arbitrary pre- and post snapshot scripts. + * - ``postgres-checkpoint`` + - :ref:`Details ` + - Execute Postgres ``CHECKPOINT`` SQL command before snapshot. + * - ``mysql-lock-tables`` + - :ref:`Details ` + - Flush and read-Lock MySQL tables while taking the snapshot. + +.. _job-hook-type-command: + +``command`` Hooks +~~~~~~~~~~~~~~~~~ + +:: + + + jobs: + - type: push + filesystems: { + "<": true, + "tmp": false + } + snapshotting: + type: periodic + prefix: zrepl_ + interval: 10m + hooks: + - type: command + path: /etc/zrepl/hooks/zrepl-notify.sh + timeout: 30s + err_is_fatal: false + - type: command + path: /etc/zrepl/hooks/special-snapshot.sh + filesystems: { + "tank/special": true + } + ... + + +The hook type ``command`` is the only currently supported hook type. +Future versions of zrepl may support other hook types. +The ``path`` to the hook executables must be absolute (e.g. ``/etc/zrepl/hooks/zrepl-notify.sh``). +No arguments may be specified; create a wrapper script if zrepl must call an executable that requires arguments. +zrepl will call the hook both before and after the snapshot, but with different values of the ``ZREPL_HOOKTYPE`` environment variable; see below. +The process standard output is logged at level INFO. Standard error is logged at level WARN. + +zrepl sets a number of environment variables for the hook processes: + +* ``ZREPL_HOOKTYPE``: either "pre_snapshot" or "post_snapshot" +* ``ZREPL_FS``: the ZFS filesystem name being snapshotted +* ``ZREPL_SNAPNAME``: the zrepl-generated snapshot name (e.g. ``zrepl_20380119_031407_000``) +* ``ZREPL_DRYRUN``: set to ``"true"`` if a dry run is in progress so scripts can print, but not run, their commands + +An empty template hook can be found in :sampleconf:`hooks/template.sh`. + +.. _job-hook-type-postgres-checkpoint: + +``postgres-checkpoint`` Hook +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Connects to a Postgres server and executes the ``CHECKPOINT`` statement pre-snapshot. +Checkpointing applies the WAL contents to all data files and syncs the data files to disk. +This is not required for a consistent database backup: it merely forward-pays the "cost" of WAL replay to the time of snapshotting instead of at restore. +However, the Postgres manual recommends against checkpointing during normal operation. +Further, the operation requires Postgres superuser privileges. +zrepl users must decide on their own whether this hook is useful for them (it likely isn't). + +.. ATTENTION:: + Note that WALs and Postgres data directory (with all database data files) must be on the same filesystem to guarantee a correct point-in-time backup with the ZFS snapshot. + +DSN syntax documented here: ``_ + +.. code-block:: sql + + CREATE USER zrepl_checkpoint PASSWORD yourpasswordhere; + ALTER ROLE zrepl_checkpoint SUPERUSER; + +.. code-block:: yaml + + - type: postgres-checkpoint + dsn: "host=localhost port=5432 user=postgres password=yourpasswordhere sslmode=disable" + filesystems: { + "p1/postgres/data11": true + } + +.. _job-hook-type-mysql-lock-tables: + +``mysql-lock-tables`` Hook +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Connects to MySQL and executes + +* pre-snapshot ``FLUSH TABLES WITH READ LOCK`` to lock all tables in all databases in the MySQL server we connect to (`docs `_) +* post-snapshot ``UNLOCK TABLES`` reverse above operation. + +Above procedure is documented in the `MySQL manual `_ +as a means to produce a consistent backup of a MySQL DBMS installation (i.e., all databases). + +`DSN syntax `_: ``[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN]`` + +.. ATTENTION:: + All MySQL databases must be on the same ZFS filesystem to guarantee a consistent point-in-time backup with the ZFS snapshot. + +.. code-block:: sql + + CREATE USER zrepl_lock_tables IDENTIFIED BY 'yourpasswordhere'; + GRANT RELOAD ON *.* TO zrepl_lock_tables; + FLUSH PRIVILEGES; + +.. code-block:: yaml + + - type: mysql-lock-tables + dsn: "zrepl_lock_tables:yourpasswordhere@tcp(localhost)/" + filesystems: { + "tank/mysql": true + } diff --git a/docs/global.rst.inc b/docs/global.rst.inc index 89b1538..ac610f6 100644 --- a/docs/global.rst.inc +++ b/docs/global.rst.inc @@ -16,4 +16,11 @@ .. |Donate via Patreon| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Fshieldsio-patreon.herokuapp.com%2Fzrepl%2Fpledges&style=flat&color=yellow :target: https://www.patreon.com/zrepl .. |Twitter| image:: https://img.shields.io/twitter/url/https/github.com/zrepl/zrepl.svg?style=social - :target: https://twitter.com/intent/tweet?text=Wow:&url=https%3A%2F%2Fgithub.com%2Fzrepl%2Fzrepl \ No newline at end of file + :target: https://twitter.com/intent/tweet?text=Wow:&url=https%3A%2F%2Fgithub.com%2Fzrepl%2Fzrepl + + +.. |serve-transport| replace:: :ref:`serve specification` +.. |connect-transport| replace:: :ref:`connect specification` +.. |snapshotting-spec| replace:: :ref:`snapshotting specification ` +.. |pruning-spec| replace:: :ref:`pruning specification ` +.. |filter-spec| replace:: :ref:`filter specification` \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 6344ba5..82da6ca 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -64,6 +64,7 @@ Main Features * **Automatic snapshot management** * [x] Periodic filesystem snapshots + * [x] Support for pre- and post-snapshot hooks with builtins for MySQL & Postgres * [x] Flexible :ref:`pruning rule system ` * [x] Age-based fading (grandfathering scheme) diff --git a/docs/installation.rst b/docs/installation.rst index 1d4470d..3f20b49 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -86,8 +86,7 @@ Either way, all build results are located in the ``artifacts/`` directory. What next? ---------- -Read the :ref:`configuration chapter` and optionally create the :ref:`runtime directories `. -Afterwards, continue with the :ref:`usage chapter`. +Read the :ref:`configuration chapter` and then continue with the :ref:`usage chapter`. **Reminder**: If you want a quick introduction, please read the :ref:`tutorial`. diff --git a/go.mod b/go.mod index 1b7e185..f8b4cb2 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/gdamore/tcell v1.2.0 github.com/go-critic/go-critic v0.3.4 // indirect github.com/go-logfmt/logfmt v0.3.0 + github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4 github.com/gogo/protobuf v1.2.1 // indirect github.com/golang/mock v1.2.0 // indirect github.com/golang/protobuf v1.2.0 @@ -29,6 +30,7 @@ require ( github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 // indirect github.com/kr/pretty v0.1.0 + github.com/lib/pq v1.2.0 github.com/mattn/go-isatty v0.0.3 github.com/matttproud/golang_protobuf_extensions v1.0.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -47,7 +49,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.3 github.com/spf13/viper v1.3.2 // indirect - github.com/stretchr/testify v1.2.2 + github.com/stretchr/testify v1.4.0 github.com/theckman/goconstraint v1.11.0 // indirect github.com/yudai/gojsondiff v0.0.0-20170107030110-7b1b7adf999d github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect diff --git a/go.sum b/go.sum index 6e0e248..74586a8 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,7 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.6.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= @@ -38,6 +39,10 @@ github.com/go-lintpack/lintpack v0.5.2/go.mod h1:NwZuYi2nUHho8XEIZ6SIxihrnPoqBTD github.com/go-logfmt/logfmt v0.3.0 h1:8HUsc87TaSWLKwrnumgC8/YconD2fJQsRJAsWaPg2ic= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= +github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4 h1:0suja/iKSDbEIYLbrS/8C7iArJiWpgCNcR+zwAHu7Ig= +github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-toolsmith/astcast v0.0.0-20181028201508-b7a89ed70af1/go.mod h1:TEo3Ghaj7PsZawQHxT/oBvo4HK/sl1RcuUHDKTTju+o= github.com/go-toolsmith/astcast v1.0.0 h1:JojxlmI6STnFVG9yOImLeGREv8W2ocNUM+iOhR6jE7g= github.com/go-toolsmith/astcast v1.0.0/go.mod h1:mt2OdQTeAQcY4DQgPSArJjHCcOwlX+Wl/kwN+LbLGQ4= @@ -153,6 +158,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/lucasb-eyer/go-colorful v1.0.2 h1:mCMFu6PgSozg9tDNMMK3g18oJBX7oYGrC09mS6CXfO4= github.com/lucasb-eyer/go-colorful v1.0.2/go.mod h1:0MS4r+7BZKSJ5mw4/S5MPN+qHFF1fYclkSPilDOKW0s= @@ -244,9 +251,12 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.0.2/go.mod h1:A8kyI5cUJhb8N+3pkfONlcEcZbueH6nhAm0Fq7SrnBM= github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/theckman/goconstraint v1.11.0 h1:oBUwN5wpE4dwyPhRGraEgJsFTr+JtLWiDnaJZJeeXI0= github.com/theckman/goconstraint v1.11.0/go.mod h1:zkCR/f2kOULTk/h1ujgyB9BlCNLaqlQ6GN2Zl4mg81g= github.com/timakin/bodyclose v0.0.0-20190407043127-4a873e97b2bb h1:lI9ufgFfvuqRctP9Ny8lDDLbSWCMxBPletcSqrnyFYM= diff --git a/logger/logger.go b/logger/logger.go index a2bd647..e5342d3 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -20,6 +20,7 @@ type Logger interface { WithField(field string, val interface{}) Logger WithFields(fields Fields) Logger WithError(err error) Logger + Log(level Level, msg string) Debug(msg string) Info(msg string) Warn(msg string) @@ -158,6 +159,10 @@ func (l *loggerImpl) WithError(err error) Logger { return l.WithField(FieldError, val) } +func (l *loggerImpl) Log(level Level, msg string) { + l.log(level, msg) +} + func (l *loggerImpl) Debug(msg string) { l.log(Debug, msg) } diff --git a/logger/nulllogger.go b/logger/nulllogger.go index ec954f7..9e83b5a 100644 --- a/logger/nulllogger.go +++ b/logger/nulllogger.go @@ -13,6 +13,7 @@ func (n nullLogger) ReplaceField(field string, val interface{}) Logger { return func (n nullLogger) WithField(field string, val interface{}) Logger { return n } func (n nullLogger) WithFields(fields Fields) Logger { return n } func (n nullLogger) WithError(err error) Logger { return n } +func (nullLogger) Log(level Level, msg string) {} func (nullLogger) Debug(msg string) {} func (nullLogger) Info(msg string) {} func (nullLogger) Warn(msg string) {} diff --git a/util/circlog/circlog.go b/util/circlog/circlog.go new file mode 100644 index 0000000..fac959f --- /dev/null +++ b/util/circlog/circlog.go @@ -0,0 +1,167 @@ +package circlog + +import ( + "fmt" + "math/bits" + "sync" +) + +const CIRCULARLOG_INIT_SIZE int = 32 << 10 + +type CircularLog struct { + buf []byte + size int + max int + written int + writeCursor int + /* + Mutex prevents: + concurrent writes: + buf, size, written, writeCursor in Write([]byte) + buf, writeCursor in Reset() + data races vs concurrent Write([]byte) calls: + size in Size() + size, writeCursor in Len() + buf, size, written, writeCursor in Bytes() + */ + mtx sync.Mutex +} + +func NewCircularLog(max int) (*CircularLog, error) { + if max <= 0 { + return nil, fmt.Errorf("max must be positive") + } + + return &CircularLog{ + size: CIRCULARLOG_INIT_SIZE, + buf: make([]byte, CIRCULARLOG_INIT_SIZE), + max: max, + }, nil +} + +func nextPow2Int(n int) int { + if n < 1 { + panic("can only round up positive integers") + } + r := uint(n) + // Credit: https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 + r-- + // Assume at least a 32 bit integer + r |= r >> 1 + r |= r >> 2 + r |= r >> 4 + r |= r >> 8 + r |= r >> 16 + if bits.UintSize == 64 { + r |= r >> 32 + } + r++ + // Can't exceed max positive int value + if r > ^uint(0)>>1 { + panic("rounded to larger than int()") + } + return int(r) +} + +func (cl *CircularLog) Write(data []byte) (int, error) { + cl.mtx.Lock() + defer cl.mtx.Unlock() + n := len(data) + + // Keep growing the buffer by doubling until + // hitting cl.max + bufAvail := cl.size - cl.writeCursor + // If cl.writeCursor wrapped on the last write, + // then the buffer is full, not empty. + if cl.writeCursor == 0 && cl.written > 0 { + bufAvail = 0 + } + if n > bufAvail && cl.size < cl.max { + // Add to size, not writeCursor, so as + // to not resize multiple times if this + // Write() immediately fills up the buffer + newSize := nextPow2Int(cl.size + n) + if newSize > cl.max { + newSize = cl.max + } + newBuf := make([]byte, newSize) + // Reset write cursor to old size if wrapped + if cl.writeCursor == 0 && cl.written > 0 { + cl.writeCursor = cl.size + } + copy(newBuf, cl.buf[:cl.writeCursor]) + cl.buf = newBuf + cl.size = newSize + } + + // If data to be written is larger than the max size, + // discard all but the last cl.size bytes + if n > cl.size { + data = data[n-cl.size:] + // Overwrite the beginning of data + // with a string indicating truncation + copy(data, []byte("(...)")) + } + // First copy data to the end of buf. If that wasn't + // all of data, then copy the rest to the beginning + // of buf. + copied := copy(cl.buf[cl.writeCursor:], data) + if copied < n { + copied += copy(cl.buf, data[copied:]) + } + + cl.writeCursor = ((cl.writeCursor + copied) % cl.size) + cl.written += copied + return copied, nil +} + +func (cl *CircularLog) Size() int { + cl.mtx.Lock() + defer cl.mtx.Unlock() + return cl.size +} + +func (cl *CircularLog) Len() int { + cl.mtx.Lock() + defer cl.mtx.Unlock() + if cl.written >= cl.size { + return cl.size + } else { + return cl.writeCursor + } +} + +func (cl *CircularLog) TotalWritten() int { + cl.mtx.Lock() + defer cl.mtx.Unlock() + + return cl.written +} + +func (cl *CircularLog) Reset() { + cl.mtx.Lock() + defer cl.mtx.Unlock() + cl.writeCursor = 0 + cl.buf = make([]byte, CIRCULARLOG_INIT_SIZE) + cl.size = CIRCULARLOG_INIT_SIZE +} + +func (cl *CircularLog) Bytes() []byte { + cl.mtx.Lock() + defer cl.mtx.Unlock() + switch { + case cl.written >= cl.size && cl.writeCursor == 0: + return cl.buf + case cl.written > cl.size: + ret := make([]byte, cl.size) + copy(ret, cl.buf[cl.writeCursor:]) + copy(ret[cl.size-cl.writeCursor:], cl.buf[:cl.writeCursor]) + return ret + default: + return cl.buf[:cl.writeCursor] + } +} + +func (cl *CircularLog) String() string { + return string(cl.Bytes()) +} diff --git a/util/circlog/circlog_test.go b/util/circlog/circlog_test.go new file mode 100644 index 0000000..068f4a6 --- /dev/null +++ b/util/circlog/circlog_test.go @@ -0,0 +1,144 @@ +package circlog_test + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/zrepl/zrepl/util/circlog" +) + +func TestCircularLog(t *testing.T) { + var maxCircularLogSize int = 1 << 20 + + writeFixedSize := func(w io.Writer, size int) (int, error) { + pattern := []byte{0xDE, 0xAD, 0xBE, 0xEF} + writeBytes := bytes.Repeat(pattern, size/4) + if len(writeBytes) < size { + writeBytes = append(writeBytes, pattern[:size-len(writeBytes)]...) + } + n, err := w.Write(writeBytes) + if err != nil { + return n, err + } + return n, nil + } + + t.Run("negative-size-error", func(t *testing.T) { + _, err := circlog.NewCircularLog(-1) + require.EqualError(t, err, "max must be positive") + }) + + t.Run("no-resize", func(t *testing.T) { + log, err := circlog.NewCircularLog(maxCircularLogSize) + require.NoError(t, err) + + initSize := log.Size() + writeSize := circlog.CIRCULARLOG_INIT_SIZE - 1 + _, err = writeFixedSize(log, writeSize) + require.NoError(t, err) + finalSize := log.Size() + require.Equal(t, initSize, finalSize) + require.Equal(t, writeSize, log.Len()) + }) + t.Run("one-resize", func(t *testing.T) { + log, err := circlog.NewCircularLog(maxCircularLogSize) + require.NoError(t, err) + + initSize := log.Size() + writeSize := circlog.CIRCULARLOG_INIT_SIZE + 1 + _, err = writeFixedSize(log, writeSize) + require.NoError(t, err) + finalSize := log.Size() + require.Greater(t, finalSize, initSize) + require.LessOrEqual(t, finalSize, maxCircularLogSize) + require.Equal(t, writeSize, log.Len()) + }) + t.Run("reset", func(t *testing.T) { + log, err := circlog.NewCircularLog(maxCircularLogSize) + require.NoError(t, err) + + initSize := log.Size() + _, err = writeFixedSize(log, maxCircularLogSize) + require.NoError(t, err) + log.Reset() + finalSize := log.Size() + require.Equal(t, initSize, finalSize) + }) + t.Run("wrap-exactly-maximum", func(t *testing.T) { + log, err := circlog.NewCircularLog(maxCircularLogSize) + require.NoError(t, err) + + initSize := log.Size() + writeSize := maxCircularLogSize / 4 + for written := 0; written < maxCircularLogSize; { + n, err := writeFixedSize(log, writeSize) + written += n + require.NoError(t, err) + } + finalSize := log.Size() + require.Greater(t, finalSize, initSize) + require.Equal(t, finalSize, maxCircularLogSize) + require.Equal(t, finalSize, log.Len()) + }) + t.Run("wrap-partial", func(t *testing.T) { + log, err := circlog.NewCircularLog(maxCircularLogSize) + require.NoError(t, err) + + initSize := log.Size() + + startSentinel := []byte{0x00, 0x00, 0x00, 0x00} + _, err = log.Write(startSentinel) + require.NoError(t, err) + + nWritesToFill := 4 + writeSize := maxCircularLogSize / nWritesToFill + for i := 0; i < (nWritesToFill + 1); i++ { + _, err := writeFixedSize(log, writeSize) + require.NoError(t, err) + } + + endSentinel := []byte{0xFF, 0xFF, 0xFF, 0xFF} + _, err = log.Write(endSentinel) + require.NoError(t, err) + + finalSize := log.Size() + require.Greater(t, finalSize, initSize) + require.Greater(t, log.TotalWritten(), finalSize) + require.Equal(t, finalSize, maxCircularLogSize) + require.Equal(t, finalSize, log.Len()) + + logBytes := log.Bytes() + + require.Equal(t, endSentinel, logBytes[len(logBytes)-len(endSentinel):]) + require.NotEqual(t, startSentinel, logBytes[:len(startSentinel)]) + }) + t.Run("overflow-write", func(t *testing.T) { + log, err := circlog.NewCircularLog(maxCircularLogSize) + require.NoError(t, err) + + initSize := log.Size() + writeSize := maxCircularLogSize + 1 + n, err := writeFixedSize(log, writeSize) + require.NoError(t, err) + finalSize := log.Size() + require.Less(t, n, writeSize) + require.Greater(t, finalSize, initSize) + require.Equal(t, finalSize, maxCircularLogSize) + logBytes := log.Bytes() + require.Equal(t, []byte("(...)"), logBytes[:5]) + }) + t.Run("stringify", func(t *testing.T) { + log, err := circlog.NewCircularLog(maxCircularLogSize) + require.NoError(t, err) + + writtenString := "A harmful truth is better than a useful lie." + n, err := log.Write([]byte(writtenString)) + require.NoError(t, err) + require.Equal(t, len(writtenString), n) + loggedString := log.String() + require.Equal(t, writtenString, loggedString) + }) +} diff --git a/util/circlog/nextpow2int_test.go b/util/circlog/nextpow2int_test.go new file mode 100644 index 0000000..1504925 --- /dev/null +++ b/util/circlog/nextpow2int_test.go @@ -0,0 +1,16 @@ +package circlog + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNextPow2Int(t *testing.T) { + require.Equal(t, 512, nextPow2Int(512), "a power of 2 should round to itself") + require.Equal(t, 1024, nextPow2Int(513), "should round up to the next power of 2") + require.PanicsWithValue(t, "can only round up positive integers", func() { nextPow2Int(0) }, "unimplemented: zero is not positive; corner case") + require.PanicsWithValue(t, "can only round up positive integers", func() { nextPow2Int(-1) }, "unimplemented: cannot round up negative numbers") + maxInt := int((^uint(0)) >> 1) + require.PanicsWithValue(t, "rounded to larger than int()", func() { nextPow2Int(maxInt - 1) }, "cannot round to a number bigger than the int type") +}