diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 0003e82..842f938 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -766,21 +766,30 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io. return nil, visitErr } + log := getLogger(ctx).WithField("proto_fs", req.GetFilesystem()).WithField("local_fs", lp.ToString()) + // determine whether we need to rollback the filesystem / change its placeholder state var clearPlaceholderProperty bool var recvOpts zfs.RecvOptions ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, lp) - if err == nil && ph.FSExists && ph.IsPlaceholder { + if err != nil { + return nil, errors.Wrap(err, "cannot get placeholder state") + } + log.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state") + if ph.FSExists && ph.IsPlaceholder { recvOpts.RollbackAndForceRecv = true clearPlaceholderProperty = true } + if clearPlaceholderProperty { + log.Info("clearing placeholder property") if err := zfs.ZFSSetPlaceholder(ctx, lp, false); err != nil { return nil, fmt.Errorf("cannot clear placeholder property for forced receive: %s", err) } } if req.ClearResumeToken && ph.FSExists { + log.Info("clearing resume token") if err := zfs.ZFSRecvClearResumeToken(ctx, lp.ToString()); err != nil { return nil, errors.Wrap(err, "cannot clear resume token") } @@ -791,7 +800,7 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io. return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv") } - getLogger(ctx).Debug("acquire concurrent recv semaphore") + log.Debug("acquire concurrent recv semaphore") // TODO use try-acquire and fail with resource-exhaustion rpc status // => would require handling on the client-side // => this is a dataconn endpoint, doesn't have the status code semantics of gRPC @@ -801,14 +810,27 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io. } defer guard.Release() - getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command") + log.WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command") snapFullPath := to.FullPath(lp.ToString()) if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil { - getLogger(ctx). + log. WithError(err). WithField("opts", fmt.Sprintf("%#v", recvOpts)). Error("zfs receive failed") + + // best-effort rollback of placeholder state if the recv didn't start + _, resumableStatePresent := err.(*zfs.RecvFailedWithResumeTokenErr) + disablePlaceholderRestoration := envconst.Bool("ZREPL_ENDPOINT_DISABLE_PLACEHOLDER_RESTORATION", false) + if !disablePlaceholderRestoration && !resumableStatePresent && recvOpts.RollbackAndForceRecv && ph.FSExists && ph.IsPlaceholder && clearPlaceholderProperty { + log.Info("restoring placeholder property") + if phErr := zfs.ZFSSetPlaceholder(ctx, lp, true); phErr != nil { + log.WithError(phErr).Error("cannot restore placeholder property after failed receive, subsequent replications will likely fail with a different error") + // fallthrough + } + // fallthrough + } + return nil, err } @@ -816,13 +838,13 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io. toRecvd, err := to.ValidateExistsAndGetVersion(ctx, lp.ToString()) if err != nil { msg := "receive request's `To` version does not match what we received in the stream" - getLogger(ctx).WithError(err).WithField("snap", snapFullPath).Error(msg) - getLogger(ctx).Error("aborting recv request, but keeping received snapshot for inspection") + log.WithError(err).WithField("snap", snapFullPath).Error(msg) + log.Error("aborting recv request, but keeping received snapshot for inspection") return nil, errors.Wrap(err, msg) } if s.conf.UpdateLastReceivedHold { - getLogger(ctx).Debug("move last-received-hold") + log.Debug("move last-received-hold") if err := MoveLastReceivedHold(ctx, lp.ToString(), toRecvd, s.conf.JobID); err != nil { return nil, errors.Wrap(err, "cannot move last-received-hold") }