mirror of
https://github.com/zrepl/zrepl.git
synced 2025-06-20 01:37:45 +02:00
parent
2716c75ad5
commit
583a63a68f
@ -32,17 +32,6 @@ func (j *ControlJob) JobName() string {
|
|||||||
return j.Name
|
return j.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *ControlJob) EndpointVersion(w http.ResponseWriter, r *http.Request) {
|
|
||||||
var buf bytes.Buffer
|
|
||||||
err := json.NewEncoder(&buf).Encode(NewZreplVersionInformation())
|
|
||||||
if err != nil {
|
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
|
||||||
io.WriteString(w, err.Error())
|
|
||||||
} else {
|
|
||||||
io.Copy(w, &buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ControlJobEndpointProfile string = "/debug/pprof/profile"
|
ControlJobEndpointProfile string = "/debug/pprof/profile"
|
||||||
ControlJobEndpointVersion string = "/version"
|
ControlJobEndpointVersion string = "/version"
|
||||||
@ -60,8 +49,11 @@ func (j *ControlJob) JobStart(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.Handle(ControlJobEndpointProfile, requestLogger{log, pprof.Profile})
|
mux.Handle(ControlJobEndpointProfile, requestLogger{log: log, handlerFunc: pprof.Profile})
|
||||||
mux.Handle(ControlJobEndpointVersion, requestLogger{log, j.EndpointVersion})
|
mux.Handle(ControlJobEndpointVersion,
|
||||||
|
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
|
||||||
|
return NewZreplVersionInformation(), nil
|
||||||
|
}}})
|
||||||
server := http.Server{Handler: mux}
|
server := http.Server{Handler: mux}
|
||||||
|
|
||||||
outer:
|
outer:
|
||||||
@ -89,14 +81,42 @@ outer:
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type jsonResponder struct {
|
||||||
|
producer func() (interface{}, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j jsonResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
res, err := j.producer()
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
io.WriteString(w, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var buf bytes.Buffer
|
||||||
|
err = json.NewEncoder(&buf).Encode(res)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
io.WriteString(w, err.Error())
|
||||||
|
} else {
|
||||||
|
io.Copy(w, &buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type requestLogger struct {
|
type requestLogger struct {
|
||||||
log Logger
|
log Logger
|
||||||
handlerFunc func(w http.ResponseWriter, r *http.Request)
|
handler http.Handler
|
||||||
|
handlerFunc http.HandlerFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
log := l.log.WithField("method", r.Method).WithField("url", r.URL)
|
log := l.log.WithField("method", r.Method).WithField("url", r.URL)
|
||||||
log.Info("start")
|
log.Info("start")
|
||||||
|
if l.handlerFunc != nil {
|
||||||
l.handlerFunc(w, r)
|
l.handlerFunc(w, r)
|
||||||
|
} else if l.handler != nil {
|
||||||
|
l.handler.ServeHTTP(w, r)
|
||||||
|
} else {
|
||||||
|
log.Error("no handler or handlerFunc configured")
|
||||||
|
}
|
||||||
log.Info("finish")
|
log.Info("finish")
|
||||||
}
|
}
|
||||||
|
@ -137,8 +137,8 @@ outer:
|
|||||||
{
|
{
|
||||||
log := pullCtx.Value(contextKeyLog).(Logger)
|
log := pullCtx.Value(contextKeyLog).(Logger)
|
||||||
log.Debug("replicating from lhs to rhs")
|
log.Debug("replicating from lhs to rhs")
|
||||||
err := doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy})
|
puller := Puller{local, log, j.Mapping, j.InitialReplPolicy}
|
||||||
if err != nil {
|
if err := puller.doPull(); err != nil {
|
||||||
log.WithError(err).Error("error replicating lhs to rhs")
|
log.WithError(err).Error("error replicating lhs to rhs")
|
||||||
}
|
}
|
||||||
// use a ctx as soon as doPull gains ctx support
|
// use a ctx as soon as doPull gains ctx support
|
||||||
|
@ -120,8 +120,8 @@ start:
|
|||||||
log.Info("starting pull")
|
log.Info("starting pull")
|
||||||
|
|
||||||
pullLog := log.WithField(logTaskField, "pull")
|
pullLog := log.WithField(logTaskField, "pull")
|
||||||
err = doPull(PullContext{client, pullLog, j.Mapping, j.InitialReplPolicy})
|
puller := Puller{client, pullLog, j.Mapping, j.InitialReplPolicy}
|
||||||
if err != nil {
|
if err = puller.doPull(); err != nil {
|
||||||
log.WithError(err).Error("error doing pull")
|
log.WithError(err).Error("error doing pull")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,14 +52,14 @@ func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
type PullContext struct {
|
type Puller struct {
|
||||||
Remote rpc.RPCClient
|
Remote rpc.RPCClient
|
||||||
Log Logger
|
Log Logger
|
||||||
Mapping DatasetMapping
|
Mapping DatasetMapping
|
||||||
InitialReplPolicy InitialReplPolicy
|
InitialReplPolicy InitialReplPolicy
|
||||||
}
|
}
|
||||||
|
|
||||||
func doPull(pull PullContext) (err error) {
|
func (pull *Puller) doPull() (err error) {
|
||||||
|
|
||||||
remote := pull.Remote
|
remote := pull.Remote
|
||||||
log := pull.Log
|
log := pull.Log
|
||||||
|
Loading…
x
Reference in New Issue
Block a user