pre- and post-snapshot hooks

* stack-based execution model, documented in documentation
* circbuf for capturing hook output
* built-in hooks for postgres and mysql
* refactor docs, too much info on the jobs page, too difficult
  to discover snapshotting & hooks

Co-authored-by: Ross Williams <ross@ross-williams.net>
Co-authored-by: Christian Schwarz <me@cschwarz.com>

fixes #74
This commit is contained in:
Ross Williams
2019-07-26 19:12:21 +00:00
committed by Christian Schwarz
parent 00434f4ac9
commit 729c83ee72
39 changed files with 2580 additions and 279 deletions

View File

@@ -0,0 +1,35 @@
// Code generated by "stringer -type=Edge"; DO NOT EDIT.
package hooks
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[Pre-1]
_ = x[Callback-2]
_ = x[Post-4]
}
const (
_Edge_name_0 = "PreCallback"
_Edge_name_1 = "Post"
)
var (
_Edge_index_0 = [...]uint8{0, 3, 11}
)
func (i Edge) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _Edge_name_0[_Edge_index_0[i]:_Edge_index_0[i+1]]
case i == 4:
return _Edge_name_1
default:
return "Edge(" + strconv.FormatInt(int64(i), 10) + ")"
}
}

View File

@@ -0,0 +1,52 @@
package hooks
import (
"fmt"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/zfs"
)
type List []Hook
func HookFromConfig(in config.HookEnum) (Hook, error) {
switch v := in.Ret.(type) {
case *config.HookCommand:
return NewCommandHook(v)
case *config.HookPostgresCheckpoint:
return PgChkptHookFromConfig(v)
case *config.HookMySQLLockTables:
return MyLockTablesFromConfig(v)
default:
return nil, fmt.Errorf("unknown hook type %T", v)
}
}
func ListFromConfig(in *config.HookList) (r *List, err error) {
hl := make(List, len(*in))
for i, h := range *in {
hl[i], err = HookFromConfig(h)
if err != nil {
return nil, fmt.Errorf("create hook #%d: %s", i+1, err)
}
}
return &hl, nil
}
func (l List) CopyFilteredForFilesystem(fs *zfs.DatasetPath) (ret List, err error) {
ret = make(List, 0, len(l))
for _, h := range l {
var passFilesystem bool
if passFilesystem, err = h.Filesystems().Filter(fs); err != nil {
return nil, err
}
if passFilesystem {
ret = append(ret, h)
}
}
return ret, nil
}

34
daemon/hooks/hook_docs.go Normal file
View File

@@ -0,0 +1,34 @@
// Package hooks implements pre- and post snapshot hooks.
//
// Plan is a generic executor for ExpectStepReports before and after an activity specified in a callback.
// It provides a reporting facility that can be polled while the plan is executing to gather progress information.
//
// This package also provides all supported hook type implementations and abstractions around them.
//
// Use For Other Kinds Of ExpectStepReports
//
// This package REQUIRES REFACTORING before it can be used for other activities than snapshots, e.g. pre- and post-replication:
//
// The Hook interface requires a hook to provide a Filesystems() filter, which doesn't make sense for
// all kinds of activities.
//
// The hook implementations should move out of this package.
// However, there is a lot of tight coupling which to untangle isn't worth it ATM.
//
// How This Package Is Used By Package Snapper
//
// Deserialize a config.List using ListFromConfig().
// Then it MUST filter the list to only contain hooks for a particular filesystem using
// hooksList.CopyFilteredForFilesystem(fs).
//
// Then create a CallbackHook using NewCallbackHookForFilesystem().
//
// Pass all of the above to NewPlan() which provides a Report() and Run() method:
//
// Plan.Run(ctx context.Context,dryRun bool) executes the plan and take a context as argument that should contain a logger added using hooks.WithLogger()).
// The value of dryRun is passed through to the hooks' Run() method.
// Command hooks make it available in the environment variable ZREPL_DRYRUN.
//
// Plan.Report() can be called while Plan.Run() is executing to give an overview of plan execution progress (future use in "zrepl status").
//
package hooks

292
daemon/hooks/hook_exec.go Normal file
View File

