diff --git a/Gopkg.lock b/Gopkg.lock index 4c185ab..e6bdc39 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -170,15 +170,15 @@ revision = "391d2c78c8404a9683d79f75dd24ab53040f89f7" [[projects]] - digest = "1:f80c558f761d9cbfb0cd1a317ece8b12a55ec670f8cef52acc515ab76463e6b6" + digest = "1:23576326ee8d287fd295807d5de5db9fcc9572ec2b4123c8ec2394a683edf1e1" name = "github.com/problame/go-streamrpc" packages = [ ".", "internal/pdu", ] pruneopts = "" - revision = "de264a1c39cd8fc42a6b5e902c6eac7fd9683521" - version = "0.1" + revision = "504ffed1faf6af51c057d7b11d79e9367678c666" + version = "v0.2" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index ecb2919..49065ab 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -56,10 +56,6 @@ ignored = [ "github.com/inconshreveable/mousetrap" ] name = "github.com/prometheus/client_golang" branch = "master" -[[constraint]] - name = "github.com/problame/go-streamrpc" - version = "0.1.0" - [[constraint]] name = "github.com/golang/protobuf" version = "1.2.0" @@ -70,3 +66,7 @@ ignored = [ "github.com/inconshreveable/mousetrap" ] [[constraint]] name = "github.com/fatih/color" version = "1.7.0" + +[[constraint]] + name = "github.com/problame/go-streamrpc" + version = "0.2.0" diff --git a/daemon/control.go b/daemon/control.go index c7e1307..35a278c 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -185,7 +185,7 @@ type requestLogger struct { func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) { log := l.log.WithField("method", r.Method).WithField("url", r.URL) - log.Info("start") + log.Debug("start") if l.handlerFunc != nil { l.handlerFunc(w, r) } else if l.handler != nil { @@ -193,5 +193,5 @@ func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) { } else { log.Error("no handler or handlerFunc configured") } - log.Info("finish") + log.Debug("finish") } diff --git a/daemon/job/job.go b/daemon/job/job.go index 4ea199a..b95f852 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -64,10 +64,7 @@ var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurabi RxStructuredMaxLen: 4096 * 4096, RxStreamMaxChunkSize: 4096 * 4096, TxChunkSize: 4096 * 4096, - RxTimeout: streamrpc.Timeout{ - Progress: 10 * time.Second, - }, - TxTimeout: streamrpc.Timeout{ + Timeout: streamrpc.Timeout{ Progress: 10 * time.Second, }, } diff --git a/daemon/job/push.go b/daemon/job/push.go index 62fbb24..e4750c2 100644 --- a/daemon/job/push.go +++ b/daemon/job/push.go @@ -7,11 +7,11 @@ import ( "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/connecter" "github.com/zrepl/zrepl/daemon/filters" - "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/daemon/pruner" "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/replication" "sync" + "github.com/zrepl/zrepl/daemon/logging" ) type Push struct { @@ -66,11 +66,10 @@ func (j *Push) Run(ctx context.Context) { defer log.Info("job exiting") - log.Debug("wait for wakeups") - invocationCount := 0 outer: for { + log.Info("wait for wakeups") select { case <-ctx.Done(): log.WithError(ctx.Err()).Info("context") @@ -86,12 +85,13 @@ outer: func (j *Push) do(ctx context.Context) { log := GetLogger(ctx) + ctx = logging.WithSubsystemLoggers(ctx, log) client, err := streamrpc.NewClient(j.connecter, &streamrpc.ClientConfig{STREAMRPC_CONFIG}) if err != nil { log.WithError(err).Error("cannot create streamrpc client") } - defer client.Close() + defer client.Close(ctx) sender := endpoint.NewSender(j.fsfilter, filters.NewAnyFSVFilter()) receiver := endpoint.NewRemote(client) @@ -100,15 +100,17 @@ func (j *Push) do(ctx context.Context) { j.replication = replication.NewReplication() j.mtx.Unlock() - ctx = logging.WithSubsystemLoggers(ctx, log) + log.Info("start replication") j.replication.Drive(ctx, sender, receiver) - // Prune sender - senderPruner := j.prunerFactory.BuildSenderPruner(ctx, sender, sender) + log.Info("start pruning sender") + psCtx := pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "sender")) + senderPruner := j.prunerFactory.BuildSenderPruner(psCtx, sender, sender) // FIXME ctx as member senderPruner.Prune() - // Prune receiver - receiverPruner := j.prunerFactory.BuildReceiverPruner(ctx, receiver, sender) + log.Info("start pruning receiver") + prCtx := pruner.WithLogger(ctx, pruner.GetLogger(ctx).WithField("prune_side", "receiver")) + receiverPruner := j.prunerFactory.BuildReceiverPruner(prCtx, receiver, sender) // FIXME ctx as member receiverPruner.Prune() } diff --git a/daemon/job/sink.go b/daemon/job/sink.go index 618ced7..8c9f496 100644 --- a/daemon/job/sink.go +++ b/daemon/job/sink.go @@ -86,7 +86,7 @@ func (j *Sink) handleConnection(ctx context.Context, conn net.Conn) { log.WithField("addr", conn.RemoteAddr()).Info("handling connection") defer log.Info("finished handling connection") - logging.WithSubsystemLoggers(ctx, log) + ctx = logging.WithSubsystemLoggers(ctx, log) local, err := endpoint.NewReceiver(j.fsmap, filters.NewAnyFSVFilter()) if err != nil { diff --git a/daemon/logging/adaptors.go b/daemon/logging/adaptors.go index 7f0b21b..c5a7196 100644 --- a/daemon/logging/adaptors.go +++ b/daemon/logging/adaptors.go @@ -28,5 +28,5 @@ func (a twoClassLogAdaptor) Errorf(fmtStr string, args ...interface{}) { } func (a twoClassLogAdaptor) Infof(fmtStr string, args ...interface{}) { - a.Logger.Info(fmt.Sprintf(fmtStr, args...)) + a.Logger.Debug(fmt.Sprintf(fmtStr, args...)) } diff --git a/endpoint/context.go b/endpoint/context.go index a528930..09f9032 100644 --- a/endpoint/context.go +++ b/endpoint/context.go @@ -18,9 +18,8 @@ func WithLogger(ctx context.Context, log Logger) context.Context { } func getLogger(ctx context.Context) Logger { - l, ok := ctx.Value(contextKeyLogger).(Logger) - if !ok { - l = logger.NewNullLogger() + if l, ok := ctx.Value(contextKeyLogger).(Logger); ok { + return l } - return l + return logger.NewNullLogger() } diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 8a26730..561e509 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -230,6 +230,8 @@ func (e *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream return errors.New("receive to filesystem denied") } + getLogger(ctx).Debug("incoming Receive") + // create placeholder parent filesystems as appropriate var visitErr error f := zfs.NewDatasetPathForest() @@ -276,6 +278,11 @@ func (e *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream getLogger(ctx).Debug("start receive command") if err := zfs.ZFSRecv(lp.ToString(), sendStream, args...); err != nil { + getLogger(ctx). + WithError(err). + WithField("args", args). + Error("zfs receive failed") + sendStream.Close() return err } return nil @@ -418,6 +425,7 @@ func (s Remote) Receive(ctx context.Context, r *pdu.ReceiveReq, sendStream io.Re return err } rb, rs, err := s.c.RequestReply(ctx, RPCReceive, bytes.NewBuffer(b), sendStream) + getLogger(ctx).WithField("err", err).Debug("Remote.Receive RequestReplyReturned") if err != nil { return err } diff --git a/replication/fsrep/fsfsm.go b/replication/fsrep/fsfsm.go index df550cc..2203639 100644 --- a/replication/fsrep/fsfsm.go +++ b/replication/fsrep/fsfsm.go @@ -357,7 +357,7 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece sr := s.buildSendRequest(false) - log.WithField("request", sr).Debug("initiate send request") + log.Debug("initiate send request") sres, sstream, err := sender.Send(ctx, sr) if err != nil { log.WithError(err).Error("send request failed") @@ -375,10 +375,13 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece Filesystem: fs, ClearResumeToken: !sres.UsedResumeToken, } - log.WithField("request", rr).Debug("initiate receive request") + log.Debug("initiate receive request") err = receiver.Receive(ctx, rr, sstream) if err != nil { - log.WithError(err).Error("receive request failed (might also be error on sender)") + log. + WithError(err). + WithField("errType", fmt.Sprintf("%T", err)). + Error("receive request failed (might also be error on sender)") sstream.Close() // This failure could be due to // - an unexpected exit of ZFS on the sending side @@ -386,7 +389,7 @@ func (s *ReplicationStep) doReplication(ctx context.Context, sender Sender, rece // - a connectivity issue return updateStateError(err) } - log.Info("receive finished") + log.Debug("receive finished") updateStateCompleted() @@ -420,7 +423,7 @@ func (s *ReplicationStep) doMarkReplicated(ctx context.Context, sender Sender) S return s.state } - log.Info("mark snapshot as replicated") + log.Debug("mark snapshot as replicated") req := pdu.SnapshotReplicationStatusReq{ Filesystem: s.parent.fs, Snapshot: s.to.GetName(), @@ -450,7 +453,7 @@ func (s *ReplicationStep) updateSizeEstimate(ctx context.Context, sender Sender) sr := s.buildSendRequest(true) - log.WithField("request", sr).Debug("initiate dry run send request") + log.Debug("initiate dry run send request") sres, _, err := sender.Send(ctx, sr) if err != nil { log.WithError(err).Error("dry run send request failed") @@ -482,7 +485,7 @@ func (s *ReplicationStep) String() string { if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send return fmt.Sprintf("%s%s (full)", s.parent.fs, s.to.RelName()) } else { - return fmt.Sprintf("%s(%s => %s)", s.parent.fs, s.from, s.to.RelName()) + return fmt.Sprintf("%s(%s => %s)", s.parent.fs, s.from.RelName(), s.to.RelName()) } } diff --git a/replication/mainfsm.go b/replication/mainfsm.go index 1cae748..c829442 100644 --- a/replication/mainfsm.go +++ b/replication/mainfsm.go @@ -147,6 +147,9 @@ func (r *Replication) Drive(ctx context.Context, sender Sender, receiver Receive WithField("transition", fmt.Sprintf("%s => %s", pre, post)). WithField("duration", delta). Debug("main state transition") + if post == Working && pre != post { + getLogger(ctx).Info("start working") + } } getLogger(ctx). @@ -178,6 +181,8 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda log := getLogger(ctx) + log.Info("start planning") + handlePlanningError := func(err error) state { return u(func(r *Replication) { r.planningError = err @@ -203,7 +208,7 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda log := mainlog.WithField("filesystem", fs.Path) - log.Info("assessing filesystem") + log.Debug("assessing filesystem") sfsvs, err := sender.ListFilesystemVersions(ctx, fs.Path) if err != nil { @@ -266,8 +271,10 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda } } qitem := fsrfsm.Done() + + log.Debug("compute send size estimate") if err = qitem.UpdateSizeEsitmate(ctx, sender); err != nil { - log.WithError(err).Error("cannot get size estimate") + log.WithError(err).Error("error computing size estimate") return handlePlanningError(err) } q.Add(qitem) @@ -284,10 +291,13 @@ func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u upda var RetrySleepDuration = 10 * time.Second // FIXME make constant onfigurable func statePlanningError(ctx context.Context, sender Sender, receiver Receiver, u updater) state { + + sleepUntil := time.Now().Add(RetrySleepDuration) u(func(r *Replication) { - r.sleepUntil = time.Now().Add(RetrySleepDuration) + r.sleepUntil = sleepUntil }) t := time.NewTimer(RetrySleepDuration) + getLogger(ctx).WithField("until", sleepUntil).Info("retry wait after planning error") defer t.Stop() select { case <-ctx.Done(): @@ -328,10 +338,12 @@ func stateWorking(ctx context.Context, sender Sender, receiver Receiver, u updat } func stateWorkingWait(ctx context.Context, sender Sender, receiver Receiver, u updater) state { + sleepUntil := time.Now().Add(RetrySleepDuration) u(func(r *Replication) { - r.sleepUntil = time.Now().Add(RetrySleepDuration) + r.sleepUntil = sleepUntil }) t := time.NewTimer(RetrySleepDuration) + getLogger(ctx).WithField("until", sleepUntil).Info("retry wait after send/recv error") defer t.Stop() select { case <-ctx.Done():