mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-25 18:04:58 +01:00
endpoint: Receiver.Receive: better logging + placeholder state error early exit
This commit is contained in:
parent
5aaac49382
commit
347d0f1aa2
@ -766,21 +766,30 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
|
|||||||
return nil, visitErr
|
return nil, visitErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log := getLogger(ctx).WithField("proto_fs", req.GetFilesystem()).WithField("local_fs", lp.ToString())
|
||||||
|
|
||||||
// determine whether we need to rollback the filesystem / change its placeholder state
|
// determine whether we need to rollback the filesystem / change its placeholder state
|
||||||
var clearPlaceholderProperty bool
|
var clearPlaceholderProperty bool
|
||||||
var recvOpts zfs.RecvOptions
|
var recvOpts zfs.RecvOptions
|
||||||
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, lp)
|
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, lp)
|
||||||
if err == nil && ph.FSExists && ph.IsPlaceholder {
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "cannot get placeholder state")
|
||||||
|
}
|
||||||
|
log.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
|
||||||
|
if ph.FSExists && ph.IsPlaceholder {
|
||||||
recvOpts.RollbackAndForceRecv = true
|
recvOpts.RollbackAndForceRecv = true
|
||||||
clearPlaceholderProperty = true
|
clearPlaceholderProperty = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if clearPlaceholderProperty {
|
if clearPlaceholderProperty {
|
||||||
|
log.Info("clearing placeholder property")
|
||||||
if err := zfs.ZFSSetPlaceholder(ctx, lp, false); err != nil {
|
if err := zfs.ZFSSetPlaceholder(ctx, lp, false); err != nil {
|
||||||
return nil, fmt.Errorf("cannot clear placeholder property for forced receive: %s", err)
|
return nil, fmt.Errorf("cannot clear placeholder property for forced receive: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.ClearResumeToken && ph.FSExists {
|
if req.ClearResumeToken && ph.FSExists {
|
||||||
|
log.Info("clearing resume token")
|
||||||
if err := zfs.ZFSRecvClearResumeToken(ctx, lp.ToString()); err != nil {
|
if err := zfs.ZFSRecvClearResumeToken(ctx, lp.ToString()); err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot clear resume token")
|
return nil, errors.Wrap(err, "cannot clear resume token")
|
||||||
}
|
}
|
||||||
@ -791,7 +800,7 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
|
|||||||
return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
|
return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
|
||||||
}
|
}
|
||||||
|
|
||||||
getLogger(ctx).Debug("acquire concurrent recv semaphore")
|
log.Debug("acquire concurrent recv semaphore")
|
||||||
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
||||||
// => would require handling on the client-side
|
// => would require handling on the client-side
|
||||||
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
|
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
|
||||||
@ -801,14 +810,27 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
|
|||||||
}
|
}
|
||||||
defer guard.Release()
|
defer guard.Release()
|
||||||
|
|
||||||
getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
log.WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
||||||
|
|
||||||
snapFullPath := to.FullPath(lp.ToString())
|
snapFullPath := to.FullPath(lp.ToString())
|
||||||
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil {
|
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil {
|
||||||
getLogger(ctx).
|
log.
|
||||||
WithError(err).
|
WithError(err).
|
||||||
WithField("opts", fmt.Sprintf("%#v", recvOpts)).
|
WithField("opts", fmt.Sprintf("%#v", recvOpts)).
|
||||||
Error("zfs receive failed")
|
Error("zfs receive failed")
|
||||||
|
|
||||||
|
// best-effort rollback of placeholder state if the recv didn't start
|
||||||
|
_, resumableStatePresent := err.(*zfs.RecvFailedWithResumeTokenErr)
|
||||||
|
disablePlaceholderRestoration := envconst.Bool("ZREPL_ENDPOINT_DISABLE_PLACEHOLDER_RESTORATION", false)
|
||||||
|
if !disablePlaceholderRestoration && !resumableStatePresent && recvOpts.RollbackAndForceRecv && ph.FSExists && ph.IsPlaceholder && clearPlaceholderProperty {
|
||||||
|
log.Info("restoring placeholder property")
|
||||||
|
if phErr := zfs.ZFSSetPlaceholder(ctx, lp, true); phErr != nil {
|
||||||
|
log.WithError(phErr).Error("cannot restore placeholder property after failed receive, subsequent replications will likely fail with a different error")
|
||||||
|
// fallthrough
|
||||||
|
}
|
||||||
|
// fallthrough
|
||||||
|
}
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -816,13 +838,13 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
|
|||||||
toRecvd, err := to.ValidateExistsAndGetVersion(ctx, lp.ToString())
|
toRecvd, err := to.ValidateExistsAndGetVersion(ctx, lp.ToString())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := "receive request's `To` version does not match what we received in the stream"
|
msg := "receive request's `To` version does not match what we received in the stream"
|
||||||
getLogger(ctx).WithError(err).WithField("snap", snapFullPath).Error(msg)
|
log.WithError(err).WithField("snap", snapFullPath).Error(msg)
|
||||||
getLogger(ctx).Error("aborting recv request, but keeping received snapshot for inspection")
|
log.Error("aborting recv request, but keeping received snapshot for inspection")
|
||||||
return nil, errors.Wrap(err, msg)
|
return nil, errors.Wrap(err, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.conf.UpdateLastReceivedHold {
|
if s.conf.UpdateLastReceivedHold {
|
||||||
getLogger(ctx).Debug("move last-received-hold")
|
log.Debug("move last-received-hold")
|
||||||
if err := MoveLastReceivedHold(ctx, lp.ToString(), toRecvd, s.conf.JobID); err != nil {
|
if err := MoveLastReceivedHold(ctx, lp.ToString(), toRecvd, s.conf.JobID); err != nil {
|
||||||
return nil, errors.Wrap(err, "cannot move last-received-hold")
|
return nil, errors.Wrap(err, "cannot move last-received-hold")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user