@@ -0,0 +1,292 @@
package hooks
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/zrepl/zrepl/zfs"
)
// Re-export type here so that
// every file in package hooks doesn't
// have to import github.com/zrepl/zrepl/zfs
type Filter zfs.DatasetFilter
type Hook interface {
Filesystems() Filter
// If true and the Pre edge invocation of Run fails, Post edge will not run and other Pre edges will not run.
ErrIsFatal() bool
// Run is invoked by HookPlan for a Pre edge.
// If HookReport.HadError() == false, the Post edge will be invoked, too.
Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport
String() string
}
type Phase string
const (
PhaseSnapshot = Phase("snapshot")
PhaseTesting = Phase("testing")
)
func (p Phase) String() string {
return string(p)
}
//go:generate stringer -type=Edge
type Edge uint
const (
Pre = Edge(1 << iota)
Callback
Post
)
func (e Edge) StringForPhase(phase Phase) string {
return fmt.Sprintf("%s_%s", e.String(), phase.String())
}
//go:generate enumer -type=StepStatus -trimprefix=Step
type StepStatus int
const (
StepPending StepStatus = 1 << iota
StepExec
StepOk
StepErr
StepSkippedDueToFatalErr
StepSkippedDueToPreErr
)
type HookReport interface {
String() string
HadError() bool
Error() string
}
type Step struct {
Hook Hook
Edge Edge
Status StepStatus
Begin, End time.Time
// Report may be nil
// FIXME cannot serialize this for client status, but contains interesting info (like what error happened)
Report HookReport
state map[interface{}]interface{}
}
func (s Step) String() (out string) {
fatal := "~"
if s.Hook.ErrIsFatal() && s.Edge == Pre {
fatal = "!"
}
runTime := "..."
if s.Status != StepPending {
t := s.End.Sub(s.Begin)
runTime = t.Round(time.Millisecond).String()
}
return fmt.Sprintf("[%s] [%5s] %s [%s] %s", s.Status, runTime, fatal, s.Edge, s.Hook)
}
type Plan struct {
mtx sync.RWMutex
steps []*Step
pre []*Step // protected by mtx
cb *Step
post []*Step // not reversed, i.e. entry at index i corresponds to pre-edge in pre[i]
phase Phase
env Env
}
func NewPlan(hooks *List, phase Phase, cb *CallbackHook, extra Env) (*Plan, error) {
var pre, post []*Step
// TODO sanity check unique name of hook?
for _, hook := range *hooks {
state := make(map[interface{}]interface{})
preE := &Step{
Hook: hook,
Edge: Pre,
Status: StepPending,
state: state,
}
pre = append(pre, preE)
postE := &Step{
Hook: hook,
Edge: Post,
Status: StepPending,
state: state,
}
post = append(post, postE)
}
cbE := &Step{
Hook: cb,
Edge: Callback,
Status: StepPending,
}
steps := make([]*Step, 0, len(pre)+len(post)+1)
steps = append(steps, pre...)
steps = append(steps, cbE)
for i := len(post) - 1; i >= 0; i-- {
steps = append(steps, post[i])
}
plan := &Plan{
phase: phase,
env: extra,
steps: steps,
pre: pre,
post: post,
cb: cbE,
}
return plan, nil
}
type PlanReport []Step
func (p *Plan) Report() PlanReport {
p.mtx.RLock()
defer p.mtx.RUnlock()
rep := make([]Step, len(p.steps))
for i := range rep {
rep[i] = *p.steps[i]
}
return rep
}
func (r PlanReport) HadError() bool {
for _, e := range r {
if e.Status == StepErr {
return true
}
}
return false
}
func (r PlanReport) HadFatalError() bool {
for _, e := range r {
if e.Status == StepSkippedDueToFatalErr {
return true
}
}
return false
}
func (r PlanReport) String() string {
stepStrings := make([]string, len(r))
for i, e := range r {
stepStrings[i] = fmt.Sprintf("%02d %s", i+1, e)
}
return strings.Join(stepStrings, "\n")
}
func (p *Plan) Run(ctx context.Context, dryRun bool) {
p.mtx.RLock()
defer p.mtx.RUnlock()
w := func(f func()) {
p.mtx.RUnlock()
defer p.mtx.RLock()
p.mtx.Lock()
defer p.mtx.Unlock()
f()
}
runHook := func(s *Step, ctx context.Context, edge Edge) HookReport {
w(func() { s.Status = StepExec })
begin := time.Now()
r := s.Hook.Run(ctx, edge, p.phase, dryRun, p.env, s.state)
end := time.Now()
w(func() {
s.Report = r
s.Status = StepOk
if r.HadError() {
s.Status = StepErr
}
s.Begin, s.End = begin, end
})
return r
}
l := getLogger(ctx)
// it's a stack, execute until we reach the end of the list (last item in)
// or fail inbetween
l.Info("run pre-edges in configuration order")
next := 0
for ; next < len(p.pre); next++ {
e := p.pre[next]
l := l.WithField("hook", e.Hook)
r := runHook(e, ctx, Pre)
if r.HadError() {
l.WithError(r).Error("hook invocation failed for pre-edge")
if e.Hook.ErrIsFatal() {
l.Error("the hook run was aborted due to a fatal error in this hook")
break
}
}
}
hadFatalErr := next != len(p.pre)
if hadFatalErr {
l.Error("fatal error in a pre-snapshot hook invocation")
l.Error("no snapshot will be taken")
l.Error("only running post-edges for successful pre-edges")
w(func() {
p.post[next].Status = StepSkippedDueToFatalErr
for i := next + 1; i < len(p.pre); i++ {
p.pre[i].Status = StepSkippedDueToFatalErr
p.post[i].Status = StepSkippedDueToFatalErr
}
p.cb.Status = StepSkippedDueToFatalErr
})
return
}
l.Info("running callback")
cbR := runHook(p.cb, ctx, Callback)
if cbR.HadError() {
l.WithError(cbR).Error("callback failed")
}
l.Info("run post-edges for successful pre-edges in reverse configuration order")
// the constructor produces pre and post entries
// post is NOT reversed
next-- // now at index of last executed pre-edge
for ; next >= 0; next-- {
e := p.post[next]
l := l.WithField("hook", e.Hook)
if p.pre[next].Status != StepOk {
if p.pre[next].Status != StepErr {
panic(fmt.Sprintf("expecting a pre-edge hook report to be either Ok or Err, got %s", p.pre[next].Status))
}
l.Info("skip post-edge because pre-edge failed")
w(func() {
e.Status = StepSkippedDueToPreErr
})
continue
}
report := runHook(e, ctx, Post)
if report.HadError() {
l.WithError(report).Error("hook invocation failed for post-edge")
l.Error("subsequent post-edges run regardless of this post-edge failure")
}
// ErrIsFatal is only relevant for Pre
}
}

View File

