transport/local: configurable dial_timeout for connect, default 2s

This commit is contained in:
Christian Schwarz 2019-09-29 16:58:18 +02:00
parent a6b578b648
commit 5c95c21727
3 changed files with 19 additions and 5 deletions

View File

@ -233,6 +233,7 @@ type LocalConnect struct {
ConnectCommon `yaml:",inline"` ConnectCommon `yaml:",inline"`
ListenerName string `yaml:"listener_name"` ListenerName string `yaml:"listener_name"`
ClientIdentity string `yaml:"client_identity"` ClientIdentity string `yaml:"client_identity"`
DialTimeout time.Duration `yaml:"dial_timeout,zeropositive,default=2s"`
} }
type ServeEnum struct { type ServeEnum struct {

View File

@ -345,5 +345,6 @@ The ``client_identity`` is used by the sink as documented above.
type: local type: local
listener_name: localsink listener_name: localsink
client_identity: local_backup client_identity: local_backup
dial_timeout: 2s # optional, 0 for no timeout
... ...

View File

@ -12,6 +12,7 @@ import (
type LocalConnecter struct { type LocalConnecter struct {
listenerName string listenerName string
clientIdentity string clientIdentity string
dialTimeout time.Duration
} }
func LocalConnecterFromConfig(in *config.LocalConnect) (*LocalConnecter, error) { func LocalConnecterFromConfig(in *config.LocalConnect) (*LocalConnecter, error) {
@ -21,13 +22,24 @@ func LocalConnecterFromConfig(in *config.LocalConnect) (*LocalConnecter, error)
if in.ListenerName == "" { if in.ListenerName == "" {
return nil, fmt.Errorf("ListenerName must not be empty") return nil, fmt.Errorf("ListenerName must not be empty")
} }
return &LocalConnecter{listenerName: in.ListenerName, clientIdentity: in.ClientIdentity}, nil if in.DialTimeout < 0 {
return nil, fmt.Errorf("DialTimeout must be zero or positive")
}
cn := &LocalConnecter{
listenerName: in.ListenerName,
clientIdentity: in.ClientIdentity,
dialTimeout: in.DialTimeout,
}
return cn, nil
} }
func (c *LocalConnecter) Connect(dialCtx context.Context) (transport.Wire, error) { func (c *LocalConnecter) Connect(dialCtx context.Context) (transport.Wire, error) {
l := GetLocalListener(c.listenerName) l := GetLocalListener(c.listenerName)
dialCtx, cancel := context.WithTimeout(dialCtx, 1*time.Second) // fail fast, config error by user is very likely if c.dialTimeout > 0 {
ctx, cancel := context.WithTimeout(dialCtx, c.dialTimeout)
defer cancel() defer cancel()
dialCtx = ctx // shadow
}
w, err := l.Connect(dialCtx, c.clientIdentity) w, err := l.Connect(dialCtx, c.clientIdentity)
if err == context.DeadlineExceeded { if err == context.DeadlineExceeded {
return nil, fmt.Errorf("local listener %q not reachable", c.listenerName) return nil, fmt.Errorf("local listener %q not reachable", c.listenerName)