cmd control status + expose DaemonStatus via control API

refs #10
This commit is contained in:
Christian Schwarz 2017-12-24 15:35:12 +01:00
parent 8c7e373049
commit 14b8d69a63
4 changed files with 137 additions and 2 deletions

8
Gopkg.lock generated
View File

@ -7,6 +7,12 @@
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0"
[[projects]]
branch = "master"
name = "github.com/dustin/go-humanize"
packages = ["."]
revision = "bb3d318650d48840a39aa21a027c6630e198e626"
[[projects]]
branch = "master"
name = "github.com/ftrvxmtrx/fd"
@ -106,6 +112,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "b2174984fa0452d4469597063f891904b70081586b239262690cf1b6477a3116"
inputs-digest = "94133f88486f1d0e1eff6b87c7bb2f0b0f87e981f91d89865ec48ad74671473a"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -39,6 +39,7 @@ func (j *ControlJob) JobStatus(ctx context.Context) (*JobStatus, error) {
const (
ControlJobEndpointProfile string = "/debug/pprof/profile"
ControlJobEndpointVersion string = "/version"
ControlJobEndpointStatus string = "/status"
)
func (j *ControlJob) JobStart(ctx context.Context) {
@ -46,6 +47,8 @@ func (j *ControlJob) JobStart(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger)
defer log.Info("control job finished")
daemon := ctx.Value(contextKeyDaemon).(*Daemon)
l, err := ListenUnixPrivate(j.sockaddr)
if err != nil {
log.WithError(err).Error("error listening")
@ -58,6 +61,10 @@ func (j *ControlJob) JobStart(ctx context.Context) {
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
return NewZreplVersionInformation(), nil
}}})
mux.Handle(ControlJobEndpointStatus,
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
return daemon.Status(), nil
}}})
server := http.Server{Handler: mux}
outer:

View File

@ -5,13 +5,17 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/logger"
"io"
golog "log"
"net"
"net/http"
"net/url"
"os"
"sort"
"strings"
)
var controlCmd = &cobra.Command{
@ -34,11 +38,23 @@ var controlVersionCmd = &cobra.Command{
Run: doControLVersionCmd,
}
var controlStatusCmdArgs struct {
format string
}
var controlStatusCmd = &cobra.Command{
Use: "status",
Short: "get current status",
Run: doControlStatusCmd,
}
func init() {
RootCmd.AddCommand(controlCmd)
controlCmd.AddCommand(pprofCmd)
pprofCmd.Flags().Int64Var(&pprofCmdArgs.seconds, "seconds", 30, "seconds to profile")
controlCmd.AddCommand(controlVersionCmd)
controlCmd.AddCommand(controlStatusCmd)
controlStatusCmd.Flags().StringVar(&controlStatusCmdArgs.format, "format", "human", "output format (human|json)")
}
func controlHttpClient() (client http.Client, err error) {
@ -153,3 +169,107 @@ func doControLVersionCmd(cmd *cobra.Command, args []string) {
fmt.Println(info.String())
}
func doControlStatusCmd(cmd *cobra.Command, args []string) {
log := golog.New(os.Stderr, "", 0)
die := func() {
log.Print("exiting after error")
os.Exit(1)
}
httpc, err := controlHttpClient()
if err != nil {
log.Printf("could not connect to daemon: %s", err)
die()
}
resp, err := httpc.Get("http://unix" + ControlJobEndpointStatus)
if err != nil {
log.Printf("error: %s", err)
die()
} else if resp.StatusCode != http.StatusOK {
var msg bytes.Buffer
io.CopyN(&msg, resp.Body, 4096)
log.Printf("error: %s", msg.String())
die()
}
var status DaemonStatus
err = json.NewDecoder(resp.Body).Decode(&status)
if err != nil {
log.Printf("error unmarshaling response: %s", err)
die()
}
switch controlStatusCmdArgs.format {
case "json":
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(status); err != nil {
log.Panic(err)
}
case "human":
formatter := HumanFormatter{}
formatter.SetMetadataFlags(MetadataAll)
formatter.SetIgnoreFields([]string{
logJobField, logTaskField,
})
jobNames := make([]string, 0, len(status.Jobs))
for name, _ := range status.Jobs {
jobNames = append(jobNames, name)
}
sort.Slice(jobNames, func(i, j int) bool {
return strings.Compare(jobNames[i], jobNames[j]) == -1
})
for _, name := range jobNames {
job := status.Jobs[name]
fmt.Printf("Job '%s':\n", name)
for _, task := range job.Tasks {
var header bytes.Buffer
fmt.Fprintf(&header, " Task '%s': ", task.Name)
if !task.Idle {
fmt.Fprint(&header, strings.Join(task.ActivityStack, "."))
} else {
fmt.Fprint(&header, "<idle>")
}
fmt.Fprint(&header, " ")
if !task.Idle || task.ProgressRx != 0 || task.ProgressTx != 0 {
fmt.Fprintf(&header, "(%s / %s , Rx/Tx",
humanize.Bytes(uint64(task.ProgressRx)),
humanize.Bytes(uint64(task.ProgressTx)))
if task.Idle {
fmt.Fprint(&header, ", values from last run")
}
fmt.Fprint(&header, ")")
}
fmt.Fprint(&header, "\n")
io.Copy(os.Stdout, &header)
var logBuf bytes.Buffer
fmt.Fprint(&logBuf, "\n")
for _, e := range task.LogEntries {
formatted, err := formatter.Format(&e)
if err != nil {
panic(err)
}
fmt.Fprintf(&logBuf, " %s\n", string(formatted))
}
fmt.Fprint(&logBuf, "\n")
if task.MaxLogLevel > logger.Info {
fmt.Println(" WARNING: This task encountered problems since the last time it left idle state:")
fmt.Println(" check the logs below or your log file for more information.")
io.Copy(os.Stdout, &logBuf)
}
}
}
default:
log.Printf("invalid output format '%s'", controlStatusCmdArgs.format)
die()
}
}

View File

@ -56,6 +56,7 @@ type contextKey string
const (
contextKeyLog contextKey = contextKey("log")
contextKeyDaemon contextKey = contextKey("daemon")
)
type Daemon struct {
@ -74,6 +75,7 @@ func (d *Daemon) Loop(ctx context.Context) {
log := ctx.Value(contextKeyLog).(Logger)
ctx, cancel := context.WithCancel(ctx)
ctx = context.WithValue(ctx, contextKeyDaemon, d)
sigChan := make(chan os.Signal, 1)
finishs := make(chan Job)