@@ -0,0 +1,103 @@
package hooks
import (
"bufio"
"bytes"
"context"
"sync"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/util/envconst"
)
type contextKey int
const (
contextKeyLog contextKey = 0
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, log)
}
func GetLogger(ctx context.Context) Logger { return getLogger(ctx) }
func getLogger(ctx context.Context) Logger {
if log, ok := ctx.Value(contextKeyLog).(Logger); ok {
return log
}
return logger.NewNullLogger()
}
const MAX_HOOK_LOG_SIZE_DEFAULT int = 1 << 20
type logWriter struct {
/*
Mutex prevents:
concurrent writes to buf, scanner in Write([]byte)
data race on scanner vs Write([]byte)
and concurrent write to buf (call to buf.Reset())
in Close()
(Also, Close() should generally block until any Write() call completes.)
*/
mtx *sync.Mutex
buf bytes.Buffer
scanner *bufio.Scanner
logger Logger
level logger.Level
field string
}
func NewLogWriter(mtx *sync.Mutex, logger Logger, level logger.Level, field string) *logWriter {
w := new(logWriter)
w.mtx = mtx
w.scanner = bufio.NewScanner(&w.buf)
w.logger = logger
w.level = level
w.field = field
return w
}
func (w *logWriter) log(line string) {
w.logger.WithField(w.field, line).Log(w.level, "hook output")
}
func (w *logWriter) logUnreadBytes() error {
for w.scanner.Scan() {
w.log(w.scanner.Text())
}
if w.buf.Cap() > envconst.Int("ZREPL_MAX_HOOK_LOG_SIZE", MAX_HOOK_LOG_SIZE_DEFAULT) {
w.buf.Reset()
}
return nil
}
func (w *logWriter) Write(in []byte) (int, error) {
w.mtx.Lock()
defer w.mtx.Unlock()
n, bufErr := w.buf.Write(in)
if bufErr != nil {
return n, bufErr
}
err := w.logUnreadBytes()
if err != nil {
return n, err
}
// Always reset the scanner for the next Write
w.scanner = bufio.NewScanner(&w.buf)
return n, nil
}
func (w *logWriter) Close() (err error) {
w.mtx.Lock()
defer w.mtx.Unlock()
return w.logUnreadBytes()
}

View File

@@ -0,0 +1,65 @@
package hooks
import (
"context"
"fmt"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/zfs"
)
type HookJobCallback func(ctx context.Context) error
type CallbackHook struct {
cb HookJobCallback
filter Filter
displayString string
}
func NewCallbackHookForFilesystem(displayString string, fs *zfs.DatasetPath, cb HookJobCallback) *CallbackHook {
filter, _ := filters.DatasetMapFilterFromConfig(map[string]bool{fs.ToString(): true})
return NewCallbackHook(displayString, cb, filter)
}
func NewCallbackHook(displayString string, cb HookJobCallback, filter Filter) *CallbackHook {
return &CallbackHook{
cb: cb,
filter: filter,
displayString: displayString,
}
}
func (h *CallbackHook) Filesystems() Filter {
return h.filter
}
func (h *CallbackHook) ErrIsFatal() bool {
return false // callback is by definition
}
func (h *CallbackHook) String() string {
return h.displayString
}
type CallbackHookReport struct {
Name string
Err error
}
func (r *CallbackHookReport) String() string {
if r.HadError() {
return r.Error()
}
return r.Name
}
func (r *CallbackHookReport) HadError() bool { return r.Err != nil }
func (r *CallbackHookReport) Error() string {
return fmt.Sprintf("%s error: %s", r.Name, r.Err)
}
func (h *CallbackHook) Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport {
err := h.cb(ctx)
return &CallbackHookReport{h.displayString, err}
}

View File

