Implement push support.

Pushing is achieved by inverting the roles on the established
connection, i.e. the client tells the server what data it should pull
from the client (PullMeRequest).

Role inversion is achieved by moving the server loop to the serverLoop
function of ByteStreamRPC, which can be called from both the Listen()
function (server-side) and the PullMeRequest() client-side function.

A donwside of this PullMe approach is that the replication policies
become part of the rpc, because the puller must follow the policy.
This commit is contained in:
Christian Schwarz 2017-05-16 16:57:24 +02:00
parent c7161cf8e6
commit 35dcfc234e
6 changed files with 200 additions and 29 deletions

View File

@ -21,7 +21,7 @@ type Pool struct {
}
type Transport interface {
Connect() (rpc.RPCRequester, error)
Connect(rpcLog Logger) (rpc.RPCRequester, error)
}
type LocalTransport struct {
Handler rpc.RPCHandler
@ -40,7 +40,7 @@ type SSHTransport struct {
type Push struct {
To *Pool
Datasets []zfs.DatasetPath
Filter zfs.DatasetMapping
InitialReplPolicy rpc.InitialReplPolicy
}
type Pull struct {
@ -169,7 +169,7 @@ func parsePushs(v interface{}, pl poolLookup) (p []Push, err error) {
asList := make([]struct {
To string
Datasets []string
Filter map[string]string
InitialReplPolicy string
}, 0)
@ -187,14 +187,11 @@ func parsePushs(v interface{}, pl poolLookup) (p []Push, err error) {
}
push := Push{
To: toPool,
Datasets: make([]zfs.DatasetPath, len(e.Datasets)),
}
for i, ds := range e.Datasets {
if push.Datasets[i], err = zfs.NewDatasetPath(ds); err != nil {
if push.Filter, err = parseComboMapping(e.Filter); err != nil {
return
}
}
if push.InitialReplPolicy, err = parseInitialReplPolicy(e.InitialReplPolicy, rpc.DEFAULT_INITIAL_REPL_POLICY); err != nil {
return
@ -376,7 +373,7 @@ func parseComboMapping(m map[string]string) (c zfs.ComboMapping, err error) {
}
func (t SSHTransport) Connect() (r rpc.RPCRequester, err error) {
func (t SSHTransport) Connect(rpcLog Logger) (r rpc.RPCRequester, err error) {
var stream io.ReadWriteCloser
var rpcTransport sshbytestream.SSHTransport
if err = copier.Copy(&rpcTransport, t); err != nil {
@ -389,10 +386,10 @@ func (t SSHTransport) Connect() (r rpc.RPCRequester, err error) {
if err != nil {
return
}
return rpc.ConnectByteStreamRPC(stream)
return rpc.ConnectByteStreamRPC(stream, rpcLog)
}
func (t LocalTransport) Connect() (r rpc.RPCRequester, err error) {
func (t LocalTransport) Connect(rpcLog Logger) (r rpc.RPCRequester, err error) {
if t.Handler == nil {
panic("local transport with uninitialized handler")
}

View File

@ -1,6 +1,7 @@
package main
import (
"fmt"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/zfs"
"io"
@ -9,18 +10,21 @@ import (
type Handler struct {
Logger Logger
PullACL zfs.DatasetMapping
SinkMappingFunc func(clientIdentity string) (mapping zfs.DatasetMapping, err error)
}
func (h Handler) HandleFilesystemRequest(r rpc.FilesystemRequest) (roots []zfs.DatasetPath, err error) {
h.Logger.Printf("handling fsr: %#v", r)
h.Logger.Printf("using PullACL: %#v", h.PullACL)
if roots, err = zfs.ZFSListMapping(h.PullACL); err != nil {
h.Logger.Printf("handle fsr err: %v\n", err)
return
}
h.Logger.Printf("got filesystems: %#v", roots)
h.Logger.Printf("returning: %#v", roots)
return
}
@ -88,3 +92,36 @@ func (h Handler) HandleIncrementalTransferRequest(r rpc.IncrementalTransferReque
return
}
func (h Handler) HandlePullMeRequest(r rpc.PullMeRequest, clientIdentity string, client rpc.RPCRequester) (err error) {
// Check if we have a sink for this request
// Use that mapping to do what happens in doPull
h.Logger.Printf("handling PullMeRequest: %#v", r)
var sinkMapping zfs.DatasetMapping
sinkMapping, err = h.SinkMappingFunc(clientIdentity)
if err != nil {
h.Logger.Printf("no sink mapping for client identity '%s', denying PullMeRequest", clientIdentity)
err = fmt.Errorf("no sink for client identity '%s'", clientIdentity)
return
}
h.Logger.Printf("doing pull...")
err = doPull(PullContext{
Remote: client,
Log: h.Logger,
Mapping: sinkMapping,
InitialReplPolicy: r.InitialReplPolicy,
})
if err != nil {
h.Logger.Printf("PullMeRequest failed with error: %s", err)
return
}
h.Logger.Printf("finished handling PullMeRequest: %#v", r)
return
}

View File

@ -111,7 +111,7 @@ func cmdStdinServer(c *cli.Context) (err error) {
return
}
findMapping := func(cm []ClientMapping) zfs.DatasetMapping {
findMapping := func(cm []ClientMapping, identity string) zfs.DatasetMapping {
for i := range cm {
if cm[i].From == identity {
return cm[i].Mapping
@ -119,14 +119,21 @@ func cmdStdinServer(c *cli.Context) (err error) {
}
return nil
}
sinkMapping := func(identity string) (sink zfs.DatasetMapping, err error) {
if sink = findMapping(conf.Sinks, identity); sink == nil {
return nil, fmt.Errorf("could not find sink for dataset")
}
return
}
sinkLogger := log.New(logOut, fmt.Sprintf("sink[%s] ", identity), logFlags)
handler := Handler{
Logger: sinkLogger,
PullACL: findMapping(conf.PullACLs),
SinkMappingFunc: sinkMapping,
PullACL: findMapping(conf.PullACLs, identity),
}
if err = rpc.ListenByteStreamRPC(sshByteStream, handler, sinkLogger); err != nil {
if err = rpc.ListenByteStreamRPC(sshByteStream, identity, handler, sinkLogger); err != nil {
//os.Exit(1)
err = cli.NewExitError(err, 1)
defaultLog.Printf("listenbytestreamerror: %#v\n", err)
@ -172,8 +179,8 @@ func cmdRun(c *cli.Context) error {
Interval: time.Duration(5 * time.Second),
Repeats: true,
RunFunc: func(log jobrun.Logger) error {
log.Printf("%v: %#v\n", time.Now(), push)
return nil
log.Printf("doing push: %v", push)
return jobPush(push, c, log)
},
}
@ -205,7 +212,7 @@ func jobPull(pull Pull, c *cli.Context, log jobrun.Logger) (err error) {
var remote rpc.RPCRequester
if remote, err = pull.From.Transport.Connect(); err != nil {
if remote, err = pull.From.Transport.Connect(log); err != nil {
return
}
@ -213,3 +220,39 @@ func jobPull(pull Pull, c *cli.Context, log jobrun.Logger) (err error) {
return doPull(PullContext{remote, log, pull.Mapping, pull.InitialReplPolicy})
}
func jobPush(push Push, c *cli.Context, log jobrun.Logger) (err error) {
if _, ok := push.To.Transport.(LocalTransport); ok {
panic("no support for local pushs")
}
var remote rpc.RPCRequester
if remote, err = push.To.Transport.Connect(log); err != nil {
return err
}
defer closeRPCWithTimeout(log, remote, time.Second*10, "")
log.Printf("building handler for PullMeRequest")
handler := Handler{
Logger: log,
PullACL: push.Filter,
SinkMappingFunc: nil, // no need for that in the handler for PullMe
}
log.Printf("handler: %#v", handler)
r := rpc.PullMeRequest{
InitialReplPolicy: push.InitialReplPolicy,
}
log.Printf("doing PullMeRequest: %#v", r)
if err = remote.PullMeRequest(r, handler); err != nil {
log.Printf("PullMeRequest failed: %s", err)
return
}
log.Printf("push job finished")
return
}

View File

@ -9,9 +9,10 @@ pools:
pushs:
- to: offsite_backups
datasets:
- tank/var/db
- tank/usr/home
filter: {
"tank/var/db*":"ok",
"tank/usr/home*":"ok"
}
pulls:
- from: offsite_backups

View File

@ -17,6 +17,7 @@ type RPCRequester interface {
FilesystemVersionsRequest(r FilesystemVersionsRequest) (versions []zfs.FilesystemVersion, err error)
InitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
IncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
PullMeRequest(r PullMeRequest, handler RPCHandler) (err error)
CloseRequest(r CloseRequest) (err error)
ForceClose() (err error)
}
@ -29,6 +30,9 @@ type RPCHandler interface {
HandleInitialTransferRequest(r InitialTransferRequest) (io.Reader, error)
HandleIncrementalTransferRequest(r IncrementalTransferRequest) (io.Reader, error)
// invert roles, i.e. handler becomes server and performs the requested pull using the client connection
HandlePullMeRequest(r PullMeRequest, clientIdentity string, client RPCRequester) (err error)
}
type Logger interface {
@ -44,12 +48,14 @@ type ByteStream interface {
type ByteStreamRPC struct {
conn ByteStream
log Logger
clientIdentity string
}
func ConnectByteStreamRPC(conn ByteStream) (RPCRequester, error) {
func ConnectByteStreamRPC(conn ByteStream, log Logger) (RPCRequester, error) {
rpc := ByteStreamRPC{
conn: conn,
log: log,
}
// Assert protocol versions are equal
@ -70,7 +76,18 @@ func (e ByteStreamRPCDecodeJSONError) Error() string {
return fmt.Sprintf("cannot decode %s: %s", e.Type, e.DecoderErr)
}
func ListenByteStreamRPC(conn ByteStream, handler RPCHandler, log Logger) error {
func ListenByteStreamRPC(conn ByteStream, clientIdentity string, handler RPCHandler, log Logger) error {
c := ByteStreamRPC{
conn: conn,
log: log,
clientIdentity: clientIdentity,
}
return c.serverLoop(handler)
}
func (c ByteStreamRPC) serverLoop(handler RPCHandler) error {
// A request consists of two subsequent chunked JSON objects
// Object 1: RequestHeader => contains type of Request Body
@ -80,7 +97,16 @@ func ListenByteStreamRPC(conn ByteStream, handler RPCHandler, log Logger) error
// a) a chunked response
// b) or another JSON object
defer conn.Close()
conn := c.conn
log := c.log
defer func() {
panicObj := recover()
// if we just exited, we don't want to close the connection (PullMeRequest depends on this)
log.Printf("exiting server loop, panic object %#v", panicObj)
if panicObj != nil {
conn.Close()
}
}()
send := func(r interface{}) {
if err := writeChunkedJSON(conn, r); err != nil {
@ -193,6 +219,8 @@ func ListenByteStreamRPC(conn ByteStream, handler RPCHandler, log Logger) error
send(&diff)
}
log.Printf("finished FilesystemVersionReqeust")
case RTInitialTransferRequest:
var rq InitialTransferRequest
if err := recv(&rq); err != nil {
@ -246,6 +274,50 @@ func ListenByteStreamRPC(conn ByteStream, handler RPCHandler, log Logger) error
}
}
case RTPullMeRequest:
var rq PullMeRequest
if err := recv(&rq); err != nil {
sendError(EDecodeRequestBody, err.Error())
return conn.Close()
}
if rq.Finished {
// we are the client that sent a PullMeRequest with Finished = false
// and then entered this server loop
log.Printf("PullMeRequest.Finished == true, exiting server loop")
send(ResponseHeader{
RequestId: header.Id,
ResponseType: ROK,
})
return nil
}
// We are a server receiving a PullMeRequest from a client
log.Printf("confirming PullMeRequest")
send(ResponseHeader{
RequestId: header.Id,
ResponseType: ROK,
})
log.Printf("pulling from client '%s', expecting client is in server loop", c.clientIdentity)
if c.clientIdentity == "" || c.clientIdentity == LOCAL_TRANSPORT_IDENTITY {
err := fmt.Errorf("client has bad name: '%s'", c.clientIdentity)
log.Printf(err.Error())
panic(err)
}
pullErr := handler.HandlePullMeRequest(rq, c.clientIdentity, c)
if pullErr != nil {
log.Printf("pulling failed with error: %s", pullErr)
panic(pullErr)
}
log.Printf("finished handling PullMeRequest, sending Finished = true")
req := PullMeRequest{Finished: true}
c.sendRequestReceiveHeader(req, ROK)
default:
sendError(EUnknownRequestType, "")
return conn.Close()
@ -293,6 +365,8 @@ func inferRequestType(v interface{}) (RequestType, error) {
return RTInitialTransferRequest, nil
case IncrementalTransferRequest:
return RTIncrementalTransferRequest, nil
case PullMeRequest:
return RTPullMeRequest, nil
case CloseRequest:
return RTCloseRequest, nil
default:
@ -407,6 +481,11 @@ func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest)
return
}
func (c ByteStreamRPC) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) {
err = c.sendRequestReceiveHeader(r, ROK)
return c.serverLoop(handler)
}
func (c ByteStreamRPC) CloseRequest(r CloseRequest) (err error) {
if err = c.sendRequestReceiveHeader(r, ROK); err != nil {
return
@ -445,6 +524,13 @@ func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (read
return
}
func (c LocalRPC) PullMeRequest(r PullMeRequest, handler RPCHandler) (err error) {
// The config syntactically only allows local Pulls, hence this is never called
// In theory, the following line should work:
// return handler.HandlePullMeRequest(r, LOCAL_TRANSPORT_IDENTITY, c)
panic("internal inconsistency: local pull me request unsupported")
}
func (c LocalRPC) CloseRequest(r CloseRequest) error { return nil }
func (c LocalRPC) ForceClose() error { return nil }

View File

@ -12,7 +12,8 @@ const (
RTFilesystemVersionsRequest = 0x11
RTInitialTransferRequest = 0x12
RTIncrementalTransferRequest = 0x13
RTCloseRequest = 0x20
RTPullMeRequest = 0x20
RTCloseRequest = 0xf0
)
type RequestHeader struct {
@ -62,6 +63,12 @@ const (
InitialReplPolicyAll InitialReplPolicy = "all"
)
type PullMeRequest struct {
// if true, the other fields are undefined
Finished bool
InitialReplPolicy InitialReplPolicy
}
type CloseRequest struct {
Goodbye string
}