mirror of
https://github.com/zrepl/zrepl.git
synced 2024-12-22 15:11:16 +01:00
bump go-streamrpc to 0.2, cleanup logging
logging should be user-friendly in INFO mode
This commit is contained in:
parent
f387e23214
commit
b95e983d0d
6
Gopkg.lock
generated
6
Gopkg.lock
generated
@ -170,15 +170,15 @@
|
||||
revision = "391d2c78c8404a9683d79f75dd24ab53040f89f7"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:f80c558f761d9cbfb0cd1a317ece8b12a55ec670f8cef52acc515ab76463e6b6"
|
||||
digest = "1:23576326ee8d287fd295807d5de5db9fcc9572ec2b4123c8ec2394a683edf1e1"
|
||||
name = "github.com/problame/go-streamrpc"
|
||||
packages = [
|
||||
".",
|
||||
"internal/pdu",
|
||||
]
|
||||
pruneopts = ""
|
||||
revision = "de264a1c39cd8fc42a6b5e902c6eac7fd9683521"
|
||||
version = "0.1"
|
||||
revision = "504ffed1faf6af51c057d7b11d79e9367678c666"
|
||||
version = "v0.2"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
|
@ -56,10 +56,6 @@ ignored = [ "github.com/inconshreveable/mousetrap" ]
|
||||
name = "github.com/prometheus/client_golang"
|
||||
branch = "master"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/problame/go-streamrpc"
|
||||
version = "0.1.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/golang/protobuf"
|
||||
version = "1.2.0"
|
||||
@ -70,3 +66,7 @@ ignored = [ "github.com/inconshreveable/mousetrap" ]
|
||||
[[constraint]]
|
||||
name = "github.com/fatih/color"
|
||||
version = "1.7.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/problame/go-streamrpc"
|
||||
version = "0.2.0"
|
||||
|
@ -185,7 +185,7 @@ type requestLogger struct {
|
||||
|
||||
func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
log := l.log.WithField("method", r.Method).WithField("url", r.URL)
|
||||
log.Info("start")
|
||||
log.Debug("start")
|
||||
if l.handlerFunc != nil {
|
||||
l.handlerFunc(w, r)
|
||||
} else if l.handler != nil {
|
||||
@ -193,5 +193,5 @@ func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
} else {
|
||||
log.Error("no handler or handlerFunc configured")
|
||||
}
|
||||
log.Info("finish")
|
||||
log.Debug("finish")
|
||||
}
|
||||
|
@ -64,10 +64,7 @@ var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurabi
|
||||
RxStructuredMaxLen: 4096 * 4096,
|
||||
RxStreamMaxChunkSize: 4096 * 4096,
|
||||
TxChunkSize: 4096 * 4096,
|
||||
RxTimeout: streamrpc.Timeout{
|
||||
Progress: 10 * time.Second,
|
||||
},
|
||||
TxTimeout: streamrpc.Timeout{
|
||||
Timeout: streamrpc.Timeout{
|
||||
Progress: 10 * time.Second,
|
||||
},
|
||||
}
|
||||
|
@ -7,11 +7,11 @@ import (
|
||||
"github.com/zrepl/zrepl/config"
|
||||
"github.com/zrepl/zrepl/daemon/connecter"
|
||||
"github.com/zrepl/zrepl/daemon/filters"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/zrepl/zrepl/daemon/pruner"
|
||||
"github.com/zrepl/zrepl/endpoint"
|
||||
"github.com/zrepl/zrepl/replication"
|
||||
"sync"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
)
|
||||
|
||||
type Push struct {
|
||||
@ -66,11 +66,10 @@ func (j *Push) Run(ctx context.Context) {
|
||||
|
||||
defer log.Info("job exiting")
|
||||
|
||||
log.Debug("wait for wakeups")
|
||||
|
||||
invocationCount := 0
|
||||
outer:
|
||||
for {
|
||||
log.Info("wait for wakeups")
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.WithError(ctx.Err()).Info("context")
|
||||
@ -86,12 +85,13 @@ outer:
|
||||
func (j *Push) do(ctx context.Context) {
|
||||
|
||||
log := GetLogger(ctx)
|
||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||
|
||||
client, err := streamrpc.NewClient(j.connecter, &streamrpc.ClientConfig{STREAMRPC_CONFIG})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("cannot create streamrpc client")
|
||||
}
|
||||
defer client.Close()
|
||||
defer client.Close(ctx)
|
||||
|
||||
sender := endpoint.NewSender(j.fsfilter, filters.NewAnyFSVFilter())
|
||||
receiver := endpoint.NewRemote(client)
|
||||
@ -100,15 +100,17 @@ func (j *Push) do(ctx context.Context) {
|
||||
j.replication = replication.NewReplication()
|
||||
j.mtx.Unlock()
|
||||
|
||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||
log.Info("start replication")
|
||||
j.replication.Drive(ctx, sender, receiver)
|
||||
|
||||
// Prune sender
|
||||
senderPruner := j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
|
||||
log.Info("start pruning sender")
|
||||
psCtx := pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "sender"))
|
||||
senderPruner := j.prunerFactory.BuildSenderPruner(psCtx, sender, sender) // FIXME ctx as member
|
||||
senderPruner.Prune()
|
||||
|
||||
// Prune receiver
|
||||
receiverPruner := j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender)
|
||||
log.Info("start pruning receiver")
|
||||
prCtx := pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "receiver"))
|
||||
receiverPruner := j.prunerFactory.BuildReceiverPruner(prCtx, receiver, sender) // FIXME ctx as member
|
||||
receiverPruner.Prune()
|
||||
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ func (j *Sink) handleConnection(ctx context.Context, conn net.Conn) {
|
||||
log.WithField("addr", conn.RemoteAddr()).Info("handling connection")
|
||||
defer log.Info("finished handling connection")
|
||||
|
||||
logging.WithSubsystemLoggers(ctx, log)
|
||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||
|
||||
local, err := endpoint.NewReceiver(j.fsmap, filters.NewAnyFSVFilter())
|
||||
if err != nil {
|
||||
|
@ -28,5 +28,5 @@ func (a twoClassLogAdaptor) Errorf(fmtStr string, args ...interface{}) {
|
||||
}
|
||||
|
||||
func (a twoClassLogAdaptor) Infof(fmtStr string, args ...interface{}) {
|
||||
a.Logger.Info(fmt.Sprintf(fmtStr, args...))
|
||||
a.Logger.Debug(fmt.Sprintf(fmtStr, args...))
|
||||
}
|
||||
|
@ -18,9 +18,8 @@ func WithLogger(ctx context.Context, log Logger) context.Context {
|
||||
}
|
||||
|
||||
func getLogger(ctx context.Context) Logger {
|
||||
l, ok := ctx.Value(contextKeyLogger).(Logger)
|
||||
if !ok {
|
||||
l = logger.NewNullLogger()
|
||||
if l, ok := ctx.Value(contextKeyLogger).(Logger); ok {
|
||||
return l
|
||||
}
|
||||
return l
|
||||
return logger.NewNullLogger()
|
||||
}
|
||||
|
@ -230,6 +230,8 @@ func (e *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream
|
||||
return errors.New("receive to filesystem denied")
|
||||
}
|
||||
|
||||
getLogger(ctx).Debug("incoming Receive")
|
||||
|
||||
// create placeholder parent filesystems as appropriate
|
||||
var visitErr error
|
||||
f := zfs.NewDatasetPathForest()
|
||||
@ -276,6 +278,11 @@ func (e *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream
|
||||
getLogger(ctx).Debug("start receive command")
|
||||
|
||||
if err := zfs.ZFSRecv(lp.ToString(), sendStream, args...); err != nil {
|
||||
getLogger(ctx).
|
||||
WithError(err).
|
||||
WithField("args", args).
|
||||
Error("zfs receive failed")
|
||||
sendStream.Close()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -418,6 +425,7 @@ func (s Remote) Receive(ctx context.Context, r *pdu.ReceiveReq, sendStream io.Re
|
||||
return err
|
||||
}
|
||||
rb, rs, err := s.c.RequestReply(ctx, RPCReceive, bytes.NewBuffer(b), sendStream)
|
||||
getLogger(ctx).WithField("err", err).Debug("Remote.Receive RequestReplyReturned")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -357,7 +357,7 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece
|
||||
|
||||
sr := s.buildSendRequest(false)
|
||||
|
||||
log.WithField("request", sr).Debug("initiate send request")
|
||||
log.Debug("initiate send request")
|
||||
sres, sstream, err := sender.Send(ctx, sr)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("send request failed")
|
||||
@ -375,10 +375,13 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece
|
||||
Filesystem: fs,
|
||||
ClearResumeToken: !sres.UsedResumeToken,
|
||||
}
|
||||
log.WithField("request", rr).Debug("initiate receive request")
|
||||
log.Debug("initiate receive request")
|
||||
err = receiver.Receive(ctx, rr, sstream)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("receive request failed (might also be error on sender)")
|
||||
log.
|
||||
WithError(err).
|
||||
WithField("errType", fmt.Sprintf("%T", err)).
|
||||
Error("receive request failed (might also be error on sender)")
|
||||
sstream.Close()
|
||||
// This failure could be due to
|
||||
// - an unexpected exit of ZFS on the sending side
|
||||
@ -386,7 +389,7 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece
|
||||
// - a connectivity issue
|
||||
return updateStateError(err)
|
||||
}
|
||||
log.Info("receive finished")
|
||||
log.Debug("receive finished")
|
||||
|
||||
updateStateCompleted()
|
||||
|
||||
@ -420,7 +423,7 @@ func (s *ReplicationStep) doMarkReplicated(ctx context.Context, sender Sender) S
|
||||
return s.state
|
||||
}
|
||||
|
||||
log.Info("mark snapshot as replicated")
|
||||
log.Debug("mark snapshot as replicated")
|
||||
req := pdu.SnapshotReplicationStatusReq{
|
||||
Filesystem: s.parent.fs,
|
||||
Snapshot: s.to.GetName(),
|
||||
@ -450,7 +453,7 @@ func (s *ReplicationStep) updateSizeEstimate(ctx context.Context, sender Sender)
|
||||
|
||||
sr := s.buildSendRequest(true)
|
||||
|
||||
log.WithField("request", sr).Debug("initiate dry run send request")
|
||||
log.Debug("initiate dry run send request")
|
||||
sres, _, err := sender.Send(ctx, sr)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("dry run send request failed")
|
||||
@ -482,7 +485,7 @@ func (s *ReplicationStep) String() string {
|
||||
if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send
|
||||
return fmt.Sprintf("%s%s (full)", s.parent.fs, s.to.RelName())
|
||||
} else {
|
||||
return fmt.Sprintf("%s(%s => %s)", s.parent.fs, s.from, s.to.RelName())
|
||||
return fmt.Sprintf("%s(%s => %s)", s.parent.fs, s.from.RelName(), s.to.RelName())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,6 +147,9 @@ func (r *Replication) Drive(ctx context.Context, sender Sender, receiver Receive
|
||||
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
|
||||
WithField("duration", delta).
|
||||
Debug("main state transition")
|
||||
if post == Working && pre != post {
|
||||
getLogger(ctx).Info("start working")
|
||||
}
|
||||
}
|
||||
|
||||
getLogger(ctx).
|
||||
@ -178,6 +181,8 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda
|
||||
|
||||
log := getLogger(ctx)
|
||||
|
||||
log.Info("start planning")
|
||||
|
||||
handlePlanningError := func(err error) state {
|
||||
return u(func(r *Replication) {
|
||||
r.planningError = err
|
||||
@ -203,7 +208,7 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda
|
||||
|
||||
log := mainlog.WithField("filesystem", fs.Path)
|
||||
|
||||
log.Info("assessing filesystem")
|
||||
log.Debug("assessing filesystem")
|
||||
|
||||
sfsvs, err := sender.ListFilesystemVersions(ctx, fs.Path)
|
||||
if err != nil {
|
||||
@ -266,8 +271,10 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda
|
||||
}
|
||||
}
|
||||
qitem := fsrfsm.Done()
|
||||
|
||||
log.Debug("compute send size estimate")
|
||||
if err = qitem.UpdateSizeEsitmate(ctx, sender); err != nil {
|
||||
log.WithError(err).Error("cannot get size estimate")
|
||||
log.WithError(err).Error("error computing size estimate")
|
||||
return handlePlanningError(err)
|
||||
}
|
||||
q.Add(qitem)
|
||||
@ -284,10 +291,13 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda
|
||||
var RetrySleepDuration = 10 * time.Second // FIXME make constant onfigurable
|
||||
|
||||
func statePlanningError(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
|
||||
|
||||
sleepUntil := time.Now().Add(RetrySleepDuration)
|
||||
u(func(r *Replication) {
|
||||
r.sleepUntil = time.Now().Add(RetrySleepDuration)
|
||||
r.sleepUntil = sleepUntil
|
||||
})
|
||||
t := time.NewTimer(RetrySleepDuration)
|
||||
getLogger(ctx).WithField("until", sleepUntil).Info("retry wait after planning error")
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -328,10 +338,12 @@ func stateWorking(ctx context.Context, sender Sender, receiver Receiver, u updat
|
||||
}
|
||||
|
||||
func stateWorkingWait(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
|
||||
sleepUntil := time.Now().Add(RetrySleepDuration)
|
||||
u(func(r *Replication) {
|
||||
r.sleepUntil = time.Now().Add(RetrySleepDuration)
|
||||
r.sleepUntil = sleepUntil
|
||||
})
|
||||
t := time.NewTimer(RetrySleepDuration)
|
||||
getLogger(ctx).WithField("until", sleepUntil).Info("retry wait after send/recv error")
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
Loading…
Reference in New Issue
Block a user