cmd: logging using logrus

This commit is contained in:
Christian Schwarz 2017-09-22 14:13:58 +02:00
parent a459f0a0f6
commit bfcba7b281
17 changed files with 307 additions and 180 deletions

View File

@ -3,7 +3,6 @@ package cmd
import (
"context"
"fmt"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
"sort"
"time"
@ -31,11 +30,11 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
ds, err := zfs.ZFSListMapping(a.DatasetFilter)
if err != nil {
a.log.Printf("error listing datasets: %s", err)
a.log.WithError(err).Error("cannot list datasets")
return
}
if len(ds) == 0 {
a.log.Printf("no datasets matching dataset filter")
a.log.WithError(err).Error("no datasets matching dataset filter")
return
}
@ -43,18 +42,18 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
now := time.Now()
a.log.Printf("examining filesystem state")
a.log.Debug("examine filesystem state")
for i, d := range ds {
l := util.NewPrefixLogger(a.log, d.ToString())
l := a.log.WithField("filesystem", d.ToString())
fsvs, err := zfs.ZFSListFilesystemVersions(d, &PrefixSnapshotFilter{a.Prefix})
if err != nil {
l.Printf("error listing filesystem versions of %s")
l.WithError(err).Error("cannot list filesystem versions")
continue
}
if len(fsvs) <= 0 {
l.Printf("no filesystem versions with prefix '%s'", a.Prefix)
l.WithField("prefix", a.Prefix).Info("no filesystem versions with prefix")
a.snaptimes[i] = snapTime{d, now}
continue
}
@ -65,11 +64,14 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
})
latest := fsvs[len(fsvs)-1]
l.Printf("latest snapshot at %s (%s old)", latest.Creation.Format(LOG_TIME_FMT), now.Sub(latest.Creation))
l.WithField("creation", latest.Creation).
Debug("found latest snapshot")
since := now.Sub(latest.Creation)
if since < 0 {
l.Printf("error: snapshot is from future (created at %s)", latest.Creation.Format(LOG_TIME_FMT))
l.WithField("snapshot", latest.Name).
WithField("creation", latest.Creation).
Error("snapshot is from the future")
continue
}
next := now
@ -84,15 +86,16 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
})
syncPoint := a.snaptimes[0]
a.log.Printf("sync point at %s (in %s)", syncPoint.time.Format(LOG_TIME_FMT), syncPoint.time.Sub(now))
a.log.WithField("sync_point", syncPoint.time.Format(LOG_TIME_FMT)).
Info("wait for sync point")
select {
case <-ctx.Done():
a.log.Printf("context: %s", ctx.Err())
a.log.WithError(ctx.Err()).Info("context done")
return
case <-time.After(syncPoint.time.Sub(now)):
a.log.Printf("snapshotting all filesystems to enable further snaps in lockstep")
a.log.Debug("snapshot all filesystems to enable further snaps in lockstep")
a.doSnapshots(didSnaps)
}
@ -102,7 +105,7 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
select {
case <-ctx.Done():
ticker.Stop()
a.log.Printf("context: %s", ctx.Err())
a.log.WithError(ctx.Err()).Info("context done")
return
case <-ticker.C:
@ -117,7 +120,7 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
// fetch new dataset list in case user added new dataset
ds, err := zfs.ZFSListMapping(a.DatasetFilter)
if err != nil {
a.log.Printf("error listing datasets: %s", err)
a.log.WithError(err).Error("cannot list datasets")
return
}
@ -126,17 +129,20 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) {
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", a.Prefix, suffix)
a.log.Printf("snapshotting %s@%s", d.ToString(), snapname)
a.log.WithField("filesystem", d.ToString()).
WithField("snapname", snapname).
Info("create snapshot")
err := zfs.ZFSSnapshot(d, snapname, false)
if err != nil {
a.log.Printf("error snapshotting %s: %s", d.ToString(), err)
a.log.WithError(err).Error("cannot create snapshot")
}
}
select {
case didSnaps <- struct{}{}:
default:
a.log.Printf("warning: callback channel is full, discarding")
a.log.Warn("warning: callback channel is full, discarding")
}
}

View File

