diff --git a/config/config.go b/config/config.go index 081e335..e53a1bf 100644 --- a/config/config.go +++ b/config/config.go @@ -59,16 +59,6 @@ type SourceJob struct { Filesystems FilesystemsFilter `yaml:"filesystems"` } -type LocalJob struct { - Type string `yaml:"type"` - Name string `yaml:"name"` - Filesystems FilesystemsFilter `yaml:"filesystems"` - RootDataset string `yaml:"root_dataset"` - Snapshotting Snapshotting `yaml:"snapshotting"` - Pruning PruningSenderReceiver `yaml:"pruning"` - Debug JobDebugSettings `yaml:"debug,optional"` -} - type FilesystemsFilter map[string]bool type Snapshotting struct { @@ -171,6 +161,11 @@ type SSHStdinserverConnect struct { DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"` } +type LocalConnect struct { + ConnectCommon `yaml:",inline"` + ClientIdentity string `yaml:"client_identity"` +} + type ServeEnum struct { Ret interface{} } @@ -201,6 +196,10 @@ type StdinserverServer struct { ClientIdentities []string `yaml:"client_identities"` } +type LocalServe struct { + ServeCommon `yaml:",inline"` +} + type PruningEnum struct { Ret interface{} } @@ -311,7 +310,6 @@ func (t *JobEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) { "sink": &SinkJob{}, "pull": &PullJob{}, "source": &SourceJob{}, - "local": &LocalJob{}, }) return } @@ -321,6 +319,7 @@ func (t *ConnectEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) "tcp": &TCPConnect{}, "tls": &TLSConnect{}, "ssh+stdinserver": &SSHStdinserverConnect{}, + "local": &LocalConnect{}, }) return } @@ -330,6 +329,7 @@ func (t *ServeEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) { "tcp": &TCPServe{}, "tls": &TLSServe{}, "stdinserver": &StdinserverServer{}, + "local" : &LocalServe{}, }) return } diff --git a/config/samples/local.yml b/config/samples/local.yml index 56441a2..01d73f0 100644 --- a/config/samples/local.yml +++ b/config/samples/local.yml @@ -1,26 +1,28 @@ jobs: -- name: mirror_local - type: local - # snapshot the filesystems matched by the left-hand-side of the mapping - # every 10m with zrepl_ as prefix - filesystems: { - "pool1/var/db<": true, - "pool1/usr/home<": true, - "pool1/usr/home/paranoid": false, #don't backup paranoid user - "pool1/poudriere/ports<": false #don't backup the ports trees - } - # TODO FIXME enforce that the tree under root_dataset and the trees allowed (true) by filesystems are non-overlapping - root_dataset: "pool2/backups/pool1" + - type: sink + name: "local_sink" + root_dataset: "storage/zrepl/sink" + serve: + type: local - snapshotting: - snapshot_prefix: zrepl_ - interval: 10m - - pruning: - keep_sender: - - type: not_replicated - keep_receiver: - - type: grid - grid: 1x1h(keep=all) | 24x1h | 35x1d | 6x30d - keep_bookmarks: all + - type: push + name: "backup_system" + connect: + type: local + client_identity: local_backup + filesystems: { + "system<": true, + } + snapshotting: + snapshot_prefix: zrepl_ + interval: 10m + pruning: + keep_sender: + - type: not_replicated + - type: last_n + count: 10 + keep_receiver: + - type: grid + grid: 1x1h(keep=all) | 24x1h | 35x1d | 6x30d + keep_bookmarks: all diff --git a/daemon/connecter/connect_local.go b/daemon/connecter/connect_local.go new file mode 100644 index 0000000..252a9b6 --- /dev/null +++ b/daemon/connecter/connect_local.go @@ -0,0 +1,26 @@ +package connecter + +import ( + "context" + "fmt" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/serve" + "net" +) + +type LocalConnecter struct { + clientIdentity string +} + +func LocalConnecterFromConfig(in *config.LocalConnect) (*LocalConnecter, error) { + if in.ClientIdentity == "" { + return nil, fmt.Errorf("ClientIdentity must not be empty") + } + return &LocalConnecter{in.ClientIdentity}, nil +} + +func (c *LocalConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) { + switchboard := serve.GetLocalListenerSwitchboard() + return switchboard.DialContext(dialCtx, c.clientIdentity) +} + diff --git a/daemon/connecter/connecter.go b/daemon/connecter/connecter.go index 4708cb3..3c0f0de 100644 --- a/daemon/connecter/connecter.go +++ b/daemon/connecter/connecter.go @@ -23,6 +23,9 @@ func FromConfig(g *config.Global, in config.ConnectEnum) (*ClientFactory, error) case *config.TLSConnect: connecter, errConnecter = TLSConnecterFromConfig(v) connConf, errRPC = streamrpcconfig.FromDaemonConfig(g, v.RPC) + case *config.LocalConnect: + connecter, errConnecter = LocalConnecterFromConfig(v) + connConf, errRPC = streamrpcconfig.FromDaemonConfig(g, v.RPC) default: panic(fmt.Sprintf("implementation error: unknown connecter type %T", v)) } diff --git a/daemon/serve/serve.go b/daemon/serve/serve.go index 8000a94..15e6ba3 100644 --- a/daemon/serve/serve.go +++ b/daemon/serve/serve.go @@ -86,6 +86,9 @@ func FromConfig(g *config.Global, in config.ServeEnum) (lf ListenerFactory, conf case *config.StdinserverServer: lf, lfError = MultiStdinserverListenerFactoryFromConfig(g, v) conf, rpcErr = streamrpcconfig.FromDaemonConfig(g, v.RPC) + case *config.LocalServe: + lf, lfError = LocalListenerFactoryFromConfig(g, v) + conf, rpcErr = streamrpcconfig.FromDaemonConfig(g, v.RPC) default: return nil, nil, errors.Errorf("internal error: unknown serve type %T", v) } diff --git a/daemon/serve/serve_local.go b/daemon/serve/serve_local.go new file mode 100644 index 0000000..ba122ae --- /dev/null +++ b/daemon/serve/serve_local.go @@ -0,0 +1,211 @@ +package serve + +import ( + "context" + "fmt" + "github.com/zrepl/zrepl/config" + "golang.org/x/sys/unix" + "net" + "os" + "sync" +) + +var localListenerSwitchboardSingleton struct { + s *LocalListenerSwitchboard + once sync.Once +} + +func GetLocalListenerSwitchboard() (*LocalListenerSwitchboard) { + localListenerSwitchboardSingleton.once.Do(func() { + localListenerSwitchboardSingleton.s = &LocalListenerSwitchboard{ + connects: make(chan connectRequest), + } + }) + return localListenerSwitchboardSingleton.s +} + +type connectRequest struct { + clientIdentity string + callback chan connectResult +} + +type connectResult struct { + conn net.Conn + err error +} + +type LocalListenerSwitchboard struct { + connects chan connectRequest +} + +func (l *LocalListenerSwitchboard) DialContext(dialCtx context.Context, clientIdentity string) (conn net.Conn, err error) { + + // place request + req := connectRequest{ + clientIdentity: clientIdentity, + callback: make(chan connectResult), + } + select { + case l.connects <- req: + case <-dialCtx.Done(): + return nil, dialCtx.Err() + } + + // wait for listener response + select { + case connRes := <- req.callback: + conn, err = connRes.conn, connRes.err + case <-dialCtx.Done(): + close(req.callback) // sending to the channel afterwards will panic, the listener has to catch this + conn, err = nil, dialCtx.Err() + } + + return conn, err +} + +type localAddr struct { + S string +} + +func (localAddr) Network() string { return "local" } + +func (a localAddr) String() string { return a.S } + +func (l *LocalListenerSwitchboard) Addr() (net.Addr) { return localAddr{""} } + +type localConn struct { + net.Conn + clientIdentity string +} + +func (l localConn) ClientIdentity() string { return l.clientIdentity } + +func (l *LocalListenerSwitchboard) Accept(ctx context.Context) (AuthenticatedConn, error) { + respondToRequest := func(req connectRequest, res connectResult) (err error) { + getLogger(ctx). + WithField("res.conn", res.conn).WithField("res.err", res.err). + Debug("responding to client request") + defer func() { + errv := recover() + getLogger(ctx).WithField("recover_err", errv). + Debug("panic on send to client callback, likely a legitimate client-side timeout") + }() + select { + case req.callback <- res: + err = nil + default: + err = fmt.Errorf("client-provided callback did block on send") + } + close(req.callback) + return err + } + + getLogger(ctx).Debug("waiting for local client connect requests") + var req connectRequest + select { + case req = <-l.connects: + case <-ctx.Done(): + return nil, ctx.Err() + } + + getLogger(ctx).WithField("client_identity", req.clientIdentity).Debug("got connect request") + if req.clientIdentity == "" { + res := connectResult{nil, fmt.Errorf("client identity must not be empty")} + if err := respondToRequest(req, res); err != nil { + return nil, err + } + return nil, fmt.Errorf("client connected with empty client identity") + } + + getLogger(ctx).Debug("creating socketpair") + left, right, err := makeSocketpairConn() + if err != nil { + res := connectResult{nil, fmt.Errorf("server error: %s", err)} + if respErr := respondToRequest(req, res); respErr != nil { + // returning the socketpair error properly is more important than the error sent to the client + getLogger(ctx).WithError(respErr).Error("error responding to client") + } + return nil, err + } + + getLogger(ctx).Debug("responding with left side of socketpair") + res := connectResult{left, nil} + if err := respondToRequest(req, res); err != nil { + getLogger(ctx).WithError(err).Error("error responding to client") + if err := left.Close(); err != nil { + getLogger(ctx).WithError(err).Error("cannot close left side of socketpair") + } + if err := right.Close(); err != nil { + getLogger(ctx).WithError(err).Error("cannot close right side of socketpair") + } + return nil, err + } + + return localConn{right, req.clientIdentity}, nil +} + +type fileConn struct { + net.Conn // net.FileConn + f *os.File +} + +func (c fileConn) Close() error { + if err := c.Conn.Close(); err != nil { + return err + } + if err := c.f.Close(); err != nil { + return err + } + return nil +} + +func makeSocketpairConn() (a, b net.Conn, err error) { + // don't use net.Pipe, as it doesn't implement things like lingering, which our code relies on + sockpair, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_STREAM, 0) + if err != nil { + return nil, nil, err + } + toConn := func(fd int) (net.Conn, error) { + f := os.NewFile(uintptr(fd), "fileconn") + if f == nil { + panic(fd) + } + c, err := net.FileConn(f) + if err != nil { + f.Close() + return nil, err + } + return fileConn{Conn: c, f: f}, nil + } + if a, err = toConn(sockpair[0]); err != nil { // shadowing + return nil, nil, err + } + if b, err = toConn(sockpair[1]); err != nil { // shadowing + a.Close() + return nil, nil, err + } + return a, b, nil +} + +func (l *LocalListenerSwitchboard) Close() error { + // FIXME: make sure concurrent Accepts return with error, and further Accepts return that error, too + // Example impl: for each accept, do context.WithCancel, and store the cancel in a list + // When closing, set a member variable to state=closed, make sure accept will exit early + // and then call all cancels in the list + // The code path from Accept entry over check if state=closed to list entry must be protected by a mutex. + return nil +} + +type LocalListenerFactory struct { + clients []string +} + +func LocalListenerFactoryFromConfig(g *config.Global, in *config.LocalServe) (f *LocalListenerFactory, err error) { + return &LocalListenerFactory{}, nil +} + + +func (*LocalListenerFactory) Listen() (AuthenticatedListener, error) { + return GetLocalListenerSwitchboard(), nil +} +