@@ -0,0 +1,192 @@
package hooks
import (
"context"
"fmt"
"io"
"math"
"os"
"os/exec"
"sort"
"strings"
"sync"
"time"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/util/circlog"
"github.com/zrepl/zrepl/util/envconst"
)
type HookEnvVar string
const (
EnvType HookEnvVar = "ZREPL_HOOKTYPE"
EnvDryRun HookEnvVar = "ZREPL_DRYRUN"
EnvFS HookEnvVar = "ZREPL_FS"
EnvSnapshot HookEnvVar = "ZREPL_SNAPNAME"
EnvTimeout HookEnvVar = "ZREPL_TIMEOUT"
)
type Env map[HookEnvVar]string
func NewHookEnv(edge Edge, phase Phase, dryRun bool, timeout time.Duration, extra Env) Env {
r := Env{
EnvTimeout: fmt.Sprintf("%.f", math.Floor(timeout.Seconds())),
}
edgeString := edge.StringForPhase(phase)
r[EnvType] = strings.ToLower(edgeString)
var dryRunString string
if dryRun {
dryRunString = "true"
} else {
dryRunString = ""
}
r[EnvDryRun] = dryRunString
for k, v := range extra {
r[k] = v
}
return r
}
type CommandHook struct {
edge Edge
filter Filter
errIsFatal bool
command string
timeout time.Duration
}
type CommandHookReport struct {
Command string
Args []string // currently always empty
Env Env
Err error
CapturedStdoutStderrCombined []byte
}
func (r *CommandHookReport) String() string {
// Reproduces a POSIX shell-compatible command line
var cmdLine strings.Builder
sep := ""
// Make sure environment variables are always
// printed in the same order
var hookEnvKeys []HookEnvVar
for k := range r.Env {
hookEnvKeys = append(hookEnvKeys, k)
}
sort.Slice(hookEnvKeys, func(i, j int) bool { return string(hookEnvKeys[i]) < string(hookEnvKeys[j]) })
for _, k := range hookEnvKeys {
cmdLine.WriteString(fmt.Sprintf("%s%s='%s'", sep, k, r.Env[k]))
sep = " "
}
cmdLine.WriteString(fmt.Sprintf("%s%s", sep, r.Command))
for _, a := range r.Args {
cmdLine.WriteString(fmt.Sprintf("%s'%s'", sep, a))
}
return fmt.Sprintf("command hook invocation: \"%s\"", cmdLine.String()) // no %q to make copy-pastable
}
func (r *CommandHookReport) Error() string {
if r.Err == nil {
return ""
}
return fmt.Sprintf("%s FAILED with error: %s", r.String(), r.Err)
}
func (r *CommandHookReport) HadError() bool {
return r.Err != nil
}
func NewCommandHook(in *config.HookCommand) (r *CommandHook, err error) {
r = &CommandHook{
errIsFatal: in.ErrIsFatal,
command: in.Path,
timeout: in.Timeout,
}
r.filter, err = filters.DatasetMapFilterFromConfig(in.Filesystems)
if err != nil {
return nil, fmt.Errorf("cannot parse filesystem filter: %s", err)
}
r.edge = Pre | Post
return r, nil
}
func (h *CommandHook) Filesystems() Filter {
return h.filter
}
func (h *CommandHook) ErrIsFatal() bool {
return h.errIsFatal
}
func (h *CommandHook) String() string {
return h.command
}
func (h *CommandHook) Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport {
l := getLogger(ctx).WithField("command", h.command)
cmdCtx, cancel := context.WithTimeout(ctx, h.timeout)
defer cancel()
cmdExec := exec.CommandContext(cmdCtx, h.command)
hookEnv := NewHookEnv(edge, phase, dryRun, h.timeout, extra)
cmdEnv := os.Environ()
for k, v := range hookEnv {
cmdEnv = append(cmdEnv, fmt.Sprintf("%s=%s", k, v))
}
cmdExec.Env = cmdEnv
var scanMutex sync.Mutex
combinedOutput, err := circlog.NewCircularLog(envconst.Int("ZREPL_MAX_HOOK_LOG_SIZE", MAX_HOOK_LOG_SIZE_DEFAULT))
if err != nil {
return &CommandHookReport{Err: err}
}
logErrWriter := NewLogWriter(&scanMutex, l, logger.Warn, "stderr")
logOutWriter := NewLogWriter(&scanMutex, l, logger.Info, "stdout")
defer logErrWriter.Close()
defer logOutWriter.Close()
cmdExec.Stderr = io.MultiWriter(logErrWriter, combinedOutput)
cmdExec.Stdout = io.MultiWriter(logOutWriter, combinedOutput)
report := &CommandHookReport{
Command: h.command,
Env: hookEnv,
// no report.Args
}
err = cmdExec.Start()
if err != nil {
report.Err = err
return report
}
err = cmdExec.Wait()
combinedOutputBytes := combinedOutput.Bytes()
report.CapturedStdoutStderrCombined = make([]byte, len(combinedOutputBytes))
copy(report.CapturedStdoutStderrCombined, combinedOutputBytes)
if err != nil {
if cmdCtx.Err() == context.DeadlineExceeded {
report.Err = fmt.Errorf("timed out after %s: %s", h.timeout, err)
return report
}
report.Err = err
return report
}
return report
}

View File

