WIP: recurring jobs

Done:

* implement autosnapper that asserts interval between snapshots
* implement pruner

* job pull: pulling + pruning
* job source: autosnapping + serving

TODO

* job source: pruning
* job local: everything
* fatal errors such as serve that cannot bind socket must be more
visible
* couldn't things that need a snapshotprefix just use a interface
Prefixer() instead? then we could have prefixsnapshotfilter and not
duplicate it every time...
* either go full context.Context or not at all...? just wait because
community climate around it isn't that great and we only need it for
cancellation? roll our own?
This commit is contained in:
Christian Schwarz 2017-09-13 23:27:18 +02:00
parent c6ca1efaae
commit e70b6f3071
8 changed files with 425 additions and 45 deletions

137
cmd/autosnap.go Normal file
View File

@ -0,0 +1,137 @@
package cmd
import (
"context"
"fmt"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
"sort"
"time"
)
type IntervalAutosnap struct {
DatasetFilter zfs.DatasetFilter
Prefix string
SnapshotInterval time.Duration
log Logger
snaptimes []snapTime
timer time.Timer
}
type snapTime struct {
ds *zfs.DatasetPath
time time.Time
}
func (a *IntervalAutosnap) Run(ctx context.Context) {
a.log = ctx.Value(contextKeyLog).(Logger)
const LOG_TIME_FMT string = time.ANSIC
ds, err := zfs.ZFSListMapping(a.DatasetFilter)
if err != nil {
a.log.Printf("error listing datasets: %s", err)
return
}
if len(ds) == 0 {
a.log.Printf("no datasets matching dataset filter")
return
}
a.snaptimes = make([]snapTime, len(ds))
now := time.Now()
a.log.Printf("examining filesystem state")
for i, d := range ds {
l := util.NewPrefixLogger(a.log, d.ToString())
fsvs, err := zfs.ZFSListFilesystemVersions(d, &PrefixSnapshotFilter{a.Prefix})
if err != nil {
l.Printf("error listing filesystem versions of %s")
continue
}
if len(fsvs) <= 0 {
l.Printf("no filesystem versions with prefix '%s'", a.Prefix)
a.snaptimes[i] = snapTime{d, now}
continue
}
// Sort versions by creation
sort.SliceStable(fsvs, func(i, j int) bool {
return fsvs[i].CreateTXG < fsvs[j].CreateTXG
})
latest := fsvs[len(fsvs)-1]
l.Printf("latest snapshot at %s (%s old)", latest.Creation.Format(LOG_TIME_FMT), now.Sub(latest.Creation))
since := now.Sub(latest.Creation)
if since < 0 {
l.Printf("error: snapshot is from future (created at %s)", latest.Creation.Format(LOG_TIME_FMT))
continue
}
next := now
if since < a.SnapshotInterval {
next = latest.Creation.Add(a.SnapshotInterval)
}
a.snaptimes[i] = snapTime{d, next}
}
sort.Slice(a.snaptimes, func(i, j int) bool {
return a.snaptimes[i].time.Before(a.snaptimes[j].time)
})
syncPoint := a.snaptimes[0]
a.log.Printf("sync point at %s (in %s)", syncPoint.time.Format(LOG_TIME_FMT), syncPoint.time.Sub(now))
select {
case <-ctx.Done():
a.log.Printf("context: %s", ctx.Err())
return
case <-time.After(syncPoint.time.Sub(now)):
a.log.Printf("snapshotting all filesystems to enable further snaps in lockstep")
a.doSnapshots()
}
ticker := time.NewTicker(a.SnapshotInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
a.log.Printf("context: %s", ctx.Err())
return
case <-ticker.C:
a.doSnapshots()
}
}
}
func (a *IntervalAutosnap) doSnapshots() {
// fetch new dataset list in case user added new dataset
ds, err := zfs.ZFSListMapping(a.DatasetFilter)
if err != nil {
a.log.Printf("error listing datasets: %s", err)
return
}
// TODO channel programs -> allow a little jitter?
for _, d := range ds {
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", a.Prefix, suffix)
a.log.Printf("snapshotting %s@%s", d.ToString(), snapname)
err := zfs.ZFSSnapshot(d, snapname, false)
if err != nil {
a.log.Printf("error snapshotting %s: %s", d.ToString(), err)
}
}
}

