job pull + source: fix broken connection teardown

Issue #56 shows zombie SSH processes.
We fix this by actually Close()ing the RWC in job pull.
If this fixes #56 it also fixes #6 --- it's the same issue.

Additionally, debugging around this revealed another issue: just
Close()ing the sshbytestream in job source will apparently outpace the
normal data stream of stdin and stdout (URG or PUSH flags?).  leading
to ugly errors in the logs.
With proper TCP connections, we would simply set the connection to
linger and close it, letting the kernel handle the final timeout. Meh.

refs #56
refs #6
This commit is contained in:
Christian Schwarz 2018-02-16 20:57:27 +01:00
parent 921bccb960
commit 6b5bd0a43c
2 changed files with 34 additions and 3 deletions

View File

@ -139,10 +139,10 @@ func (j *PullJob) doRun(ctx context.Context) {
j.task.Enter("pull") j.task.Enter("pull")
puller := Puller{j.task, client, j.Mapping, j.InitialReplPolicy} puller := Puller{j.task, client, j.Mapping, j.InitialReplPolicy}
puller.Pull() puller.Pull()
closeRPCWithTimeout(j.task, client, time.Second*1, "")
rwc.Close()
j.task.Finish() j.task.Finish()
closeRPCWithTimeout(j.task, client, time.Second*10, "")
j.task.Enter("prune") j.task.Enter("prune")
pruner, err := j.Pruner(j.task, PrunePolicySideDefault, false) pruner, err := j.Pruner(j.task, PrunePolicySideDefault, false)
if err != nil { if err != nil {

View File

@ -197,6 +197,8 @@ func (j *SourceJob) handleConnection(rwc io.ReadWriteCloser, task *Task) {
task.Enter("handle_connection") task.Enter("handle_connection")
defer task.Finish() defer task.Finish()
task.Log().Info("handling client connection")
rwc, err := util.NewReadWriteCloserLogger(rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump) rwc, err := util.NewReadWriteCloserLogger(rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump)
if err != nil { if err != nil {
panic(err) panic(err)
@ -215,5 +217,34 @@ func (j *SourceJob) handleConnection(rwc io.ReadWriteCloser, task *Task) {
if err = rpcServer.Serve(); err != nil { if err = rpcServer.Serve(); err != nil {
task.Log().WithError(err).Error("error serving connection") task.Log().WithError(err).Error("error serving connection")
} }
rwc.Close()
// wait for client to close connection
// FIXME: we cannot just close it like we would to with a TCP socket because
// FIXME: sshbytestream's Close() may overtake the remaining data in the pipe
const CLIENT_HANGUP_TIMEOUT = 1 * time.Second
task.Log().
WithField("timeout", CLIENT_HANGUP_TIMEOUT).
Debug("waiting for client to hang up")
wchan := make(chan error)
go func() {
var pseudo [1]byte
_, err := io.ReadFull(rwc, pseudo[:])
wchan <- err
}()
var werr error
select {
case werr = <-wchan:
// all right
case <-time.After(CLIENT_HANGUP_TIMEOUT):
werr = errors.New("client did not close connection within timeout")
}
if werr != nil && werr != io.EOF {
task.Log().WithError(werr).
Error("error waiting for client to hang up")
}
task.Log().Info("closing client connection")
if err = rwc.Close(); err != nil {
task.Log().WithError(err).Error("error force-closing connection")
}
} }