bring back prometheus metrics, with new metrics for replication state machine

This commit is contained in:
Christian Schwarz 2018-09-07 22:03:41 -07:00
parent ab9446137f
commit fa47667f31
12 changed files with 156 additions and 52 deletions

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/nethelpers"
"github.com/zrepl/zrepl/logger"
@ -35,6 +36,8 @@ func (j *controlJob) Name() string { return jobNameControl }
func (j *controlJob) Status() interface{} { return nil }
func (j *controlJob) RegisterMetrics(registerer prometheus.Registerer) {}
const (
ControlJobEndpointPProf string = "/debug/pprof"
ControlJobEndpointVersion string = "/version"

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/logging"
@ -59,9 +60,23 @@ func Run(conf *config.Config) error {
}
jobs.start(ctx, controlJob, true)
// start prometheus
//var promJob *prometheusJob // FIXME
//jobs.start(ctx, promJob, true)
for i, jc := range conf.Global.Monitoring {
var (
job job.Job
err error
)
switch v := jc.Ret.(type) {
case *config.PrometheusMonitoring:
job, err = newPrometheusJobFromConfig(v)
default:
return errors.Errorf("unknown monitoring job #%d (type %T)", i, v)
}
if err != nil {
return errors.Wrapf(err,"cannot build monitorin gjob #%d", i)
}
jobs.start(ctx, job, true)
}
log.Info("starting daemon")
@ -160,7 +175,9 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
s.m.Lock()
defer s.m.Unlock()
jobLog := job.GetLogger(ctx).WithField(logJobField, j.Name())
jobLog := job.GetLogger(ctx).
WithField(logJobField, j.Name()).
WithOutlet(newPrometheusLogOutlet(j.Name()), logger.Debug)
jobName := j.Name()
if !internal && IsInternalJobName(jobName) {
panic(fmt.Sprintf("internal job name used for non-internal job %s", jobName))
@ -171,6 +188,9 @@ func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
if _, ok := s.jobs[jobName]; ok {
panic(fmt.Sprintf("duplicate job name %s", jobName))
}
j.RegisterMetrics(prometheus.DefaultRegisterer)
s.jobs[jobName] = j
ctx = job.WithLogger(ctx, jobLog)
ctx, wakeupChan := job.WithWakeup(ctx)

View File

@ -3,6 +3,7 @@ package job
import (
"context"
"errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/logger"
)
@ -47,6 +48,7 @@ type Job interface {
Name() string
Run(ctx context.Context)
Status() interface{}
RegisterMetrics(registerer prometheus.Registerer)
}
func WaitWakeup(ctx context.Context) <-chan struct{} {

View File

@ -3,6 +3,7 @@ package job
import (
"context"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/connecter"
"github.com/zrepl/zrepl/daemon/filters"
@ -23,6 +24,10 @@ type Push struct {
snapper *snapper.Snapper
promRepStateSecs *prometheus.HistogramVec // labels: state
promPruneSecs *prometheus.HistogramVec // labels: prune_side
promBytesReplicated *prometheus.CounterVec // labels: filesystem
mtx sync.Mutex
replication *replication.Replication
}
@ -31,6 +36,20 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) {
j = &Push{}
j.name = in.Name
j.promRepStateSecs = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "replication",
Name: "state_time",
Help: "seconds spent during replication",
ConstLabels: prometheus.Labels{"zrepl_job":j.name},
}, []string{"state"})
j.promBytesReplicated = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "zrepl",
Subsystem: "replication",
Name: "bytes_replicated",
Help: "number of bytes replicated from sender to receiver per filesystem",
ConstLabels: prometheus.Labels{"zrepl_job":j.name},
}, []string{"filesystem"})
j.clientFactory, err = connecter.FromConfig(g, in.Connect)
if err != nil {
@ -43,7 +62,14 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) {
}
j.fsfilter = fsf
j.prunerFactory, err = pruner.NewPrunerFactory(in.Pruning)
j.promPruneSecs = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "pruning",
Name: "time",
Help: "seconds spent in pruner",
ConstLabels: prometheus.Labels{"zrepl_job":j.name},
}, []string{"prune_side"})
j.prunerFactory, err = pruner.NewPrunerFactory(in.Pruning, j.promPruneSecs)
if err != nil {
return nil, err
}
@ -55,6 +81,12 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) {
return j, nil
}
func (j *Push) RegisterMetrics(registerer prometheus.Registerer) {
registerer.MustRegister(j.promRepStateSecs)
registerer.MustRegister(j.promPruneSecs)
registerer.MustRegister(j.promBytesReplicated)
}
func (j *Push) Name() string { return j.name }
func (j *Push) Status() interface{} {
@ -118,7 +150,7 @@ func (j *Push) do(ctx context.Context) {
receiver := endpoint.NewRemote(client)
j.mtx.Lock()
j.replication = replication.NewReplication()
j.replication = replication.NewReplication(j.promRepStateSecs, j.promBytesReplicated)
j.mtx.Unlock()
log.Info("start replication")

View File

@ -4,6 +4,7 @@ import (
"context"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/serve"
@ -45,6 +46,8 @@ func (*Sink) Status() interface{} {
return nil
}
func (*Sink) RegisterMetrics(registerer prometheus.Registerer) {}
func (j *Sink) Run(ctx context.Context) {
log := GetLogger(ctx)

View File

@ -4,7 +4,9 @@ import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/zfs"
"net"
"net/http"
@ -14,37 +16,24 @@ type prometheusJob struct {
listen string
}
func newPrometheusJob(listen string) *prometheusJob {
return &prometheusJob{listen}
func newPrometheusJobFromConfig(in *config.PrometheusMonitoring) (*prometheusJob, error) {
if _, _, err := net.SplitHostPort(in.Listen); err != nil {
return nil, err
}
return &prometheusJob{in.Listen}, nil
}
var prom struct {
taskLastActiveStart *prometheus.GaugeVec
taskLastActiveDuration *prometheus.GaugeVec
taskLogEntries *prometheus.CounterVec
}
func init() {
prom.taskLastActiveStart = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "zrepl",
Subsystem: "daemon",
Name: "task_last_active_start",
Help: "point in time at which the job task last left idle state",
}, []string{"zrepl_job", "job_type", "task"})
prom.taskLastActiveDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "zrepl",
Subsystem: "daemon",
Name: "task_last_active_duration",
Help: "seconds that the last run ob a job task spent between leaving and re-entering idle state",
}, []string{"zrepl_job", "job_type", "task"})
prom.taskLogEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "zrepl",
Subsystem: "daemon",
Name: "task_log_entries",
Name: "log_entries",
Help: "number of log entries per job task and level",
}, []string{"zrepl_job", "job_type", "task", "level"})
prometheus.MustRegister(prom.taskLastActiveStart)
prometheus.MustRegister(prom.taskLastActiveDuration)
}, []string{"zrepl_job", "level"})
prometheus.MustRegister(prom.taskLogEntries)
}
@ -52,6 +41,8 @@ func (j *prometheusJob) Name() string { return jobNamePrometheus }
func (j *prometheusJob) Status() interface{} { return nil }
func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}
func (j *prometheusJob) Run(ctx context.Context) {
if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil {
@ -80,3 +71,19 @@ func (j *prometheusJob) Run(ctx context.Context) {
}
}
type prometheusJobOutlet struct {
jobName string
}
var _ logger.Outlet = prometheusJobOutlet{}
func newPrometheusLogOutlet(jobName string) prometheusJobOutlet {
return prometheusJobOutlet{jobName}
}
func (o prometheusJobOutlet) WriteEntry(entry logger.Entry) error {
prom.taskLogEntries.WithLabelValues(o.jobName, entry.Level.String()).Inc()
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/pruning"
@ -49,6 +50,7 @@ type args struct {
rules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
promPruneSecs prometheus.Observer
}
type Pruner struct {
@ -72,6 +74,7 @@ type PrunerFactory struct {
receiverRules []pruning.KeepRule
retryWait time.Duration
considerSnapAtCursorReplicated bool
promPruneSecs *prometheus.HistogramVec
}
func checkContainsKeep1(rules []pruning.KeepRule) error {
@ -87,7 +90,7 @@ func checkContainsKeep1(rules []pruning.KeepRule) error {
return errors.New("sender keep rules must contain last_n or be empty so that the last snapshot is definitely kept")
}
func NewPrunerFactory(in config.PruningSenderReceiver) (*PrunerFactory, error) {
func NewPrunerFactory(in config.PruningSenderReceiver, promPruneSecs *prometheus.HistogramVec) (*PrunerFactory, error) {
keepRulesReceiver, err := pruning.RulesFromConfig(in.KeepReceiver)
if err != nil {
return nil, errors.Wrap(err, "cannot build receiver pruning rules")
@ -111,6 +114,7 @@ func NewPrunerFactory(in config.PruningSenderReceiver) (*PrunerFactory, error) {
keepRulesReceiver,
10 * time.Second, //FIXME constant
considerSnapAtCursorReplicated,
promPruneSecs,
}
return f, nil
}
@ -124,6 +128,7 @@ func (f *PrunerFactory) BuildSenderPruner(ctx context.Context, target Target, re
f.senderRules,
f.retryWait,
f.considerSnapAtCursorReplicated,
f.promPruneSecs.WithLabelValues("sender"),
},
state: Plan,
}
@ -139,6 +144,7 @@ func (f *PrunerFactory) BuildReceiverPruner(ctx context.Context, target Target,
f.receiverRules,
f.retryWait,
false, // senseless here anyways
f.promPruneSecs.WithLabelValues("receiver"),
},
state: Plan,
}

View File

@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"io"
"net"
"sync"
@ -90,7 +91,9 @@ func (s State) fsrsf() state {
}
type Replication struct {
// lock protects all fields in this struct, but not the data behind pointers
promBytesReplicated prometheus.Counter
// lock protects all fields below it in this struct, but not the data behind pointers
lock sync.Mutex
state State
fs string
@ -120,8 +123,8 @@ type ReplicationBuilder struct {
r *Replication
}
func BuildReplication(fs string) *ReplicationBuilder {
return &ReplicationBuilder{&Replication{fs: fs}}
func BuildReplication(fs string, promBytesReplicated prometheus.Counter) *ReplicationBuilder {
return &ReplicationBuilder{&Replication{fs: fs, promBytesReplicated: promBytesReplicated}}
}
func (b *ReplicationBuilder) AddStep(from, to FilesystemVersion) *ReplicationBuilder {
@ -204,6 +207,7 @@ func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Rece
preTime := time.Now()
s = s(ctx, sender, receiver, u)
delta := time.Now().Sub(preTime)
post = u(func(f *Replication) {
if len(f.pending) == 0 {
return
@ -369,6 +373,9 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece
}
s.byteCounter = util.NewByteCounterReader(sstream)
defer func() {
s.parent.promBytesReplicated.Add(float64(s.byteCounter.Bytes()))
}()
sstream = s.byteCounter
rr := &pdu.ReceiveReq{

View File

@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"math/bits"
"sync"
"time"
@ -49,6 +50,10 @@ func (s State) rsf() state {
// It is a state machine that is driven by the Drive method
// and provides asynchronous reporting via the Report method (i.e. from another goroutine).
type Replication struct {
// not protected by lock
promSecsPerState *prometheus.HistogramVec // labels: state
promBytesReplicated *prometheus.CounterVec // labels: filesystem
// lock protects all fields of this struct (but not the fields behind pointers!)
lock sync.Mutex
@ -77,8 +82,10 @@ type Report struct {
Active *fsrep.Report
}
func NewReplication() *Replication {
func NewReplication(secsPerState *prometheus.HistogramVec, bytesReplicated *prometheus.CounterVec) *Replication {
r := Replication{
promSecsPerState: secsPerState,
promBytesReplicated: bytesReplicated,
state: Planning,
}
return &r
@ -142,6 +149,7 @@ func (r *Replication) Drive(ctx context.Context, sender Sender, receiver Receive
pre = u(nil)
s = s(ctx, sender, receiver, u)
delta := time.Now().Sub(preTime)
r.promSecsPerState.WithLabelValues(pre.String()).Observe(delta.Seconds())
post = u(nil)
getLogger(ctx).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
@ -262,7 +270,11 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda
continue
}
fsrfsm := fsrep.BuildReplication(fs.Path)
var promBytesReplicated *prometheus.CounterVec
u(func(replication *Replication) { // FIXME args struct like in pruner (also use for sender and receiver)
promBytesReplicated = replication.promBytesReplicated
})
fsrfsm := fsrep.BuildReplication(fs.Path, promBytesReplicated.WithLabelValues(fs.Path))
if len(path) == 1 {
fsrfsm.AddStep(nil, path[0])
} else {

View File

@ -4,9 +4,9 @@ import "github.com/prometheus/client_golang/prometheus"
var prom struct {
ZFSListFilesystemVersionDuration *prometheus.HistogramVec
ZFSDestroyFilesystemVersionDuration *prometheus.HistogramVec
ZFSSnapshotDuration *prometheus.HistogramVec
ZFSBookmarkDuration *prometheus.HistogramVec
ZFSDestroyDuration *prometheus.HistogramVec
}
func init() {
@ -16,12 +16,6 @@ func init() {
Name: "list_filesystem_versions_duration",
Help: "Seconds it took for listing the versions of a given filesystem",
}, []string{"filesystem"})
prom.ZFSDestroyFilesystemVersionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "zfs",
Name: "destroy_filesystem_version_duration",
Help: "Seconds it took to destroy a version of a given filesystem",
}, []string{"filesystem", "version_type"})
prom.ZFSSnapshotDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "zfs",
@ -34,20 +28,26 @@ func init() {
Name: "bookmark_duration",
Help: "Duration it took to bookmark a given snapshot",
}, []string{"filesystem"})
prom.ZFSDestroyDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "zfs",
Name: "destroy_duration",
Help: "Duration it took to destroy a dataset",
}, []string{"dataset_type", "filesystem"})
}
func PrometheusRegister(registry prometheus.Registerer) error {
if err := registry.Register(prom.ZFSListFilesystemVersionDuration); err != nil {
return err
}
if err := registry.Register(prom.ZFSDestroyFilesystemVersionDuration); err != nil {
return err
}
if err := registry.Register(prom.ZFSBookmarkDuration); err != nil {
return err
}
if err := registry.Register(prom.ZFSSnapshotDuration); err != nil {
return err
}
if err := registry.Register(prom.ZFSDestroyDuration); err != nil {
return err
}
return nil
}

View File

@ -156,9 +156,6 @@ func ZFSListFilesystemVersions(fs *DatasetPath, filter FilesystemVersionFilter)
func ZFSDestroyFilesystemVersion(filesystem *DatasetPath, version *FilesystemVersion) (err error) {
promTimer := prometheus.NewTimer(prom.ZFSDestroyFilesystemVersionDuration.WithLabelValues(filesystem.ToString(), version.Type.String()))
defer promTimer.ObserveDuration()
datasetPath := version.ToAbsPath(filesystem)
// Sanity check...

View File

@ -566,6 +566,21 @@ func zfsGet(path string, props []string, allowedSources zfsPropertySource) (*ZFS
func ZFSDestroy(dataset string) (err error) {
var dstype, filesystem string
idx := strings.IndexAny(dataset, "@#")
if idx == -1 {
dstype = "filesystem"
filesystem = dataset
} else {
switch dataset[idx] {
case '@': dstype = "snapshot"
case '#': dstype = "bookmark"
}
filesystem = dataset[:idx]
}
defer prometheus.NewTimer(prom.ZFSDestroyDuration.WithLabelValues(dstype, filesystem))
cmd := exec.Command(ZFS_BINARY, "destroy", dataset)
stderr := bytes.NewBuffer(make([]byte, 0, 1024))