From 3d8e552c6aa5b134932c1839f812571a96c6870f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 31 Aug 2018 21:51:44 +0200 Subject: [PATCH] streamrpc 0.3 + config from daemon/config --- Gopkg.toml | 2 +- config/config.go | 67 +++++++++++++---------- config/config_rpc_test.go | 23 ++++++-- daemon/connecter/connecter.go | 35 ++++++++++-- daemon/job/job.go | 11 ---- daemon/job/push.go | 12 ++-- daemon/job/sink.go | 5 +- daemon/serve/serve.go | 29 ++++++++-- daemon/streamrpcconfig/streamrpcconfig.go | 22 ++++++++ 9 files changed, 144 insertions(+), 62 deletions(-) create mode 100644 daemon/streamrpcconfig/streamrpcconfig.go diff --git a/Gopkg.toml b/Gopkg.toml index 49065ab..b08613e 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -69,4 +69,4 @@ ignored = [ "github.com/inconshreveable/mousetrap" ] [[constraint]] name = "github.com/problame/go-streamrpc" - version = "0.2.0" + version = "0.3.0" diff --git a/config/config.go b/config/config.go index d6a334f..d312f57 100644 --- a/config/config.go +++ b/config/config.go @@ -6,15 +6,15 @@ import ( "github.com/zrepl/yaml-config" "io/ioutil" "os" + "reflect" "regexp" "strconv" "time" - "reflect" ) type Config struct { Jobs []JobEnum `yaml:"jobs"` - Global *Global `yaml:"global,optional,fromdefaults"` + Global *Global `yaml:"global,optional,fromdefaults"` } type JobEnum struct { @@ -124,10 +124,10 @@ var _ yaml.Defaulter = &LoggingOutletEnumList{} type Global struct { Logging *LoggingOutletEnumList `yaml:"logging,optional,fromdefaults"` - Monitoring []MonitoringEnum `yaml:"monitoring,optional"` - Control *GlobalControl `yaml:"control,optional,fromdefaults"` - Serve *GlobalServe `yaml:"serve,optional,fromdefaults"` - RPC *RPCConfig `yaml:"rpc,optional,fromdefaults"` + Monitoring []MonitoringEnum `yaml:"monitoring,optional"` + Control *GlobalControl `yaml:"control,optional,fromdefaults"` + Serve *GlobalServe `yaml:"serve,optional,fromdefaults"` + RPC *RPCConfig `yaml:"rpc,optional,fromdefaults"` } func Default(i interface{}) { @@ -143,38 +143,40 @@ func Default(i interface{}) { } type RPCConfig struct { - Timeout time.Duration `yaml:"timeout,optional,positive,default=10s"` - TxChunkSize uint `yaml:"tx_chunk_size,optional,default=32768"` - RxStructuredMaxLen uint `yaml:"rx_structured_max,optional,default=16777216"` - RxStreamChunkMaxLen uint `yaml:"rx_stream_chunk_max,optional,default=16777216"` - RxHeaderMaxLen uint `yaml:"rx_header_max,optional,default=40960"` + Timeout time.Duration `yaml:"timeout,optional,positive,default=10s"` + TxChunkSize uint32 `yaml:"tx_chunk_size,optional,default=32768"` + RxStructuredMaxLen uint32 `yaml:"rx_structured_max,optional,default=16777216"` + RxStreamChunkMaxLen uint32 `yaml:"rx_stream_chunk_max,optional,default=16777216"` + RxHeaderMaxLen uint32 `yaml:"rx_header_max,optional,default=40960"` } type ConnectEnum struct { Ret interface{} } +type ConnectCommon struct { + Type string `yaml:"type"` + RPC *RPCConfig `yaml:"rpc,optional"` +} + type TCPConnect struct { - Type string `yaml:"type"` - RPC *RPCConfig `yaml:"rpc,optional"` - Address string `yaml:"address"` - DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"` + ConnectCommon `yaml:",inline"` + Address string `yaml:"address"` + DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"` } type TLSConnect struct { - Type string `yaml:"type"` - RPC *RPCConfig `yaml:"rpc,optional"` - Address string `yaml:"address"` - Ca string `yaml:"ca"` - Cert string `yaml:"cert"` - Key string `yaml:"key"` - ServerCN string `yaml:"server_cn"` - DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"` + ConnectCommon `yaml:",inline"` + Address string `yaml:"address"` + Ca string `yaml:"ca"` + Cert string `yaml:"cert"` + Key string `yaml:"key"` + ServerCN string `yaml:"server_cn"` + DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"` } type SSHStdinserverConnect struct { - Type string `yaml:"type"` - RPC *RPCConfig `yaml:"rpc,optional"` + ConnectCommon `yaml:",inline"` Host string `yaml:"host"` User string `yaml:"user"` Port uint16 `yaml:"port"` @@ -189,14 +191,19 @@ type ServeEnum struct { Ret interface{} } +type ServeCommon struct { + Type string `yaml:"type"` + RPC *RPCConfig `yaml:"rpc,optional"` +} + type TCPServe struct { - Type string `yaml:"type"` - Listen string `yaml:"listen"` - Clients map[string]string `yaml:"clients"` + ServeCommon `yaml:",inline"` + Listen string `yaml:"listen"` + Clients map[string]string `yaml:"clients"` } type TLSServe struct { - Type string `yaml:"type"` + ServeCommon `yaml:",inline"` Listen string `yaml:"listen"` Ca string `yaml:"ca"` Cert string `yaml:"cert"` @@ -206,7 +213,7 @@ type TLSServe struct { } type StdinserverServer struct { - Type string `yaml:"type"` + ServeCommon `yaml:",inline"` ClientIdentity string `yaml:"client_identity"` } diff --git a/config/config_rpc_test.go b/config/config_rpc_test.go index 0fd2a4c..fa041b4 100644 --- a/config/config_rpc_test.go +++ b/config/config_rpc_test.go @@ -1,12 +1,12 @@ package config import ( - "testing" "github.com/stretchr/testify/assert" + "testing" "time" ) -func TestRPC (t *testing.T) { +func TestRPC(t *testing.T) { conf := testValidConfig(t, ` jobs: - name: pull_servers @@ -42,10 +42,25 @@ jobs: keep_receiver: - type: last_n count: 100 + +- type: sink + name: "laptop_sink" + replication: + root_dataset: "pool2/backup_laptops" + serve: + type: tcp + listen: "192.168.122.189:8888" + clients: { + "10.23.42.23":"client1" + } + rpc: + rx_structured_max: 0x2342 + `) assert.Equal(t, 20*time.Second, conf.Jobs[0].Ret.(*PullJob).Replication.Connect.Ret.(*TCPConnect).RPC.Timeout) - assert.Equal(t, uint(0xabcd), conf.Jobs[1].Ret.(*PullJob).Replication.Connect.Ret.(*TCPConnect).RPC.TxChunkSize) + assert.Equal(t, uint32(0xabcd), conf.Jobs[1].Ret.(*PullJob).Replication.Connect.Ret.(*TCPConnect).RPC.TxChunkSize) + assert.Equal(t, uint32(0x2342), conf.Jobs[2].Ret.(*SinkJob).Replication.Serve.Ret.(*TCPServe).RPC.RxStructuredMaxLen) defConf := RPCConfig{} Default(&defConf) assert.Equal(t, defConf.Timeout, conf.Global.RPC.Timeout) @@ -56,6 +71,6 @@ func TestGlobal_DefaultRPCConfig(t *testing.T) { var c RPCConfig Default(&c) assert.NotNil(t, c) - assert.Equal(t, c.TxChunkSize, uint(1)<<15) + assert.Equal(t, c.TxChunkSize, uint32(1)<<15) }) } diff --git a/daemon/connecter/connecter.go b/daemon/connecter/connecter.go index a5d95ad..e6803ce 100644 --- a/daemon/connecter/connecter.go +++ b/daemon/connecter/connecter.go @@ -4,17 +4,44 @@ import ( "fmt" "github.com/problame/go-streamrpc" "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/streamrpcconfig" ) -func FromConfig(g *config.Global, in config.ConnectEnum) (streamrpc.Connecter, error) { +func FromConfig(g *config.Global, in config.ConnectEnum) (*ClientFactory, error) { + var ( + connecter streamrpc.Connecter + errConnecter, errRPC error + connConf *streamrpc.ConnConfig + ) switch v := in.Ret.(type) { case *config.SSHStdinserverConnect: - return SSHStdinserverConnecterFromConfig(v) + connecter, errConnecter = SSHStdinserverConnecterFromConfig(v) + connConf, errRPC = streamrpcconfig.FromDaemonConfig(g, v.RPC) case *config.TCPConnect: - return TCPConnecterFromConfig(v) + connecter, errConnecter = TCPConnecterFromConfig(v) + connConf, errRPC = streamrpcconfig.FromDaemonConfig(g, v.RPC) case *config.TLSConnect: - return TLSConnecterFromConfig(v) + connecter, errConnecter = TLSConnecterFromConfig(v) + connConf, errRPC = streamrpcconfig.FromDaemonConfig(g, v.RPC) default: panic(fmt.Sprintf("implementation error: unknown connecter type %T", v)) } + + if errConnecter != nil { + return nil, errConnecter + } + if errRPC != nil { + return nil, errRPC + } + + return &ClientFactory{connecter: connecter, config: &streamrpc.ClientConfig{connConf}}, nil +} + +type ClientFactory struct { + connecter streamrpc.Connecter + config *streamrpc.ClientConfig +} + +func (f ClientFactory) NewClient() (*streamrpc.Client, error) { + return streamrpc.NewClient(f.connecter, f.config) } diff --git a/daemon/job/job.go b/daemon/job/job.go index b95f852..12f738e 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -3,9 +3,7 @@ package job import ( "context" "errors" - "github.com/problame/go-streamrpc" "github.com/zrepl/zrepl/logger" - "time" ) type Logger = logger.Logger @@ -59,12 +57,3 @@ func WaitWakeup(ctx context.Context) <-chan struct{} { return wc } -var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurability - RxHeaderMaxLen: 4096, - RxStructuredMaxLen: 4096 * 4096, - RxStreamMaxChunkSize: 4096 * 4096, - TxChunkSize: 4096 * 4096, - Timeout: streamrpc.Timeout{ - Progress: 10 * time.Second, - }, -} diff --git a/daemon/job/push.go b/daemon/job/push.go index 088e863..5c7004f 100644 --- a/daemon/job/push.go +++ b/daemon/job/push.go @@ -3,7 +3,6 @@ package job import ( "context" "github.com/pkg/errors" - "github.com/problame/go-streamrpc" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/connecter" "github.com/zrepl/zrepl/daemon/filters" @@ -16,7 +15,7 @@ import ( type Push struct { name string - connecter streamrpc.Connecter + clientFactory *connecter.ClientFactory fsfilter endpoint.FSFilter prunerFactory *pruner.PrunerFactory @@ -30,7 +29,10 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) { j = &Push{} j.name = in.Name - j.connecter, err = connecter.FromConfig(g, in.Replication.Connect) + j.clientFactory, err = connecter.FromConfig(g, in.Replication.Connect) + if err != nil { + return nil, errors.Wrap(err, "cannot build client") + } if j.fsfilter, err = filters.DatasetMapFilterFromConfig(in.Replication.Filesystems); err != nil { return nil, errors.Wrap(err, "cannnot build filesystem filter") @@ -87,9 +89,9 @@ func (j *Push) do(ctx context.Context) { log := GetLogger(ctx) ctx = logging.WithSubsystemLoggers(ctx, log) - client, err := streamrpc.NewClient(j.connecter, &streamrpc.ClientConfig{STREAMRPC_CONFIG}) + client, err := j.clientFactory.NewClient() if err != nil { - log.WithError(err).Error("cannot create streamrpc client") + log.WithError(err).Error("factory cannot instantiate streamrpc client") } defer client.Close(ctx) diff --git a/daemon/job/sink.go b/daemon/job/sink.go index 3c20ae3..6afbb5d 100644 --- a/daemon/job/sink.go +++ b/daemon/job/sink.go @@ -15,6 +15,7 @@ import ( type Sink struct { name string l serve.ListenerFactory + rpcConf *streamrpc.ConnConfig fsmap endpoint.FSMap fsmapInv endpoint.FSFilter } @@ -24,7 +25,7 @@ func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) { // FIXME multi client support s = &Sink{name: in.Name} - if s.l, err = serve.FromConfig(g, in.Replication.Serve); err != nil { + if s.l, s.rpcConf, err = serve.FromConfig(g, in.Replication.Serve); err != nil { return nil, errors.Wrap(err, "cannot build server") } @@ -95,7 +96,7 @@ func (j *Sink) handleConnection(ctx context.Context, conn net.Conn) { } handler := endpoint.NewHandler(local) - if err := streamrpc.ServeConn(ctx, conn, STREAMRPC_CONFIG, handler.Handle); err != nil { + if err := streamrpc.ServeConn(ctx, conn, j.rpcConf, handler.Handle); err != nil { log.WithError(err).Error("error serving client") } } diff --git a/daemon/serve/serve.go b/daemon/serve/serve.go index 4df8488..fa7bb4a 100644 --- a/daemon/serve/serve.go +++ b/daemon/serve/serve.go @@ -4,23 +4,42 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/config" "net" + "github.com/zrepl/zrepl/daemon/streamrpcconfig" + "github.com/problame/go-streamrpc" ) type ListenerFactory interface { Listen() (net.Listener, error) } -func FromConfig(g *config.Global, in config.ServeEnum) (ListenerFactory, error) { +func FromConfig(g *config.Global, in config.ServeEnum) (lf ListenerFactory, conf *streamrpc.ConnConfig, _ error) { + var ( + lfError, rpcErr error + ) switch v := in.Ret.(type) { case *config.TCPServe: - return TCPListenerFactoryFromConfig(g, v) + lf, lfError = TCPListenerFactoryFromConfig(g, v) + conf, rpcErr = streamrpcconfig.FromDaemonConfig(g, v.RPC) case *config.TLSServe: - return TLSListenerFactoryFromConfig(g, v) + lf, lfError = TLSListenerFactoryFromConfig(g, v) + conf, rpcErr = streamrpcconfig.FromDaemonConfig(g, v.RPC) case *config.StdinserverServer: - return StdinserverListenerFactoryFromConfig(g, v) + lf, lfError = StdinserverListenerFactoryFromConfig(g, v) + conf, rpcErr = streamrpcconfig.FromDaemonConfig(g, v.RPC) default: - return nil, errors.Errorf("internal error: unknown serve type %T", v) + return nil, nil, errors.Errorf("internal error: unknown serve type %T", v) } + if lfError != nil { + return nil, nil, lfError + } + if rpcErr != nil { + return nil, nil, rpcErr + } + + return lf, conf, nil + } + + diff --git a/daemon/streamrpcconfig/streamrpcconfig.go b/daemon/streamrpcconfig/streamrpcconfig.go new file mode 100644 index 0000000..3292577 --- /dev/null +++ b/daemon/streamrpcconfig/streamrpcconfig.go @@ -0,0 +1,22 @@ +package streamrpcconfig + +import ( + "github.com/problame/go-streamrpc" + "github.com/zrepl/zrepl/config" +) + +func FromDaemonConfig(g *config.Global, in *config.RPCConfig) (*streamrpc.ConnConfig, error) { + conf := in + if conf == nil { + conf = g.RPC + } + return &streamrpc.ConnConfig{ + RxHeaderMaxLen: conf.RxHeaderMaxLen, + RxStructuredMaxLen: conf.RxStructuredMaxLen, + RxStreamMaxChunkSize: conf.RxStreamChunkMaxLen, + TxChunkSize: conf.TxChunkSize, + Timeout: streamrpc.Timeout{ + Progress: conf.Timeout, + }, + }, nil +}