View File

@ -1,15 +1,24 @@
package cmd package cmd
import ( import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/zfs"
"strings" "strings"
"github.com/pkg/errors"
) )
type PrefixSnapshotFilter struct { type PrefixSnapshotFilter struct {
Prefix string Prefix string
} }
func parseSnapshotPrefix(i string) (p string, err error) {
if len(i) <= 0 {
err = errors.Errorf("snapshot prefix must not be empty string")
return
}
p = i
return
}
func parsePrefixSnapshotFilter(i string) (f *PrefixSnapshotFilter, err error) { func parsePrefixSnapshotFilter(i string) (f *PrefixSnapshotFilter, err error) {
if !(len(i) > 0) { if !(len(i) > 0) {
err = errors.Errorf("snapshot prefix must be longer than 0 characters") err = errors.Errorf("snapshot prefix must be longer than 0 characters")
@ -22,4 +31,3 @@ func parsePrefixSnapshotFilter(i string) (f *PrefixSnapshotFilter, err error) {
func (f *PrefixSnapshotFilter) Filter(fsv zfs.FilesystemVersion) (accept bool, err error) { func (f *PrefixSnapshotFilter) Filter(fsv zfs.FilesystemVersion) (accept bool, err error) {
return fsv.Type == zfs.Snapshot && strings.HasPrefix(fsv.Name, f.Prefix), nil return fsv.Type == zfs.Snapshot && strings.HasPrefix(fsv.Name, f.Prefix), nil
} }

View File

@ -3,6 +3,7 @@ package cmd
import ( import (
"time" "time"
"context"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/rpc"
@ -76,7 +77,9 @@ func (j *LocalJob) JobName() string {
return j.Name return j.Name
} }
func (j *LocalJob) JobDo(log Logger) (err error) { func (j *LocalJob) JobStart(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger)
local := rpc.NewLocalRPC() local := rpc.NewLocalRPC()
handler := Handler{ handler := Handler{
@ -90,5 +93,8 @@ func (j *LocalJob) JobDo(log Logger) (err error) {
} }
registerEndpoints(local, handler) registerEndpoints(local, handler)
return doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy}) err := doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy})
if err != nil {
log.Printf("error doing pull: %s", err)
}
} }

View File

@ -3,6 +3,7 @@ package cmd
import ( import (
"time" "time"
"context"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/rpc"
@ -12,8 +13,11 @@ import (
type PullJob struct { type PullJob struct {
Name string Name string
Connect RWCConnecter Connect RWCConnecter
Interval time.Duration
Mapping *DatasetMapFilter Mapping *DatasetMapFilter
SnapshotFilter *PrefixSnapshotFilter // constructed from mapping during parsing
pruneFilter *DatasetMapFilter
SnapshotPrefix string
InitialReplPolicy InitialReplPolicy InitialReplPolicy InitialReplPolicy
Prune PrunePolicy Prune PrunePolicy
Debug JobDebugSettings Debug JobDebugSettings
@ -23,6 +27,7 @@ func parsePullJob(name string, i map[string]interface{}) (j *PullJob, err error)
var asMap struct { var asMap struct {
Connect map[string]interface{} Connect map[string]interface{}
Interval string
Mapping map[string]string Mapping map[string]string
InitialReplPolicy string `mapstructure:"initial_repl_policy"` InitialReplPolicy string `mapstructure:"initial_repl_policy"`
Prune map[string]interface{} Prune map[string]interface{}
@ -43,19 +48,29 @@ func parsePullJob(name string, i map[string]interface{}) (j *PullJob, err error)
return nil, err return nil, err
} }
if j.Interval, err = time.ParseDuration(asMap.Interval); err != nil {
err = errors.Wrap(err, "cannot parse 'interval'")
return nil, err
}
j.Mapping, err = parseDatasetMapFilter(asMap.Mapping, false) j.Mapping, err = parseDatasetMapFilter(asMap.Mapping, false)
if err != nil { if err != nil {
err = errors.Wrap(err, "cannot parse 'mapping'") err = errors.Wrap(err, "cannot parse 'mapping'")
return nil, err return nil, err
} }
if j.pruneFilter, err = j.Mapping.InvertedFilter(); err != nil {
err = errors.Wrap(err, "cannot automatically invert 'mapping' for prune job")
return nil, err
}
j.InitialReplPolicy, err = parseInitialReplPolicy(asMap.InitialReplPolicy, DEFAULT_INITIAL_REPL_POLICY) j.InitialReplPolicy, err = parseInitialReplPolicy(asMap.InitialReplPolicy, DEFAULT_INITIAL_REPL_POLICY)
if err != nil { if err != nil {
err = errors.Wrap(err, "cannot parse 'initial_repl_policy'") err = errors.Wrap(err, "cannot parse 'initial_repl_policy'")
return return
} }
if j.SnapshotFilter, err = parsePrefixSnapshotFilter(asMap.SnapshotPrefix); err != nil { if j.SnapshotPrefix, err = parseSnapshotPrefix(asMap.SnapshotPrefix); err != nil {
return return
} }
@ -76,12 +91,19 @@ func (j *PullJob) JobName() string {
return j.Name return j.Name
} }
func (j *PullJob) JobDo(log Logger) (err error) { func (j *PullJob) JobStart(ctx context.Context) {
ticker := time.NewTicker(j.Interval)
start:
log := ctx.Value(contextKeyLog).(Logger)
log.Printf("connecting")
rwc, err := j.Connect.Connect() rwc, err := j.Connect.Connect()
if err != nil { if err != nil {
log.Printf("error connect: %s", err) log.Printf("error connecting: %s", err)
return err return
} }
rwc, err = util.NewReadWriteCloserLogger(rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump) rwc, err = util.NewReadWriteCloserLogger(rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump)
@ -94,6 +116,33 @@ func (j *PullJob) JobDo(log Logger) (err error) {
client.SetLogger(log, true) client.SetLogger(log, true)
} }
defer closeRPCWithTimeout(log, client, time.Second*10, "") log.Printf("starting pull")
return doPull(PullContext{client, log, j.Mapping, j.InitialReplPolicy})
pullLog := util.NewPrefixLogger(log, "pull")
err = doPull(PullContext{client, pullLog, j.Mapping, j.InitialReplPolicy})
if err != nil {
log.Printf("error doing pull: %s", err)
}
closeRPCWithTimeout(log, client, time.Second*10, "")
log.Printf("starting prune")
prunectx := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "prune"))
pruner := Pruner{time.Now(), false, j.pruneFilter, j.SnapshotPrefix, j.Prune}
pruner.Run(prunectx)
log.Printf("finish prune")
log.Printf("wait for next interval")
select {
case <-ctx.Done():
log.Printf("context: %s", ctx.Err())
return
case <-ticker.C:
goto start
}
}
func (j *PullJob) doRun(ctx context.Context) {
} }

View File

@ -1,14 +1,13 @@
package cmd package cmd
import ( import (
"context"
mapstructure "github.com/mitchellh/mapstructure" mapstructure "github.com/mitchellh/mapstructure"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/util"
"io" "io"
"os" "sync"
"os/signal"
"syscall"
"time" "time"
) )
@ -16,10 +15,13 @@ type SourceJob struct {
Name string Name string
Serve AuthenticatedChannelListenerFactory Serve AuthenticatedChannelListenerFactory
Datasets *DatasetMapFilter Datasets *DatasetMapFilter
SnapshotFilter *PrefixSnapshotFilter SnapshotPrefix string
Interval time.Duration Interval time.Duration
Prune PrunePolicy Prune PrunePolicy
Debug JobDebugSettings Debug JobDebugSettings
snapCancel context.CancelFunc
serveCancel context.CancelFunc
} }
func parseSourceJob(name string, i map[string]interface{}) (j *SourceJob, err error) { func parseSourceJob(name string, i map[string]interface{}) (j *SourceJob, err error) {
@ -48,7 +50,7 @@ func parseSourceJob(name string, i map[string]interface{}) (j *SourceJob, err er
return return
} }
if j.SnapshotFilter, err = parsePrefixSnapshotFilter(asMap.SnapshotPrefix); err != nil { if j.SnapshotPrefix, err = parseSnapshotPrefix(asMap.SnapshotPrefix); err != nil {
return return
} }
@ -74,16 +76,46 @@ func (j *SourceJob) JobName() string {
return j.Name return j.Name
} }
func (j *SourceJob) JobDo(log Logger) (err error) { func (j *SourceJob) JobStart(ctx context.Context) {
var wg sync.WaitGroup
log := ctx.Value(contextKeyLog).(Logger)
log.Printf("starting autosnap")
var snapContext context.Context
snapContext, j.snapCancel = context.WithCancel(ctx)
snapContext = context.WithValue(snapContext, contextKeyLog, util.NewPrefixLogger(log, "autosnap"))
a := IntervalAutosnap{DatasetFilter: j.Datasets, Prefix: j.SnapshotPrefix, SnapshotInterval: j.Interval}
wg.Add(1)
go func() {
a.Run(snapContext)
wg.Done()
}()
log.Printf("starting serve")
var serveContext context.Context
serveContext, j.serveCancel = context.WithCancel(ctx)
serveContext = context.WithValue(serveContext, contextKeyLog, util.NewPrefixLogger(log, "serve"))
wg.Add(1)
go func() {
j.serve(serveContext)
wg.Done()
}()
wg.Wait()
}
func (j *SourceJob) serve(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger)
listener, err := j.Serve.Listen() listener, err := j.Serve.Listen()
if err != nil { if err != nil {
return err log.Printf("error listening: %s", err)
return
} }
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
rwcChan := make(chan io.ReadWriteCloser) rwcChan := make(chan io.ReadWriteCloser)
// Serve connections until interrupted or error // Serve connections until interrupted or error
@ -131,24 +163,20 @@ outer:
} }
rwc.Close() rwc.Close()
case sig := <-sigChan: case <-ctx.Done():
log.Printf("context: %s", ctx.Err())
log.Printf("%s received", sig)
break outer break outer
} }
} }
signal.Stop(sigChan)
close(sigChan)
log.Printf("closing listener") log.Printf("closing listener")
err = listener.Close() err = listener.Close()
if err != nil { if err != nil {
log.Printf("error closing listener: %s", err) log.Printf("error closing listener: %s", err)
} }
return nil return
} }