@ -30,6 +30,7 @@ type Global struct {
Control struct {
Sockpath string
}
logging *LoggingConfig
}
type JobDebugSettings struct {

View File

@ -2,9 +2,7 @@ package cmd
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/util"
"net"
"net/http"
"net/http/pprof"
@ -61,12 +59,12 @@ outer:
select {
case <-ctx.Done():
log.Printf("context: %s", ctx.Err())
log.WithError(err).Info("contex done")
server.Shutdown(context.Background())
break outer
case err = <-served:
if err != nil {
log.Printf("error serving: %s", err)
log.WithError(err).Error("error serving")
break outer
}
}
@ -81,7 +79,7 @@ type requestLogger struct {
}
func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log := util.NewPrefixLogger(l.log, fmt.Sprintf("%s %s", r.Method, r.URL))
log := l.log.WithField("method", r.Method).WithField("url", r.URL)
log.Printf("start")
l.handlerFunc(w, r)
log.Printf("finish")

View File

@ -7,7 +7,6 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
"sync"
)
@ -91,7 +90,7 @@ func (j *LocalJob) JobStart(ctx context.Context) {
// All local datasets will be passed to its Map() function,
// but only those for which a mapping exists will actually be pulled.
// We can pay this small performance penalty for now.
handler := NewHandler(log, localPullACL{}, &PrefixSnapshotFilter{j.SnapshotPrefix})
handler := NewHandler(log.WithField("task", "handler"), localPullACL{}, &PrefixSnapshotFilter{j.SnapshotPrefix})
registerEndpoints(local, handler)
@ -112,8 +111,8 @@ func (j *LocalJob) JobStart(ctx context.Context) {
return
}
makeCtx := func(parent context.Context, logPrefix string) (ctx context.Context) {
return context.WithValue(parent, contextKeyLog, util.NewPrefixLogger(log, logPrefix))
makeCtx := func(parent context.Context, taskName string) (ctx context.Context) {
return context.WithValue(parent, contextKeyLog, log.WithField("task", taskName))
}
var snapCtx, plCtx, prCtx, pullCtx context.Context
snapCtx = makeCtx(ctx, "autosnap")

View File

@ -119,7 +119,7 @@ start:
log.Printf("starting pull")
pullLog := util.NewPrefixLogger(log, "pull")
pullLog := log.WithField("task", "pull")
err = doPull(PullContext{client, pullLog, j.Mapping, j.InitialReplPolicy})
if err != nil {
log.Printf("error doing pull: %s", err)
@ -128,7 +128,7 @@ start:
closeRPCWithTimeout(log, client, time.Second*10, "")
log.Printf("starting prune")
prunectx := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "prune"))
prunectx := context.WithValue(ctx, contextKeyLog, log.WithField("task", "prune"))
pruner, err := j.Pruner(PrunePolicySideDefault, false)
if err != nil {
log.Printf("error creating pruner: %s", err)

View File

@ -84,9 +84,9 @@ func (j *SourceJob) JobStart(ctx context.Context) {
return
}
snapContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "autosnap"))
prunerContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "prune"))
serveContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "serve"))
snapContext := context.WithValue(ctx, contextKeyLog, log.WithField("task", "autosnap"))
prunerContext := context.WithValue(ctx, contextKeyLog, log.WithField("task", "prune"))
serveContext := context.WithValue(ctx, contextKeyLog, log.WithField("task", "serve"))
didSnaps := make(chan struct{})
go j.serve(serveContext)
@ -163,7 +163,7 @@ outer:
// handle connection
rpcServer := rpc.NewServer(rwc)
if j.Debug.RPC.Log {
rpclog := util.NewPrefixLogger(log, "rpc")
rpclog := log.WithField("subsystem", "rpc")
rpcServer.SetLogger(rpclog, true)
}
registerEndpoints(rpcServer, handler)

78
cmd/config_logging.go Normal file
View File

