cmd: prometheus job type and Task instrumentation

refs #67
This commit is contained in:
Christian Schwarz 2018-04-13 23:37:49 +02:00
parent aa3865d0a3
commit a4da029105
3 changed files with 140 additions and 0 deletions

View File

@ -0,0 +1,100 @@
package cmd
import (
"context"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/zrepl/zrepl/zfs"
"net"
"net/http"
)
type PrometheusJob struct {
Listen string
}
var prom struct {
taskLastActiveStart *prometheus.GaugeVec
taskLastActiveDuration *prometheus.GaugeVec
taskLogEntries *prometheus.CounterVec
}
func init() {
prom.taskLastActiveStart = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "zrepl",
Subsystem: "daemon",
Name: "task_last_active_start",
Help: "point in time at which the job task last left idle state",
}, []string{"zrepl_job", "job_type", "task"})
prom.taskLastActiveDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "zrepl",
Subsystem: "daemon",
Name: "task_last_active_duration",
Help: "seconds that the last run ob a job task spent between leaving and re-entering idle state",
}, []string{"zrepl_job", "job_type", "task"})
prom.taskLogEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "zrepl",
Subsystem: "daemon",
Name: "task_log_entries",
Help: "number of log entries per job task and level",
}, []string{"zrepl_job", "job_type", "task", "level"})
prometheus.MustRegister(prom.taskLastActiveStart)
prometheus.MustRegister(prom.taskLastActiveDuration)
prometheus.MustRegister(prom.taskLogEntries)
}
func parsePrometheusJob(c JobParsingContext, name string, i map[string]interface{}) (j *PrometheusJob, err error) {
var s struct {
Listen string
}
if err := mapstructure.Decode(i, &s); err != nil {
return nil, errors.Wrap(err, "mapstructure error")
}
if s.Listen == "" {
return nil, errors.New("must specify 'listen' attribute")
}
return &PrometheusJob{s.Listen}, nil
}
func (*PrometheusJob) JobName() string {
return "prometheus"
}
func (j *PrometheusJob) JobType() JobType { return JobTypePrometheus }
func (j *PrometheusJob) JobStart(ctx context.Context) {
if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil {
panic(err)
}
log := ctx.Value(contextKeyLog).(Logger)
task := NewTask("main", j, log)
log = task.Log()
l, err := net.Listen("tcp", j.Listen)
if err != nil {
log.WithError(err).Error("cannot listen")
}
go func() {
select {
case <-ctx.Done():
l.Close()
}
}()
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
err = http.Serve(l, mux)
if err != nil {
log.WithError(err).Error("error while serving")
}
}
func (*PrometheusJob) JobStatus(ctxt context.Context) (*JobStatus, error) {
return &JobStatus{}, nil
}

View File

@ -179,6 +179,8 @@ func parseJob(c JobParsingContext, i map[string]interface{}) (j Job, err error)
return parseSourceJob(c, name, i)
case JobTypeLocal:
return parseLocalJob(c, name, i)
case JobTypePrometheus:
return parsePrometheusJob(c, name, i)
default:
panic(fmt.Sprintf("implementation error: unknown job type %s", jobtype))
}

View File

@ -39,6 +39,7 @@ const (
JobTypePull JobType = "pull"
JobTypeSource JobType = "source"
JobTypeLocal JobType = "local"
JobTypePrometheus JobType = "prometheus"
JobTypeControl JobType = "control"
)
@ -50,6 +51,8 @@ func ParseUserJobType(s string) (JobType, error) {
return JobTypeSource, nil
case "local":
return JobTypeLocal, nil
case "prometheus":
return JobTypePrometheus, nil
}
return "", fmt.Errorf("unknown job type '%s'", s)
}
@ -204,6 +207,7 @@ type TaskStatus struct {
// An instance of Task tracks a single thread of activity that is part of a Job.
type Task struct {
name string // immutable
parent Job // immutable
// Stack of activities the task is currently in
@ -219,6 +223,7 @@ type Task struct {
type taskProgress struct {
rx int64
tx int64
creation time.Time
lastUpdate time.Time
logEntries []logger.Entry
mtx sync.RWMutex
@ -226,6 +231,7 @@ type taskProgress struct {
func newTaskProgress() (p *taskProgress) {
return &taskProgress{
creation: time.Now(),
logEntries: make([]logger.Entry, 0),
}
}
@ -250,6 +256,7 @@ func (p *taskProgress) DeepCopy() (out taskProgress) {
p.mtx.RLock()
defer p.mtx.RUnlock()
out.rx, out.tx = p.rx, p.tx
out.creation = p.creation
out.lastUpdate = p.lastUpdate
out.logEntries = make([]logger.Entry, len(p.logEntries))
for i := range p.logEntries {
@ -280,6 +287,7 @@ type taskActivity struct {
func NewTask(name string, parent Job, lg *logger.Logger) *Task {
t := &Task{
name: name,
parent: parent,
activities: list.New(),
}
@ -318,6 +326,13 @@ func (t *Task) Enter(activity string) {
// reset progress when leaving idle task
// we leave the old progress dangling to have the user not worry about
prev.progress = newTaskProgress()
prom.taskLastActiveStart.WithLabelValues(
t.parent.JobName(),
t.parent.JobType().String(),
t.name).
Set(float64(prev.progress.creation.UnixNano()) / 1e9)
}
act := &taskActivity{activity, false, nil, prev.progress}
t.activities.PushFront(act)
@ -391,6 +406,21 @@ func (t *Task) Finish() {
t.activities.Remove(top)
t.activitiesLastUpdate = time.Now()
// prometheus
front := t.activities.Front()
if front != nil && front == t.activities.Back() {
idleAct := front.Value.(*taskActivity)
if !idleAct.idle {
panic("inconsistent implementation")
}
progress := idleAct.progress.Read()
non_idle_time := t.activitiesLastUpdate.Sub(progress.creation) // use same time
prom.taskLastActiveDuration.WithLabelValues(
t.parent.JobName(),
t.parent.JobType().String(),
t.name).Set(non_idle_time.Seconds())
}
}
// Returns a logger derived from the logger passed to the constructor function.
@ -407,6 +437,14 @@ func (t *Task) WriteEntry(entry logger.Entry) error {
t.rwl.RLock()
defer t.rwl.RUnlock()
t.cur().progress.UpdateLogEntry(entry)
prom.taskLogEntries.WithLabelValues(
t.parent.JobName(),
t.parent.JobType().String(),
t.name,
entry.Level.String()).
Inc()
return nil
}