mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 08:23:50 +01:00
endpoint: serialize dataset hierarchy modification within a receiver job
refs #136 refs #140
This commit is contained in:
parent
2f2e6e6a00
commit
3e71542c78
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||||
|
"github.com/zrepl/zrepl/util/chainlock"
|
||||||
"github.com/zrepl/zrepl/zfs"
|
"github.com/zrepl/zrepl/zfs"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -170,13 +171,19 @@ type FSMap interface { // FIXME unused
|
|||||||
type Receiver struct {
|
type Receiver struct {
|
||||||
rootWithoutClientComponent *zfs.DatasetPath
|
rootWithoutClientComponent *zfs.DatasetPath
|
||||||
appendClientIdentity bool
|
appendClientIdentity bool
|
||||||
|
|
||||||
|
recvParentCreationMtx *chainlock.L
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReceiver(rootDataset *zfs.DatasetPath, appendClientIdentity bool) *Receiver {
|
func NewReceiver(rootDataset *zfs.DatasetPath, appendClientIdentity bool) *Receiver {
|
||||||
if rootDataset.Length() <= 0 {
|
if rootDataset.Length() <= 0 {
|
||||||
panic(fmt.Sprintf("root dataset must not be an empty path: %v", rootDataset))
|
panic(fmt.Sprintf("root dataset must not be an empty path: %v", rootDataset))
|
||||||
}
|
}
|
||||||
return &Receiver{rootWithoutClientComponent: rootDataset.Copy(), appendClientIdentity: appendClientIdentity}
|
return &Receiver{
|
||||||
|
rootWithoutClientComponent: rootDataset.Copy(),
|
||||||
|
appendClientIdentity: appendClientIdentity,
|
||||||
|
recvParentCreationMtx: chainlock.New(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error {
|
func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error {
|
||||||
@ -323,40 +330,53 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create placeholder parent filesystems as appropriate
|
// create placeholder parent filesystems as appropriate
|
||||||
|
//
|
||||||
|
// Manipulating the ZFS dataset hierarchy must happen exclusively.
|
||||||
|
// TODO: Use fine-grained locking to allow separate clients / requests to pass
|
||||||
|
// through the following section concurrently when operating on disjoint
|
||||||
|
// ZFS dataset hierarchy subtrees.
|
||||||
var visitErr error
|
var visitErr error
|
||||||
f := zfs.NewDatasetPathForest()
|
func() {
|
||||||
f.Add(lp)
|
getLogger(ctx).Debug("begin aquire recvParentCreationMtx")
|
||||||
getLogger(ctx).Debug("begin tree-walk")
|
defer s.recvParentCreationMtx.Lock().Unlock()
|
||||||
f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) {
|
getLogger(ctx).Debug("end aquire recvParentCreationMtx")
|
||||||
if v.Path.Equal(lp) {
|
defer getLogger(ctx).Debug("release recvParentCreationMtx")
|
||||||
return false
|
|
||||||
}
|
|
||||||
ph, err := zfs.ZFSGetFilesystemPlaceholderState(v.Path)
|
|
||||||
if err != nil {
|
|
||||||
visitErr = err
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
getLogger(ctx).
|
|
||||||
WithField("fs", v.Path.ToString()).
|
|
||||||
WithField("placeholder_state", fmt.Sprintf("%#v", ph)).
|
|
||||||
Debug("placeholder state for filesystem")
|
|
||||||
|
|
||||||
if !ph.FSExists {
|
f := zfs.NewDatasetPathForest()
|
||||||
l := getLogger(ctx).WithField("placeholder_fs", v.Path)
|
f.Add(lp)
|
||||||
l.Debug("create placeholder filesystem")
|
getLogger(ctx).Debug("begin tree-walk")
|
||||||
err := zfs.ZFSCreatePlaceholderFilesystem(v.Path)
|
f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) {
|
||||||
|
if v.Path.Equal(lp) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ph, err := zfs.ZFSGetFilesystemPlaceholderState(v.Path)
|
||||||
|
getLogger(ctx).
|
||||||
|
WithField("fs", v.Path.ToString()).
|
||||||
|
WithField("placeholder_state", fmt.Sprintf("%#v", ph)).
|
||||||
|
WithField("err", fmt.Sprintf("%s", err)).
|
||||||
|
WithField("errType", fmt.Sprintf("%T", err)).
|
||||||
|
Debug("placeholder state for filesystem")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.WithError(err).Error("cannot create placeholder filesystem")
|
|
||||||
visitErr = err
|
visitErr = err
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
|
||||||
}
|
|
||||||
getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists")
|
|
||||||
return true // leave this fs as is
|
|
||||||
})
|
|
||||||
getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
|
|
||||||
|
|
||||||
|
if !ph.FSExists {
|
||||||
|
l := getLogger(ctx).WithField("placeholder_fs", v.Path)
|
||||||
|
l.Debug("create placeholder filesystem")
|
||||||
|
err := zfs.ZFSCreatePlaceholderFilesystem(v.Path)
|
||||||
|
if err != nil {
|
||||||
|
l.WithError(err).Error("cannot create placeholder filesystem")
|
||||||
|
visitErr = err
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists")
|
||||||
|
return true // leave this fs as is
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
|
||||||
if visitErr != nil {
|
if visitErr != nil {
|
||||||
return nil, visitErr
|
return nil, visitErr
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user