@@ -0,0 +1,145 @@
package hooks
import (
"context"
"database/sql"
"fmt"
"strings"
sqldriver "database/sql/driver"
"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/zfs"
)
// https://dev.mysql.com/doc/mysql-backup-excerpt/5.7/en/backup-methods.html
//
// Making Backups Using a File System Snapshot:
//
// If you are using a Veritas file system, you can make a backup like this:
//
// From a client program, execute FLUSH TABLES WITH READ LOCK.
// From another shell, execute mount vxfs snapshot.
// From the first client, execute UNLOCK TABLES.
// Copy files from the snapshot.
// Unmount the snapshot.
//
// Similar snapshot capabilities may be available in other file systems, such as LVM or ZFS.
type MySQLLockTables struct {
errIsFatal bool
connector sqldriver.Connector
filesystems Filter
}
type myLockTablesStateKey int
const (
myLockTablesConnection myLockTablesStateKey = 1 + iota
)
func MyLockTablesFromConfig(in *config.HookMySQLLockTables) (*MySQLLockTables, error) {
conf, err := mysql.ParseDSN(in.DSN)
if err != nil {
return nil, errors.Wrap(err, "`dsn` invalid")
}
cn, err := mysql.NewConnector(conf)
if err != nil {
return nil, errors.Wrap(err, "`connect` invalid")
}
filesystems, err := filters.DatasetMapFilterFromConfig(in.Filesystems)
if err != nil {
return nil, errors.Wrap(err, "`filesystems` invalid")
}
return &MySQLLockTables{
in.ErrIsFatal,
cn,
filesystems,
}, nil
}
func (h *MySQLLockTables) ErrIsFatal() bool { return h.errIsFatal }
func (h *MySQLLockTables) Filesystems() Filter { return h.filesystems }
func (h *MySQLLockTables) String() string { return "MySQL FLUSH TABLES WITH READ LOCK" }
type MyLockTablesReport struct {
What string
Err error
}
func (r *MyLockTablesReport) HadError() bool { return r.Err != nil }
func (r *MyLockTablesReport) Error() string { return r.String() }
func (r *MyLockTablesReport) String() string {
var s strings.Builder
s.WriteString(r.What)
if r.Err != nil {
fmt.Fprintf(&s, ": %s", r.Err)
}
return s.String()
}
func (h *MySQLLockTables) Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport {
fs, ok := extra[EnvFS]
if !ok {
panic(extra)
}
dp, err := zfs.NewDatasetPath(fs)
if err != nil {
panic(err)
}
if pass, err := h.filesystems.Filter(dp); err != nil {
return &MyLockTablesReport{What: "filesystem filter", Err: err}
} else if !pass {
getLogger(ctx).Debug("filesystem does not match filter, skipping")
return &MyLockTablesReport{What: "filesystem filter skipped this filesystem", Err: nil}
}
switch edge {
case Pre:
err := h.doRunPre(ctx, dp, dryRun, state)
return &MyLockTablesReport{"FLUSH TABLES WITH READ LOCK", err}
case Post:
err := h.doRunPost(ctx, dp, dryRun, state)
return &MyLockTablesReport{"UNLOCK TABLES", err}
}
return &MyLockTablesReport{What: "skipped this edge", Err: nil}
}
func (h *MySQLLockTables) doRunPre(ctx context.Context, fs *zfs.DatasetPath, dry bool, state map[interface{}]interface{}) (err error) {
db := sql.OpenDB(h.connector)
defer func(err *error) {
if *err != nil {
db.Close()
}
}(&err)
getLogger(ctx).Debug("do FLUSH TABLES WITH READ LOCK")
_, err = db.ExecContext(ctx, "FLUSH TABLES WITH READ LOCK")
if err != nil {
return
}
state[myLockTablesConnection] = db
return nil
}
func (h *MySQLLockTables) doRunPost(ctx context.Context, fs *zfs.DatasetPath, dry bool, state map[interface{}]interface{}) error {
db := state[myLockTablesConnection].(*sql.DB)
defer db.Close()
getLogger(ctx).Debug("do UNLOCK TABLES")
_, err := db.ExecContext(ctx, "UNLOCK TABLES")
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,98 @@
package hooks
import (
"context"
"database/sql"
"fmt"
"math"
"time"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/zfs"
)
type PgChkptHook struct {
errIsFatal bool
connector *pq.Connector
filesystems Filter
}
func PgChkptHookFromConfig(in *config.HookPostgresCheckpoint) (*PgChkptHook, error) {
filesystems, err := filters.DatasetMapFilterFromConfig(in.Filesystems)
if err != nil {
return nil, errors.Wrap(err, "`filesystems` invalid")
}
cn, err := pq.NewConnector(in.DSN)
if err != nil {
return nil, errors.Wrap(err, "`dsn` invalid")
}
return &PgChkptHook{
in.ErrIsFatal,
cn,
filesystems,
}, nil
}
func (h *PgChkptHook) ErrIsFatal() bool { return h.errIsFatal }
func (h *PgChkptHook) Filesystems() Filter { return h.filesystems }
func (h *PgChkptHook) String() string { return "postgres checkpoint" }
type PgChkptHookReport struct{ Err error }
func (r *PgChkptHookReport) HadError() bool { return r.Err != nil }
func (r *PgChkptHookReport) Error() string { return r.Err.Error() }
func (r *PgChkptHookReport) String() string {
if r.Err != nil {
return fmt.Sprintf("postgres CHECKPOINT failed: %s", r.Err)
} else {
return "postgres CHECKPOINT completed"
}
}
func (h *PgChkptHook) Run(ctx context.Context, edge Edge, phase Phase, dryRun bool, extra Env, state map[interface{}]interface{}) HookReport {
if edge != Pre {
return &PgChkptHookReport{nil}
}
fs, ok := extra[EnvFS]
if !ok {
panic(extra)
}
dp, err := zfs.NewDatasetPath(fs)
if err != nil {
panic(err)
}
err = h.doRunPre(ctx, dp, dryRun)
return &PgChkptHookReport{err}
}
func (h *PgChkptHook) doRunPre(ctx context.Context, fs *zfs.DatasetPath, dry bool) error {
if pass, err := h.filesystems.Filter(fs); err != nil || !pass {
getLogger(ctx).Debug("filesystem does not match filter, skipping")
return err
}
db := sql.OpenDB(h.connector)
defer db.Close()
dl, ok := ctx.Deadline()
if ok {
timeout := uint64(math.Floor(time.Until(dl).Seconds() * 1000)) // TODO go1.13 milliseconds
getLogger(ctx).WithField("statement_timeout", timeout).Debug("setting statement timeout for CHECKPOINT")
_, err := db.ExecContext(ctx, "SET statement_timeout TO ?", timeout)
if err != nil {
return err
}
}
if dry {
getLogger(ctx).Debug("dry-run - use ping instead of CHECKPOINT")
return db.PingContext(ctx)
}
getLogger(ctx).Info("execute CHECKPOINT command")
_, err := db.ExecContext(ctx, "CHECKPOINT")
return err
}

484
daemon/hooks/hooks_test.go Normal file
View File

@@ -0,0 +1,484 @@
package hooks_test
import (
"bytes"
"context"
"fmt"
"os"
"regexp"
"testing"
"text/template"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/zfs"
)
type comparisonAssertionFunc func(require.TestingT, interface{}, interface{}, ...interface{})
type valueAssertionFunc func(require.TestingT, interface{}, ...interface{})
type expectStep struct {
ExpectedEdge hooks.Edge
ExpectStatus hooks.StepStatus
OutputTest valueAssertionFunc
ErrorTest valueAssertionFunc
}
type testCase struct {
Name string
Config []string
IsSlow bool
SuppressOutput bool
ExpectCallbackSkipped bool
ExpectHadFatalErr bool
ExpectHadError bool
ExpectStepReports []expectStep
}
func curryLeft(f comparisonAssertionFunc, expected interface{}) valueAssertionFunc {
return curry(f, expected, false)
}
func curryRight(f comparisonAssertionFunc, expected interface{}) valueAssertionFunc {
return curry(f, expected, true)
}
func curry(f comparisonAssertionFunc, expected interface{}, right bool) (ret valueAssertionFunc) {
ret = func(t require.TestingT, s interface{}, v ...interface{}) {
var x interface{}
var y interface{}
if right {
x = s
y = expected
} else {
x = expected
y = s
}
if len(v) > 0 {
f(t, x, y, v)
} else {
f(t, x, y)
}
}
return
}
func TestHooks(t *testing.T) {
testFSName := "testpool/testdataset"
testSnapshotName := "testsnap"
tmpl, err := template.New("TestHooks").Parse(`
jobs:
- name: TestHooks
type: snap
filesystems: {"<": true}
snapshotting:
type: periodic
interval: 1m
prefix: zrepl_snapjob_
hooks:
{{- template "List" . }}
pruning:
keep:
- type: last_n
count: 10
`)
if err != nil {
panic(err)
}
regexpTest := func(s string) valueAssertionFunc {
return curryLeft(require.Regexp, regexp.MustCompile(s))
}
containsTest := func(s string) valueAssertionFunc {
return curryRight(require.Contains, s)
}
testTable := []testCase{
testCase{
Name: "no_hooks",
ExpectStepReports: []expectStep{
{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk},
},
},
testCase{
Name: "timeout",
IsSlow: true,
ExpectHadError: true,
Config: []string{`{type: command, path: {{.WorkDir}}/test/test-timeout.sh, timeout: 2s}`},
ExpectStepReports: []expectStep{
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepErr,
OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s ZREPL_TIMEOUT=2", testFSName, testSnapshotName)),
ErrorTest: regexpTest(`timed out after 2(.\d+)?s`),
},
expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepSkippedDueToPreErr,
},
},
},
testCase{
Name: "check_env",
Config: []string{`{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`},
ExpectHadError: false,
ExpectStepReports: []expectStep{
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepOk,
OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s", testFSName, testSnapshotName)),
},
expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepOk,
OutputTest: containsTest(fmt.Sprintf("TEST post_testing %s@%s", testFSName, testSnapshotName)),
},
},
},
testCase{
Name: "nonfatal_pre_error_continues",
ExpectCallbackSkipped: false,
ExpectHadError: true,
Config: []string{`{type: command, path: {{.WorkDir}}/test/test-error.sh}`},
ExpectStepReports: []expectStep{
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepErr,
OutputTest: containsTest(fmt.Sprintf("TEST ERROR pre_testing %s@%s", testFSName, testSnapshotName)),
ErrorTest: regexpTest("^command hook invocation.*exit status 1$"),
},
expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepSkippedDueToPreErr, // post-edge is not executed for failing pre-edge
},
},
},
testCase{
Name: "pre_error_fatal_skips_subsequent_pre_edges_and_callback_and_its_post_edge_and_post_edges",
ExpectCallbackSkipped: true,
ExpectHadFatalErr: true,
ExpectHadError: true,
Config: []string{
`{type: command, path: {{.WorkDir}}/test/test-error.sh, err_is_fatal: true}`,
`{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`,
},
ExpectStepReports: []expectStep{
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepErr,
OutputTest: containsTest(fmt.Sprintf("TEST ERROR pre_testing %s@%s", testFSName, testSnapshotName)),
ErrorTest: regexpTest("^command hook invocation.*exit status 1$"),
},
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepSkippedDueToFatalErr,
},
expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepSkippedDueToFatalErr},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepSkippedDueToFatalErr,
},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepSkippedDueToFatalErr,
},
},
},
testCase{
Name: "post_error_fails_are_ignored_even_if_fatal",
ExpectHadFatalErr: false, // only occurs during Post, so it's not a fatal error
ExpectHadError: true,
Config: []string{
`{type: command, path: {{.WorkDir}}/test/test-post-error.sh, err_is_fatal: true}`,
`{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`,
},
ExpectStepReports: []expectStep{
expectStep{
// No-action run of test-post-error.sh
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepOk,
OutputTest: require.Empty,
},
expectStep{
// Pre run of test-report-env.sh
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepOk,
OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s", testFSName, testSnapshotName)),
},
expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepOk,
OutputTest: containsTest(fmt.Sprintf("TEST post_testing %s@%s", testFSName, testSnapshotName)),
},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepErr,
OutputTest: containsTest(fmt.Sprintf("TEST ERROR post_testing %s@%s", testFSName, testSnapshotName)),
ErrorTest: regexpTest("^command hook invocation.*exit status 1$"),
},
},
},
testCase{
Name: "cleanup_check_env",
Config: []string{`{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`},
ExpectStepReports: []expectStep{
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepOk,
OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s", testFSName, testSnapshotName)),
},
expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepOk,
OutputTest: containsTest(fmt.Sprintf("TEST post_testing %s@%s", testFSName, testSnapshotName)),
},
},
},
testCase{
Name: "pre_error_cancels_post",
Config: []string{`{type: command, path: {{.WorkDir}}/test/test-pre-error-post-ok.sh}`},
ExpectHadError: true,
ExpectHadFatalErr: false,
ExpectStepReports: []expectStep{
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepErr,
OutputTest: containsTest(fmt.Sprintf("TEST ERROR pre_testing %s@%s", testFSName, testSnapshotName)),
ErrorTest: regexpTest("^command hook invocation.*exit status 1$"),
},
expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepSkippedDueToPreErr,
},
},
},
testCase{
Name: "pre_error_does_not_cancel_other_posts_but_itself",
Config: []string{
`{type: command, path: {{.WorkDir}}/test/test-report-env.sh}`,
`{type: command, path: {{.WorkDir}}/test/test-pre-error-post-ok.sh}`,
},
ExpectHadError: true,
ExpectHadFatalErr: false,
ExpectStepReports: []expectStep{
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepOk,
OutputTest: containsTest(fmt.Sprintf("TEST pre_testing %s@%s", testFSName, testSnapshotName)),
},
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepErr,
OutputTest: containsTest(fmt.Sprintf("TEST ERROR pre_testing %s@%s", testFSName, testSnapshotName)),
ErrorTest: regexpTest("^command hook invocation.*exit status 1$"),
},
expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepSkippedDueToPreErr,
},
expectStep{
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepOk,
OutputTest: containsTest(fmt.Sprintf("TEST post_testing %s@%s", testFSName, testSnapshotName)),
},
},
},
testCase{
Name: "exceed_buffer_limit",
SuppressOutput: true,
Config: []string{`{type: command, path: {{.WorkDir}}/test/test-large-stdout.sh}`},
ExpectHadError: false,
ExpectHadFatalErr: false,
ExpectStepReports: []expectStep{
expectStep{
ExpectedEdge: hooks.Pre,
ExpectStatus: hooks.StepOk,
OutputTest: func(t require.TestingT, s interface{}, v ...interface{}) {
require.Len(t, s, 1<<20)
},
},
expectStep{ExpectedEdge: hooks.Callback, ExpectStatus: hooks.StepOk},
expectStep{
// No-action run of above hook
ExpectedEdge: hooks.Post,
ExpectStatus: hooks.StepOk,
OutputTest: require.Empty,
},
},
},
/*
Following not intended to test functionality of
filter package. Only to demonstrate that hook
filters are being applied. The following should
result in NO hooks running. If it does run, a
fatal hooks.RunReport will be returned.
*/
testCase{
Name: "exclude_all_filesystems",
Config: []string{
`{type: command, path: {{.WorkDir}}/test/test-error.sh, err_is_fatal: true, filesystems: {"<": false}}`,
},
ExpectStepReports: []expectStep{
expectStep{
ExpectedEdge: hooks.Callback,
ExpectStatus: hooks.StepOk,
},
},
},
}
parseHookConfig := func(t *testing.T, in string) *config.Config {
t.Helper()
conf, err := config.ParseConfigBytes([]byte(in))
require.NoError(t, err)
require.NotNil(t, conf)
return conf
}
fillHooks := func(tt *testCase) string {
// make hook path absolute
cwd, err := os.Getwd()
if err != nil {
panic("os.Getwd() failed")
}
var hooksTmpl string = "\n"
for _, l := range tt.Config {
hooksTmpl += fmt.Sprintf(" - %s\n", l)
}
tmpl.New("List").Parse(hooksTmpl)
var outBytes bytes.Buffer
data := struct {
WorkDir string
}{
WorkDir: cwd,
}
if err := tmpl.Execute(&outBytes, data); err != nil {
panic(err)
}
return outBytes.String()
}
var c *config.Config
fs, err := zfs.NewDatasetPath(testFSName)
require.NoError(t, err)
log := logger.NewTestLogger(t)
var cbReached bool
cb := hooks.NewCallbackHookForFilesystem("testcallback", fs, func(_ context.Context) error {
cbReached = true
return nil
})
hookEnvExtra := hooks.Env{
hooks.EnvFS: fs.ToString(),
hooks.EnvSnapshot: testSnapshotName,
}
for _, tt := range testTable {
if testing.Short() && tt.IsSlow {
continue
}
t.Run(tt.Name, func(t *testing.T) {
c = parseHookConfig(t, fillHooks(&tt))
snp := c.Jobs[0].Ret.(*config.SnapJob).Snapshotting.Ret.(*config.SnapshottingPeriodic)
hookList, err := hooks.ListFromConfig(&snp.Hooks)
require.NoError(t, err)
filteredHooks, err := hookList.CopyFilteredForFilesystem(fs)
require.NoError(t, err)
plan, err := hooks.NewPlan(&filteredHooks, hooks.PhaseTesting, cb, hookEnvExtra)
require.NoError(t, err)
t.Logf("REPORT PRE EXECUTION:\n%s", plan.Report())
cbReached = false
ctx := context.Background()
if testing.Verbose() && !tt.SuppressOutput {
ctx = hooks.WithLogger(ctx, log)
}
plan.Run(ctx, false)
report := plan.Report()
t.Logf("REPORT POST EXECUTION:\n%s", report)
/*
* TEST ASSERTIONS
*/
t.Logf("len(runReports)=%v", len(report))
t.Logf("len(tt.ExpectStepReports)=%v", len(tt.ExpectStepReports))
require.Equal(t, len(tt.ExpectStepReports), len(report), "ExpectStepReports must be same length as expected number of hook runs, excluding possible Callback")
// Check if callback ran, when required
if tt.ExpectCallbackSkipped {
require.False(t, cbReached, "callback ran but should not have run")
} else {
require.True(t, cbReached, "callback should have run but did not")
}
// Check if a fatal run error occurred and was expected
require.Equal(t, tt.ExpectHadFatalErr, report.HadFatalError(), "non-matching HadFatalError")
require.Equal(t, tt.ExpectHadError, report.HadError(), "non-matching HadError")
if tt.ExpectHadFatalErr {
require.True(t, tt.ExpectHadError, "ExpectHadFatalErr implies ExpectHadError")
}
if !tt.ExpectHadError {
require.False(t, tt.ExpectHadFatalErr, "!ExpectHadError implies !ExpectHadFatalErr")
}
// Iterate through each expected hook run
for i, hook := range tt.ExpectStepReports {
t.Logf("expecting report conforming to %v", hook)
exp, act := hook.ExpectStatus, report[i].Status
require.Equal(t, exp, act, "%s != %s", exp, act)
// Check for required ExpectedEdge
require.NotZero(t, hook.ExpectedEdge, "each hook must have an ExpectedEdge")
require.Equal(t, hook.ExpectedEdge, report[i].Edge,
"incorrect edge: expected %q, actual %q", hook.ExpectedEdge.String(), report[i].Edge.String(),
)
// Check for expected output
if hook.OutputTest != nil {
require.IsType(t, (*hooks.CommandHookReport)(nil), report[i].Report)
chr := report[i].Report.(*hooks.CommandHookReport)
hook.OutputTest(t, string(chr.CapturedStdoutStderrCombined))
}
// Check for expected errors
if hook.ErrorTest != nil {
hook.ErrorTest(t, string(report[i].Report.Error()))
}
}
})
}
}

