From 29c6c86c009a25c26dde4639766f171164f697f3 Mon Sep 17 00:00:00 2001 From: Ivan Andreev Date: Tue, 24 Aug 2021 17:37:04 +0300 Subject: [PATCH] ftp: fix timeout after long uploads #5596 --- backend/ftp/ftp.go | 15 ++++- backend/ftp/ftp_internal_test.go | 98 ++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 backend/ftp/ftp_internal_test.go diff --git a/backend/ftp/ftp.go b/backend/ftp/ftp.go index 7a9e68e79..330596e31 100644 --- a/backend/ftp/ftp.go +++ b/backend/ftp/ftp.go @@ -128,6 +128,11 @@ Enabled by default. Use 0 to disable.`, Help: "Disable TLS 1.3 (workaround for FTP servers with buggy TLS)", Default: false, Advanced: true, + }, { + Name: "shut_timeout", + Help: "Maximum time to wait for data connection closing status.", + Default: fs.Duration(60 * time.Second), + Advanced: true, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -166,6 +171,7 @@ type Options struct { DisableMLSD bool `config:"disable_mlsd"` IdleTimeout fs.Duration `config:"idle_timeout"` CloseTimeout fs.Duration `config:"close_timeout"` + ShutTimeout fs.Duration `config:"shut_timeout"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -311,6 +317,9 @@ func (f *Fs) ftpConnection(ctx context.Context) (c *ftp.ServerConn, err error) { if f.opt.DisableMLSD { ftpConfig = append(ftpConfig, ftp.DialWithDisabledMLSD(true)) } + if f.opt.ShutTimeout != 0 && f.opt.ShutTimeout != fs.DurationOff { + ftpConfig = append(ftpConfig, ftp.DialWithShutTimeout(time.Duration(f.opt.ShutTimeout))) + } if f.ci.Dump&(fs.DumpHeaders|fs.DumpBodies|fs.DumpRequests|fs.DumpResponses) != 0 { ftpConfig = append(ftpConfig, ftp.DialWithDebugOutput(&debugLog{auth: f.ci.Dump&fs.DumpAuth != 0})) } @@ -990,7 +999,11 @@ func (f *ftpReadCloser) Close() error { errchan <- f.rc.Close() }() // Wait for Close for up to 60 seconds by default - timer := time.NewTimer(time.Duration(f.f.opt.CloseTimeout)) + closeTimeout := f.f.opt.CloseTimeout + if closeTimeout == 0 { + closeTimeout = fs.DurationOff + } + timer := time.NewTimer(time.Duration(closeTimeout)) select { case err = <-errchan: timer.Stop() diff --git a/backend/ftp/ftp_internal_test.go b/backend/ftp/ftp_internal_test.go new file mode 100644 index 000000000..ada0e9fb1 --- /dev/null +++ b/backend/ftp/ftp_internal_test.go @@ -0,0 +1,98 @@ +package ftp + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/object" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/fstest/fstests" + "github.com/rclone/rclone/lib/readers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type settings map[string]interface{} + +func deriveFs(ctx context.Context, t *testing.T, f fs.Fs, opts settings) fs.Fs { + fsName := strings.Split(f.Name(), "{")[0] // strip off hash + configMap := configmap.Simple{} + for key, val := range opts { + configMap[key] = fmt.Sprintf("%v", val) + } + remote := fmt.Sprintf("%s,%s:%s", fsName, configMap.String(), f.Root()) + fixFs, err := fs.NewFs(ctx, remote) + require.NoError(t, err) + return fixFs +} + +// test that big file uploads do not cause network i/o timeout +func (f *Fs) testUploadTimeout(t *testing.T) { + const ( + fileSize = 100000000 // 100 MiB + idleTimeout = 40 * time.Millisecond // small because test server is local + maxTime = 5 * time.Second // prevent test hangup + ) + + if testing.Short() { + t.Skip("not running with -short") + } + + ctx := context.Background() + ci := fs.GetConfig(ctx) + saveLowLevelRetries := ci.LowLevelRetries + saveTimeout := ci.Timeout + defer func() { + ci.LowLevelRetries = saveLowLevelRetries + ci.Timeout = saveTimeout + }() + ci.LowLevelRetries = 1 + ci.Timeout = idleTimeout + + upload := func(concurrency int, shutTimeout time.Duration) (obj fs.Object, err error) { + fixFs := deriveFs(ctx, t, f, settings{ + "concurrency": concurrency, + "shut_timeout": shutTimeout, + }) + + // Make test object + fileTime := fstest.Time("2020-03-08T09:30:00.000000000Z") + meta := object.NewStaticObjectInfo("upload-timeout.test", fileTime, int64(fileSize), true, nil, nil) + data := readers.NewPatternReader(int64(fileSize)) + + // Run upload and ensure maximum time + done := make(chan bool) + deadline := time.After(maxTime) + go func() { + obj, err = fixFs.Put(ctx, data, meta) + done <- true + }() + select { + case <-done: + case <-deadline: + t.Fatalf("Upload got stuck for %v !", maxTime) + } + + return obj, err + } + + // non-zero shut_timeout should fix i/o errors + obj, err := upload(f.opt.Concurrency, time.Second) + assert.NoError(t, err) + assert.NotNil(t, obj) + if obj != nil { + _ = obj.Remove(ctx) + } +} + +// InternalTest dispatches all internal tests +func (f *Fs) InternalTest(t *testing.T) { + t.Run("UploadTimeout", f.testUploadTimeout) +} + +var _ fstests.InternalTester = (*Fs)(nil)