streamrpc 0.3 + config from daemon/config

This commit is contained in:
Christian Schwarz 2018-08-31 21:51:44 +02:00
parent d55a271ac7
commit 3d8e552c6a
9 changed files with 144 additions and 62 deletions

View File

@ -69,4 +69,4 @@ ignored = [ "github.com/inconshreveable/mousetrap" ]
[[constraint]] [[constraint]]
name = "github.com/problame/go-streamrpc" name = "github.com/problame/go-streamrpc"
version = "0.2.0" version = "0.3.0"

View File

@ -6,15 +6,15 @@ import (
"github.com/zrepl/yaml-config" "github.com/zrepl/yaml-config"
"io/ioutil" "io/ioutil"
"os" "os"
"reflect"
"regexp" "regexp"
"strconv" "strconv"
"time" "time"
"reflect"
) )
type Config struct { type Config struct {
Jobs []JobEnum `yaml:"jobs"` Jobs []JobEnum `yaml:"jobs"`
Global *Global `yaml:"global,optional,fromdefaults"` Global *Global `yaml:"global,optional,fromdefaults"`
} }
type JobEnum struct { type JobEnum struct {
@ -124,10 +124,10 @@ var _ yaml.Defaulter = &LoggingOutletEnumList{}
type Global struct { type Global struct {
Logging *LoggingOutletEnumList `yaml:"logging,optional,fromdefaults"` Logging *LoggingOutletEnumList `yaml:"logging,optional,fromdefaults"`
Monitoring []MonitoringEnum `yaml:"monitoring,optional"` Monitoring []MonitoringEnum `yaml:"monitoring,optional"`
Control *GlobalControl `yaml:"control,optional,fromdefaults"` Control *GlobalControl `yaml:"control,optional,fromdefaults"`
Serve *GlobalServe `yaml:"serve,optional,fromdefaults"` Serve *GlobalServe `yaml:"serve,optional,fromdefaults"`
RPC *RPCConfig `yaml:"rpc,optional,fromdefaults"` RPC *RPCConfig `yaml:"rpc,optional,fromdefaults"`
} }
func Default(i interface{}) { func Default(i interface{}) {
@ -143,38 +143,40 @@ func Default(i interface{}) {
} }
type RPCConfig struct { type RPCConfig struct {
Timeout time.Duration `yaml:"timeout,optional,positive,default=10s"` Timeout time.Duration `yaml:"timeout,optional,positive,default=10s"`
TxChunkSize uint `yaml:"tx_chunk_size,optional,default=32768"` TxChunkSize uint32 `yaml:"tx_chunk_size,optional,default=32768"`
RxStructuredMaxLen uint `yaml:"rx_structured_max,optional,default=16777216"` RxStructuredMaxLen uint32 `yaml:"rx_structured_max,optional,default=16777216"`
RxStreamChunkMaxLen uint `yaml:"rx_stream_chunk_max,optional,default=16777216"` RxStreamChunkMaxLen uint32 `yaml:"rx_stream_chunk_max,optional,default=16777216"`
RxHeaderMaxLen uint `yaml:"rx_header_max,optional,default=40960"` RxHeaderMaxLen uint32 `yaml:"rx_header_max,optional,default=40960"`
} }
type ConnectEnum struct { type ConnectEnum struct {
Ret interface{} Ret interface{}
} }
type ConnectCommon struct {
Type string `yaml:"type"`
RPC *RPCConfig `yaml:"rpc,optional"`
}
type TCPConnect struct { type TCPConnect struct {
Type string `yaml:"type"` ConnectCommon `yaml:",inline"`
RPC *RPCConfig `yaml:"rpc,optional"` Address string `yaml:"address"`
Address string `yaml:"address"` DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"`
DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"`
} }
type TLSConnect struct { type TLSConnect struct {
Type string `yaml:"type"` ConnectCommon `yaml:",inline"`
RPC *RPCConfig `yaml:"rpc,optional"` Address string `yaml:"address"`
Address string `yaml:"address"` Ca string `yaml:"ca"`
Ca string `yaml:"ca"` Cert string `yaml:"cert"`
Cert string `yaml:"cert"` Key string `yaml:"key"`
Key string `yaml:"key"` ServerCN string `yaml:"server_cn"`
ServerCN string `yaml:"server_cn"` DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"`
DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"`
} }
type SSHStdinserverConnect struct { type SSHStdinserverConnect struct {
Type string `yaml:"type"` ConnectCommon `yaml:",inline"`
RPC *RPCConfig `yaml:"rpc,optional"`
Host string `yaml:"host"` Host string `yaml:"host"`
User string `yaml:"user"` User string `yaml:"user"`
Port uint16 `yaml:"port"` Port uint16 `yaml:"port"`
@ -189,14 +191,19 @@ type ServeEnum struct {
Ret interface{} Ret interface{}
} }
type ServeCommon struct {
Type string `yaml:"type"`
RPC *RPCConfig `yaml:"rpc,optional"`
}
type TCPServe struct { type TCPServe struct {
Type string `yaml:"type"` ServeCommon `yaml:",inline"`
Listen string `yaml:"listen"` Listen string `yaml:"listen"`
Clients map[string]string `yaml:"clients"` Clients map[string]string `yaml:"clients"`
} }
type TLSServe struct { type TLSServe struct {
Type string `yaml:"type"` ServeCommon `yaml:",inline"`
Listen string `yaml:"listen"` Listen string `yaml:"listen"`
Ca string `yaml:"ca"` Ca string `yaml:"ca"`
Cert string `yaml:"cert"` Cert string `yaml:"cert"`
@ -206,7 +213,7 @@ type TLSServe struct {
} }
type StdinserverServer struct { type StdinserverServer struct {
Type string `yaml:"type"` ServeCommon `yaml:",inline"`
ClientIdentity string `yaml:"client_identity"` ClientIdentity string `yaml:"client_identity"`
} }

View File

@ -1,12 +1,12 @@
package config package config
import ( import (
"testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"testing"
"time" "time"
) )
func TestRPC (t *testing.T) { func TestRPC(t *testing.T) {
conf := testValidConfig(t, ` conf := testValidConfig(t, `
jobs: jobs:
- name: pull_servers - name: pull_servers
@ -42,10 +42,25 @@ jobs:
keep_receiver: keep_receiver:
- type: last_n - type: last_n
count: 100 count: 100
- type: sink
name: "laptop_sink"
replication:
root_dataset: "pool2/backup_laptops"
serve:
type: tcp
listen: "192.168.122.189:8888"
clients: {
"10.23.42.23":"client1"
}
rpc:
rx_structured_max: 0x2342
`) `)
assert.Equal(t, 20*time.Second, conf.Jobs[0].Ret.(*PullJob).Replication.Connect.Ret.(*TCPConnect).RPC.Timeout) assert.Equal(t, 20*time.Second, conf.Jobs[0].Ret.(*PullJob).Replication.Connect.Ret.(*TCPConnect).RPC.Timeout)
assert.Equal(t, uint(0xabcd), conf.Jobs[1].Ret.(*PullJob).Replication.Connect.Ret.(*TCPConnect).RPC.TxChunkSize) assert.Equal(t, uint32(0xabcd), conf.Jobs[1].Ret.(*PullJob).Replication.Connect.Ret.(*TCPConnect).RPC.TxChunkSize)
assert.Equal(t, uint32(0x2342), conf.Jobs[2].Ret.(*SinkJob).Replication.Serve.Ret.(*TCPServe).RPC.RxStructuredMaxLen)
defConf := RPCConfig{} defConf := RPCConfig{}
Default(&defConf) Default(&defConf)
assert.Equal(t, defConf.Timeout, conf.Global.RPC.Timeout) assert.Equal(t, defConf.Timeout, conf.Global.RPC.Timeout)
@ -56,6 +71,6 @@ func TestGlobal_DefaultRPCConfig(t *testing.T) {
var c RPCConfig var c RPCConfig
Default(&c) Default(&c)
assert.NotNil(t, c) assert.NotNil(t, c)
assert.Equal(t, c.TxChunkSize, uint(1)<<15) assert.Equal(t, c.TxChunkSize, uint32(1)<<15)
}) })
} }

View File

@ -4,17 +4,44 @@ import (
"fmt" "fmt"
"github.com/problame/go-streamrpc" "github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/streamrpcconfig"
) )
func FromConfig(g *config.Global, in config.ConnectEnum) (streamrpc.Connecter, error) { func FromConfig(g *config.Global, in config.ConnectEnum) (*ClientFactory, error) {
var (
connecter streamrpc.Connecter
errConnecter, errRPC error
connConf *streamrpc.ConnConfig
)
switch v := in.Ret.(type) { switch v := in.Ret.(type) {
case *config.SSHStdinserverConnect: case *config.SSHStdinserverConnect:
return SSHStdinserverConnecterFromConfig(v) connecter, errConnecter = SSHStdinserverConnecterFromConfig(v)
connConf, errRPC = streamrpcconfig.FromDaemonConfig(g, v.RPC)
case *config.TCPConnect: case *config.TCPConnect:
return TCPConnecterFromConfig(v) connecter, errConnecter = TCPConnecterFromConfig(v)
connConf, errRPC = streamrpcconfig.FromDaemonConfig(g, v.RPC)
case *config.TLSConnect: case *config.TLSConnect:
return TLSConnecterFromConfig(v) connecter, errConnecter = TLSConnecterFromConfig(v)
connConf, errRPC = streamrpcconfig.FromDaemonConfig(g, v.RPC)
default: default:
panic(fmt.Sprintf("implementation error: unknown connecter type %T", v)) panic(fmt.Sprintf("implementation error: unknown connecter type %T", v))
} }
if errConnecter != nil {
return nil, errConnecter
}
if errRPC != nil {
return nil, errRPC
}
return &ClientFactory{connecter: connecter, config: &streamrpc.ClientConfig{connConf}}, nil
}
type ClientFactory struct {
connecter streamrpc.Connecter
config *streamrpc.ClientConfig
}
func (f ClientFactory) NewClient() (*streamrpc.Client, error) {
return streamrpc.NewClient(f.connecter, f.config)
} }

View File

@ -3,9 +3,7 @@ package job
import ( import (
"context" "context"
"errors" "errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/logger"
"time"
) )
type Logger = logger.Logger type Logger = logger.Logger
@ -59,12 +57,3 @@ func WaitWakeup(ctx context.Context) <-chan struct{} {
return wc return wc
} }
var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurability
RxHeaderMaxLen: 4096,
RxStructuredMaxLen: 4096 * 4096,
RxStreamMaxChunkSize: 4096 * 4096,
TxChunkSize: 4096 * 4096,
Timeout: streamrpc.Timeout{
Progress: 10 * time.Second,
},
}

