status: infra for reporting jobs instead of just replication.Report

This commit is contained in:
Christian Schwarz 2018-09-23 21:08:03 +02:00
parent 4a6160baf3
commit 9446b51a1f
7 changed files with 116 additions and 25 deletions

View File

@ -2,12 +2,12 @@ package client
import ( import (
"fmt" "fmt"
"github.com/mitchellh/mapstructure"
"github.com/nsf/termbox-go" "github.com/nsf/termbox-go"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/yaml-config"
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon" "github.com/zrepl/zrepl/daemon"
"github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/replication/fsrep" "github.com/zrepl/zrepl/replication/fsrep"
"sort" "sort"
"sync" "sync"
@ -22,7 +22,7 @@ type tui struct {
indent int indent int
lock sync.Mutex //For report and error lock sync.Mutex //For report and error
report map[string]interface{} report map[string]job.Status
err error err error
} }
@ -42,6 +42,10 @@ func (t *tui) moveLine(dl int, col int) {
func (t *tui) write(text string) { func (t *tui) write(text string) {
for _, c := range text { for _, c := range text {
if c == '\n' {
t.newline()
continue
}
termbox.SetCell(t.x, t.y, c, termbox.ColorDefault, termbox.ColorDefault) termbox.SetCell(t.x, t.y, c, termbox.ColorDefault, termbox.ColorDefault)
t.x += 1 t.x += 1
} }
@ -104,7 +108,7 @@ func RunStatus(flags StatusFlags, config *config.Config, args []string) error {
defer termbox.Close() defer termbox.Close()
update := func() { update := func() {
m := make(map[string]interface{}) m := make(map[string]job.Status)
err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus, err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus,
struct{}{}, struct{}{},
@ -172,7 +176,7 @@ func (t *tui) draw() {
for _, k := range keys { for _, k := range keys {
v := t.report[k] v := t.report[k]
if len(k) == 0 || k[0] == '_' { //Internal job if len(k) == 0 || daemon.IsInternalJobName(k) { //Internal job
continue continue
} }
t.setIndent(0) t.setIndent(0)
@ -180,20 +184,35 @@ func (t *tui) draw() {
t.printf("Job: %s", k) t.printf("Job: %s", k)
t.setIndent(1) t.setIndent(1)
t.newline() t.newline()
t.printf("Type: %s", v.Type)
if v == nil { t.setIndent(1)
t.printf("No report generated yet")
t.newline() t.newline()
continue
} if v.Type != job.TypePush {
rep := replication.Report{} t.printf("No status representation for job type '%s', dumping as YAML", v.Type)
err := mapstructure.Decode(v, &rep) t.newline()
asYaml, err := yaml.Marshal(v.JobSpecific)
if err != nil { if err != nil {
t.printf("Failed to decode report: %s", err.Error()) t.printf("Error marshaling status to YAML: %s", err)
t.newline()
continue
}
t.write(string(asYaml))
t.newline() t.newline()
continue continue
} }
pushStatus, ok := v.JobSpecific.(*job.PushStatus)
if !ok || pushStatus == nil {
t.printf("PushStatus is null")
t.newline()
continue
}
rep := pushStatus.Replication
if rep == nil {
t.newline()
continue
}
all := make([]*fsrep.Report, 0, len(rep.Completed)+len(rep.Pending) + 1) all := make([]*fsrep.Report, 0, len(rep.Completed)+len(rep.Pending) + 1)
all = append(all, rep.Completed...) all = append(all, rep.Completed...)

View File

@ -35,7 +35,7 @@ func newControlJob(sockpath string, jobs *jobs) (j *controlJob, err error) {
func (j *controlJob) Name() string { return jobNameControl } func (j *controlJob) Name() string { return jobNameControl }
func (j *controlJob) Status() interface{} { return nil } func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
var promControl struct { var promControl struct {
requestBegin *prometheus.CounterVec requestBegin *prometheus.CounterVec

View File

@ -125,13 +125,13 @@ func (s *jobs) wait() <-chan struct{} {
return ch return ch
} }
func (s *jobs) status() map[string]interface{} { func (s *jobs) status() map[string]*job.Status {
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() defer s.m.RUnlock()
type res struct { type res struct {
name string name string
status interface{} status *job.Status
} }
var wg sync.WaitGroup var wg sync.WaitGroup
c := make(chan res, len(s.jobs)) c := make(chan res, len(s.jobs))
@ -144,7 +144,7 @@ func (s *jobs) status() map[string]interface{} {
} }
wg.Wait() wg.Wait()
close(c) close(c)
ret := make(map[string]interface{}, len(s.jobs)) ret := make(map[string]*job.Status, len(s.jobs))
for res := range c { for res := range c {
ret[res.name] = res.status ret[res.name] = res.status
} }

View File

@ -2,7 +2,9 @@ package job
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/logger"
) )
@ -47,10 +49,73 @@ func WithWakeup(ctx context.Context) (context.Context, WakeupFunc) {
type Job interface { type Job interface {
Name() string Name() string
Run(ctx context.Context) Run(ctx context.Context)
Status() interface{} Status() *Status
RegisterMetrics(registerer prometheus.Registerer) RegisterMetrics(registerer prometheus.Registerer)
} }
type Type string
const (
TypeInternal Type = "internal"
TypePush Type = "push"
TypeSink Type = "sink"
)
type Status struct {
Type Type
JobSpecific interface{}
}
func (s *Status) MarshalJSON() ([]byte, error) {
typeJson, err := json.Marshal(s.Type)
if err != nil {
return nil, err
}
jobJSON, err := json.Marshal(s.JobSpecific)
if err != nil {
return nil, err
}
m := map[string]json.RawMessage {
"type": typeJson,
string(s.Type): jobJSON,
}
return json.Marshal(m)
}
func (s *Status) UnmarshalJSON(in []byte) (err error) {
var m map[string]json.RawMessage
if err := json.Unmarshal(in, &m); err != nil {
return err
}
tJSON, ok := m["type"]
if !ok {
return fmt.Errorf("field 'type' not found")
}
if err := json.Unmarshal(tJSON, &s.Type); err != nil {
return err
}
key := string(s.Type)
jobJSON, ok := m[key]
if !ok {
return fmt.Errorf("field '%s', not found", key)
}
switch s.Type {
case TypePush:
var st PushStatus
err = json.Unmarshal(jobJSON, &st)
s.JobSpecific = &st
case TypeSink:
var st SinkStatus
err = json.Unmarshal(jobJSON, &st)
s.JobSpecific = &st
case TypeInternal:
// internal jobs do not report specifics
default:
err = fmt.Errorf("unknown job type '%s'", key)
}
return err
}
func WaitWakeup(ctx context.Context) <-chan struct{} { func WaitWakeup(ctx context.Context) <-chan struct{} {
wc, ok := ctx.Value(contextKeyWakeup).(chan struct{}) wc, ok := ctx.Value(contextKeyWakeup).(chan struct{})
if !ok { if !ok {

View File

@ -89,7 +89,11 @@ func (j *Push) RegisterMetrics(registerer prometheus.Registerer) {
func (j *Push) Name() string { return j.name } func (j *Push) Name() string { return j.name }
func (j *Push) Status() interface{} { type PushStatus struct {
Replication *replication.Report
}
func (j *Push) Status() *Status {
rep := func() *replication.Replication { rep := func() *replication.Replication {
j.mtx.Lock() j.mtx.Lock()
defer j.mtx.Unlock() defer j.mtx.Unlock()
@ -98,10 +102,12 @@ func (j *Push) Status() interface{} {
} }
return j.replication return j.replication
}() }()
s := &PushStatus{}
if rep == nil { if rep == nil {
return nil return &Status{Type: TypePush, JobSpecific: s}
} }
return rep.Report() s.Replication = rep.Report()
return &Status{Type: TypePush, JobSpecific: s}
} }
func (j *Push) Run(ctx context.Context) { func (j *Push) Run(ctx context.Context) {

View File

@ -41,9 +41,10 @@ func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) {
func (j *Sink) Name() string { return j.name } func (j *Sink) Name() string { return j.name }
func (*Sink) Status() interface{} { type SinkStatus struct {}
// FIXME
return nil func (*Sink) Status() *Status {
return &Status{Type: TypeSink} // FIXME SinkStatus
} }
func (*Sink) RegisterMetrics(registerer prometheus.Registerer) {} func (*Sink) RegisterMetrics(registerer prometheus.Registerer) {}

View File

@ -39,7 +39,7 @@ func init() {
func (j *prometheusJob) Name() string { return jobNamePrometheus } func (j *prometheusJob) Name() string { return jobNamePrometheus }
func (j *prometheusJob) Status() interface{} { return nil } func (j *prometheusJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {} func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}