diff --git a/cmd/autosnap.go b/cmd/autosnap.go index eaeef4d..0af4a44 100644 --- a/cmd/autosnap.go +++ b/cmd/autosnap.go @@ -3,7 +3,6 @@ package cmd import ( "context" "fmt" - "github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/zfs" "sort" "time" @@ -31,11 +30,11 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) { ds, err := zfs.ZFSListMapping(a.DatasetFilter) if err != nil { - a.log.Printf("error listing datasets: %s", err) + a.log.WithError(err).Error("cannot list datasets") return } if len(ds) == 0 { - a.log.Printf("no datasets matching dataset filter") + a.log.WithError(err).Error("no datasets matching dataset filter") return } @@ -43,18 +42,18 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) { now := time.Now() - a.log.Printf("examining filesystem state") + a.log.Debug("examine filesystem state") for i, d := range ds { - l := util.NewPrefixLogger(a.log, d.ToString()) + l := a.log.WithField("filesystem", d.ToString()) fsvs, err := zfs.ZFSListFilesystemVersions(d, &PrefixSnapshotFilter{a.Prefix}) if err != nil { - l.Printf("error listing filesystem versions of %s") + l.WithError(err).Error("cannot list filesystem versions") continue } if len(fsvs) <= 0 { - l.Printf("no filesystem versions with prefix '%s'", a.Prefix) + l.WithField("prefix", a.Prefix).Info("no filesystem versions with prefix") a.snaptimes[i] = snapTime{d, now} continue } @@ -65,11 +64,14 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) { }) latest := fsvs[len(fsvs)-1] - l.Printf("latest snapshot at %s (%s old)", latest.Creation.Format(LOG_TIME_FMT), now.Sub(latest.Creation)) + l.WithField("creation", latest.Creation). + Debug("found latest snapshot") since := now.Sub(latest.Creation) if since < 0 { - l.Printf("error: snapshot is from future (created at %s)", latest.Creation.Format(LOG_TIME_FMT)) + l.WithField("snapshot", latest.Name). + WithField("creation", latest.Creation). + Error("snapshot is from the future") continue } next := now @@ -84,15 +86,16 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) { }) syncPoint := a.snaptimes[0] - a.log.Printf("sync point at %s (in %s)", syncPoint.time.Format(LOG_TIME_FMT), syncPoint.time.Sub(now)) + a.log.WithField("sync_point", syncPoint.time.Format(LOG_TIME_FMT)). + Info("wait for sync point") select { case <-ctx.Done(): - a.log.Printf("context: %s", ctx.Err()) + a.log.WithError(ctx.Err()).Info("context done") return case <-time.After(syncPoint.time.Sub(now)): - a.log.Printf("snapshotting all filesystems to enable further snaps in lockstep") + a.log.Debug("snapshot all filesystems to enable further snaps in lockstep") a.doSnapshots(didSnaps) } @@ -102,7 +105,7 @@ func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) { select { case <-ctx.Done(): ticker.Stop() - a.log.Printf("context: %s", ctx.Err()) + a.log.WithError(ctx.Err()).Info("context done") return case <-ticker.C: @@ -117,7 +120,7 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) { // fetch new dataset list in case user added new dataset ds, err := zfs.ZFSListMapping(a.DatasetFilter) if err != nil { - a.log.Printf("error listing datasets: %s", err) + a.log.WithError(err).Error("cannot list datasets") return } @@ -126,17 +129,20 @@ func (a *IntervalAutosnap) doSnapshots(didSnaps chan struct{}) { suffix := time.Now().In(time.UTC).Format("20060102_150405_000") snapname := fmt.Sprintf("%s%s", a.Prefix, suffix) - a.log.Printf("snapshotting %s@%s", d.ToString(), snapname) + a.log.WithField("filesystem", d.ToString()). + WithField("snapname", snapname). + Info("create snapshot") + err := zfs.ZFSSnapshot(d, snapname, false) if err != nil { - a.log.Printf("error snapshotting %s: %s", d.ToString(), err) + a.log.WithError(err).Error("cannot create snapshot") } } select { case didSnaps <- struct{}{}: default: - a.log.Printf("warning: callback channel is full, discarding") + a.log.Warn("warning: callback channel is full, discarding") } } diff --git a/cmd/config.go b/cmd/config.go index 09ad7cf..5ccc6e1 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -30,6 +30,7 @@ type Global struct { Control struct { Sockpath string } + logging *LoggingConfig } type JobDebugSettings struct { diff --git a/cmd/config_job_control.go b/cmd/config_job_control.go index 0c7087b..8e07399 100644 --- a/cmd/config_job_control.go +++ b/cmd/config_job_control.go @@ -2,9 +2,7 @@ package cmd import ( "context" - "fmt" "github.com/pkg/errors" - "github.com/zrepl/zrepl/util" "net" "net/http" "net/http/pprof" @@ -61,12 +59,12 @@ outer: select { case <-ctx.Done(): - log.Printf("context: %s", ctx.Err()) + log.WithError(err).Info("contex done") server.Shutdown(context.Background()) break outer case err = <-served: if err != nil { - log.Printf("error serving: %s", err) + log.WithError(err).Error("error serving") break outer } } @@ -81,7 +79,7 @@ type requestLogger struct { } func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) { - log := util.NewPrefixLogger(l.log, fmt.Sprintf("%s %s", r.Method, r.URL)) + log := l.log.WithField("method", r.Method).WithField("url", r.URL) log.Printf("start") l.handlerFunc(w, r) log.Printf("finish") diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go index fca1961..8cc93d2 100644 --- a/cmd/config_job_local.go +++ b/cmd/config_job_local.go @@ -7,7 +7,6 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/zrepl/zrepl/rpc" - "github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/zfs" "sync" ) @@ -91,7 +90,7 @@ func (j *LocalJob) JobStart(ctx context.Context) { // All local datasets will be passed to its Map() function, // but only those for which a mapping exists will actually be pulled. // We can pay this small performance penalty for now. - handler := NewHandler(log, localPullACL{}, &PrefixSnapshotFilter{j.SnapshotPrefix}) + handler := NewHandler(log.WithField("task", "handler"), localPullACL{}, &PrefixSnapshotFilter{j.SnapshotPrefix}) registerEndpoints(local, handler) @@ -112,8 +111,8 @@ func (j *LocalJob) JobStart(ctx context.Context) { return } - makeCtx := func(parent context.Context, logPrefix string) (ctx context.Context) { - return context.WithValue(parent, contextKeyLog, util.NewPrefixLogger(log, logPrefix)) + makeCtx := func(parent context.Context, taskName string) (ctx context.Context) { + return context.WithValue(parent, contextKeyLog, log.WithField("task", taskName)) } var snapCtx, plCtx, prCtx, pullCtx context.Context snapCtx = makeCtx(ctx, "autosnap") diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index 0855953..15d8000 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -119,7 +119,7 @@ start: log.Printf("starting pull") - pullLog := util.NewPrefixLogger(log, "pull") + pullLog := log.WithField("task", "pull") err = doPull(PullContext{client, pullLog, j.Mapping, j.InitialReplPolicy}) if err != nil { log.Printf("error doing pull: %s", err) @@ -128,7 +128,7 @@ start: closeRPCWithTimeout(log, client, time.Second*10, "") log.Printf("starting prune") - prunectx := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "prune")) + prunectx := context.WithValue(ctx, contextKeyLog, log.WithField("task", "prune")) pruner, err := j.Pruner(PrunePolicySideDefault, false) if err != nil { log.Printf("error creating pruner: %s", err) diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go index 1dbdb33..735c227 100644 --- a/cmd/config_job_source.go +++ b/cmd/config_job_source.go @@ -84,9 +84,9 @@ func (j *SourceJob) JobStart(ctx context.Context) { return } - snapContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "autosnap")) - prunerContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "prune")) - serveContext := context.WithValue(ctx, contextKeyLog, util.NewPrefixLogger(log, "serve")) + snapContext := context.WithValue(ctx, contextKeyLog, log.WithField("task", "autosnap")) + prunerContext := context.WithValue(ctx, contextKeyLog, log.WithField("task", "prune")) + serveContext := context.WithValue(ctx, contextKeyLog, log.WithField("task", "serve")) didSnaps := make(chan struct{}) go j.serve(serveContext) @@ -163,7 +163,7 @@ outer: // handle connection rpcServer := rpc.NewServer(rwc) if j.Debug.RPC.Log { - rpclog := util.NewPrefixLogger(log, "rpc") + rpclog := log.WithField("subsystem", "rpc") rpcServer.SetLogger(rpclog, true) } registerEndpoints(rpcServer, handler) diff --git a/cmd/config_logging.go b/cmd/config_logging.go new file mode 100644 index 0000000..652cbc0 --- /dev/null +++ b/cmd/config_logging.go @@ -0,0 +1,78 @@ +package cmd + +import ( + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + //"github.com/rifflock/lfshook" + "github.com/sirupsen/logrus" +) + +type LoggingConfig struct { + Stdout struct { + Level logrus.Level + } + //LFS lfshook.PathMap +} + +func parseLogging(i interface{}) (c *LoggingConfig, err error) { + + c = &LoggingConfig{} + c.Stdout.Level = logrus.WarnLevel + if i == nil { + return c, nil + } + + var asMap struct { + Mate string + Stdout map[string]string + LFS map[string]string + } + if err = mapstructure.Decode(i, &asMap); err != nil { + return nil, errors.Wrap(err, "mapstructure error") + } + + //if asMap.LFS != nil { + // c.LFS = make(map[logrus.Level]string, len(asMap.LFS)) + // for level_str, path := range asMap.LFS { + // level, err := logrus.ParseLevel(level_str) + // if err != nil { + // return nil, errors.Wrapf(err, "cannot parse level '%s'", level_str) + // } + // if len(path) <= 0 { + // return nil, errors.Errorf("path must be longer than 0") + // } + // c.LFS[level] = path + // } + //} + + if asMap.Stdout != nil { + lvl, err := logrus.ParseLevel(asMap.Stdout["level"]) + if err != nil { + return nil, errors.Wrap(err, "cannot parse stdout log level") + } + c.Stdout.Level = lvl + } + + return c, nil + +} + +func (c *LoggingConfig) MakeLogrus() (l logrus.FieldLogger) { + + log := logrus.New() + log.Out = nopWriter(0) + log.Level = logrus.DebugLevel + + //log.Level = logrus.DebugLevel + // + //if len(c.LFS) > 0 { + // lfshook := lfshook.NewHook(c.LFS) + // log.Hooks.Add(lfshook) + //} + + stdhook := NewStdHook() + log.Hooks.Add(stdhook) + + return log + +} diff --git a/cmd/config_parse.go b/cmd/config_parse.go index 555acd4..0953869 100644 --- a/cmd/config_parse.go +++ b/cmd/config_parse.go @@ -80,10 +80,14 @@ func parseConfig(i interface{}) (c *Config, err error) { err = mapstructure.Decode(asMap.Global, &c.Global) if err != nil { - err = errors.Wrap(err, "cannot parse global section: %s") + err = errors.Wrap(err, "mapstructure error on 'global' section: %s") return } + if c.Global.logging, err = parseLogging(asMap.Global["logging"]); err != nil { + return nil, errors.Wrap(err, "cannot parse logging section") + } + cpc := ConfigParsingContext{&c.Global} jpc := JobParsingContext{cpc} diff --git a/cmd/daemon.go b/cmd/daemon.go index e4d524f..9580471 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "github.com/spf13/cobra" - "log" "os" "os/signal" "syscall" @@ -21,15 +20,6 @@ func init() { RootCmd.AddCommand(daemonCmd) } -type jobLogger struct { - MainLog Logger - JobName string -} - -func (l jobLogger) Printf(format string, v ...interface{}) { - l.MainLog.Printf(fmt.Sprintf("[%s]: %s", l.JobName, format), v...) -} - type Job interface { JobName() string JobStart(ctxt context.Context) @@ -37,15 +27,15 @@ type Job interface { func doDaemon(cmd *cobra.Command, args []string) { - log := log.New(os.Stderr, "", log.LUTC|log.Ldate|log.Ltime) - conf, err := ParseConfig(rootArgs.configFile) if err != nil { - log.Printf("error parsing config: %s", err) + fmt.Fprintf(os.Stderr, "error parsing config: %s", err) os.Exit(1) } - ctx := context.Background() + log := conf.Global.logging.MakeLogrus() + log.Debug("starting daemon") + ctx := context.WithValue(context.Background(), contextKeyLog, log) ctx = context.WithValue(ctx, contextKeyLog, log) d := NewDaemon(conf) @@ -83,7 +73,7 @@ func (d *Daemon) Loop(ctx context.Context) { for _, job := range d.conf.Jobs { log.Printf("starting job %s", job.JobName()) - logger := jobLogger{log, job.JobName()} + logger := log.WithField("job", job.JobName()) i++ jobCtx := context.WithValue(ctx, contextKeyLog, logger) go func(j Job) { diff --git a/cmd/handler.go b/cmd/handler.go index 865a8a5..1af5cc3 100644 --- a/cmd/handler.go +++ b/cmd/handler.go @@ -64,38 +64,42 @@ func registerEndpoints(server rpc.RPCServer, handler Handler) (err error) { func (h Handler) HandleFilesystemRequest(r *FilesystemRequest, roots *[]*zfs.DatasetPath) (err error) { - h.logger.Printf("handling fsr: %#v", r) + log := h.logger.WithField("endpoint", "FilesystemRequest") - h.logger.Printf("using dsf: %#v", h.dsf) + log.WithField("request", r).Debug("request") + log.WithField("dataset_filter", h.dsf).Debug("dsf") allowed, err := zfs.ZFSListMapping(h.dsf) if err != nil { - h.logger.Printf("handle fsr err: %v\n", err) + log.WithError(err).Error("error listing filesystems") return } - h.logger.Printf("returning: %#v", allowed) + log.WithField("response", allowed).Debug("response") *roots = allowed return } func (h Handler) HandleFilesystemVersionsRequest(r *FilesystemVersionsRequest, versions *[]zfs.FilesystemVersion) (err error) { - h.logger.Printf("handling filesystem versions request: %#v", r) + log := h.logger.WithField("endpoint", "FilesystemVersionsRequest") + + log.WithField("request", r).Debug("request") // allowed to request that? if h.pullACLCheck(r.Filesystem, nil); err != nil { + log.WithError(err).Warn("pull ACL check failed") return } // find our versions vs, err := zfs.ZFSListFilesystemVersions(r.Filesystem, h.fsvf) if err != nil { - h.logger.Printf("our versions error: %#v\n", err) + log.WithError(err).Error("cannot list filesystem versions") return } - h.logger.Printf("our versions: %#v\n", vs) + log.WithField("resposne", vs).Debug("response") *versions = vs return @@ -104,16 +108,19 @@ func (h Handler) HandleFilesystemVersionsRequest(r *FilesystemVersionsRequest, v func (h Handler) HandleInitialTransferRequest(r *InitialTransferRequest, stream *io.Reader) (err error) { - h.logger.Printf("handling initial transfer request: %#v", r) + log := h.logger.WithField("endpoint", "InitialTransferRequest") + + log.WithField("request", r).Debug("request") if err = h.pullACLCheck(r.Filesystem, &r.FilesystemVersion); err != nil { + log.WithError(err).Warn("pull ACL check failed") return } - h.logger.Printf("invoking zfs send") + log.Debug("invoking zfs send") s, err := zfs.ZFSSend(r.Filesystem, &r.FilesystemVersion, nil) if err != nil { - h.logger.Printf("error sending filesystem: %#v", err) + log.WithError(err).Error("cannot send filesystem") } *stream = s @@ -123,19 +130,22 @@ func (h Handler) HandleInitialTransferRequest(r *InitialTransferRequest, stream func (h Handler) HandleIncrementalTransferRequest(r *IncrementalTransferRequest, stream *io.Reader) (err error) { - h.logger.Printf("handling incremental transfer request: %#v", r) + log := h.logger.WithField("endpoint", "IncrementalTransferRequest") + log.WithField("request", r).Debug("request") if err = h.pullACLCheck(r.Filesystem, &r.From); err != nil { + log.WithError(err).Warn("pull ACL check failed") return } if err = h.pullACLCheck(r.Filesystem, &r.To); err != nil { + log.WithError(err).Warn("pull ACL check failed") return } - h.logger.Printf("invoking zfs send") + log.Debug("invoking zfs send") s, err := zfs.ZFSSend(r.Filesystem, &r.From, &r.To) if err != nil { - h.logger.Printf("error sending filesystem: %#v", err) + log.WithError(err).Error("cannot send filesystem") } *stream = s @@ -148,12 +158,10 @@ func (h Handler) pullACLCheck(p *zfs.DatasetPath, v *zfs.FilesystemVersion) (err fsAllowed, err = h.dsf.Filter(p) if err != nil { err = fmt.Errorf("error evaluating ACL: %s", err) - h.logger.Printf(err.Error()) return } if !fsAllowed { err = fmt.Errorf("ACL prohibits access to %s", p.ToString()) - h.logger.Printf(err.Error()) return } if v == nil { @@ -163,12 +171,10 @@ func (h Handler) pullACLCheck(p *zfs.DatasetPath, v *zfs.FilesystemVersion) (err vAllowed, err = h.fsvf.Filter(*v) if err != nil { err = errors.Wrap(err, "error evaluating version filter") - h.logger.Printf(err.Error()) return } if !vAllowed { err = fmt.Errorf("ACL prohibits access to %s", v.ToAbsPath(p)) - h.logger.Printf(err.Error()) return } return diff --git a/cmd/logrus.go b/cmd/logrus.go new file mode 100644 index 0000000..097f9a5 --- /dev/null +++ b/cmd/logrus.go @@ -0,0 +1,55 @@ +package cmd + +import ( + "bytes" + "fmt" + "github.com/sirupsen/logrus" + "os" +) + +type CLIFormatter struct { +} + +func (f CLIFormatter) Format(e *logrus.Entry) (out []byte, err error) { + var buf bytes.Buffer + fmt.Fprintf(&buf, "%s\n", e.Message) + return buf.Bytes(), nil +} + +var stdhookStderrLevels []logrus.Level = []logrus.Level{ + logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, logrus.WarnLevel, +} + +type Stdhook struct { +} + +func NewStdHook() *Stdhook { + return &Stdhook{} +} + +func (h *Stdhook) Levels() []logrus.Level { + // Accept all so we can filter the output later + return []logrus.Level{ + logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel, logrus.WarnLevel, + logrus.InfoLevel, logrus.DebugLevel, + } +} + +func (h *Stdhook) Fire(entry *logrus.Entry) error { + s, err := entry.String() + if err != nil { + return err + } + for _, l := range stdhookStderrLevels { + if l == entry.Level { + fmt.Fprint(os.Stderr, s) + return nil + } + } + fmt.Fprint(os.Stdout, s) + return nil +} + +type nopWriter int + +func (w nopWriter) Write(p []byte) (n int, err error) { return len(p), nil } diff --git a/cmd/main.go b/cmd/main.go index 5c3e331..ca81efe 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -11,12 +11,16 @@ package cmd import ( + "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) -type Logger interface { - Printf(format string, v ...interface{}) -} +// +//type Logger interface { +// Printf(format string, v ...interface{}) +//} + +type Logger logrus.FieldLogger var RootCmd = &cobra.Command{ Use: "zrepl", diff --git a/cmd/prune.go b/cmd/prune.go index 6300a3d..29ad269 100644 --- a/cmd/prune.go +++ b/cmd/prune.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/zfs" "time" ) @@ -29,16 +28,16 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) { log := ctx.Value(contextKeyLog).(Logger) if p.DryRun { - log.Printf("doing dry run") + log.Info("doing dry run") } filesystems, err := zfs.ZFSListMapping(p.DatasetFilter) if err != nil { - log.Printf("error applying filesystem filter: %s", err) + log.WithError(err).Error("error applying filesystem filter") return nil, err } if len(filesystems) <= 0 { - log.Printf("no filesystems matching filter") + log.Info("no filesystems matching filter") return nil, err } @@ -46,27 +45,27 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) { for _, fs := range filesystems { + log := log.WithField("filesystem", fs.ToString()) + fsversions, err := zfs.ZFSListFilesystemVersions(fs, &PrefixSnapshotFilter{p.SnapshotPrefix}) if err != nil { - log.Printf("error listing filesytem versions of %s: %s", fs, err) + log.WithError(err).Error("error listing filesytem versions") continue } if len(fsversions) == 0 { - log.Printf("no filesystem versions matching prefix '%s'", p.SnapshotPrefix) + log.WithField("prefix", p.SnapshotPrefix).Info("no filesystem versions matching prefix") continue } - l := util.NewPrefixLogger(log, fs.ToString()) - dbgj, err := json.Marshal(fsversions) if err != nil { panic(err) } - l.Printf("DEBUG: FSVERSIONS=%s", dbgj) + log.WithField("fsversions", string(dbgj)).Debug() keep, remove, err := p.PrunePolicy.Prune(fs, fsversions) if err != nil { - l.Printf("error evaluating prune policy: %s", err) + log.WithError(err).Error("error evaluating prune policy") continue } @@ -74,23 +73,28 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) { if err != nil { panic(err) } - l.Printf("DEBUG: KEEP=%s", dbgj) + log.WithField("keep", string(dbgj)).Debug() dbgj, err = json.Marshal(remove) - l.Printf("DEBUG: REMOVE=%s", dbgj) + log.WithField("remove", string(dbgj)).Debug() r = append(r, PruneResult{fs, fsversions, keep, remove}) - describe := func(v zfs.FilesystemVersion) string { + makeFields := func(v zfs.FilesystemVersion) (fields map[string]interface{}) { + fields = make(map[string]interface{}) + fields["version"] = v.ToAbsPath(fs) timeSince := v.Creation.Sub(p.Now) + fields["age_ns"] = timeSince const day time.Duration = 24 * time.Hour days := timeSince / day remainder := timeSince % day - return fmt.Sprintf("%s@%dd%s from now", v.ToAbsPath(fs), days, remainder) + fields["age_str"] = fmt.Sprintf("%dd%s", days, remainder) + return } for _, v := range remove { - l.Printf("remove %s", describe(v)) + fields := makeFields(v) + log.WithFields(fields).Info("destroying version") // echo what we'll do and exec zfs destroy if not dry run // TODO special handling for EBUSY (zfs hold) // TODO error handling for clones? just echo to cli, skip over, and exit with non-zero status code (we're idempotent) @@ -98,7 +102,7 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) { err := zfs.ZFSDestroyFilesystemVersion(fs, v) if err != nil { // handle - l.Printf("error: %s", err) + log.WithFields(fields).WithError(err).Error("error destroying version") } } } diff --git a/cmd/replication.go b/cmd/replication.go index f0b739f..cf67e62 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -5,6 +5,8 @@ import ( "io" "time" + "bytes" + "encoding/json" "github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/zfs" @@ -62,14 +64,14 @@ func doPull(pull PullContext) (err error) { remote := pull.Remote log := pull.Log - log.Printf("requesting remote filesystem list") + log.Info("request remote filesystem list") fsr := FilesystemRequest{} var remoteFilesystems []*zfs.DatasetPath if err = remote.Call("FilesystemRequest", &fsr, &remoteFilesystems); err != nil { return } - log.Printf("map remote filesystems to local paths and determine order for per-filesystem sync") + log.Debug("map remote filesystems to local paths and determine order for per-filesystem sync") type RemoteLocalMapping struct { Remote *zfs.DatasetPath Local *zfs.DatasetPath @@ -82,38 +84,41 @@ func doPull(pull PullContext) (err error) { localFs, err = pull.Mapping.Map(remoteFilesystems[fs]) if err != nil { err := fmt.Errorf("error mapping %s: %s", remoteFilesystems[fs], err) - log.Printf("%s", err) + log.WithError(err).Error() return err } if localFs == nil { continue } - log.Printf("%s => %s", remoteFilesystems[fs].ToString(), localFs.ToString()) + log.WithField("map_remote", remoteFilesystems[fs].ToString()). + WithField("map_local", localFs.ToString()).Debug() m := RemoteLocalMapping{remoteFilesystems[fs], localFs} replMapping[m.Local.ToString()] = m localTraversal.Add(m.Local) } - log.Printf("build cache for already present local filesystem state") + log.Debug("build cache for already present local filesystem state") localFilesystemState, err := zfs.ZFSListFilesystemState() if err != nil { - log.Printf("error requesting local filesystem state: %s", err) + log.WithError(err).Error("cannot request local filesystem state") return err } - log.Printf("start per-filesystem sync") + log.Info("start per-filesystem sync") localTraversal.WalkTopDown(func(v zfs.DatasetPathVisit) bool { + log := log.WithField("filesystem", v.Path.ToString()) + if v.FilledIn { if _, exists := localFilesystemState[v.Path.ToString()]; exists { // No need to verify if this is a placeholder or not. It is sufficient // to know we can add child filesystems to it return true } - log.Printf("creating placeholder filesystem %s", v.Path.ToString()) + log.Debug("create placeholder filesystem") err = zfs.ZFSCreatePlaceholderFilesystem(v.Path) if err != nil { - err = fmt.Errorf("aborting, cannot create placeholder filesystem %s: %s", v.Path, err) + log.Error("cannot create placeholder filesystem") return false } return true @@ -124,41 +129,40 @@ func doPull(pull PullContext) (err error) { panic("internal inconsistency: replMapping should contain mapping for any path that was not filled in by WalkTopDown()") } - log := func(format string, args ...interface{}) { - log.Printf("[%s => %s]: %s", m.Remote.ToString(), m.Local.ToString(), fmt.Sprintf(format, args...)) - } + log = log.WithField("map_remote", m.Remote.ToString()). + WithField("map_local", m.Local.ToString()) - log("examing local filesystem state") + log.Debug("examing local filesystem state") localState, localExists := localFilesystemState[m.Local.ToString()] var versions []zfs.FilesystemVersion switch { case !localExists: - log("local filesystem does not exist") + log.Info("local filesystem does not exist") case localState.Placeholder: - log("local filesystem is marked as placeholder") + log.Info("local filesystem is marked as placeholder") default: - log("local filesystem exists") - log("requesting local filesystem versions") + log.Debug("local filesystem exists") + log.Debug("requesting local filesystem versions") if versions, err = zfs.ZFSListFilesystemVersions(m.Local, nil); err != nil { - log("cannot get local filesystem versions: %s", err) + log.WithError(err).Error("cannot get local filesystem versions") return false } } - log("requesting remote filesystem versions") + log.Info("requesting remote filesystem versions") r := FilesystemVersionsRequest{ Filesystem: m.Remote, } var theirVersions []zfs.FilesystemVersion if err = remote.Call("FilesystemVersionsRequest", &r, &theirVersions); err != nil { - log("error requesting remote filesystem versions: %s", err) - log("stopping replication for all filesystems mapped as children of %s", m.Local.ToString()) + log.WithError(err).Error("cannot get remote filesystem versions") + log.Warn("stopping replication for all filesystems mapped as children of receiving filesystem") return false } - log("computing diff between remote and local filesystem versions") + log.Debug("computing diff between remote and local filesystem versions") diff := zfs.MakeFilesystemDiff(versions, theirVersions) - log("%s", diff) + log.WithField("diff", diff).Debug("diff between local and remote filesystem") if localState.Placeholder && diff.Conflict != zfs.ConflictAllRight { panic("internal inconsistency: local placeholder implies ConflictAllRight") @@ -167,7 +171,7 @@ func doPull(pull PullContext) (err error) { switch diff.Conflict { case zfs.ConflictAllRight: - log("performing initial sync, following policy: '%s'", pull.InitialReplPolicy) + log.WithField("replication_policy", pull.InitialReplPolicy).Info("performing initial sync, following policy") if pull.InitialReplPolicy != InitialReplPolicyMostRecent { panic(fmt.Sprintf("policy '%s' not implemented", pull.InitialReplPolicy)) @@ -181,7 +185,7 @@ func doPull(pull PullContext) (err error) { } if len(snapsOnly) < 1 { - log("cannot perform initial sync: no remote snapshots. stopping...") + log.Warn("cannot perform initial sync: no remote snapshots") return false } @@ -190,62 +194,60 @@ func doPull(pull PullContext) (err error) { FilesystemVersion: snapsOnly[len(snapsOnly)-1], } - log("requesting snapshot stream for %s", r.FilesystemVersion) + log.Debug("requesting snapshot stream for %s", r.FilesystemVersion) var stream io.Reader if err = remote.Call("InitialTransferRequest", &r, &stream); err != nil { - log("error requesting initial transfer: %s", err) + log.WithError(err).Error("cannot request initial transfer") return false } - log("received initial transfer request response") + log.Debug("received initial transfer request response") - log("invoking zfs receive") + log.Debug("invoke zfs receive") watcher := util.IOProgressWatcher{Reader: stream} watcher.KickOff(1*time.Second, func(p util.IOProgress) { - log("progress on receive operation: %v bytes received", p.TotalRX) + log.WithField("total_rx", p.TotalRX).Info("progress on receive operation") }) recvArgs := []string{"-u"} if localState.Placeholder { - log("receive with forced rollback to replace placeholder filesystem") + log.Info("receive with forced rollback to replace placeholder filesystem") recvArgs = append(recvArgs, "-F") } if err = zfs.ZFSRecv(m.Local, &watcher, recvArgs...); err != nil { - log("error receiving stream: %s", err) + log.WithError(err).Error("canot receive stream") return false } - log("finished receiving stream, %v bytes total", watcher.Progress().TotalRX) + log.WithField("total_rx", watcher.Progress().TotalRX). + Info("finished receiving stream") - log("configuring properties of received filesystem") + log.Debug("configuring properties of received filesystem") if err = zfs.ZFSSet(m.Local, "readonly", "on"); err != nil { - + log.WithError(err).Error("cannot set readonly property") } - log("finished initial transfer") + log.Info("finished initial transfer") return true case zfs.ConflictIncremental: if len(diff.IncrementalPath) < 2 { - log("remote and local are in sync") + log.Info("remote and local are in sync") return true } - log("following incremental path from diff") + log.Info("following incremental path from diff") var pathRx uint64 for i := 0; i < len(diff.IncrementalPath)-1; i++ { from, to := diff.IncrementalPath[i], diff.IncrementalPath[i+1] - log := func(format string, args ...interface{}) { - log("[%v/%v][%s => %s]: %s", i+1, len(diff.IncrementalPath)-1, - from.Name, to.Name, fmt.Sprintf(format, args...)) - } + log, _ := log.WithField("inc_from", from.Name).WithField("inc_to", to.Name), 0 - log("requesting incremental snapshot stream") + log.Debug("requesting incremental snapshot stream") r := IncrementalTransferRequest{ Filesystem: m.Remote, From: from, @@ -253,57 +255,57 @@ func doPull(pull PullContext) (err error) { } var stream io.Reader if err = remote.Call("IncrementalTransferRequest", &r, &stream); err != nil { - log("error requesting incremental snapshot stream: %s", err) + log.WithError(err).Error("cannot request incremental snapshot stream") return false } - log("invoking zfs receive") + log.Debug("invoking zfs receive") watcher := util.IOProgressWatcher{Reader: stream} watcher.KickOff(1*time.Second, func(p util.IOProgress) { - log("progress on receive operation: %v bytes received", p.TotalRX) + log.WithField("total_rx", p.TotalRX).Info("progress on receive operation") }) if err = zfs.ZFSRecv(m.Local, &watcher); err != nil { - log("error receiving stream: %s", err) + log.WithError(err).Error("cannot receive stream") return false } totalRx := watcher.Progress().TotalRX pathRx += totalRx - log("finished incremental transfer, %v bytes total", totalRx) + log.WithField("total_rx", totalRx).Info("finished incremental transfer") } - log("finished following incremental path, %v bytes total", pathRx) + log.WithField("total_rx", pathRx).Info("finished following incremental path") return true case zfs.ConflictNoCommonAncestor: - - log("remote and local filesystem have snapshots, but no common one") - log("perform manual replication to establish a common snapshot history") - log("remote versions:") - for _, v := range diff.MRCAPathRight { - log(" %s (GUID %v)", v, v.Guid) - } - log("local versions:") - for _, v := range diff.MRCAPathLeft { - log(" %s (GUID %v)", v, v.Guid) - } - return false - + fallthrough case zfs.ConflictDiverged: - log("remote and local filesystem share a history but have diverged") - log("perform manual replication or delete snapshots on the receiving" + - "side to establish an incremental replication parse") - log("remote-only versions:") - for _, v := range diff.MRCAPathRight { - log(" %s (GUID %v)", v, v.Guid) + var jsonDiff bytes.Buffer + if err := json.NewEncoder(&jsonDiff).Encode(diff); err != nil { + log.WithError(err).Error("cannot JSON-encode diff") + return false } - log("local-only versions:") - for _, v := range diff.MRCAPathLeft { - log(" %s (GUID %v)", v, v.Guid) + + var problem, resolution string + + switch diff.Conflict { + case zfs.ConflictNoCommonAncestor: + problem = "remote and local filesystem have snapshots, but no common one" + resolution = "perform manual establish a common snapshot history" + case zfs.ConflictDiverged: + problem = "remote and local filesystem share a history but have diverged" + resolution = "perform manual replication or delete snapshots on the receiving" + + "side to establish an incremental replication parse" } + + log.WithField("diff", jsonDiff.String()). + WithField("problem", problem). + WithField("resolution", resolution). + Error("manual conflict resolution required") + return false } diff --git a/cmd/test.go b/cmd/test.go index e2c0a18..f03eb94 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -10,9 +10,9 @@ import ( "strings" "github.com/kr/pretty" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/zrepl/zrepl/zfs" - "log" ) var testCmd = &cobra.Command{ @@ -64,7 +64,9 @@ func init() { func testCmdGlobalInit(cmd *cobra.Command, args []string) { - testCmdGlobal.log = log.New(os.Stdout, "", 0) + log := logrus.New() + log.Formatter = CLIFormatter{} + testCmdGlobal.log = log var err error if testCmdGlobal.conf, err = ParseConfig(rootArgs.configFile); err != nil { diff --git a/docs/content/_index.md b/docs/content/_index.md index ca80a62..4405381 100644 --- a/docs/content/_index.md +++ b/docs/content/_index.md @@ -4,10 +4,8 @@ title = "zrepl - ZFS replication" # zrepl - ZFS replication -zrepl is a tool for replicating ZFS filesystems. - {{% notice info %}} -`zrepl` as well as this documentation is still under active development. +zrepl as well as this documentation is still under active development. Use & test at your own risk ;) {{% /notice %}} diff --git a/util/logging.go b/util/logging.go deleted file mode 100644 index 8b67702..0000000 --- a/util/logging.go +++ /dev/null @@ -1,20 +0,0 @@ -package util - -import "fmt" - -type Logger interface { - Printf(format string, args ...interface{}) -} - -type PrefixLogger struct { - Log Logger - Prefix string -} - -func NewPrefixLogger(logger Logger, prefix string) (l PrefixLogger) { - return PrefixLogger{logger, prefix} -} - -func (l PrefixLogger) Printf(format string, v ...interface{}) { - l.Log.Printf(fmt.Sprintf("[%s]: %s", l.Prefix, format), v...) -}