@ -0,0 +1,78 @@
package cmd
import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
//"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
)
type LoggingConfig struct {
Stdout struct {
Level logrus.Level
}
//LFS lfshook.PathMap
}
func parseLogging(i interface{}) (c *LoggingConfig, err error) {
c = &LoggingConfig{}
c.Stdout.Level = logrus.WarnLevel
if i == nil {
return c, nil
}
var asMap struct {
Mate string
Stdout map[string]string
LFS map[string]string
}
if err = mapstructure.Decode(i, &asMap); err != nil {
return nil, errors.Wrap(err, "mapstructure error")
}
//if asMap.LFS != nil {
// c.LFS = make(map[logrus.Level]string, len(asMap.LFS))
// for level_str, path := range asMap.LFS {
// level, err := logrus.ParseLevel(level_str)
// if err != nil {
// return nil, errors.Wrapf(err, "cannot parse level '%s'", level_str)
// }
// if len(path) <= 0 {
// return nil, errors.Errorf("path must be longer than 0")
// }
// c.LFS[level] = path
// }
//}
if asMap.Stdout != nil {
lvl, err := logrus.ParseLevel(asMap.Stdout["level"])
if err != nil {
return nil, errors.Wrap(err, "cannot parse stdout log level")
}
c.Stdout.Level = lvl
}
return c, nil
}
func (c *LoggingConfig) MakeLogrus() (l logrus.FieldLogger) {
log := logrus.New()
log.Out = nopWriter(0)
log.Level = logrus.DebugLevel
//log.Level = logrus.DebugLevel
//
//if len(c.LFS) > 0 {
// lfshook := lfshook.NewHook(c.LFS)
// log.Hooks.Add(lfshook)
//}
stdhook := NewStdHook()
log.Hooks.Add(stdhook)
return log
}

View File

