diff --git a/client/status.go b/client/status.go index bd743d7..5d7a95a 100644 --- a/client/status.go +++ b/client/status.go @@ -2,12 +2,12 @@ package client import ( "fmt" - "github.com/mitchellh/mapstructure" "github.com/nsf/termbox-go" "github.com/pkg/errors" + "github.com/zrepl/yaml-config" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon" - "github.com/zrepl/zrepl/replication" + "github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/replication/fsrep" "sort" "sync" @@ -22,7 +22,7 @@ type tui struct { indent int lock sync.Mutex //For report and error - report map[string]interface{} + report map[string]job.Status err error } @@ -42,6 +42,10 @@ func (t *tui) moveLine(dl int, col int) { func (t *tui) write(text string) { for _, c := range text { + if c == '\n' { + t.newline() + continue + } termbox.SetCell(t.x, t.y, c, termbox.ColorDefault, termbox.ColorDefault) t.x += 1 } @@ -104,7 +108,7 @@ func RunStatus(flags StatusFlags, config *config.Config, args []string) error { defer termbox.Close() update := func() { - m := make(map[string]interface{}) + m := make(map[string]job.Status) err2 := jsonRequestResponse(httpc, daemon.ControlJobEndpointStatus, struct{}{}, @@ -172,7 +176,7 @@ func (t *tui) draw() { for _, k := range keys { v := t.report[k] - if len(k) == 0 || k[0] == '_' { //Internal job + if len(k) == 0 || daemon.IsInternalJobName(k) { //Internal job continue } t.setIndent(0) @@ -180,20 +184,35 @@ func (t *tui) draw() { t.printf("Job: %s", k) t.setIndent(1) t.newline() + t.printf("Type: %s", v.Type) + t.setIndent(1) + t.newline() - if v == nil { - t.printf("No report generated yet") + if v.Type != job.TypePush { + t.printf("No status representation for job type '%s', dumping as YAML", v.Type) t.newline() - continue - } - rep := replication.Report{} - err := mapstructure.Decode(v, &rep) - if err != nil { - t.printf("Failed to decode report: %s", err.Error()) + asYaml, err := yaml.Marshal(v.JobSpecific) + if err != nil { + t.printf("Error marshaling status to YAML: %s", err) + t.newline() + continue + } + t.write(string(asYaml)) t.newline() continue } + pushStatus, ok := v.JobSpecific.(*job.PushStatus) + if !ok || pushStatus == nil { + t.printf("PushStatus is null") + t.newline() + continue + } + rep := pushStatus.Replication + if rep == nil { + t.newline() + continue + } all := make([]*fsrep.Report, 0, len(rep.Completed)+len(rep.Pending) + 1) all = append(all, rep.Completed...) diff --git a/daemon/control.go b/daemon/control.go index ffacfc8..0be8a0d 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -35,7 +35,7 @@ func newControlJob(sockpath string, jobs *jobs) (j *controlJob, err error) { func (j *controlJob) Name() string { return jobNameControl } -func (j *controlJob) Status() interface{} { return nil } +func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} } var promControl struct { requestBegin *prometheus.CounterVec diff --git a/daemon/daemon.go b/daemon/daemon.go index 91cc71b..8ada3c9 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -125,13 +125,13 @@ func (s *jobs) wait() <-chan struct{} { return ch } -func (s *jobs) status() map[string]interface{} { +func (s *jobs) status() map[string]*job.Status { s.m.RLock() defer s.m.RUnlock() type res struct { name string - status interface{} + status *job.Status } var wg sync.WaitGroup c := make(chan res, len(s.jobs)) @@ -144,7 +144,7 @@ func (s *jobs) status() map[string]interface{} { } wg.Wait() close(c) - ret := make(map[string]interface{}, len(s.jobs)) + ret := make(map[string]*job.Status, len(s.jobs)) for res := range c { ret[res.name] = res.status } diff --git a/daemon/job/job.go b/daemon/job/job.go index 3e338c0..23f580d 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -2,7 +2,9 @@ package job import ( "context" + "encoding/json" "errors" + "fmt" "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/logger" ) @@ -47,10 +49,73 @@ func WithWakeup(ctx context.Context) (context.Context, WakeupFunc) { type Job interface { Name() string Run(ctx context.Context) - Status() interface{} + Status() *Status RegisterMetrics(registerer prometheus.Registerer) } +type Type string + +const ( + TypeInternal Type = "internal" + TypePush Type = "push" + TypeSink Type = "sink" +) + +type Status struct { + Type Type + JobSpecific interface{} +} + +func (s *Status) MarshalJSON() ([]byte, error) { + typeJson, err := json.Marshal(s.Type) + if err != nil { + return nil, err + } + jobJSON, err := json.Marshal(s.JobSpecific) + if err != nil { + return nil, err + } + m := map[string]json.RawMessage { + "type": typeJson, + string(s.Type): jobJSON, + } + return json.Marshal(m) +} + +func (s *Status) UnmarshalJSON(in []byte) (err error) { + var m map[string]json.RawMessage + if err := json.Unmarshal(in, &m); err != nil { + return err + } + tJSON, ok := m["type"] + if !ok { + return fmt.Errorf("field 'type' not found") + } + if err := json.Unmarshal(tJSON, &s.Type); err != nil { + return err + } + key := string(s.Type) + jobJSON, ok := m[key] + if !ok { + return fmt.Errorf("field '%s', not found", key) + } + switch s.Type { + case TypePush: + var st PushStatus + err = json.Unmarshal(jobJSON, &st) + s.JobSpecific = &st + case TypeSink: + var st SinkStatus + err = json.Unmarshal(jobJSON, &st) + s.JobSpecific = &st + case TypeInternal: + // internal jobs do not report specifics + default: + err = fmt.Errorf("unknown job type '%s'", key) + } + return err +} + func WaitWakeup(ctx context.Context) <-chan struct{} { wc, ok := ctx.Value(contextKeyWakeup).(chan struct{}) if !ok { diff --git a/daemon/job/push.go b/daemon/job/push.go index 496a44c..c77e4d3 100644 --- a/daemon/job/push.go +++ b/daemon/job/push.go @@ -89,7 +89,11 @@ func (j *Push) RegisterMetrics(registerer prometheus.Registerer) { func (j *Push) Name() string { return j.name } -func (j *Push) Status() interface{} { +type PushStatus struct { + Replication *replication.Report +} + +func (j *Push) Status() *Status { rep := func() *replication.Replication { j.mtx.Lock() defer j.mtx.Unlock() @@ -98,10 +102,12 @@ func (j *Push) Status() interface{} { } return j.replication }() + s := &PushStatus{} if rep == nil { - return nil + return &Status{Type: TypePush, JobSpecific: s} } - return rep.Report() + s.Replication = rep.Report() + return &Status{Type: TypePush, JobSpecific: s} } func (j *Push) Run(ctx context.Context) { diff --git a/daemon/job/sink.go b/daemon/job/sink.go index 627b999..0c1c114 100644 --- a/daemon/job/sink.go +++ b/daemon/job/sink.go @@ -41,9 +41,10 @@ func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) { func (j *Sink) Name() string { return j.name } -func (*Sink) Status() interface{} { - // FIXME - return nil +type SinkStatus struct {} + +func (*Sink) Status() *Status { + return &Status{Type: TypeSink} // FIXME SinkStatus } func (*Sink) RegisterMetrics(registerer prometheus.Registerer) {} diff --git a/daemon/prometheus.go b/daemon/prometheus.go index b39910c..7607b94 100644 --- a/daemon/prometheus.go +++ b/daemon/prometheus.go @@ -39,7 +39,7 @@ func init() { func (j *prometheusJob) Name() string { return jobNamePrometheus } -func (j *prometheusJob) Status() interface{} { return nil } +func (j *prometheusJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} } func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}