mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 00:13:52 +01:00
Merge pull request #311 from zrepl/problame/zfscmd-fixes-backported-from-307-tracing-wip
zfscmd & zfs fixes created during WIP on #307
This commit is contained in:
commit
7b34d6cba5
@ -211,7 +211,7 @@ func doMigrateReplicationCursorFS(ctx context.Context, v1CursorJobs []job.Job, f
|
||||
}
|
||||
fmt.Printf("identified owning job %q\n", owningJob.Name())
|
||||
|
||||
bookmarks, err := zfs.ZFSListFilesystemVersions(fs, zfs.ListFilesystemVersionsOptions{
|
||||
bookmarks, err := zfs.ZFSListFilesystemVersions(ctx, fs, zfs.ListFilesystemVersionsOptions{
|
||||
Types: zfs.Bookmarks,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -485,7 +485,7 @@ var findSyncPointFSNoFilesystemVersionsErr = fmt.Errorf("no filesystem versions"
|
||||
|
||||
func findSyncPointFSNextOptimalSnapshotTime(l Logger, now time.Time, interval time.Duration, prefix string, d *zfs.DatasetPath) (time.Time, error) {
|
||||
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(d, zfs.ListFilesystemVersionsOptions{
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(context.TODO(), d, zfs.ListFilesystemVersionsOptions{
|
||||
Types: zfs.Snapshots,
|
||||
ShortnamePrefix: prefix,
|
||||
})
|
||||
|
@ -96,7 +96,7 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(lp, zfs.ListFilesystemVersionsOptions{})
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(ctx, lp, zfs.ListFilesystemVersionsOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -648,7 +648,7 @@ func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFile
|
||||
}
|
||||
// TODO share following code with sender
|
||||
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(lp, zfs.ListFilesystemVersionsOptions{})
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(ctx, lp, zfs.ListFilesystemVersionsOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -573,7 +573,7 @@ func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsA
|
||||
whatTypes[zfs.Snapshot] = true
|
||||
}
|
||||
}
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(fsp, zfs.ListFilesystemVersionsOptions{
|
||||
fsvs, err := zfs.ZFSListFilesystemVersions(ctx, fsp, zfs.ListFilesystemVersionsOptions{
|
||||
Types: whatTypes,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -43,7 +43,7 @@ func ListFilesystemVersionsTypeFilteringAndPrefix(t *platformtest.Context) {
|
||||
fs := fmt.Sprintf("%s/foo bar", t.RootDataset)
|
||||
|
||||
// no options := all types
|
||||
vs, err := zfs.ZFSListFilesystemVersions(mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{})
|
||||
vs, err := zfs.ZFSListFilesystemVersions(t, mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []string{
|
||||
"#blup 1", "#bookfoo 1", "#bookfoo 2", "#foo 1", "#foo 2",
|
||||
@ -51,21 +51,21 @@ func ListFilesystemVersionsTypeFilteringAndPrefix(t *platformtest.Context) {
|
||||
}, versionRelnamesSorted(vs))
|
||||
|
||||
// just snapshots
|
||||
vs, err = zfs.ZFSListFilesystemVersions(mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{
|
||||
vs, err = zfs.ZFSListFilesystemVersions(t, mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{
|
||||
Types: zfs.Snapshots,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []string{"@ foo with leading whitespace", "@blup 1", "@foo 1", "@foo 2"}, versionRelnamesSorted(vs))
|
||||
|
||||
// just bookmarks
|
||||
vs, err = zfs.ZFSListFilesystemVersions(mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{
|
||||
vs, err = zfs.ZFSListFilesystemVersions(t, mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{
|
||||
Types: zfs.Bookmarks,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []string{"#blup 1", "#bookfoo 1", "#bookfoo 2", "#foo 1", "#foo 2"}, versionRelnamesSorted(vs))
|
||||
|
||||
// just with prefix foo
|
||||
vs, err = zfs.ZFSListFilesystemVersions(mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{
|
||||
vs, err = zfs.ZFSListFilesystemVersions(t, mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{
|
||||
ShortnamePrefix: "foo",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
@ -82,7 +82,7 @@ func ListFilesystemVersionsZeroExistIsNotAnError(t *platformtest.Context) {
|
||||
|
||||
fs := fmt.Sprintf("%s/foo bar", t.RootDataset)
|
||||
|
||||
vs, err := zfs.ZFSListFilesystemVersions(mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{})
|
||||
vs, err := zfs.ZFSListFilesystemVersions(t, mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{})
|
||||
require.Empty(t, vs)
|
||||
require.NoError(t, err)
|
||||
dsne, ok := err.(*zfs.DatasetDoesNotExist)
|
||||
@ -98,7 +98,7 @@ func ListFilesystemVersionsFilesystemNotExist(t *platformtest.Context) {
|
||||
|
||||
nonexistentFS := fmt.Sprintf("%s/not existent", t.RootDataset)
|
||||
|
||||
vs, err := zfs.ZFSListFilesystemVersions(mustDatasetPath(nonexistentFS), zfs.ListFilesystemVersionsOptions{})
|
||||
vs, err := zfs.ZFSListFilesystemVersions(t, mustDatasetPath(nonexistentFS), zfs.ListFilesystemVersionsOptions{})
|
||||
require.Empty(t, vs)
|
||||
require.Error(t, err)
|
||||
t.Logf("err = %T\n%s", err, err)
|
||||
@ -141,7 +141,7 @@ func ListFilesystemVersionsUserrefs(t *platformtest.Context) {
|
||||
|
||||
fs := fmt.Sprintf("%s/foo bar", t.RootDataset)
|
||||
|
||||
vs, err := zfs.ZFSListFilesystemVersions(mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{})
|
||||
vs, err := zfs.ZFSListFilesystemVersions(t, mustDatasetPath(fs), zfs.ListFilesystemVersionsOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
type expectation struct {
|
||||
|
@ -206,13 +206,13 @@ func (o *ListFilesystemVersionsOptions) matches(v FilesystemVersion) bool {
|
||||
}
|
||||
|
||||
// returned versions are sorted by createtxg FIXME drop sort by createtxg requirement
|
||||
func ZFSListFilesystemVersions(fs *DatasetPath, options ListFilesystemVersionsOptions) (res []FilesystemVersion, err error) {
|
||||
func ZFSListFilesystemVersions(ctx context.Context, fs *DatasetPath, options ListFilesystemVersionsOptions) (res []FilesystemVersion, err error) {
|
||||
listResults := make(chan ZFSListResult)
|
||||
|
||||
promTimer := prometheus.NewTimer(prom.ZFSListFilesystemVersionDuration.WithLabelValues(fs.ToString()))
|
||||
defer promTimer.ObserveDuration()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
go ZFSListChan(ctx, listResults,
|
||||
[]string{"name", "guid", "createtxg", "creation", "userrefs"},
|
||||
|
@ -1084,7 +1084,7 @@ func ZFSRecv(ctx context.Context, fs string, v *ZFSSendArgVersion, streamCopier
|
||||
if opts.RollbackAndForceRecv {
|
||||
// destroy all snapshots before `recv -F` because `recv -F`
|
||||
// does not perform a rollback unless `send -R` was used (which we assume hasn't been the case)
|
||||
snaps, err := ZFSListFilesystemVersions(fsdp, ListFilesystemVersionsOptions{
|
||||
snaps, err := ZFSListFilesystemVersions(ctx, fsdp, ListFilesystemVersionsOptions{
|
||||
Types: Snapshots,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -18,10 +18,10 @@ import (
|
||||
)
|
||||
|
||||
type Cmd struct {
|
||||
cmd *exec.Cmd
|
||||
ctx context.Context
|
||||
mtx sync.RWMutex
|
||||
startedAt, waitReturnedAt time.Time
|
||||
cmd *exec.Cmd
|
||||
ctx context.Context
|
||||
mtx sync.RWMutex
|
||||
startedAt, waitStartedAt, waitReturnedAt time.Time
|
||||
}
|
||||
|
||||
func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
|
||||
@ -119,19 +119,65 @@ func (c *Cmd) startPost(err error) {
|
||||
}
|
||||
|
||||
func (c *Cmd) waitPre() {
|
||||
waitPreLogging(c, time.Now())
|
||||
now := time.Now()
|
||||
|
||||
// ignore duplicate waits
|
||||
c.mtx.Lock()
|
||||
// ignore duplicate waits
|
||||
if !c.waitStartedAt.IsZero() {
|
||||
c.mtx.Unlock()
|
||||
return
|
||||
}
|
||||
c.waitStartedAt = now
|
||||
c.mtx.Unlock()
|
||||
|
||||
waitPreLogging(c, now)
|
||||
}
|
||||
|
||||
type usage struct {
|
||||
total_secs, system_secs, user_secs float64
|
||||
}
|
||||
|
||||
func (c *Cmd) waitPost(err error) {
|
||||
now := time.Now()
|
||||
|
||||
c.mtx.Lock()
|
||||
// ignore duplicate waits
|
||||
if !c.waitReturnedAt.IsZero() {
|
||||
c.mtx.Unlock()
|
||||
return
|
||||
}
|
||||
c.waitReturnedAt = now
|
||||
c.mtx.Unlock()
|
||||
|
||||
waitPostReport(c, now)
|
||||
waitPostLogging(c, err, now)
|
||||
waitPostPrometheus(c, err, now)
|
||||
// build usage
|
||||
var u usage
|
||||
{
|
||||
var s *os.ProcessState
|
||||
if err == nil {
|
||||
s = c.cmd.ProcessState
|
||||
} else if ee, ok := err.(*exec.ExitError); ok {
|
||||
s = ee.ProcessState
|
||||
}
|
||||
|
||||
if s == nil {
|
||||
u = usage{
|
||||
total_secs: c.Runtime().Seconds(),
|
||||
system_secs: -1,
|
||||
user_secs: -1,
|
||||
}
|
||||
} else {
|
||||
u = usage{
|
||||
total_secs: c.Runtime().Seconds(),
|
||||
system_secs: s.SystemTime().Seconds(),
|
||||
user_secs: s.UserTime().Seconds(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
waitPostReport(c, u, now)
|
||||
waitPostLogging(c, u, err, now)
|
||||
waitPostPrometheus(c, u, err, now)
|
||||
}
|
||||
|
||||
// returns 0 if the command did not yet finish
|
||||
|
@ -1,7 +1,6 @@
|
||||
package zfscmd
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -28,23 +27,12 @@ func waitPreLogging(c *Cmd, now time.Time) {
|
||||
c.log().Debug("start waiting")
|
||||
}
|
||||
|
||||
func waitPostLogging(c *Cmd, err error, now time.Time) {
|
||||
|
||||
var total, system, user float64
|
||||
|
||||
total = c.Runtime().Seconds()
|
||||
if ee, ok := err.(*exec.ExitError); ok {
|
||||
system = ee.ProcessState.SystemTime().Seconds()
|
||||
user = ee.ProcessState.UserTime().Seconds()
|
||||
} else {
|
||||
system = -1
|
||||
user = -1
|
||||
}
|
||||
func waitPostLogging(c *Cmd, u usage, err error, now time.Time) {
|
||||
|
||||
log := c.log().
|
||||
WithField("total_time_s", total).
|
||||
WithField("systemtime_s", system).
|
||||
WithField("usertime_s", user)
|
||||
WithField("total_time_s", u.total_secs).
|
||||
WithField("systemtime_s", u.system_secs).
|
||||
WithField("usertime_s", u.user_secs)
|
||||
|
||||
if err == nil {
|
||||
log.Info("command exited without error")
|
||||
|
@ -46,7 +46,7 @@ func RegisterMetrics(r prometheus.Registerer) {
|
||||
r.MustRegister(metrics.usertime)
|
||||
}
|
||||
|
||||
func waitPostPrometheus(c *Cmd, err error, now time.Time) {
|
||||
func waitPostPrometheus(c *Cmd, u usage, err error, now time.Time) {
|
||||
|
||||
if len(c.cmd.Args) < 2 {
|
||||
getLogger(c.ctx).WithField("args", c.cmd.Args).
|
||||
@ -64,10 +64,10 @@ func waitPostPrometheus(c *Cmd, err error, now time.Time) {
|
||||
|
||||
metrics.totaltime.
|
||||
WithLabelValues(labelValues...).
|
||||
Observe(c.Runtime().Seconds())
|
||||
Observe(u.total_secs)
|
||||
metrics.systemtime.WithLabelValues(labelValues...).
|
||||
Observe(c.cmd.ProcessState.SystemTime().Seconds())
|
||||
Observe(u.system_secs)
|
||||
metrics.usertime.WithLabelValues(labelValues...).
|
||||
Observe(c.cmd.ProcessState.UserTime().Seconds())
|
||||
Observe(u.user_secs)
|
||||
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ func startPostReport(c *Cmd, err error, now time.Time) {
|
||||
active.mtx.Unlock()
|
||||
}
|
||||
|
||||
func waitPostReport(c *Cmd, now time.Time) {
|
||||
func waitPostReport(c *Cmd, _ usage, now time.Time) {
|
||||
active.mtx.Lock()
|
||||
defer active.mtx.Unlock()
|
||||
prev := active.cmds[c]
|
||||
|
Loading…
Reference in New Issue
Block a user