diff --git a/backend/sftp/sftp.go b/backend/sftp/sftp.go index 4f063c75b..af2d0abbe 100644 --- a/backend/sftp/sftp.go +++ b/backend/sftp/sftp.go @@ -300,7 +300,7 @@ type Fs struct { drain *time.Timer // used to drain the pool when we stop using the connections pacer *fs.Pacer // pacer for operations savedpswd string - transfers int32 // count in use references + sessions int32 // count in use sessions } // Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading) @@ -363,21 +363,21 @@ func (c *conn) closed() error { return nil } -// Show that we are doing an upload or download +// Show that we are using an ssh session // -// Call removeTransfer() when done -func (f *Fs) addTransfer() { - atomic.AddInt32(&f.transfers, 1) +// Call removeSession() when done +func (f *Fs) addSession() { + atomic.AddInt32(&f.sessions, 1) } -// Show the upload or download done -func (f *Fs) removeTransfer() { - atomic.AddInt32(&f.transfers, -1) +// Show the ssh session is no longer in use +func (f *Fs) removeSession() { + atomic.AddInt32(&f.sessions, -1) } -// getTransfers shows whether there are any transfers in progress -func (f *Fs) getTransfers() int32 { - return atomic.LoadInt32(&f.transfers) +// getSessions shows whether there are any sessions in use +func (f *Fs) getSessions() int32 { + return atomic.LoadInt32(&f.sessions) } // Open a new connection to the SFTP server. @@ -506,8 +506,8 @@ func (f *Fs) putSftpConnection(pc **conn, err error) { func (f *Fs) drainPool(ctx context.Context) (err error) { f.poolMu.Lock() defer f.poolMu.Unlock() - if transfers := f.getTransfers(); transfers != 0 { - fs.Debugf(f, "Not closing %d unused connections as %d transfers in progress", len(f.pool), transfers) + if sessions := f.getSessions(); sessions != 0 { + fs.Debugf(f, "Not closing %d unused connections as %d sessions active", len(f.pool), sessions) if f.opt.IdleTimeout > 0 { f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer } @@ -1093,6 +1093,9 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string // run runds cmd on the remote end returning standard output func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) { + f.addSession() // Show session in use + defer f.removeSession() + c, err := f.getSftpConnection(ctx) if err != nil { return nil, errors.Wrap(err, "run: get SFTP connection") @@ -1231,6 +1234,8 @@ func (o *Object) Remote() string { // Hash returns the selected checksum of the file // If no checksum is available it returns "" func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) { + o.fs.addSession() // Show session in use + defer o.fs.removeSession() if o.fs.opt.DisableHashCheck { return "", nil } @@ -1434,7 +1439,7 @@ func (f *Fs) newObjectReader(sftpFile *sftp.File) *objectReader { done: make(chan struct{}), } // Show connection in use - f.addTransfer() + f.addSession() go func() { // Use sftpFile.WriteTo to pump data so that it gets a @@ -1465,7 +1470,7 @@ func (file *objectReader) Close() (err error) { // Wait for the background process to finish <-file.done // Show connection no longer in use - file.f.removeTransfer() + file.f.removeSession() return err } @@ -1518,8 +1523,8 @@ func (sr *sizeReader) Size() int64 { // Update a remote sftp file using the data and ModTime from func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - o.fs.addTransfer() // Show transfer in progress - defer o.fs.removeTransfer() + o.fs.addSession() // Show session in use + defer o.fs.removeSession() // Clear the hash cache since we are about to update the object o.md5sum = nil o.sha1sum = nil