diff --git a/cmd/prune.go b/cmd/prune.go index 73ea7e1..2adb14f 100644 --- a/cmd/prune.go +++ b/cmd/prune.go @@ -3,7 +3,9 @@ package cmd import ( "context" "fmt" + "github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/zfs" + "sync" "time" ) @@ -23,9 +25,8 @@ type PruneResult struct { Remove []zfs.FilesystemVersion } +// FIXME must not call p.task.Enter because it runs in parallel func (p *Pruner) filterFilesystems() (filesystems []*zfs.DatasetPath, stop bool) { - p.task.Enter("filter_fs") - defer p.task.Finish() filesystems, err := zfs.ZFSListMapping(p.DatasetFilter) if err != nil { p.task.Log().WithError(err).Error("error applying filesystem filter") @@ -38,9 +39,8 @@ func (p *Pruner) filterFilesystems() (filesystems []*zfs.DatasetPath, stop bool) return filesystems, false } +// FIXME must not call p.task.Enter because it runs in parallel func (p *Pruner) filterVersions(fs *zfs.DatasetPath) (fsversions []zfs.FilesystemVersion, stop bool) { - p.task.Enter("filter_versions") - defer p.task.Finish() log := p.task.Log().WithField(logFSField, fs.ToString()) filter := NewPrefixFilter(p.SnapshotPrefix) @@ -56,9 +56,8 @@ func (p *Pruner) filterVersions(fs *zfs.DatasetPath) (fsversions []zfs.Filesyste return fsversions, false } -func (p *Pruner) pruneFilesystem(fs *zfs.DatasetPath) (r PruneResult, valid bool) { - p.task.Enter("prune_fs") - defer p.task.Finish() +// FIXME must not call p.task.Enter because it runs in parallel +func (p *Pruner) pruneFilesystem(fs *zfs.DatasetPath, destroySemaphore util.Semaphore) (r PruneResult, valid bool) { log := p.task.Log().WithField(logFSField, fs.ToString()) fsversions, stop := p.filterVersions(fs) @@ -66,9 +65,7 @@ func (p *Pruner) pruneFilesystem(fs *zfs.DatasetPath) (r PruneResult, valid bool return } - p.task.Enter("prune_policy") keep, remove, err := p.PrunePolicy.Prune(fs, fsversions) - p.task.Finish() if err != nil { log.WithError(err).Error("error evaluating prune policy") return @@ -81,33 +78,37 @@ func (p *Pruner) pruneFilesystem(fs *zfs.DatasetPath) (r PruneResult, valid bool r = PruneResult{fs, fsversions, keep, remove} - makeFields := func(v zfs.FilesystemVersion) (fields map[string]interface{}) { - fields = make(map[string]interface{}) - fields["version"] = v.ToAbsPath(fs) - timeSince := v.Creation.Sub(p.Now) - fields["age_ns"] = timeSince - const day time.Duration = 24 * time.Hour - days := timeSince / day - remainder := timeSince % day - fields["age_str"] = fmt.Sprintf("%dd%s", days, remainder) - return - } + var wg sync.WaitGroup + for v := range remove { + wg.Add(1) + go func(v zfs.FilesystemVersion) { + defer wg.Done() + // log fields + fields := make(map[string]interface{}) + fields["version"] = v.ToAbsPath(fs) + timeSince := v.Creation.Sub(p.Now) + fields["age_ns"] = timeSince + const day time.Duration = 24 * time.Hour + days := timeSince / day + remainder := timeSince % day + fields["age_str"] = fmt.Sprintf("%dd%s", days, remainder) - for _, v := range remove { - fields := makeFields(v) - log.WithFields(fields).Info("destroying version") - // echo what we'll do and exec zfs destroy if not dry run - // TODO special handling for EBUSY (zfs hold) - // TODO error handling for clones? just echo to cli, skip over, and exit with non-zero status code (we're idempotent) - if !p.DryRun { - p.task.Enter("destroy") - err := zfs.ZFSDestroyFilesystemVersion(fs, v) - p.task.Finish() - if err != nil { - log.WithFields(fields).WithError(err).Error("error destroying version") + log.WithFields(fields).Info("destroying version") + // echo what we'll do and exec zfs destroy if not dry run + // TODO special handling for EBUSY (zfs hold) + // TODO error handling for clones? just echo to cli, skip over, and exit with non-zero status code (we're idempotent) + if !p.DryRun { + destroySemaphore.Down() + err := zfs.ZFSDestroyFilesystemVersion(fs, v) + destroySemaphore.Up() + if err != nil { + log.WithFields(fields).WithError(err).Error("error destroying version") + } } - } + }(remove[v]) } + wg.Wait() + return r, true } @@ -124,13 +125,31 @@ func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) { return } - r = make([]PruneResult, 0, len(filesystems)) + maxConcurrentDestroy := len(filesystems) + p.task.Log().WithField("max_concurrent_destroy", maxConcurrentDestroy).Info("begin concurrent destroy") + destroySem := util.NewSemaphore(maxConcurrentDestroy) + resChan := make(chan PruneResult, len(filesystems)) + var wg sync.WaitGroup for _, fs := range filesystems { - res, ok := p.pruneFilesystem(fs) - if ok { - r = append(r, res) - } + wg.Add(1) + go func(fs *zfs.DatasetPath) { + defer wg.Done() + res, ok := p.pruneFilesystem(fs, destroySem) + if ok { + resChan <- res + } + + }(fs) + } + wg.Wait() + close(resChan) + + p.task.Log().Info("destroys done") + + r = make([]PruneResult, 0, len(filesystems)) + for res := range resChan { + r = append(r, res) } return