zrepl/cmd/endpoint/endpoint.go

433 lines
11 KiB
Go
Raw Normal View History

package endpoint
import (
"fmt"
"github.com/zrepl/zrepl/replication/pdu"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/zfs"
"io"
"github.com/pkg/errors"
"github.com/golang/protobuf/proto"
2017-09-22 14:13:58 +02:00
"bytes"
"context"
"github.com/zrepl/zrepl/replication"
)
// FIXME: remove this
reimplement io.ReadWriteCloser based RPC mechanism The existing ByteStreamRPC requires writing RPC stub + server code for each RPC endpoint. Does not scale well. Goal: adding a new RPC call should - not require writing an RPC stub / handler - not require modifications to the RPC lib The wire format is inspired by HTTP2, the API by net/rpc. Frames are used for framing messages, i.e. a message is made of multiple frames which are glued together using a frame-bridging reader / writer. This roughly corresponds to HTTP2 streams, although we're happy with just one stream at any time and the resulting non-need for flow control, etc. Frames are typed using a header. The two most important types are 'Header' and 'Data'. The RPC protocol is built on top of this: - Client sends a header => multiple frames of type 'header' - Client sends request body => mulitiple frames of type 'data' - Server reads a header => multiple frames of type 'header' - Server reads request body => mulitiple frames of type 'data' - Server sends response header => ... - Server sends response body => ... An RPC header is serialized JSON and always the same structure. The body is of the type specified in the header. The RPC server and client use some semi-fancy reflection tequniques to automatically infer the data type of the request/response body based on the method signature of the server handler; or the client parameters, respectively. This boils down to a special-case for io.Reader, which are just dumped into a series of data frames as efficiently as possible. All other types are (de)serialized using encoding/json. The RPC layer and Frame Layer log some arbitrary messages that proved useful during debugging. By default, they log to a non-logger, which should not have a big impact on performance. pprof analysis shows the implementation spends its CPU time 60% waiting for syscalls 30% in memmove 10% ... On a Intel(R) Core(TM) i7-6600U CPU @ 2.60GHz CPU, Linux 4.12, the implementation achieved ~3.6GiB/s. Future optimization may include spice(2) / vmspice(2) on Linux, although this doesn't fit so well with the heavy use of io.Reader / io.Writer throughout the codebase. The existing hackaround for local calls was re-implemented to fit the new interface of PRCServer and RPCClient. The 'R'PC method invocation is a bit slower because reflection is involved inbetween, but otherwise performance should be no different. The RPC code currently does not support multipart requests and thus does not support the equivalent of a POST. Thus, the switch to the new rpc code had the following fallout: - Move request objects + constants from rpc package to main app code - Sacrifice the hacky 'push = pull me' way of doing push -> need to further extend RPC to support multipart requests or something to implement this properly with additional interfaces -> should be done after replication is abstracted better than separate algorithms for doPull() and doPush()
2017-08-19 22:37:14 +02:00
type InitialReplPolicy string
const (
InitialReplPolicyMostRecent InitialReplPolicy = "most_recent"
InitialReplPolicyAll InitialReplPolicy = "all"
)
// FIXME: remove this
const DEFAULT_INITIAL_REPL_POLICY = InitialReplPolicyMostRecent
// SenderEndpoint implements replication.ReplicationEndpoint for a sending side
type SenderEndpoint struct {
FSFilter zfs.DatasetFilter
FilesystemVersionFilter zfs.FilesystemVersionFilter
}
func NewSenderEndpoint(fsf zfs.DatasetFilter, fsvf zfs.FilesystemVersionFilter) *SenderEndpoint {
return &SenderEndpoint{fsf, fsvf}
}
func (p *SenderEndpoint) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) {
fss, err := zfs.ZFSListMapping(p.FSFilter)
if err != nil {
return nil, err
}
rfss := make([]*pdu.Filesystem, len(fss))
for i := range fss {
rfss[i] = &pdu.Filesystem{
Path: fss[i].ToString(),
// FIXME: not supporting ResumeToken yet
}
}
return rfss, nil
}
func (p *SenderEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) {
dp, err := zfs.NewDatasetPath(fs)
if err != nil {
return nil, err
}
pass, err := p.FSFilter.Filter(dp)
if err != nil {
return nil, err
}
if !pass {
return nil, replication.NewFilteredError(fs)
}
fsvs, err := zfs.ZFSListFilesystemVersions(dp, p.FilesystemVersionFilter)
if err != nil {
return nil, err
}
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
for i := range fsvs {
rfsvs[i] = pdu.FilesystemVersionFromZFS(fsvs[i])
}
return rfsvs, nil
}
func (p *SenderEndpoint) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
dp, err := zfs.NewDatasetPath(r.Filesystem)
if err != nil {
return nil, nil, err
}
pass, err := p.FSFilter.Filter(dp)
if err != nil {
return nil, nil, err
}
if !pass {
return nil, nil, replication.NewFilteredError(r.Filesystem)
}
stream, err := zfs.ZFSSend(r.Filesystem, r.From, r.To)
if err != nil {
return nil, nil, err
}
return &pdu.SendRes{}, stream, nil
}
func (p *SenderEndpoint) Receive(ctx context.Context, r *pdu.ReceiveReq, sendStream io.ReadCloser) (error) {
return fmt.Errorf("sender endpoint does not receive")
}
type FSFilter interface {
Filter(path *zfs.DatasetPath) (pass bool, err error)
}
// FIXME: can we get away without error types here?
type FSMap interface {
FSFilter
Map(path *zfs.DatasetPath) (*zfs.DatasetPath,error)
Invert() (FSMap,error)
AsFilter() (FSFilter)
}
// ReceiverEndpoint implements replication.ReplicationEndpoint for a receiving side
type ReceiverEndpoint struct {
fsmapInv FSMap
fsmap FSMap
fsvf zfs.FilesystemVersionFilter
}
func NewReceiverEndpoint(fsmap FSMap, fsvf zfs.FilesystemVersionFilter) (*ReceiverEndpoint, error) {
fsmapInv, err := fsmap.Invert()
if err != nil {
return nil, err
}
return &ReceiverEndpoint{fsmapInv, fsmap, fsvf}, nil
}
func (e *ReceiverEndpoint) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) {
filtered, err := zfs.ZFSListMapping(e.fsmapInv.AsFilter())
if err != nil {
return nil, errors.Wrap(err, "error checking client permission")
}
fss := make([]*pdu.Filesystem, len(filtered))
for i, a := range filtered {
mapped, err := e.fsmapInv.Map(a)
if err != nil {
return nil, err
}
fss[i] = &pdu.Filesystem{Path: mapped.ToString()}
}
return fss, nil
}
func (e *ReceiverEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) {
p, err := zfs.NewDatasetPath(fs)
if err != nil {
return nil, err
}
lp, err := e.fsmap.Map(p)
if err != nil {
return nil, err
}
if lp == nil {
return nil, errors.New("access to filesystem denied")
}
fsvs, err := zfs.ZFSListFilesystemVersions(lp, e.fsvf)
if err != nil {
return nil, err
}
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
for i := range fsvs {
rfsvs[i] = pdu.FilesystemVersionFromZFS(fsvs[i])
}
return rfsvs, nil
}
func (e *ReceiverEndpoint) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
return nil, nil, errors.New("receiver endpoint does not send")
}
func (e *ReceiverEndpoint) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream io.ReadCloser) error {
defer sendStream.Close()
p, err := zfs.NewDatasetPath(req.Filesystem)
if err != nil {
return err
}
lp, err := e.fsmap.Map(p)
if err != nil {
return err
}
if lp == nil {
return errors.New("receive to filesystem denied")
}
// create placeholder parent filesystems as appropriate
var visitErr error
f := zfs.NewDatasetPathForest()
f.Add(lp)
2018-08-10 17:06:00 +02:00
getLogger(ctx).Debug("begin tree-walk")
f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) {
if v.Path.Equal(lp) {
return false
}
_, err := zfs.ZFSGet(v.Path, []string{zfs.ZREPL_PLACEHOLDER_PROPERTY_NAME})
if err != nil {
// interpret this as an early exit of the zfs binary due to the fs not existing
if err := zfs.ZFSCreatePlaceholderFilesystem(v.Path); err != nil {
2018-08-10 17:06:00 +02:00
getLogger(ctx).
WithError(err).
WithField("placeholder_fs", v.Path).
Error("cannot create placeholder filesystem")
visitErr = err
return false
}
}
2018-08-10 17:06:00 +02:00
getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists")
return true // leave this fs as is
})
2018-08-10 17:06:00 +02:00
getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
if visitErr != nil {
return visitErr
}
needForceRecv := false
props, err := zfs.ZFSGet(lp, []string{zfs.ZREPL_PLACEHOLDER_PROPERTY_NAME})
if err == nil {
if isPlaceholder, _ := zfs.IsPlaceholder(lp, props.Get(zfs.ZREPL_PLACEHOLDER_PROPERTY_NAME)); isPlaceholder {
needForceRecv = true
}
}
args := make([]string, 0, 1)
if needForceRecv {
args = append(args, "-F")
}
2018-08-10 17:06:00 +02:00
getLogger(ctx).Debug("start receive command")
if err := zfs.ZFSRecv(lp.ToString(), sendStream, args...); err != nil {
return err
}
return nil
}
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
// RPC STUBS
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
const (
RPCListFilesystems = "ListFilesystems"
RPCListFilesystemVersions = "ListFilesystemVersions"
RPCReceive = "Receive"
RPCSend = "Send"
)
type RemoteEndpoint struct {
*streamrpc.Client
}
func (s RemoteEndpoint) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) {
req := pdu.ListFilesystemReq{}
b, err := proto.Marshal(&req)
if err != nil {
return nil, err
}
rb, rs, err := s.RequestReply(ctx, RPCListFilesystems, bytes.NewBuffer(b), nil)
if err != nil {
return nil, err
}
if rs != nil {
rs.Close()
return nil, errors.New("response contains unexpected stream")
}
var res pdu.ListFilesystemRes
if err := proto.Unmarshal(rb.Bytes(), &res); err != nil {
return nil, err
}
return res.Filesystems, nil
}
func (s RemoteEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) {
req := pdu.ListFilesystemVersionsReq{
Filesystem: fs,
}
b, err := proto.Marshal(&req)
if err != nil {
return nil, err
}
rb, rs, err := s.RequestReply(ctx, RPCListFilesystemVersions, bytes.NewBuffer(b), nil)
if err != nil {
return nil, err
}
if rs != nil {
rs.Close()
return nil, errors.New("response contains unexpected stream")
}
var res pdu.ListFilesystemVersionsRes
if err := proto.Unmarshal(rb.Bytes(), &res); err != nil {
return nil, err
}
return res.Versions, nil
}
func (s RemoteEndpoint) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
b, err := proto.Marshal(r)
if err != nil {
return nil, nil, err
}
rb, rs, err := s.RequestReply(ctx, RPCSend, bytes.NewBuffer(b), nil)
if err != nil {
return nil, nil, err
}
if rs == nil {
return nil, nil, errors.New("response does not contain a stream")
}
var res pdu.SendRes
if err := proto.Unmarshal(rb.Bytes(), &res); err != nil {
rs.Close()
return nil, nil, err
}
// FIXME make sure the consumer will read the reader until the end...
return &res, rs, nil
}
func (s RemoteEndpoint) Receive(ctx context.Context, r *pdu.ReceiveReq, sendStream io.ReadCloser) (error) {
defer sendStream.Close()
b, err := proto.Marshal(r)
if err != nil {
return err
}
rb, rs, err := s.RequestReply(ctx, RPCReceive, bytes.NewBuffer(b), sendStream)
if err != nil {
return err
}
if rs != nil {
rs.Close()
return errors.New("response contains unexpected stream")
}
var res pdu.ReceiveRes
if err := proto.Unmarshal(rb.Bytes(), &res); err != nil {
return err
}
return nil
}
type HandlerAdaptor struct {
ep replication.Endpoint
}
2017-09-22 14:13:58 +02:00
func NewHandlerAdaptor(ep replication.Endpoint) HandlerAdaptor {
return HandlerAdaptor{ep}
}
2018-08-08 13:12:50 +02:00
func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructured *bytes.Buffer, reqStream io.ReadCloser) (resStructured *bytes.Buffer, resStream io.ReadCloser, err error) {
switch endpoint {
case RPCListFilesystems:
var req pdu.ListFilesystemReq
if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil {
return nil, nil, err
}
fsses, err := a.ep.ListFilesystems(ctx)
if err != nil {
return nil, nil, err
}
res := &pdu.ListFilesystemRes{
Filesystems: fsses,
}
b, err := proto.Marshal(res)
if err != nil {
return nil, nil, err
}
return bytes.NewBuffer(b), nil, nil
case RPCListFilesystemVersions:
var req pdu.ListFilesystemVersionsReq
if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil {
return nil, nil, err
}
fsvs, err := a.ep.ListFilesystemVersions(ctx, req.Filesystem)
if err != nil {
return nil, nil, err
}
res := &pdu.ListFilesystemVersionsRes{
Versions: fsvs,
}
b, err := proto.Marshal(res)
if err != nil {
return nil, nil, err
}
return bytes.NewBuffer(b), nil, nil
case RPCSend:
sender, ok := a.ep.(replication.Sender)
if !ok {
goto Err
}
var req pdu.SendReq
if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil {
return nil, nil, err
}
res, sendStream, err := sender.Send(ctx, &req)
if err != nil {
return nil, nil, err
}
b, err := proto.Marshal(res)
if err != nil {
return nil, nil, err
}
return bytes.NewBuffer(b), sendStream, err
case RPCReceive:
receiver, ok := a.ep.(replication.Receiver)
if !ok {
goto Err
}
var req pdu.ReceiveReq
if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil {
return nil, nil, err
}
err := receiver.Receive(ctx, &req, reqStream)
if err != nil {
return nil, nil, err
}
b, err := proto.Marshal(&pdu.ReceiveRes{})
if err != nil {
return nil, nil, err
}
return bytes.NewBuffer(b), nil, err
}
Err:
return nil, nil, errors.New("no handler for given endpoint")
}