diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..1f8ded2 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,73 @@ +package main + +type Role uint + +const ( + ROLE_IPC Role = iota + ROLE_ACTION Role = iota +) + +func main() { + + role = ROLE_IPC // TODO: if argv[1] == ipc then... + switch (role) { + case ROLE_IPC: + doIPC() + case ROLE_ACTION: + doAction() + } + +} + +func doIPC() { + + sshByteStream = sshbytestream.Incoming() + handler = Handler{} + if err := ListenByteStreamRPC(sshByteStream, handler); err != nil { + // PANIC + } + + // exit(0) + +} + +func doAction() { + + sshByteStream = sshbytestream.Outgoing(model.SSHTransport{}) + + remote,_ := ConnectByteStreamRPC(sshByteStream) + + request := NewFilesystemRequest(["zroot/var/db", "zroot/home"]) + forest, _ := remote.FilesystemRequest(request) + + for tree := forest { + fmt.Println(tree) + } + +} + + +type Handler struct {} + +func (h Handler) HandleFilesystemRequest(r FilesystemRequest) (roots []model.Filesystem, err error) { + + roots = make([]model.Filesystem, 0, 10) + + for _, root := range r.Roots { + if zfsRoot, err := zfs.FilesystemsAtRoot(root); err != nil { + return nil, err + } + roots = append(roots, zfsRoot) + } + + return +} + +func (h Handler) HandleInitialTransferRequest(r InitialTransferRequest) (io.Read, error) { + // TODO ACL + return zfs.InitialSend(r.Snapshot) +} +func (h Handler) HandleIncrementalTransferRequestRequest(r IncrementalTransferRequest) (io.Read, error) { + // TODO ACL + return zfs.IncrementalSend(r.FromSnapshot, r.ToSnapshot) +} diff --git a/model/model.go b/model/model.go new file mode 100644 index 0000000..dd11ede --- /dev/null +++ b/model/model.go @@ -0,0 +1,33 @@ +package model + +type Filesytem struct { + Name string + Children []Filesytem + Snapshots []Snaphot +} + +type FilesytemMapping struct { + From Filesytem + To Filesytem +} + +type Snapshot struct { + Name String +} + +type Pool struct { + Transport Transport + Root Filesytem +} + +type Transport interface { + Connect() Connection +} + +type SSHTransport struct { + Host string + User string + Port uint16 +} + +type LocalTransport struct{} diff --git a/rpc/bytestream.go b/rpc/bytestream.go new file mode 100644 index 0000000..b8492f8 --- /dev/null +++ b/rpc/bytestream.go @@ -0,0 +1,26 @@ +package rpc + +struct Unchunker { + In io.Reader + internalBuffer []byte + remainingChunkSize uint +} + +func NewUnchunker(conn io.Reader) { + return Unchunker{In: conn} // TODO +} + +func (c Unchunker) Read(b []byte) (n int, error) { + // read min(c.internalBuffer.len, b.len) from c.internalBuffer into b + // fill up internalBuffer + // 1 read up to max(remainingChunkSize,internalBuffer.len) from In + // 2 if remainingChunkSize == 0, read next chunk size, update remainingChunkSize + // 3 goto 1 +} + +struct Chunker { + In io.Reader + MaxChunkSize uint + + ChunkBuf []byte // length of buf determines chunk size? --> would be fixed then +} \ No newline at end of file diff --git a/rpc/rpc.go b/rpc/rpc.go new file mode 100644 index 0000000..f4ad965 --- /dev/null +++ b/rpc/rpc.go @@ -0,0 +1,76 @@ +package rpc + +type RPCRequester interface { + FilesystemRequest(r FilesystemRequest) (root model.Filesystem, err error) + InitialTransferRequest(r InitialTransferRequest) (io.Read, error) + IncrementalTransferRequest(r IncrementalTransferRequest) (io.Read, error) +} + +type RPCHandler interface { + HandleFilesystemRequest(r FilesystemRequest) (root model.Filesystem, err error) + HandleInitialTransferRequest(r InitialTransferRequest) (io.Read, error) + HandleIncrementalTransferRequestRequest(r IncrementalTransferRequest) (io.Read, error) +} + + +type ByteStreamRPC struct { + conn io.ReadWriteCloser +} + +func ConnectByteStreamRPC(conn io.ReadWriteCloser) (ByteStreamRPC, error) { + // TODO do ssh connection to transport, establish TCP-like communication channel + conn := sshtransport.New() + rpc := ByteStreamRPC{ + conn: conn, + } + return conn, nil +} + +func ListenByteStreamRPC(conn io.ReadWriteCloser, handler RPCHandler) (error) { + // Read from connection, decode wire protocol, route requests to handler + return nil +} + +func (c ByteStreamRPC) FilesystemRequest(r FilesystemRequest) (roots []model.Filesystem, err error) { + encodedReader := protobuf.Encode(r) + c.conn.Write(NewChunker(encodedReader)) + encodedResponseReader := NewUnchunker(c.conn.Read()) + roots = protobuf.Decode(encodedResponse) + return +} + +func (c ByteStreamRPC) InitialTransferRequest(r InitialTransferRequest) (io.Read, error) { + // send request header using protobuf or similar + encodedReader := protobuf.Encode(r) + c.conn.Write(NewChunker(encodedReader)) + // expect chunked response -> use unchunker on c.conn to read snapshot stream + return NewUmainnchunker(c.conn.Read()) +} + +func (c ByteStreamRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Read, error) { + +} + + +type LocalRPC struct { + handler RPCHandler +} + + +func ConnectLocalRPC(handler RPCHandler) LocalRPC { + return LocalRPC{handler} +} + +func (c LocalRPC) FilesystemRequest(r FilesystemRequest) (root model.Filesystem, err error) { + return c.handler.HandleFilesystemRequest(r) +} + +func (c LocalRPC) InitialTransferRequest(r InitialTransferRequest) (io.Read, error) { + return c.handler.HandleInitialTransferRequest(r) +} + +func (c LocalRPC) IncrementalTransferRequest(r IncrementalTransferRequest) (io.Read, error) { + return c.handler.HandleIncrementalTransferRequest(r) +} + + diff --git a/rpc/structs.go b/rpc/structs.go new file mode 100644 index 0000000..455f22c --- /dev/null +++ b/rpc/structs.go @@ -0,0 +1,29 @@ +package protocol + +type RequestId uint8 +type Request struct { + RequestType RequestType + RequestId [16]byte // UUID +} + +type FilesystemRequest struct { + Request + Roots []string +} + +type InitialTransferRequest struct { + Request + Snapshot string // tank/my/db@ljlsdjflksdf +} +func (r InitialTransferRequest) Respond(snapshotReader io.Reader) { + +} + +type IncrementalTransferRequest struct { + Request + FromSnapshot string + ToSnapshot string +} +func (r IncrementalTransferRequest) Respond(snapshotReader io.Reader) { + +} diff --git a/sshbytestream/ssh.go b/sshbytestream/ssh.go new file mode 100644 index 0000000..b1b15f4 --- /dev/null +++ b/sshbytestream/ssh.go @@ -0,0 +1,28 @@ +package sshbytestream + +func Incoming() (io.ReadWriteCloser, error) { + // derivce ReadWriteCloser from stdin & stdout +} + +func Outgoing(name string, remote model.SSHTransport) (io.ReadWriteCloser, error) { + // encapsulate + // fork(),exec(ssh $remote zrepl ipc ssh $name) + // stdin and stdout in a ReadWriteCloser + return ForkedSSHReadWriteCloser{} +} + +struct ForkedSSHReadWriteCloser { + +} + +func (f ForkedSSHReadWriteCloser) Read(p []byte) (n int, err error) { + +} + +func (f ForkedSSHReadWriteCloser) Write(p []byte) (n int, err error) { + +} + +func (f ForkedSSHReadWriteCloser) Close() error { + +} \ No newline at end of file diff --git a/zfs/zfs.go b/zfs/zfs.go new file mode 100644 index 0000000..6167c72 --- /dev/null +++ b/zfs/zfs.go @@ -0,0 +1,13 @@ +package zfs + +func InitialSend(snapshot string) (io.Read, error) { + +} + +func IncrementalSend(from, to string) (io.Read, error) { + +} + +func FilesystemsAtRoot(root string) (model.Filesystem, error) { + +} \ No newline at end of file