View File

@ -1,9 +1,12 @@
package cmd package cmd
import ( import (
"github.com/spf13/cobra" "context"
"fmt" "fmt"
"sync" "github.com/spf13/cobra"
"os"
"os/signal"
"syscall"
) )
// daemonCmd represents the daemon command // daemonCmd represents the daemon command
@ -26,32 +29,75 @@ func (l jobLogger) Printf(format string, v ...interface{}) {
l.MainLog.Printf(fmt.Sprintf("[%s]: %s", l.JobName, format), v...) l.MainLog.Printf(fmt.Sprintf("[%s]: %s", l.JobName, format), v...)
} }
type Job interface { type Job interface {
JobName() string JobName() string
JobDo(log Logger) (err error) JobStart(ctxt context.Context)
} }
func doDaemon(cmd *cobra.Command, args []string) { func doDaemon(cmd *cobra.Command, args []string) {
d := Daemon{}
d.Loop()
}
var wg sync.WaitGroup type contextKey string
const (
contextKeyLog contextKey = contextKey("log")
)
type Daemon struct {
log Logger
}
func (d *Daemon) Loop() {
finishs := make(chan Job)
cancels := make([]context.CancelFunc, len(conf.Jobs))
log.Printf("starting jobs from config") log.Printf("starting jobs from config")
i := 0
for _, job := range conf.Jobs { for _, job := range conf.Jobs {
log.Printf("starting job %s", job.JobName()) log.Printf("starting job %s", job.JobName())
logger := jobLogger{log, job.JobName()} logger := jobLogger{log, job.JobName()}
wg.Add(1) ctx := context.Background()
ctx, cancels[i] = context.WithCancel(ctx)
i++
ctx = context.WithValue(ctx, contextKeyLog, logger)
go func(j Job) { go func(j Job) {
defer wg.Done() job.JobStart(ctx)
err := job.JobDo(logger) finishs <- j
if err != nil {
logger.Printf("returned error: %+v", err)
}
}(job) }(job)
} }
log.Printf("waiting for jobs from config to finish") sigChan := make(chan os.Signal, 1)
wg.Wait() signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
finishCount := 0
outer:
for {
select {
case j := <-finishs:
log.Printf("job finished: %s", j.JobName())
finishCount++
if finishCount == len(conf.Jobs) {
log.Printf("all jobs finished")
break outer
}
case sig := <-sigChan:
log.Printf("received signal: %s", sig)
log.Printf("cancelling all jobs")
for _, c := range cancels {
log.Printf("cancelling job")
c()
}
}
}
signal.Stop(sigChan)
log.Printf("exiting")
} }