@ -80,10 +80,14 @@ func parseConfig(i interface{}) (c *Config, err error) {
err = mapstructure.Decode(asMap.Global, &c.Global)
if err != nil {
err = errors.Wrap(err, "cannot parse global section: %s")
err = errors.Wrap(err, "mapstructure error on 'global' section: %s")
return
}
if c.Global.logging, err = parseLogging(asMap.Global["logging"]); err != nil {
return nil, errors.Wrap(err, "cannot parse logging section")
}
cpc := ConfigParsingContext{&c.Global}
jpc := JobParsingContext{cpc}

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/spf13/cobra"
"log"
"os"
"os/signal"
"syscall"
@ -21,15 +20,6 @@ func init() {
RootCmd.AddCommand(daemonCmd)
}
type jobLogger struct {
MainLog Logger
JobName string
}
func (l jobLogger) Printf(format string, v ...interface{}) {
l.MainLog.Printf(fmt.Sprintf("[%s]: %s", l.JobName, format), v...)
}
type Job interface {
JobName() string
JobStart(ctxt context.Context)
@ -37,15 +27,15 @@ type Job interface {
func doDaemon(cmd *cobra.Command, args []string) {
log := log.New(os.Stderr, "", log.LUTC|log.Ldate|log.Ltime)
conf, err := ParseConfig(rootArgs.configFile)
if err != nil {
log.Printf("error parsing config: %s", err)
fmt.Fprintf(os.Stderr, "error parsing config: %s", err)
os.Exit(1)
}
ctx := context.Background()
log := conf.Global.logging.MakeLogrus()
log.Debug("starting daemon")
ctx := context.WithValue(context.Background(), contextKeyLog, log)
ctx = context.WithValue(ctx, contextKeyLog, log)
d := NewDaemon(conf)
@ -83,7 +73,7 @@ func (d *Daemon) Loop(ctx context.Context) {
for _, job := range d.conf.Jobs {
log.Printf("starting job %s", job.JobName())
logger := jobLogger{log, job.JobName()}
logger := log.WithField("job", job.JobName())
i++
jobCtx := context.WithValue(ctx, contextKeyLog, logger)
go func(j Job) {

View File

@ -64,38 +64,42 @@ func registerEndpoints(server rpc.RPCServer, handler Handler) (err error) {
func (h Handler) HandleFilesystemRequest(r *FilesystemRequest, roots *[]*zfs.DatasetPath) (err error) {
h.logger.Printf("handling fsr: %#v", r)
log := h.logger.WithField("endpoint", "FilesystemRequest")
h.logger.Printf("using dsf: %#v", h.dsf)
log.WithField("request", r).Debug("request")
log.WithField("dataset_filter", h.dsf).Debug("dsf")
allowed, err := zfs.ZFSListMapping(h.dsf)
if err != nil {
h.logger.Printf("handle fsr err: %v\n", err)
log.WithError(err).Error("error listing filesystems")
return
}
h.logger.Printf("returning: %#v", allowed)
log.WithField("response", allowed).Debug("response")
*roots = allowed
return
}
func (h Handler) HandleFilesystemVersionsRequest(r *FilesystemVersionsRequest, versions *[]zfs.FilesystemVersion) (err error) {
h.logger.Printf("handling filesystem versions request: %#v", r)
log := h.logger.WithField("endpoint", "FilesystemVersionsRequest")
log.WithField("request", r).Debug("request")
// allowed to request that?
if h.pullACLCheck(r.Filesystem, nil); err != nil {
log.WithError(err).Warn("pull ACL check failed")
return
}
// find our versions
vs, err := zfs.ZFSListFilesystemVersions(r.Filesystem, h.fsvf)
if err != nil {
h.logger.Printf("our versions error: %#v\n", err)
log.WithError(err).Error("cannot list filesystem versions")
return
}
h.logger.Printf("our versions: %#v\n", vs)
log.WithField("resposne", vs).Debug("response")
*versions = vs
return
@ -104,16 +108,19 @@ func (h Handler) HandleFilesystemVersionsRequest(r *FilesystemVersionsRequest, v
func (h Handler) HandleInitialTransferRequest(r *InitialTransferRequest, stream *io.Reader) (err error) {
h.logger.Printf("handling initial transfer request: %#v", r)
log := h.logger.WithField("endpoint", "InitialTransferRequest")
log.WithField("request", r).Debug("request")
if err = h.pullACLCheck(r.Filesystem, &r.FilesystemVersion); err != nil {
log.WithError(err).Warn("pull ACL check failed")
return
}
h.logger.Printf("invoking zfs send")
log.Debug("invoking zfs send")
s, err := zfs.ZFSSend(r.Filesystem, &r.FilesystemVersion, nil)
if err != nil {
h.logger.Printf("error sending filesystem: %#v", err)
log.WithError(err).Error("cannot send filesystem")
}
*stream = s
@ -123,19 +130,22 @@ func (h Handler) HandleInitialTransferRequest(r *InitialTransferRequest, stream
func (h Handler) HandleIncrementalTransferRequest(r *IncrementalTransferRequest, stream *io.Reader) (err error) {
h.logger.Printf("handling incremental transfer request: %#v", r)
log := h.logger.WithField("endpoint", "IncrementalTransferRequest")
log.WithField("request", r).Debug("request")
if err = h.pullACLCheck(r.Filesystem, &r.From); err != nil {
log.WithError(err).Warn("pull ACL check failed")
return
}
if err = h.pullACLCheck(r.Filesystem, &r.To); err != nil {
log.WithError(err).Warn("pull ACL check failed")
return
}
h.logger.Printf("invoking zfs send")
log.Debug("invoking zfs send")
s, err := zfs.ZFSSend(r.Filesystem, &r.From, &r.To)
if err != nil {
h.logger.Printf("error sending filesystem: %#v", err)
log.WithError(err).Error("cannot send filesystem")
}
*stream = s
@ -148,12 +158,10 @@ func (h Handler) pullACLCheck(p *zfs.DatasetPath, v *zfs.FilesystemVersion) (err
fsAllowed, err = h.dsf.Filter(p)
if err != nil {
err = fmt.Errorf("error evaluating ACL: %s", err)
h.logger.Printf(err.Error())
return
}
if !fsAllowed {
err = fmt.Errorf("ACL prohibits access to %s", p.ToString())
h.logger.Printf(err.Error())
return
}
if v == nil {
@ -163,12 +171,10 @@ func (h Handler) pullACLCheck(p *zfs.DatasetPath, v *zfs.FilesystemVersion) (err
vAllowed, err = h.fsvf.Filter(*v)
if err != nil {
err = errors.Wrap(err, "error evaluating version filter")
h.logger.Printf(err.Error())
return
}
if !vAllowed {
err = fmt.Errorf("ACL prohibits access to %s", v.ToAbsPath(p))
h.logger.Printf(err.Error())
return
}
return

55
cmd/logrus.go Normal file
View File

@ -0,0 +1,55 @@
package cmd
import (
"bytes"
"fmt"
"github.com/sirupsen/logrus"
"os"
)
type CLIFormatter struct {
}
func (f CLIFormatter) Format(e *logrus.Entry) (out []byte, err error) {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%s\n", e.Message)
return buf.Bytes(), nil
}
var stdhookStderrLevels []logrus.Level = []logrus.Level{
logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, logrus.WarnLevel,
}
type Stdhook struct {
}
func NewStdHook() *Stdhook {
return &Stdhook{}
}
func (h *Stdhook) Levels() []logrus.Level {
// Accept all so we can filter the output later
return []logrus.Level{
logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, logrus.WarnLevel,
logrus.InfoLevel, logrus.DebugLevel,
}
}
func (h *Stdhook) Fire(entry *logrus.Entry) error {
s, err := entry.String()
if err != nil {
return err
}
for _, l := range stdhookStderrLevels {
if l == entry.Level {
fmt.Fprint(os.Stderr, s)
return nil
}
}
fmt.Fprint(os.Stdout, s)
return nil
}
type nopWriter int
func (w nopWriter) Write(p []byte) (n int, err error) { return len(p), nil }

View File

@ -11,12 +11,16 @@
package cmd
import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
type Logger interface {
Printf(format string, v ...interface{})
}
//
//type Logger interface {
// Printf(format string, v ...interface{})
//}
type Logger logrus.FieldLogger
var RootCmd = &cobra.Command{
Use: "zrepl",

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
"time"
)
@ -29,16 +28,16 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) {
log := ctx.Value(contextKeyLog).(Logger)
if p.DryRun {
log.Printf("doing dry run")
log.Info("doing dry run")
}
filesystems, err := zfs.ZFSListMapping(p.DatasetFilter)
if err != nil {
log.Printf("error applying filesystem filter: %s", err)
log.WithError(err).Error("error applying filesystem filter")
return nil, err
}
if len(filesystems) <= 0 {
log.Printf("no filesystems matching filter")
log.Info("no filesystems matching filter")
return nil, err
}
@ -46,27 +45,27 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) {
for _, fs := range filesystems {
log := log.WithField("filesystem", fs.ToString())
fsversions, err := zfs.ZFSListFilesystemVersions(fs, &PrefixSnapshotFilter{p.SnapshotPrefix})
if err != nil {
log.Printf("error listing filesytem versions of %s: %s", fs, err)
log.WithError(err).Error("error listing filesytem versions")
continue
}
if len(fsversions) == 0 {
log.Printf("no filesystem versions matching prefix '%s'", p.SnapshotPrefix)
log.WithField("prefix", p.SnapshotPrefix).Info("no filesystem versions matching prefix")
continue
}
l := util.NewPrefixLogger(log, fs.ToString())
dbgj, err := json.Marshal(fsversions)
if err != nil {
panic(err)
}
l.Printf("DEBUG: FSVERSIONS=%s", dbgj)
log.WithField("fsversions", string(dbgj)).Debug()
keep, remove, err := p.PrunePolicy.Prune(fs, fsversions)
if err != nil {
l.Printf("error evaluating prune policy: %s", err)
log.WithError(err).Error("error evaluating prune policy")
continue
}
@ -74,23 +73,28 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) {
if err != nil {
panic(err)
}
l.Printf("DEBUG: KEEP=%s", dbgj)
log.WithField("keep", string(dbgj)).Debug()
dbgj, err = json.Marshal(remove)
l.Printf("DEBUG: REMOVE=%s", dbgj)
log.WithField("remove", string(dbgj)).Debug()
r = append(r, PruneResult{fs, fsversions, keep, remove})
describe := func(v zfs.FilesystemVersion) string {
makeFields := func(v zfs.FilesystemVersion) (fields map[string]interface{}) {
fields = make(map[string]interface{})
fields["version"] = v.ToAbsPath(fs)
timeSince := v.Creation.Sub(p.Now)
fields["age_ns"] = timeSince
const day time.Duration = 24 * time.Hour
days := timeSince / day
remainder := timeSince % day
return fmt.Sprintf("%s@%dd%s from now", v.ToAbsPath(fs), days, remainder)
fields["age_str"] = fmt.Sprintf("%dd%s", days, remainder)
return
}
for _, v := range remove {
l.Printf("remove %s", describe(v))
fields := makeFields(v)
log.WithFields(fields).Info("destroying version")
// echo what we'll do and exec zfs destroy if not dry run
// TODO special handling for EBUSY (zfs hold)
// TODO error handling for clones? just echo to cli, skip over, and exit with non-zero status code (we're idempotent)
@ -98,7 +102,7 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) {
err := zfs.ZFSDestroyFilesystemVersion(fs, v)
if err != nil {
// handle
l.Printf("error: %s", err)
log.WithFields(fields).WithError(err).Error("error destroying version")
}
}
}

View File

@ -5,6 +5,8 @@ import (
"io"
"time"
"bytes"
"encoding/json"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
@ -62,14 +64,14 @@ func doPull(pull PullContext) (err error) {
remote := pull.Remote
log := pull.Log
log.Printf("requesting remote filesystem list")
log.Info("request remote filesystem list")
fsr := FilesystemRequest{}
var remoteFilesystems []*zfs.DatasetPath
if err = remote.Call("FilesystemRequest", &fsr, &remoteFilesystems); err != nil {
return
}
log.Printf("map remote filesystems to local paths and determine order for per-filesystem sync")
log.Debug("map remote filesystems to local paths and determine order for per-filesystem sync")
type RemoteLocalMapping struct {
Remote *zfs.DatasetPath
Local *zfs.DatasetPath
@ -82,38 +84,41 @@ func doPull(pull PullContext) (err error) {
localFs, err = pull.Mapping.Map(remoteFilesystems[fs])
if err != nil {
err := fmt.Errorf("error mapping %s: %s", remoteFilesystems[fs], err)
log.Printf("%s", err)
log.WithError(err).Error()
return err
}
if localFs == nil {
continue
}
log.Printf("%s => %s", remoteFilesystems[fs].ToString(), localFs.ToString())
log.WithField("map_remote", remoteFilesystems[fs].ToString()).
WithField("map_local", localFs.ToString()).Debug()
m := RemoteLocalMapping{remoteFilesystems[fs], localFs}
replMapping[m.Local.ToString()] = m
localTraversal.Add(m.Local)
}
log.Printf("build cache for already present local filesystem state")
log.Debug("build cache for already present local filesystem state")
localFilesystemState, err := zfs.ZFSListFilesystemState()
if err != nil {
log.Printf("error requesting local filesystem state: %s", err)
log.WithError(err).Error("cannot request local filesystem state")
return err
}
log.Printf("start per-filesystem sync")
log.Info("start per-filesystem sync")
localTraversal.WalkTopDown(func(v zfs.DatasetPathVisit) bool {
log := log.WithField("filesystem", v.Path.ToString())
if v.FilledIn {
if _, exists := localFilesystemState[v.Path.ToString()]; exists {
// No need to verify if this is a placeholder or not. It is sufficient
// to know we can add child filesystems to it
return true
}
log.Printf("creating placeholder filesystem %s", v.Path.ToString())
log.Debug("create placeholder filesystem")
err = zfs.ZFSCreatePlaceholderFilesystem(v.Path)
if err != nil {
err = fmt.Errorf("aborting, cannot create placeholder filesystem %s: %s", v.Path, err)
log.Error("cannot create placeholder filesystem")
return false
}
return true
@ -124,41 +129,40 @@ func doPull(pull PullContext) (err error) {
panic("internal inconsistency: replMapping should contain mapping for any path that was not filled in by WalkTopDown()")
}
log := func(format string, args ...interface{}) {
log.Printf("[%s => %s]: %s", m.Remote.ToString(), m.Local.ToString(), fmt.Sprintf(format, args...))
}
log = log.WithField("map_remote", m.Remote.ToString()).
WithField("map_local", m.Local.ToString())
log("examing local filesystem state")
log.Debug("examing local filesystem state")
localState, localExists := localFilesystemState[m.Local.ToString()]
var versions []zfs.FilesystemVersion
switch {
case !localExists:
log("local filesystem does not exist")
log.Info("local filesystem does not exist")
case localState.Placeholder:
log("local filesystem is marked as placeholder")
log.Info("local filesystem is marked as placeholder")
default:
log("local filesystem exists")
log("requesting local filesystem versions")
log.Debug("local filesystem exists")
log.Debug("requesting local filesystem versions")
if versions, err = zfs.ZFSListFilesystemVersions(m.Local, nil); err != nil {
log("cannot get local filesystem versions: %s", err)
log.WithError(err).Error("cannot get local filesystem versions")
return false
}
}
log("requesting remote filesystem versions")
log.Info("requesting remote filesystem versions")
r := FilesystemVersionsRequest{
Filesystem: m.Remote,
}
var theirVersions []zfs.FilesystemVersion
if err = remote.Call("FilesystemVersionsRequest", &r, &theirVersions); err != nil {
log("error requesting remote filesystem versions: %s", err)
log("stopping replication for all filesystems mapped as children of %s", m.Local.ToString())
log.WithError(err).Error("cannot get remote filesystem versions")
log.Warn("stopping replication for all filesystems mapped as children of receiving filesystem")
return false
}
log("computing diff between remote and local filesystem versions")
log.Debug("computing diff between remote and local filesystem versions")
diff := zfs.MakeFilesystemDiff(versions, theirVersions)
log("%s", diff)
log.WithField("diff", diff).Debug("diff between local and remote filesystem")
if localState.Placeholder && diff.Conflict != zfs.ConflictAllRight {
panic("internal inconsistency: local placeholder implies ConflictAllRight")
@ -167,7 +171,7 @@ func doPull(pull PullContext) (err error) {
switch diff.Conflict {
case zfs.ConflictAllRight:
log("performing initial sync, following policy: '%s'", pull.InitialReplPolicy)
log.WithField("replication_policy", pull.InitialReplPolicy).Info("performing initial sync, following policy")
if pull.InitialReplPolicy != InitialReplPolicyMostRecent {
panic(fmt.Sprintf("policy '%s' not implemented", pull.InitialReplPolicy))
@ -181,7 +185,7 @@ func doPull(pull PullContext) (err error) {
}
if len(snapsOnly) < 1 {
log("cannot perform initial sync: no remote snapshots. stopping...")
log.Warn("cannot perform initial sync: no remote snapshots")
return false
}
@ -190,62 +194,60 @@ func doPull(pull PullContext) (err error) {
FilesystemVersion: snapsOnly[len(snapsOnly)-1],
}
log("requesting snapshot stream for %s", r.FilesystemVersion)
log.Debug("requesting snapshot stream for %s", r.FilesystemVersion)
var stream io.Reader
if err = remote.Call("InitialTransferRequest", &r, &stream); err != nil {
log("error requesting initial transfer: %s", err)
log.WithError(err).Error("cannot request initial transfer")
return false
}
log("received initial transfer request response")
log.Debug("received initial transfer request response")
log("invoking zfs receive")
log.Debug("invoke zfs receive")
watcher := util.IOProgressWatcher{Reader: stream}
watcher.KickOff(1*time.Second, func(p util.IOProgress) {
log("progress on receive operation: %v bytes received", p.TotalRX)
log.WithField("total_rx", p.TotalRX).Info("progress on receive operation")
})
recvArgs := []string{"-u"}
if localState.Placeholder {
log("receive with forced rollback to replace placeholder filesystem")
log.Info("receive with forced rollback to replace placeholder filesystem")
recvArgs = append(recvArgs, "-F")
}
if err = zfs.ZFSRecv(m.Local, &watcher, recvArgs...); err != nil {
log("error receiving stream: %s", err)
log.WithError(err).Error("canot receive stream")
return false
}
log("finished receiving stream, %v bytes total", watcher.Progress().TotalRX)
log.WithField("total_rx", watcher.Progress().TotalRX).
Info("finished receiving stream")
log("configuring properties of received filesystem")
log.Debug("configuring properties of received filesystem")
if err = zfs.ZFSSet(m.Local, "readonly", "on"); err != nil {
log.WithError(err).Error("cannot set readonly property")
}
log("finished initial transfer")
log.Info("finished initial transfer")
return true
case zfs.ConflictIncremental:
if len(diff.IncrementalPath) < 2 {
log("remote and local are in sync")
log.Info("remote and local are in sync")
return true
}
log("following incremental path from diff")
log.Info("following incremental path from diff")
var pathRx uint64
for i := 0; i < len(diff.IncrementalPath)-1; i++ {
from, to := diff.IncrementalPath[i], diff.IncrementalPath[i+1]
log := func(format string, args ...interface{}) {
log("[%v/%v][%s => %s]: %s", i+1, len(diff.IncrementalPath)-1,
from.Name, to.Name, fmt.Sprintf(format, args...))
}
log, _ := log.WithField("inc_from", from.Name).WithField("inc_to", to.Name), 0
log("requesting incremental snapshot stream")
log.Debug("requesting incremental snapshot stream")
r := IncrementalTransferRequest{
Filesystem: m.Remote,
From: from,
@ -253,57 +255,57 @@ func doPull(pull PullContext) (err error) {
}
var stream io.Reader
if err = remote.Call("IncrementalTransferRequest", &r, &stream); err != nil {
log("error requesting incremental snapshot stream: %s", err)
log.WithError(err).Error("cannot request incremental snapshot stream")
return false
}
log("invoking zfs receive")
log.Debug("invoking zfs receive")
watcher := util.IOProgressWatcher{Reader: stream}
watcher.KickOff(1*time.Second, func(p util.IOProgress) {
log("progress on receive operation: %v bytes received", p.TotalRX)
log.WithField("total_rx", p.TotalRX).Info("progress on receive operation")
})
if err = zfs.ZFSRecv(m.Local, &watcher); err != nil {
log("error receiving stream: %s", err)
log.WithError(err).Error("cannot receive stream")
return false
}
totalRx := watcher.Progress().TotalRX
pathRx += totalRx
log("finished incremental transfer, %v bytes total", totalRx)
log.WithField("total_rx", totalRx).Info("finished incremental transfer")
}
log("finished following incremental path, %v bytes total", pathRx)
log.WithField("total_rx", pathRx).Info("finished following incremental path")
return true
case zfs.ConflictNoCommonAncestor:
log("remote and local filesystem have snapshots, but no common one")
log("perform manual replication to establish a common snapshot history")
log("remote versions:")
for _, v := range diff.MRCAPathRight {
log(" %s (GUID %v)", v, v.Guid)
}
log("local versions:")
for _, v := range diff.MRCAPathLeft {
log(" %s (GUID %v)", v, v.Guid)
}
return false
fallthrough
case zfs.ConflictDiverged:
log("remote and local filesystem share a history but have diverged")
log("perform manual replication or delete snapshots on the receiving" +
"side to establish an incremental replication parse")
log("remote-only versions:")
for _, v := range diff.MRCAPathRight {
log(" %s (GUID %v)", v, v.Guid)
var jsonDiff bytes.Buffer
if err := json.NewEncoder(&jsonDiff).Encode(diff); err != nil {
log.WithError(err).Error("cannot JSON-encode diff")
return false
}
log("local-only versions:")
for _, v := range diff.MRCAPathLeft {
log(" %s (GUID %v)", v, v.Guid)
var problem, resolution string
switch diff.Conflict {
case zfs.ConflictNoCommonAncestor:
problem = "remote and local filesystem have snapshots, but no common one"
resolution = "perform manual establish a common snapshot history"
case zfs.ConflictDiverged:
problem = "remote and local filesystem share a history but have diverged"
resolution = "perform manual replication or delete snapshots on the receiving" +
"side to establish an incremental replication parse"
}
log.WithField("diff", jsonDiff.String()).
WithField("problem", problem).
WithField("resolution", resolution).
Error("manual conflict resolution required")
return false
}

View File

@ -10,9 +10,9 @@ import (
"strings"
"github.com/kr/pretty"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/zfs"
"log"
)
var testCmd = &cobra.Command{
@ -64,7 +64,9 @@ func init() {
func testCmdGlobalInit(cmd *cobra.Command, args []string) {
testCmdGlobal.log = log.New(os.Stdout, "", 0)
log := logrus.New()
log.Formatter = CLIFormatter{}
testCmdGlobal.log = log
var err error
if testCmdGlobal.conf, err = ParseConfig(rootArgs.configFile); err != nil {

View File

@ -4,10 +4,8 @@ title = "zrepl - ZFS replication"
# zrepl - ZFS replication
zrepl is a tool for replicating ZFS filesystems.
{{% notice info %}}
`zrepl` as well as this documentation is still under active development.
zrepl as well as this documentation is still under active development.
Use & test at your own risk ;)
{{% /notice %}}

View File

@ -1,20 +0,0 @@
package util
import "fmt"
type Logger interface {
Printf(format string, args ...interface{})
}
type PrefixLogger struct {
Log Logger
Prefix string
}
func NewPrefixLogger(logger Logger, prefix string) (l PrefixLogger) {
return PrefixLogger{logger, prefix}
}
func (l PrefixLogger) Printf(format string, v ...interface{}) {
l.Log.Printf(fmt.Sprintf("[%s]: %s", l.Prefix, format), v...)
}