View File

@@ -0,0 +1,77 @@
// Code generated by "enumer -type=StepStatus -trimprefix=Step"; DO NOT EDIT.
//
package hooks
import (
"fmt"
)
const (
_StepStatusName_0 = "PendingExec"
_StepStatusName_1 = "Ok"
_StepStatusName_2 = "Err"
_StepStatusName_3 = "SkippedDueToFatalErr"
_StepStatusName_4 = "SkippedDueToPreErr"
)
var (
_StepStatusIndex_0 = [...]uint8{0, 7, 11}
_StepStatusIndex_1 = [...]uint8{0, 2}
_StepStatusIndex_2 = [...]uint8{0, 3}
_StepStatusIndex_3 = [...]uint8{0, 20}
_StepStatusIndex_4 = [...]uint8{0, 18}
)
func (i StepStatus) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _StepStatusName_0[_StepStatusIndex_0[i]:_StepStatusIndex_0[i+1]]
case i == 4:
return _StepStatusName_1
case i == 8:
return _StepStatusName_2
case i == 16:
return _StepStatusName_3
case i == 32:
return _StepStatusName_4
default:
return fmt.Sprintf("StepStatus(%d)", i)
}
}
var _StepStatusValues = []StepStatus{1, 2, 4, 8, 16, 32}
var _StepStatusNameToValueMap = map[string]StepStatus{
_StepStatusName_0[0:7]: 1,
_StepStatusName_0[7:11]: 2,
_StepStatusName_1[0:2]: 4,
_StepStatusName_2[0:3]: 8,
_StepStatusName_3[0:20]: 16,
_StepStatusName_4[0:18]: 32,
}
// StepStatusString retrieves an enum value from the enum constants string name.
// Throws an error if the param is not part of the enum.
func StepStatusString(s string) (StepStatus, error) {
if val, ok := _StepStatusNameToValueMap[s]; ok {
return val, nil
}
return 0, fmt.Errorf("%s does not belong to StepStatus values", s)
}
// StepStatusValues returns all values of the enum
func StepStatusValues() []StepStatus {
return _StepStatusValues
}
// IsAStepStatus returns "true" if the value is listed in the enum definition. "false" otherwise
func (i StepStatus) IsAStepStatus() bool {
for _, v := range _StepStatusValues {
if i == v {
return true
}
}
return false
}

