diff --git a/cmd/autosnap.go b/cmd/autosnap.go index a658fe6..52dbc5b 100644 --- a/cmd/autosnap.go +++ b/cmd/autosnap.go @@ -1,11 +1,64 @@ -package main +package cmd import ( "fmt" + "github.com/spf13/cobra" + "github.com/zrepl/zrepl/jobrun" "github.com/zrepl/zrepl/zfs" + "sync" "time" ) +var autosnapArgs struct { + job string +} + +var AutosnapCmd = &cobra.Command{ + Use: "autosnap", + Short: "perform automatic snapshotting", + Run: cmdAutosnap, +} + +func init() { + AutosnapCmd.Flags().StringVar(&autosnapArgs.job, "job", "", "job to run") + RootCmd.AddCommand(AutosnapCmd) +} + +func cmdAutosnap(cmd *cobra.Command, args []string) { + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runner.Start() + }() + + log.Printf("autosnap...") + + for i := range conf.Autosnaps { + + snap := conf.Autosnaps[i] + + if autosnapArgs.job == "" || autosnapArgs.job == snap.Name { + + job := jobrun.Job{ + Name: fmt.Sprintf("autosnap.%s", snap.Name), + RepeatStrategy: snap.Interval, + RunFunc: func(log jobrun.Logger) error { + log.Printf("doing autosnap: %v", snap) + ctx := AutosnapContext{snap} + return doAutosnap(ctx, log) + }, + } + runner.AddJob(job) + + } + } + + wg.Wait() + +} + type AutosnapContext struct { Autosnap Autosnap } diff --git a/cmd/config.go b/cmd/config.go index 3d5e0a0..7dd1833 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -1,4 +1,4 @@ -package main +package cmd import ( "errors" diff --git a/cmd/config_test.go b/cmd/config_test.go index d0e57a2..aa677fd 100644 --- a/cmd/config_test.go +++ b/cmd/config_test.go @@ -1,4 +1,4 @@ -package main +package cmd import ( "github.com/stretchr/testify/assert" diff --git a/cmd/handler.go b/cmd/handler.go index 1f2ec1b..da2f098 100644 --- a/cmd/handler.go +++ b/cmd/handler.go @@ -1,4 +1,4 @@ -package main +package cmd import ( "fmt" diff --git a/cmd/main.go b/cmd/main.go index ca327f1..6ccb7a9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,365 +1,104 @@ -package main +// zrepl replicates ZFS filesystems & volumes between pools +// +// Code Organization +// +// The cmd package uses github.com/spf13/cobra for its CLI. +// +// It combines the other packages in the zrepl project to implement zrepl functionality. +// +// Each subcommand's code is in the corresponding *.go file. +// All other *.go files contain code shared by the subcommands. +package cmd import ( "fmt" - "github.com/urfave/cli" + "github.com/spf13/cobra" "github.com/zrepl/zrepl/jobrun" - "github.com/zrepl/zrepl/rpc" - "github.com/zrepl/zrepl/sshbytestream" - "github.com/zrepl/zrepl/zfs" "golang.org/x/sys/unix" "io" - "log" + golog "log" "net/http" _ "net/http/pprof" "os" - "runtime/debug" - "sync" - "time" ) type Logger interface { Printf(format string, v ...interface{}) } -var conf Config -var runner *jobrun.JobRunner -var logFlags int = log.LUTC | log.Ldate | log.Ltime -var logOut io.Writer -var defaultLog Logger +// global state / facilities +var ( + conf Config + runner *jobrun.JobRunner + logFlags int = golog.LUTC | golog.Ldate | golog.Ltime + logOut io.Writer + log Logger +) -func main() { - - defer func() { - e := recover() - if e != nil { - defaultLog.Printf("panic:\n%s\n\n", debug.Stack()) - defaultLog.Printf("error: %t %s", e, e) - os.Exit(1) - } - }() - - app := cli.NewApp() - - app.Name = "zrepl" - app.Usage = "replicate zfs datasets" - app.EnableBashCompletion = true - app.Flags = []cli.Flag{ - cli.StringFlag{Name: "config"}, - cli.StringFlag{Name: "logfile"}, - cli.StringFlag{Name: "debug.pprof.http"}, - } - app.Before = func(c *cli.Context) (err error) { - - // Logging - if c.GlobalIsSet("logfile") { - var logFile *os.File - logFile, err = os.OpenFile(c.String("logfile"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) - if err != nil { - return - } - - if err = unix.Dup2(int(logFile.Fd()), int(os.Stderr.Fd())); err != nil { - logFile.WriteString(fmt.Sprintf("error duping logfile to stderr: %s\n", err)) - return - } - logOut = logFile - } else { - logOut = os.Stderr - } - defaultLog = log.New(logOut, "", logFlags) - - // CPU profiling - if c.GlobalIsSet("debug.pprof.http") { - go func() { - http.ListenAndServe(c.GlobalString("debug.pprof.http"), nil) - }() - } - - // Config - if !c.GlobalIsSet("config") { - return cli.NewExitError("config flag not set", 2) - } - if conf, err = ParseConfig(c.GlobalString("config")); err != nil { - return cli.NewExitError(err, 2) - } - - jobrunLogger := log.New(os.Stderr, "jobrun ", logFlags) - runner = jobrun.NewJobRunner(jobrunLogger) - return - } - app.Commands = []cli.Command{ - { - Name: "stdinserver", - Aliases: []string{"s"}, - Usage: "start in stdin server mode (from authorized keys)", - Flags: []cli.Flag{ - cli.StringFlag{Name: "identity"}, - }, - Action: cmdStdinServer, - }, - { - Name: "run", - Aliases: []string{"r"}, - Usage: "do replication", - Action: cmdRun, - Flags: []cli.Flag{ - cli.StringFlag{Name: "job"}, - cli.BoolFlag{ - Name: "once", - Usage: "run jobs only once, regardless of configured repeat behavior", - }, - }, - }, - { - Name: "prune", - Action: cmdPrune, - Flags: []cli.Flag{ - cli.StringFlag{Name: "job"}, - cli.BoolFlag{Name: "n", Usage: "simulation (dry run)"}, - }, - }, - { - Name: "autosnap", - Action: cmdAutosnap, - Flags: []cli.Flag{ - cli.StringFlag{Name: "job"}, - }, - }, - } - - app.Run(os.Args) +var RootCmd = &cobra.Command{ + Use: "zrepl", + Short: "ZFS dataset replication", + Long: `Replicate ZFS filesystems & volumes between pools: + - push & pull mode + - automatic snapshot creation & pruning + - local / over the network + - ACLs instead of blank SSH access`, } -func cmdStdinServer(c *cli.Context) (err error) { +var rootArgs struct { + configFile string + stderrFile string + httpPprof string +} - if !c.IsSet("identity") { - return cli.NewExitError("identity flag not set", 2) - } - identity := c.String("identity") +func init() { + cobra.OnInitialize(initConfig) + RootCmd.PersistentFlags().StringVar(&rootArgs.configFile, "config", "", "config file path") + RootCmd.PersistentFlags().StringVar(&rootArgs.stderrFile, "stderrFile", "-", "redirect stderr to given path") + RootCmd.PersistentFlags().StringVar(&rootArgs.httpPprof, "debug.pprof.http", "", "run pprof http server on given port") +} - var sshByteStream io.ReadWriteCloser - if sshByteStream, err = sshbytestream.Incoming(); err != nil { - return - } +func initConfig() { - findMapping := func(cm []ClientMapping, identity string) zfs.DatasetMapping { - for i := range cm { - if cm[i].From == identity { - return cm[i].Mapping - } + // Logging & stderr redirection + if rootArgs.stderrFile != "-" { + file, err := os.OpenFile(rootArgs.stderrFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) + if err != nil { + return } - return nil - } - sinkMapping := func(identity string) (sink zfs.DatasetMapping, err error) { - if sink = findMapping(conf.Sinks, identity); sink == nil { - return nil, fmt.Errorf("could not find sink for dataset") + + if err = unix.Dup2(int(file.Fd()), int(os.Stderr.Fd())); err != nil { + file.WriteString(fmt.Sprintf("error redirecting stderr file %s: %s\n", rootArgs.stderrFile, err)) + return } - return + logOut = file + } else { + logOut = os.Stderr } - sinkLogger := log.New(logOut, fmt.Sprintf("sink[%s] ", identity), logFlags) - handler := Handler{ - Logger: sinkLogger, - SinkMappingFunc: sinkMapping, - PullACL: findMapping(conf.PullACLs, identity), + log = golog.New(logOut, "", logFlags) + + // CPU profiling + if rootArgs.httpPprof != "" { + go func() { + http.ListenAndServe(rootArgs.httpPprof, nil) + }() } - if err = rpc.ListenByteStreamRPC(sshByteStream, identity, handler, sinkLogger); err != nil { - //os.Exit(1) - err = cli.NewExitError(err, 1) - defaultLog.Printf("listenbytestreamerror: %#v\n", err) + // Config + if rootArgs.configFile == "" { + log.Printf("config file not set") + os.Exit(1) + } + var err error + if conf, err = ParseConfig(rootArgs.configFile); err != nil { + log.Printf("error parsing config: %s", err) + os.Exit(1) } + jobrunLogger := golog.New(os.Stderr, "jobrun ", logFlags) + runner = jobrun.NewJobRunner(jobrunLogger) return } - -func cmdRun(c *cli.Context) error { - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - runner.Start() - }() - - jobs := make([]jobrun.Job, len(conf.Pulls)+len(conf.Pushs)) - i := 0 - for _, pull := range conf.Pulls { - jobs[i] = jobrun.Job{ - Name: fmt.Sprintf("pull.%d", i), - RepeatStrategy: pull.RepeatStrategy, - RunFunc: func(log jobrun.Logger) error { - log.Printf("doing pull: %v", pull) - return jobPull(pull, c, log) - }, - } - i++ - } - for _, push := range conf.Pushs { - jobs[i] = jobrun.Job{ - Name: fmt.Sprintf("push.%d", i), - RepeatStrategy: push.RepeatStrategy, - RunFunc: func(log jobrun.Logger) error { - log.Printf("doing push: %v", push) - return jobPush(push, c, log) - }, - } - i++ - } - - for _, j := range jobs { - if c.IsSet("once") { - j.RepeatStrategy = jobrun.NoRepeatStrategy{} - } - if c.IsSet("job") { - if c.String("job") == j.Name { - runner.AddJob(j) - break - } - continue - } - runner.AddJob(j) - } - - for { - select { - case job := <-runner.NotificationChan(): - log.Printf("job %s reported error: %v\n", job.Name, job.LastError) - } - } - - wg.Wait() - - return nil -} - -func jobPull(pull Pull, c *cli.Context, log jobrun.Logger) (err error) { - - if lt, ok := pull.From.Transport.(LocalTransport); ok { - lt.SetHandler(Handler{ - Logger: log, - PullACL: pull.Mapping, - }) - pull.From.Transport = lt - log.Printf("fixing up local transport: %#v", pull.From.Transport) - } - - var remote rpc.RPCRequester - - if remote, err = pull.From.Transport.Connect(log); err != nil { - return - } - - defer closeRPCWithTimeout(log, remote, time.Second*10, "") - - return doPull(PullContext{remote, log, pull.Mapping, pull.InitialReplPolicy}) -} - -func jobPush(push Push, c *cli.Context, log jobrun.Logger) (err error) { - - if _, ok := push.To.Transport.(LocalTransport); ok { - panic("no support for local pushs") - } - - var remote rpc.RPCRequester - if remote, err = push.To.Transport.Connect(log); err != nil { - return err - } - - defer closeRPCWithTimeout(log, remote, time.Second*10, "") - - log.Printf("building handler for PullMeRequest") - handler := Handler{ - Logger: log, - PullACL: push.Filter, - SinkMappingFunc: nil, // no need for that in the handler for PullMe - } - log.Printf("handler: %#v", handler) - - r := rpc.PullMeRequest{ - InitialReplPolicy: push.InitialReplPolicy, - } - log.Printf("doing PullMeRequest: %#v", r) - - if err = remote.PullMeRequest(r, handler); err != nil { - log.Printf("PullMeRequest failed: %s", err) - return - } - - log.Printf("push job finished") - return - -} - -func cmdPrune(c *cli.Context) error { - - log := defaultLog - - jobFailed := false - - log.Printf("retending...") - - for _, prune := range conf.Prunes { - - if !c.IsSet("job") || (c.IsSet("job") && c.String("job") == prune.Name) { - log.Printf("Beginning prune job:\n%s", prune) - ctx := PruneContext{prune, time.Now(), c.IsSet("n")} - err := doPrune(ctx, log) - if err != nil { - jobFailed = true - log.Printf("Prune job failed with error: %s", err) - } - log.Printf("\n") - - } - - } - - if jobFailed { - return cli.NewExitError("At least one job failed with an error. Check log for details.", 1) - } - - return nil -} - -func cmdAutosnap(c *cli.Context) error { - - log := defaultLog - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - runner.Start() - }() - - log.Printf("autosnap...") - - for i := range conf.Autosnaps { - - snap := conf.Autosnaps[i] - - if !c.IsSet("job") || (c.IsSet("job") && c.String("job") == snap.Name) { - - job := jobrun.Job{ - Name: fmt.Sprintf("autosnap.%s", snap.Name), - RepeatStrategy: snap.Interval, - RunFunc: func(log jobrun.Logger) error { - log.Printf("doing autosnap: %v", snap) - ctx := AutosnapContext{snap} - return doAutosnap(ctx, log) - }, - } - runner.AddJob(job) - - } - } - - wg.Wait() - - return nil - -} diff --git a/cmd/prune.go b/cmd/prune.go index 6d27a42..864b46f 100644 --- a/cmd/prune.go +++ b/cmd/prune.go @@ -1,13 +1,61 @@ -package main +package cmd import ( "fmt" + "github.com/spf13/cobra" "github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/zfs" + "os" "sort" "time" ) +var pruneArgs struct { + job string + dryRun bool +} + +var PruneCmd = &cobra.Command{ + Use: "prune", + Short: "perform pruning", + Run: cmdPrune, +} + +func init() { + PruneCmd.Flags().StringVar(&pruneArgs.job, "job", "", "job to run") + PruneCmd.Flags().BoolVarP(&pruneArgs.dryRun, "dryrun", "n", false, "dry run") + RootCmd.AddCommand(PruneCmd) +} + +func cmdPrune(cmd *cobra.Command, args []string) { + + jobFailed := false + + log.Printf("retending...") + + for _, prune := range conf.Prunes { + + if pruneArgs.job == "" || pruneArgs.job == prune.Name { + log.Printf("Beginning prune job:\n%s", prune) + ctx := PruneContext{prune, time.Now(), pruneArgs.dryRun} + err := doPrune(ctx, log) + if err != nil { + jobFailed = true + log.Printf("Prune job failed with error: %s", err) + } + log.Printf("\n") + + } + + } + + if jobFailed { + log.Printf("At least one job failed with an error. Check log for details.") + os.Exit(1) + } + +} + type PruneContext struct { Prune Prune Now time.Time diff --git a/cmd/replication.go b/cmd/replication.go index fde1216..afc9729 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -1,13 +1,150 @@ -package main +package cmd import ( "fmt" + "github.com/spf13/cobra" + "github.com/zrepl/zrepl/jobrun" "github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/zfs" "io" + "sync" "time" ) +var runArgs struct { + job string + once bool +} + +var RunCmd = &cobra.Command{ + Use: "run", + Short: "run push & pull replication", + Run: cmdRun, +} + +func init() { + RootCmd.AddCommand(RunCmd) + RunCmd.Flags().BoolVar(&runArgs.once, "once", false, "run jobs only once, regardless of configured repeat behavior") + RunCmd.Flags().StringVar(&runArgs.job, "job", "", "run only the given job") +} + +func cmdRun(cmd *cobra.Command, args []string) { + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runner.Start() + }() + + jobs := make([]jobrun.Job, len(conf.Pulls)+len(conf.Pushs)) + i := 0 + for _, pull := range conf.Pulls { + jobs[i] = jobrun.Job{ + Name: fmt.Sprintf("pull.%d", i), + RepeatStrategy: pull.RepeatStrategy, + RunFunc: func(log jobrun.Logger) error { + log.Printf("doing pull: %v", pull) + return jobPull(pull, log) + }, + } + i++ + } + for _, push := range conf.Pushs { + jobs[i] = jobrun.Job{ + Name: fmt.Sprintf("push.%d", i), + RepeatStrategy: push.RepeatStrategy, + RunFunc: func(log jobrun.Logger) error { + log.Printf("doing push: %v", push) + return jobPush(push, log) + }, + } + i++ + } + + for _, j := range jobs { + if runArgs.once { + j.RepeatStrategy = jobrun.NoRepeatStrategy{} + } + if runArgs.job != "" { + if runArgs.job == j.Name { + runner.AddJob(j) + break + } + continue + } + runner.AddJob(j) + } + + for { + select { + case job := <-runner.NotificationChan(): + log.Printf("job %s reported error: %v\n", job.Name, job.LastError) + } + } + + wg.Wait() + +} + +func jobPull(pull Pull, log jobrun.Logger) (err error) { + + if lt, ok := pull.From.Transport.(LocalTransport); ok { + lt.SetHandler(Handler{ + Logger: log, + PullACL: pull.Mapping, + }) + pull.From.Transport = lt + log.Printf("fixing up local transport: %#v", pull.From.Transport) + } + + var remote rpc.RPCRequester + + if remote, err = pull.From.Transport.Connect(log); err != nil { + return + } + + defer closeRPCWithTimeout(log, remote, time.Second*10, "") + + return doPull(PullContext{remote, log, pull.Mapping, pull.InitialReplPolicy}) +} + +func jobPush(push Push, log jobrun.Logger) (err error) { + + if _, ok := push.To.Transport.(LocalTransport); ok { + panic("no support for local pushs") + } + + var remote rpc.RPCRequester + if remote, err = push.To.Transport.Connect(log); err != nil { + return err + } + + defer closeRPCWithTimeout(log, remote, time.Second*10, "") + + log.Printf("building handler for PullMeRequest") + handler := Handler{ + Logger: log, + PullACL: push.Filter, + SinkMappingFunc: nil, // no need for that in the handler for PullMe + } + log.Printf("handler: %#v", handler) + + r := rpc.PullMeRequest{ + InitialReplPolicy: push.InitialReplPolicy, + } + log.Printf("doing PullMeRequest: %#v", r) + + if err = remote.PullMeRequest(r, handler); err != nil { + log.Printf("PullMeRequest failed: %s", err) + return + } + + log.Printf("push job finished") + return + +} + func closeRPCWithTimeout(log Logger, remote rpc.RPCRequester, timeout time.Duration, goodbye string) { log.Printf("closing rpc connection") diff --git a/cmd/stdinserver.go b/cmd/stdinserver.go new file mode 100644 index 0000000..5834b9a --- /dev/null +++ b/cmd/stdinserver.go @@ -0,0 +1,79 @@ +package cmd + +import ( + "fmt" + "github.com/spf13/cobra" + "github.com/zrepl/zrepl/rpc" + "github.com/zrepl/zrepl/sshbytestream" + "github.com/zrepl/zrepl/zfs" + "io" + golog "log" + "os" +) + +var stdinserver struct { + identity string +} + +var StdinserverCmd = &cobra.Command{ + Use: "stdinserver", + Short: "start in stdin server mode (from authorized_keys file)", + Run: cmdStdinServer, +} + +func init() { + StdinserverCmd.Flags().StringVar(&stdinserver.identity, "identity", "", "") + RootCmd.AddCommand(StdinserverCmd) +} + +func cmdStdinServer(cmd *cobra.Command, args []string) { + + var err error + defer func() { + if err != nil { + log.Printf("stdinserver exiting with error: %s", err) + os.Exit(1) + } + }() + + if stdinserver.identity == "" { + err = fmt.Errorf("identity flag not set") + return + } + identity := stdinserver.identity + + var sshByteStream io.ReadWriteCloser + if sshByteStream, err = sshbytestream.Incoming(); err != nil { + return + } + + findMapping := func(cm []ClientMapping, identity string) zfs.DatasetMapping { + for i := range cm { + if cm[i].From == identity { + return cm[i].Mapping + } + } + return nil + } + sinkMapping := func(identity string) (sink zfs.DatasetMapping, err error) { + if sink = findMapping(conf.Sinks, identity); sink == nil { + return nil, fmt.Errorf("could not find sink for dataset") + } + return + } + + sinkLogger := golog.New(logOut, fmt.Sprintf("sink[%s] ", identity), logFlags) + handler := Handler{ + Logger: sinkLogger, + SinkMappingFunc: sinkMapping, + PullACL: findMapping(conf.PullACLs, identity), + } + + if err = rpc.ListenByteStreamRPC(sshByteStream, identity, handler, sinkLogger); err != nil { + log.Printf("listenbytestreamerror: %#v\n", err) + os.Exit(1) + } + + return + +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..a7919b6 --- /dev/null +++ b/main.go @@ -0,0 +1,15 @@ +// See cmd package. +package main + +import ( + "github.com/zrepl/zrepl/cmd" + "log" + "os" +) + +func main() { + if err := cmd.RootCmd.Execute(); err != nil { + log.Printf("error executing root command: %s", err) + os.Exit(1) + } +}