transport local: named listeners + struct renaming

This commit is contained in:
Christian Schwarz 2018-10-11 13:06:47 +02:00
parent 976c1f3929
commit 01668a989e
4 changed files with 49 additions and 21 deletions

View File

@ -163,6 +163,7 @@ type SSHStdinserverConnect struct {
type LocalConnect struct {
ConnectCommon `yaml:",inline"`
ListenerName string `yaml:"listener_name"`
ClientIdentity string `yaml:"client_identity"`
}
@ -198,6 +199,7 @@ type StdinserverServer struct {
type LocalServe struct {
ServeCommon `yaml:",inline"`
ListenerName string `yaml:"listener_name"`
}
type PruningEnum struct {

View File

@ -5,11 +5,13 @@ jobs:
root_dataset: "storage/zrepl/sink"
serve:
type: local
listener_name: localsink
- type: push
name: "backup_system"
connect:
type: local
listener_name: localsink
client_identity: local_backup
filesystems: {
"system<": true,

View File

@ -9,6 +9,7 @@ import (
)
type LocalConnecter struct {
listenerName string
clientIdentity string
}
@ -16,11 +17,14 @@ func LocalConnecterFromConfig(in *config.LocalConnect) (*LocalConnecter, error)
if in.ClientIdentity == "" {
return nil, fmt.Errorf("ClientIdentity must not be empty")
}
return &LocalConnecter{in.ClientIdentity}, nil
if in.ListenerName == "" {
return nil, fmt.Errorf("ListenerName must not be empty")
}
return &LocalConnecter{listenerName: in.ListenerName, clientIdentity: in.ClientIdentity}, nil
}
func (c *LocalConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) {
switchboard := serve.GetLocalListenerSwitchboard()
return switchboard.DialContext(dialCtx, c.clientIdentity)
l := serve.GetLocalListener(c.listenerName)
return l.Connect(dialCtx, c.clientIdentity)
}

View File

@ -10,18 +10,28 @@ import (
"sync"
)
var localListenerSwitchboardSingleton struct {
s *LocalListenerSwitchboard
once sync.Once
var localListeners struct {
m map[string]*LocalListener // listenerName -> listener
init sync.Once
mtx sync.Mutex
}
func GetLocalListenerSwitchboard() (*LocalListenerSwitchboard) {
localListenerSwitchboardSingleton.once.Do(func() {
localListenerSwitchboardSingleton.s = &LocalListenerSwitchboard{
connects: make(chan connectRequest),
}
func GetLocalListener(listenerName string) (*LocalListener) {
localListeners.init.Do(func() {
localListeners.m = make(map[string]*LocalListener)
})
return localListenerSwitchboardSingleton.s
localListeners.mtx.Lock()
defer localListeners.mtx.Unlock()
l, ok := localListeners.m[listenerName]
if !ok {
l = newLocalListener()
localListeners.m[listenerName] = l
}
return l
}
type connectRequest struct {
@ -34,11 +44,18 @@ type connectResult struct {
err error
}
type LocalListenerSwitchboard struct {
type LocalListener struct {
connects chan connectRequest
}
func (l *LocalListenerSwitchboard) DialContext(dialCtx context.Context, clientIdentity string) (conn net.Conn, err error) {
func newLocalListener() *LocalListener {
return &LocalListener{
connects: make(chan connectRequest),
}
}
// Connect to the LocalListener from a client with identity clientIdentity
func (l *LocalListener) Connect(dialCtx context.Context, clientIdentity string) (conn net.Conn, err error) {
// place request
req := connectRequest{
@ -71,7 +88,7 @@ func (localAddr) Network() string { return "local" }
func (a localAddr) String() string { return a.S }
func (l *LocalListenerSwitchboard) Addr() (net.Addr) { return localAddr{"<listening>"} }
func (l *LocalListener) Addr() (net.Addr) { return localAddr{"<listening>"} }
type localConn struct {
net.Conn
@ -80,7 +97,7 @@ type localConn struct {
func (l localConn) ClientIdentity() string { return l.clientIdentity }
func (l *LocalListenerSwitchboard) Accept(ctx context.Context) (AuthenticatedConn, error) {
func (l *LocalListener) Accept(ctx context.Context) (AuthenticatedConn, error) {
respondToRequest := func(req connectRequest, res connectResult) (err error) {
getLogger(ctx).
WithField("res.conn", res.conn).WithField("res.err", res.err).
@ -187,7 +204,7 @@ func makeSocketpairConn() (a, b net.Conn, err error) {
return a, b, nil
}
func (l *LocalListenerSwitchboard) Close() error {
func (l *LocalListener) Close() error {
// FIXME: make sure concurrent Accepts return with error, and further Accepts return that error, too
// Example impl: for each accept, do context.WithCancel, and store the cancel in a list
// When closing, set a member variable to state=closed, make sure accept will exit early
@ -197,15 +214,18 @@ func (l *LocalListenerSwitchboard) Close() error {
}
type LocalListenerFactory struct {
clients []string
listenerName string
}
func LocalListenerFactoryFromConfig(g *config.Global, in *config.LocalServe) (f *LocalListenerFactory, err error) {
return &LocalListenerFactory{}, nil
if in.ListenerName == "" {
return nil, fmt.Errorf("ListenerName must not be empty")
}
return &LocalListenerFactory{listenerName: in.ListenerName}, nil
}
func (*LocalListenerFactory) Listen() (AuthenticatedListener, error) {
return GetLocalListenerSwitchboard(), nil
func (lf *LocalListenerFactory) Listen() (AuthenticatedListener, error) {
return GetLocalListener(lf.listenerName), nil
}