From 93d85015af8d68bb9eaf6fd22bb0a0e9ebafcb1d Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 6 Oct 2021 11:50:35 +0100 Subject: [PATCH] sftp: fix timeout when doing MD5SUM of large file Before this change we were timing out MD5SUMs after 1 minute because rclone was closing the SSH session when there were sessions still aftive. This change counts sessions active for all SSH sessions now (Upload, Download, Hashes and running commands). See: https://forum.rclone.org/t/while-rclone-copying-large-files-md5sum-failed-with-exit-status/26845/ --- backend/sftp/sftp.go | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/backend/sftp/sftp.go b/backend/sftp/sftp.go index 4f063c75b..af2d0abbe 100644 --- a/backend/sftp/sftp.go +++ b/backend/sftp/sftp.go @@ -300,7 +300,7 @@ type Fs struct { drain *time.Timer // used to drain the pool when we stop using the connections pacer *fs.Pacer // pacer for operations savedpswd string - transfers int32 // count in use references + sessions int32 // count in use sessions } // Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading) @@ -363,21 +363,21 @@ func (c *conn) closed() error { return nil } -// Show that we are doing an upload or download +// Show that we are using an ssh session // -// Call removeTransfer() when done -func (f *Fs) addTransfer() { - atomic.AddInt32(&f.transfers, 1) +// Call removeSession() when done +func (f *Fs) addSession() { + atomic.AddInt32(&f.sessions, 1) } -// Show the upload or download done -func (f *Fs) removeTransfer() { - atomic.AddInt32(&f.transfers, -1) +// Show the ssh session is no longer in use +func (f *Fs) removeSession() { + atomic.AddInt32(&f.sessions, -1) } -// getTransfers shows whether there are any transfers in progress -func (f *Fs) getTransfers() int32 { - return atomic.LoadInt32(&f.transfers) +// getSessions shows whether there are any sessions in use +func (f *Fs) getSessions() int32 { + return atomic.LoadInt32(&f.sessions) } // Open a new connection to the SFTP server. @@ -506,8 +506,8 @@ func (f *Fs) putSftpConnection(pc **conn, err error) { func (f *Fs) drainPool(ctx context.Context) (err error) { f.poolMu.Lock() defer f.poolMu.Unlock() - if transfers := f.getTransfers(); transfers != 0 { - fs.Debugf(f, "Not closing %d unused connections as %d transfers in progress", len(f.pool), transfers) + if sessions := f.getSessions(); sessions != 0 { + fs.Debugf(f, "Not closing %d unused connections as %d sessions active", len(f.pool), sessions) if f.opt.IdleTimeout > 0 { f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer } @@ -1093,6 +1093,9 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string // run runds cmd on the remote end returning standard output func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) { + f.addSession() // Show session in use + defer f.removeSession() + c, err := f.getSftpConnection(ctx) if err != nil { return nil, errors.Wrap(err, "run: get SFTP connection") @@ -1231,6 +1234,8 @@ func (o *Object) Remote() string { // Hash returns the selected checksum of the file // If no checksum is available it returns "" func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) { + o.fs.addSession() // Show session in use + defer o.fs.removeSession() if o.fs.opt.DisableHashCheck { return "", nil } @@ -1434,7 +1439,7 @@ func (f *Fs) newObjectReader(sftpFile *sftp.File) *objectReader { done: make(chan struct{}), } // Show connection in use - f.addTransfer() + f.addSession() go func() { // Use sftpFile.WriteTo to pump data so that it gets a @@ -1465,7 +1470,7 @@ func (file *objectReader) Close() (err error) { // Wait for the background process to finish <-file.done // Show connection no longer in use - file.f.removeTransfer() + file.f.removeSession() return err } @@ -1518,8 +1523,8 @@ func (sr *sizeReader) Size() int64 { // Update a remote sftp file using the data and ModTime from func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - o.fs.addTransfer() // Show transfer in progress - defer o.fs.removeTransfer() + o.fs.addSession() // Show session in use + defer o.fs.removeSession() // Clear the hash cache since we are about to update the object o.md5sum = nil o.sha1sum = nil