102
cmd/prune.go Normal file
View File

@ -0,0 +1,102 @@
package cmd
import (
"context"
"encoding/json"
"fmt"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
"time"
)
type Pruner struct {
Now time.Time
DryRun bool
DatasetFilter zfs.DatasetFilter
SnapshotPrefix string
PrunePolicy PrunePolicy
}
func (p *Pruner) Run(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger)
if p.DryRun {
log.Printf("doing dry run")
}
// ZFSListSnapsFiltered --> todo can extend fsfilter or need new? Have already something per fs
// Dedicated snapshot object? Adaptor object to FilesystemVersion?
filesystems, err := zfs.ZFSListMapping(p.DatasetFilter)
if err != nil {
log.Printf("error applying filesystem filter: %s", err)
return
}
if len(filesystems) <= 0 {
log.Printf("no filesystems matching filter")
return
}
for _, fs := range filesystems {
fsversions, err := zfs.ZFSListFilesystemVersions(fs, &PrefixSnapshotFilter{p.SnapshotPrefix})
if err != nil {
log.Printf("error listing filesytem versions of %s: %s", fs, err)
continue
}
if len(fsversions) == 0 {
log.Printf("no filesystem versions matching prefix '%s'", p.SnapshotPrefix)
continue
}
l := util.NewPrefixLogger(log, fs.ToString())
dbgj, err := json.Marshal(fsversions)
if err != nil {
panic(err)
}
l.Printf("DEBUG: FSVERSIONS=%s", dbgj)
keep, remove, err := p.PrunePolicy.Prune(fs, fsversions)
if err != nil {
l.Printf("error evaluating prune policy: %s", err)
continue
}
dbgj, err = json.Marshal(keep)
if err != nil {
panic(err)
}
l.Printf("DEBUG: KEEP=%s", dbgj)
dbgj, err = json.Marshal(remove)
l.Printf("DEBUG: REMOVE=%s", dbgj)
describe := func(v zfs.FilesystemVersion) string {
timeSince := v.Creation.Sub(p.Now)
const day time.Duration = 24 * time.Hour
days := timeSince / day
remainder := timeSince % day
return fmt.Sprintf("%s@%dd%s from now", v.ToAbsPath(fs), days, remainder)
}
for _, v := range remove {
l.Printf("remove %s", describe(v))
// echo what we'll do and exec zfs destroy if not dry run
// TODO special handling for EBUSY (zfs hold)
// TODO error handling for clones? just echo to cli, skip over, and exit with non-zero status code (we're idempotent)
if !p.DryRun {
err := zfs.ZFSDestroyFilesystemVersion(fs, v)
if err != nil {
// handle
l.Printf("error: %s", err)
}
}
}
}
return
}

View File

@ -9,6 +9,10 @@ jobs:
port: 22 port: 22
identity_file: /root/.ssh/id_ed25519 identity_file: /root/.ssh/id_ed25519
# pull (=ask for new snapshots) every 10m, prune afterwards
# this will leave us at most 10m behind production
interval: 10m
# pull all offered filesystems to storage/backups/zrepl/pull/prod1.example.com # pull all offered filesystems to storage/backups/zrepl/pull/prod1.example.com
mapping: { mapping: {
"<":"storage/backups/zrepl/pull/prod1.example.com" "<":"storage/backups/zrepl/pull/prod1.example.com"