zrepl/cmd/replication.go

322 lines
9.4 KiB
Go
Raw Normal View History

package cmd
import (
"fmt"
"io"
2017-09-22 14:13:58 +02:00
"bytes"
"encoding/json"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/zfs"
)
type localPullACL struct{}
func (a localPullACL) Filter(p *zfs.DatasetPath) (pass bool, err error) {
return true, nil
}
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
const DEFAULT_INITIAL_REPL_POLICY = InitialReplPolicyMostRecent
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
type InitialReplPolicy string
const (
InitialReplPolicyMostRecent InitialReplPolicy = "most_recent"
InitialReplPolicyAll InitialReplPolicy = "all"
)
type Puller struct {
task *Task
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
Remote rpc.RPCClient
Mapping DatasetMapping
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
InitialReplPolicy InitialReplPolicy
}
type remoteLocalMapping struct {
Remote *zfs.DatasetPath
Local *zfs.DatasetPath
}
func (p *Puller) getRemoteFilesystems() (rfs []*zfs.DatasetPath, ok bool) {
p.task.Enter("fetch_remote_fs_list")
defer p.task.Finish()
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
fsr := FilesystemRequest{}
if err := p.Remote.Call("FilesystemRequest", &fsr, &rfs); err != nil {
p.task.Log().WithError(err).Error("cannot fetch remote filesystem list")
return nil, false
}
return rfs, true
}
func (p *Puller) buildReplMapping(remoteFilesystems []*zfs.DatasetPath) (replMapping map[string]remoteLocalMapping, ok bool) {
p.task.Enter("build_repl_mapping")
defer p.task.Finish()
replMapping = make(map[string]remoteLocalMapping, len(remoteFilesystems))
for fs := range remoteFilesystems {
var err error
var localFs *zfs.DatasetPath
localFs, err = p.Mapping.Map(remoteFilesystems[fs])
if err != nil {
2017-09-02 12:40:22 +02:00
err := fmt.Errorf("error mapping %s: %s", remoteFilesystems[fs], err)
p.task.Log().WithError(err).WithField(logMapFromField, remoteFilesystems[fs]).Error("cannot map")
return nil, false
2017-09-02 12:40:22 +02:00
}
if localFs == nil {
continue
}
p.task.Log().WithField(logMapFromField, remoteFilesystems[fs].ToString()).
WithField(logMapToField, localFs.ToString()).Debug("mapping")
m := remoteLocalMapping{remoteFilesystems[fs], localFs}
replMapping[m.Local.ToString()] = m
}
return replMapping, true
}
// returns true if the receiving filesystem (local side) exists and can have child filesystems
func (p *Puller) replFilesystem(m remoteLocalMapping, localFilesystemState map[string]zfs.FilesystemState) (localExists bool) {
p.task.Enter("repl_fs")
defer p.task.Finish()
var err error
remote := p.Remote
log := p.task.Log().
WithField(logMapFromField, m.Remote.ToString()).
WithField(logMapToField, m.Local.ToString())
log.Debug("examining local filesystem state")
localState, localExists := localFilesystemState[m.Local.ToString()]
var versions []zfs.FilesystemVersion
switch {
case !localExists:
log.Info("local filesystem does not exist")
case localState.Placeholder:
log.Info("local filesystem is marked as placeholder")
default:
log.Debug("local filesystem exists")
log.Debug("requesting local filesystem versions")
if versions, err = zfs.ZFSListFilesystemVersions(m.Local, nil); err != nil {
log.WithError(err).Error("cannot get local filesystem versions")
return false
}
}
log.Info("requesting remote filesystem versions")
r := FilesystemVersionsRequest{
Filesystem: m.Remote,
}
var theirVersions []zfs.FilesystemVersion
if err = remote.Call("FilesystemVersionsRequest", &r, &theirVersions); err != nil {
log.WithError(err).Error("cannot get remote filesystem versions")
log.Warn("stopping replication for all filesystems mapped as children of receiving filesystem")
return false
}
log.Debug("computing diff between remote and local filesystem versions")
diff := zfs.MakeFilesystemDiff(versions, theirVersions)
log.WithField("diff", diff).Debug("diff between local and remote filesystem")
2017-09-22 14:13:58 +02:00
if localState.Placeholder && diff.Conflict != zfs.ConflictAllRight {
panic("internal inconsistency: local placeholder implies ConflictAllRight")
}
switch diff.Conflict {
case zfs.ConflictAllRight:
log.WithField("replication_policy", p.InitialReplPolicy).Info("performing initial sync, following policy")
if p.InitialReplPolicy != InitialReplPolicyMostRecent {
panic(fmt.Sprintf("policy '%s' not implemented", p.InitialReplPolicy))
}
snapsOnly := make([]zfs.FilesystemVersion, 0, len(diff.MRCAPathRight))
for s := range diff.MRCAPathRight {
if diff.MRCAPathRight[s].Type == zfs.Snapshot {
snapsOnly = append(snapsOnly, diff.MRCAPathRight[s])
}
}
if len(snapsOnly) < 1 {
log.Warn("cannot perform initial sync: no remote snapshots")
return false
}
r := InitialTransferRequest{
Filesystem: m.Remote,
FilesystemVersion: snapsOnly[len(snapsOnly)-1],
}
log.WithField("version", r.FilesystemVersion).Debug("requesting snapshot stream")
var stream io.Reader
if err = remote.Call("InitialTransferRequest", &r, &stream); err != nil {
log.WithError(err).Error("cannot request initial transfer")
return false
}
log.Debug("received initial transfer request response")
log.Debug("invoke zfs receive")
recvArgs := []string{"-u"}
if localState.Placeholder {
log.Info("receive with forced rollback to replace placeholder filesystem")
recvArgs = append(recvArgs, "-F")
}
progressStream := p.task.ProgressUpdater(stream)
if err = zfs.ZFSRecv(m.Local, progressStream, recvArgs...); err != nil {
log.WithError(err).Error("cannot receive stream")
return false
}
log.Info("finished receiving stream") // TODO rx delta
// TODO unify with recv path of ConflictIncremental
log.Debug("configuring properties of received filesystem")
if err = zfs.ZFSSet(m.Local, "readonly", "on"); err != nil {
log.WithError(err).Error("cannot set readonly property")
}
log.Info("finished initial transfer")
return true
case zfs.ConflictIncremental:
if len(diff.IncrementalPath) < 2 {
log.Info("remote and local are in sync")
return true
}
log.Info("following incremental path from diff")
for i := 0; i < len(diff.IncrementalPath)-1; i++ {
from, to := diff.IncrementalPath[i], diff.IncrementalPath[i+1]
log, _ := log.WithField(logIncFromField, from.Name).WithField(logIncToField, to.Name), 0
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
log.Debug("requesting incremental snapshot stream")
r := IncrementalTransferRequest{
Filesystem: m.Remote,
From: from,
To: to,
}
var stream io.Reader
if err = remote.Call("IncrementalTransferRequest", &r, &stream); err != nil {
log.WithError(err).Error("cannot request incremental snapshot stream")
return false
}
log.Debug("invoking zfs receive")
progressStream := p.task.ProgressUpdater(stream)
// TODO protect against malicious incremental stream
if err = zfs.ZFSRecv(m.Local, progressStream); err != nil {
log.WithError(err).Error("cannot receive stream")
return false
}
log.Info("finished incremental transfer") // TODO increment rx
}
log.Info("finished following incremental path") // TODO path rx
return true
case zfs.ConflictNoCommonAncestor:
fallthrough
case zfs.ConflictDiverged:
var jsonDiff bytes.Buffer
if err := json.NewEncoder(&jsonDiff).Encode(diff); err != nil {
log.WithError(err).Error("cannot JSON-encode diff")
return false
}
var problem, resolution string
switch diff.Conflict {
case zfs.ConflictNoCommonAncestor:
problem = "remote and local filesystem have snapshots, but no common one"
resolution = "perform manual establish a common snapshot history"
case zfs.ConflictDiverged:
problem = "remote and local filesystem share a history but have diverged"
resolution = "perform manual replication or delete snapshots on the receiving" +
"side to establish an incremental replication parse"
}
log.WithField("diff", jsonDiff.String()).
WithField("problem", problem).
WithField("resolution", resolution).
Error("manual conflict resolution required")
return false
}
panic("should not be reached")
}
func (p *Puller) Pull() {
p.task.Enter("run")
defer p.task.Finish()
p.task.Log().Info("request remote filesystem list")
remoteFilesystems, ok := p.getRemoteFilesystems()
if !ok {
return
}
p.task.Log().Debug("map remote filesystems to local paths and determine order for per-filesystem sync")
replMapping, ok := p.buildReplMapping(remoteFilesystems)
if !ok {
}
p.task.Log().Debug("build cache for already present local filesystem state")
p.task.Enter("cache_local_fs_state")
localFilesystemState, err := zfs.ZFSListFilesystemState()
p.task.Finish()
if err != nil {
p.task.Log().WithError(err).Error("cannot request local filesystem state")
return
}
localTraversal := zfs.NewDatasetPathForest()
for _, m := range replMapping {
localTraversal.Add(m.Local)
}
p.task.Log().Info("start per-filesystem sync")
localTraversal.WalkTopDown(func(v zfs.DatasetPathVisit) bool {
2017-09-22 14:13:58 +02:00
p.task.Enter("tree_walk")
defer p.task.Finish()
2017-09-22 14:13:58 +02:00
log := p.task.Log().WithField(logFSField, v.Path.ToString())
if v.FilledIn {
if _, exists := localFilesystemState[v.Path.ToString()]; exists {
// No need to verify if this is a placeholder or not. It is sufficient
// to know we can add child filesystems to it
return true
}
log.Debug("create placeholder filesystem")
p.task.Enter("create_placeholder")
err = zfs.ZFSCreatePlaceholderFilesystem(v.Path)
p.task.Finish()
if err != nil {
log.Error("cannot create placeholder filesystem")
return false
}
return true
}
m, ok := replMapping[v.Path.ToString()]
if !ok {
panic("internal inconsistency: replMapping should contain mapping for any path that was not filled in by WalkTopDown()")
}
return p.replFilesystem(m, localFilesystemState)
})
return
}