Merge branch 'problame/zfs-command-logging-and-status' into problame/holds-release-and-hold-leak-fix-v2

This commit is contained in:
Christian Schwarz 2020-03-29 19:04:21 +02:00
commit bc291e622f
6 changed files with 410 additions and 2 deletions

View File

@ -0,0 +1,11 @@
The tool in this package (`go run . -h`) scrapes log lines produces by the `github.com/zrepl/zrepl/zfs/zfscmd` package
into a stream of JSON objects.
The `analysis.ipynb` then runs some basic analysis on the collected log output.
## Deps for the `scrape_graylog_csv.bash` script
```
pip install --upgrade git+https://github.com/lk-jeffpeck/csvfilter.git@ec433f14330fbbf5d41f56febfeedac22868a949
```

View File

@ -0,0 +1,180 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import seaborn as sns\n",
"import matplotlib.pyplot as plt\n",
"import re\n",
"\n",
"%matplotlib notebook"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# ./parsed.json is the stdout of the scraper tool in this directory\n",
"df = pd.read_json(\"./parsed.json\", lines=True)\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def parse_ds(entity):\n",
" m = re.search(r\"(?P<dataset>[^@#]*)([@#].+)?\", entity)\n",
" return m.group(\"dataset\")\n",
" \n",
"def parse_cmd(row):\n",
" cmd = row.Cmd\n",
" binary, verb, *tail = re.split(r\"\\s+\", cmd) # NOTE whitespace in dataset names => don't use comp\n",
" \n",
" dataset = None\n",
" if binary == \"zfs\":\n",
" if verb == \"send\": \n",
" dataset = parse_ds(tail[-1])\n",
" if \"-n\" in tail:\n",
" verb = \"send-dry\"\n",
" elif verb == \"recv\" or verb == \"receive\":\n",
" verb = \"receive\"\n",
" if len(tail) > 0:\n",
" dataset = parse_ds(tail[-1])\n",
" else:\n",
" verb = \"receive-CLI-test\"\n",
" elif verb == \"get\":\n",
" dataset = parse_ds(tail[-1])\n",
" elif verb == \"list\":\n",
" if \"-r\" in tail and \"-d\" in tail and \"1\" in tail:\n",
" dataset = parse_ds(tail[-1])\n",
" verb = \"list-single-dataset\"\n",
" else:\n",
" dataset = \"!ALL_POOLS!\"\n",
" verb = \"list-all-filesystems\"\n",
" elif verb == \"bookmark\":\n",
" dataset = parse_ds(tail[-2])\n",
" elif verb == \"hold\":\n",
" dataset = parse_ds(tail[-1])\n",
" elif verb == \"snapshot\":\n",
" dataset = parse_ds(tail[-1])\n",
" elif verb == \"release\":\n",
" dss = tail[-1].split(\",\")\n",
" if len(dss) > 1:\n",
" raise Exception(\"cannot handle batch-release\")\n",
" dataset = parse_ds(dss[0])\n",
" elif verb == \"holds\" and \"-H\" in tail:\n",
" dss = tail[-1].split(\",\")\n",
" if len(dss) > 1:\n",
" raise Exception(\"cannot handle batch-holds\")\n",
" dataset = parse_ds(dss[0])\n",
" elif verb == \"destroy\":\n",
" dss = tail[-1].split(\",\")\n",
" if len(dss) > 1:\n",
" raise Exception(\"cannot handle batch-holds\")\n",
" dataset = parse_ds(dss[0])\n",
" \n",
" return {'action':binary + \"-\" + verb, 'dataset': dataset }\n",
" \n",
" \n",
"res = df.apply(parse_cmd, axis='columns', result_type='expand')\n",
"res = pd.concat([df, res], axis='columns')\n",
"for cat in [\"action\", \"dataset\"]:\n",
" res[cat] = res[cat].astype('category')\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"res[\"OtherTime\"] = res.TotalTime - res.Usertime - res.Systime\n",
"x = res.melt(id_vars=[\"action\", \"dataset\"], value_vars=[\"TotalTime\", \"OtherTime\", \"Usertime\", \"Systime\"])\n",
"x"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(\"commands with NaN values\")\n",
"set(x[x.isna().any(axis=1)].action.values)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# (~x.action.astype('str').isin([\"zfs-send\", \"zfs-recv\"]))\n",
"totaltimes = x[(x.variable == \"TotalTime\")].groupby([\"action\", \"dataset\"]).sum().reset_index()\n",
"display(totaltimes)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"totaltimes_by_action = totaltimes.groupby(\"action\").sum().sort_values(by=\"value\")\n",
"totaltimes_by_action.plot.barh()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"totaltimes.groupby(\"dataset\").sum().plot.barh(fontsize=5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"most_expensive_action = totaltimes_by_action.idxmax().value\n",
"display(most_expensive_action)\n",
"most_expensive_action_by_dataset = totaltimes[totaltimes.action == most_expensive_action].groupby(\"dataset\").sum().sort_values(by=\"value\")\n",
"most_expensive_action_by_dataset.plot.barh(rot=50, fontsize=5, figsize=(10, 20))\n",
"plt.savefig('most-expensive-command.pdf')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
# This script converts output that was produced by zrepl and captured by Graylog
# back to something that the scraper in this package's main can understand
# Intended for human syslog
# logging:
# - type: syslog
# level: debug
# format: human
csvfilter --skip 1 -f 0,2 -q '"' --out-quotechar=' ' /dev/stdin | sed -E 's/^\s*([^,]*), /\1 [LEVEL]/' | \
go run . -v \
--dateRE '^([^\[]+) (\[.*)' \
--dateFormat '2006-01-02T15:04:05.999999999Z'

View File

@ -0,0 +1,122 @@
package main
import (
"bufio"
"encoding/json"
"fmt"
"os"
"regexp"
"strings"
"time"
"github.com/go-logfmt/logfmt"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/zrepl/zrepl/daemon/logging"
)
type RuntimeLine struct {
LogTime time.Time
Cmd string
TotalTime, Usertime, Systime time.Duration
Error string
}
var humanFormatterLineRE = regexp.MustCompile(`^(\[[^\]]+\]){2}\[zfs.cmd\]:\s+command\s+exited\s+(with|without)\s+error\s+(.+)`)
func parseSecs(s string) (time.Duration, error) {
d, err := time.ParseDuration(s + "s")
if err != nil {
return 0, errors.Wrapf(err, "parse duration %q", s)
}
return d, nil
}
func parseHumanFormatterNodate(line string) (l RuntimeLine, err error) {
m := humanFormatterLineRE.FindStringSubmatch(line)
if m == nil {
return l, errors.New("human formatter regex does not match")
}
d := logfmt.NewDecoder(strings.NewReader(m[3]))
for d.ScanRecord() {
for d.ScanKeyval() {
k := string(d.Key())
v := string(d.Value())
switch k {
case "cmd":
l.Cmd = v
case "total_time_s":
l.TotalTime, err = parseSecs(v)
case "usertime_s":
l.Usertime, err = parseSecs(v)
case "systemtime_s":
l.Systime, err = parseSecs(v)
case "err":
l.Error = v
case "invocation":
continue // pass
default:
return l, errors.Errorf("unknown key %q", k)
}
}
}
if d.Err() != nil {
return l, errors.Wrap(d.Err(), "decode key value pairs")
}
return l, nil
}
func parseLogLine(line string) (l RuntimeLine, err error) {
m := dateRegex.FindStringSubmatch(line)
if len(m) != 3 {
return l, errors.Errorf("invalid date regex match %v", m)
}
date, err := time.Parse(dateFormat, strings.TrimSpace(m[1]))
if err != nil {
panic(fmt.Sprintf("cannot parse date %q: %s", m[1], err))
}
logLine := m[2]
l, err = parseHumanFormatterNodate(strings.TrimSpace(logLine))
l.LogTime = date
return l, err
}
var verbose bool
var dateRegexArg string
var dateRegex *regexp.Regexp
var dateFormat string
func main() {
pflag.StringVarP(&dateRegexArg, "dateRE", "d", `^([^\[]+)(.*)`, "date regex")
pflag.StringVar(&dateFormat, "dateFormat", logging.HumanFormatterDateFormat, "go date format")
pflag.BoolVarP(&verbose, "verbose", "v", false, "verbose")
pflag.Parse()
dateRegex = regexp.MustCompile(dateRegexArg)
input := bufio.NewScanner(os.Stdin)
input.Split(bufio.ScanLines)
enc := json.NewEncoder(os.Stdout)
for input.Scan() {
l, err := parseLogLine(input.Text())
if err != nil && verbose {
fmt.Fprintf(os.Stderr, "ignoring line after error %v\n", err)
fmt.Fprintf(os.Stderr, "offending line was: %s\n", input.Text())
}
if err == nil {
if err := enc.Encode(l); err != nil {
panic(err)
}
}
}
if input.Err() != nil {
panic(input.Err())
}
}

View File

@ -0,0 +1,78 @@
package main
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestParseHumanFormatterNodate(t *testing.T) {
type testCase struct {
Name string
Input string
Expect *RuntimeLine
ExpectErr string
}
secs := func(s string) time.Duration {
d, err := parseSecs(s)
require.NoError(t, err)
return d
}
tcs := []testCase{
{
Name: "human-formatter-noerror",
Input: `[jobname][zfs.cmd]: command exited without error usertime_s="0.008445" cmd="zfs list -H -p -o name -r -t filesystem,volume" systemtime_s="0.033783" invocation="84" total_time_s="0.037828619"`,
Expect: &RuntimeLine{
Cmd: "zfs list -H -p -o name -r -t filesystem,volume",
TotalTime: secs("0.037828619"),
Usertime: secs("0.008445"),
Systime: secs("0.033783"),
Error: "",
},
},
{
Name: "human-formatter-witherror",
Input: `[jobname][zfs.cmd]: command exited with error usertime_s="0.008445" cmd="zfs list -H -p -o name -r -t filesystem,volume" systemtime_s="0.033783" invocation="84" total_time_s="0.037828619" err="some error"`,
Expect: &RuntimeLine{
Cmd: "zfs list -H -p -o name -r -t filesystem,volume",
TotalTime: secs("0.037828619"),
Usertime: secs("0.008445"),
Systime: secs("0.033783"),
Error: "some error",
},
},
{
Name: "from graylog",
Input: `[csnas][zfs.cmd]: command exited without error usertime_s="0" cmd="zfs send -i zroot/ezjail/synapse-12@zrepl_20200329_095518_000 zroot/ezjail/synapse-12@zrepl_20200329_102454_000" total_time_s="0.101598591" invocation="85" systemtime_s="0.041581"`,
Expect: &RuntimeLine{
Cmd: "zfs send -i zroot/ezjail/synapse-12@zrepl_20200329_095518_000 zroot/ezjail/synapse-12@zrepl_20200329_102454_000",
TotalTime: secs("0.101598591"),
Systime: secs("0.041581"),
Usertime: secs("0"),
Error: "",
},
},
}
for _, c := range tcs {
t.Run(c.Name, func(t *testing.T) {
l, err := parseHumanFormatterNodate(c.Input)
t.Logf("l=%v", l)
t.Logf("err=%T %v", err, err)
if (c.Expect != nil && c.ExpectErr != "") || (c.Expect == nil && c.ExpectErr == "") {
t.Fatal("bad test case", c)
}
if c.Expect != nil {
require.Equal(t, *c.Expect, l)
}
if c.ExpectErr != "" {
require.EqualError(t, err, c.ExpectErr)
}
})
}
}

View File

@ -8,7 +8,8 @@ import (
// //
// Pre-events logged with debug // Pre-events logged with debug
// Post-event without error logged with info // Post-event without error logged with info
// Post-events with error logged at error level // Post-events with error _also_ logged with info
// (Not all errors we observe at this layer) are actual errors in higher-level layers)
func startPreLogging(c *Cmd, now time.Time) { func startPreLogging(c *Cmd, now time.Time) {
c.log().Debug("starting command") c.log().Debug("starting command")
@ -35,6 +36,6 @@ func waitPostLogging(c *Cmd, err error, now time.Time) {
if err == nil { if err == nil {
log.Info("command exited without error") log.Info("command exited without error")
} else { } else {
log.WithError(err).Error("command exited with error") log.WithError(err).Info("command exited with error")
} }
} }