View File

@@ -0,0 +1,5 @@
#!/bin/sh -eu
>&2 echo "TEST ERROR $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME"
exit 1

View File

@@ -0,0 +1,18 @@
#!/bin/sh -eu
# Exceed default hook log size of 1<<20 bytes by double
# The Go test should fail if buffer size exceeds 1MB
pre_testing() {
ln=0
while :; do printf '%06d: 012345678901234567890123456789012345678901234567890123456789012345678901\n' "$ln"; ln="$(($ln + 1))"; done \
| head -c$(( 2 << 20 ))
}
case "$ZREPL_HOOKTYPE" in
pre_testing)
"$ZREPL_HOOKTYPE";;
*)
# Not handled by this script
exit 0;;
esac

View File

@@ -0,0 +1,15 @@
#!/bin/sh -eu
post_testing() {
>&2 echo "TEST ERROR $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME"
exit 1
}
case "$ZREPL_HOOKTYPE" in
post_testing)
"$ZREPL_HOOKTYPE";;
*)
# Not handled by this script
exit 0;;
esac

View File

@@ -0,0 +1,11 @@
#!/bin/sh -eu
if [ "$ZREPL_HOOKTYPE" = "pre_testing" ]; then
>&2 echo "TEST ERROR $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME"
exit 1
elif [ "$ZREPL_HOOKTYPE" = "post_testing" ]; then
echo "TEST $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME"
else
printf "Unknown hook type: %s" "$ZREPL_HOOKTYPE"
exit 255
fi

View File

@@ -0,0 +1,3 @@
#!/bin/sh -eu
echo "TEST $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME ${ZREPL_DRYRUN:+(dry run)}"

View File

@@ -0,0 +1,5 @@
#!/bin/sh -eu
echo "TEST $ZREPL_HOOKTYPE $ZREPL_FS@$ZREPL_SNAPNAME ZREPL_TIMEOUT=$ZREPL_TIMEOUT"
exec sleep $(($ZREPL_TIMEOUT + 1))