View File

@ -3,7 +3,6 @@ package job
import ( import (
"context" "context"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/connecter" "github.com/zrepl/zrepl/daemon/connecter"
"github.com/zrepl/zrepl/daemon/filters" "github.com/zrepl/zrepl/daemon/filters"
@ -16,7 +15,7 @@ import (
type Push struct { type Push struct {
name string name string
connecter streamrpc.Connecter clientFactory *connecter.ClientFactory
fsfilter endpoint.FSFilter fsfilter endpoint.FSFilter
prunerFactory *pruner.PrunerFactory prunerFactory *pruner.PrunerFactory
@ -30,7 +29,10 @@ func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) {
j = &Push{} j = &Push{}
j.name = in.Name j.name = in.Name
j.connecter, err = connecter.FromConfig(g, in.Replication.Connect) j.clientFactory, err = connecter.FromConfig(g, in.Replication.Connect)
if err != nil {
return nil, errors.Wrap(err, "cannot build client")
}
if j.fsfilter, err = filters.DatasetMapFilterFromConfig(in.Replication.Filesystems); err != nil { if j.fsfilter, err = filters.DatasetMapFilterFromConfig(in.Replication.Filesystems); err != nil {
return nil, errors.Wrap(err, "cannnot build filesystem filter") return nil, errors.Wrap(err, "cannnot build filesystem filter")
@ -87,9 +89,9 @@ func (j *Push) do(ctx context.Context) {
log := GetLogger(ctx) log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log) ctx = logging.WithSubsystemLoggers(ctx, log)
client, err := streamrpc.NewClient(j.connecter, &streamrpc.ClientConfig{STREAMRPC_CONFIG}) client, err := j.clientFactory.NewClient()
if err != nil { if err != nil {
log.WithError(err).Error("cannot create streamrpc client") log.WithError(err).Error("factory cannot instantiate streamrpc client")
} }
defer client.Close(ctx) defer client.Close(ctx)

View File

@ -15,6 +15,7 @@ import (
type Sink struct { type Sink struct {
name string name string
l serve.ListenerFactory l serve.ListenerFactory
rpcConf *streamrpc.ConnConfig
fsmap endpoint.FSMap fsmap endpoint.FSMap
fsmapInv endpoint.FSFilter fsmapInv endpoint.FSFilter
} }
@ -24,7 +25,7 @@ func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) {
// FIXME multi client support // FIXME multi client support
s = &Sink{name: in.Name} s = &Sink{name: in.Name}
if s.l, err = serve.FromConfig(g, in.Replication.Serve); err != nil { if s.l, s.rpcConf, err = serve.FromConfig(g, in.Replication.Serve); err != nil {
return nil, errors.Wrap(err, "cannot build server") return nil, errors.Wrap(err, "cannot build server")
} }
@ -95,7 +96,7 @@ func (j *Sink) handleConnection(ctx context.Context, conn net.Conn) {
} }
handler := endpoint.NewHandler(local) handler := endpoint.NewHandler(local)
if err := streamrpc.ServeConn(ctx, conn, STREAMRPC_CONFIG, handler.Handle); err != nil { if err := streamrpc.ServeConn(ctx, conn, j.rpcConf, handler.Handle); err != nil {
log.WithError(err).Error("error serving client") log.WithError(err).Error("error serving client")
} }
} }

View File

@ -4,23 +4,42 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/config"
"net" "net"
"github.com/zrepl/zrepl/daemon/streamrpcconfig"
"github.com/problame/go-streamrpc"
) )
type ListenerFactory interface { type ListenerFactory interface {
Listen() (net.Listener, error) Listen() (net.Listener, error)
} }
func FromConfig(g *config.Global, in config.ServeEnum) (ListenerFactory, error) { func FromConfig(g *config.Global, in config.ServeEnum) (lf ListenerFactory, conf *streamrpc.ConnConfig, _ error) {
var (
lfError, rpcErr error
)
switch v := in.Ret.(type) { switch v := in.Ret.(type) {
case *config.TCPServe: case *config.TCPServe:
return TCPListenerFactoryFromConfig(g, v) lf, lfError = TCPListenerFactoryFromConfig(g, v)
conf, rpcErr = streamrpcconfig.FromDaemonConfig(g, v.RPC)
case *config.TLSServe: case *config.TLSServe:
return TLSListenerFactoryFromConfig(g, v) lf, lfError = TLSListenerFactoryFromConfig(g, v)
conf, rpcErr = streamrpcconfig.FromDaemonConfig(g, v.RPC)
case *config.StdinserverServer: case *config.StdinserverServer:
return StdinserverListenerFactoryFromConfig(g, v) lf, lfError = StdinserverListenerFactoryFromConfig(g, v)
conf, rpcErr = streamrpcconfig.FromDaemonConfig(g, v.RPC)
default: default:
return nil, errors.Errorf("internal error: unknown serve type %T", v) return nil, nil, errors.Errorf("internal error: unknown serve type %T", v)
} }
if lfError != nil {
return nil, nil, lfError
}
if rpcErr != nil {
return nil, nil, rpcErr
}
return lf, conf, nil
} }

View File

@ -0,0 +1,22 @@
package streamrpcconfig
import (
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config"
)
func FromDaemonConfig(g *config.Global, in *config.RPCConfig) (*streamrpc.ConnConfig, error) {
conf := in
if conf == nil {
conf = g.RPC
}
return &streamrpc.ConnConfig{
RxHeaderMaxLen: conf.RxHeaderMaxLen,
RxStructuredMaxLen: conf.RxStructuredMaxLen,
RxStreamMaxChunkSize: conf.RxStreamChunkMaxLen,
TxChunkSize: conf.TxChunkSize,
Timeout: streamrpc.Timeout{
Progress: conf.Timeout,
},
}, nil
}