From 7a6f882750464b26141d66936025f5fe64f5c01c Mon Sep 17 00:00:00 2001 From: Fred Date: Mon, 22 Jul 2024 20:05:28 +0100 Subject: [PATCH] seafile: send large upload by chunks --- backend/seafile/api/types.go | 13 +- backend/seafile/content_range.go | 71 ++++ backend/seafile/content_range_test.go | 86 +++++ backend/seafile/object.go | 79 +++-- backend/seafile/pacer.go | 12 +- backend/seafile/seafile.go | 264 ++++++++++----- backend/seafile/seafile_mock_server_test.go | 307 ++++++++++++++++++ backend/seafile/seafile_reverseproxy_test.go | 162 +++++++++ backend/seafile/seafile_test.go | 24 +- backend/seafile/webapi.go | 222 +++++++++++-- docs/content/seafile.md | 48 +++ fstest/test_all/config.yaml | 5 - fstest/testserver/init.d/TestSeafile | 6 +- fstest/testserver/init.d/TestSeafileEncrypted | 22 +- fstest/testserver/init.d/TestSeafileV6 | 48 --- 15 files changed, 1186 insertions(+), 183 deletions(-) create mode 100644 backend/seafile/content_range.go create mode 100644 backend/seafile/content_range_test.go create mode 100644 backend/seafile/seafile_mock_server_test.go create mode 100644 backend/seafile/seafile_reverseproxy_test.go delete mode 100755 fstest/testserver/init.d/TestSeafileV6 diff --git a/backend/seafile/api/types.go b/backend/seafile/api/types.go index d6224709f..9dc52deec 100644 --- a/backend/seafile/api/types.go +++ b/backend/seafile/api/types.go @@ -26,7 +26,8 @@ type AccountInfo struct { // ServerInfo contains server information type ServerInfo struct { - Version string `json:"version"` + Version string `json:"version"` + Features []string `json:"features"` } // DefaultLibrary when none specified @@ -152,3 +153,13 @@ type BatchSourceDestRequest struct { DstLibraryID string `json:"dst_repo_id"` DstParentDir string `json:"dst_parent_dir"` } + +// FileUploadedBytes contains the JSON response to the "file-uploaded-bytes" API call +type FileUploadedBytes struct { + FileUploadedBytes int64 `json:"uploadedBytes"` +} + +// ChunkUpload contains the result of uploading one part of a file +type ChunkUpload struct { + Success bool `json:"success"` +} diff --git a/backend/seafile/content_range.go b/backend/seafile/content_range.go new file mode 100644 index 000000000..03bf7d248 --- /dev/null +++ b/backend/seafile/content_range.go @@ -0,0 +1,71 @@ +package seafile + +import "fmt" + +type contentRanger interface { + getChunkSize() int64 + getContentRangeHeader() string +} + +type streamedContentRange struct { + size int64 +} + +func newStreamedContentRange(size int64) *streamedContentRange { + return &streamedContentRange{ + size: size, + } +} +func (r *streamedContentRange) getChunkSize() int64 { return r.size } +func (r *streamedContentRange) getContentRangeHeader() string { return "" } + +type chunkedContentRange struct { + start int64 + chunkSize int64 + size int64 +} + +// newChunkedContentRange does not support streaming (unknown size) +func newChunkedContentRange(chunkSize, size int64) *chunkedContentRange { + if size <= 0 { + panic("content range cannot operate on streaming") + } + if chunkSize <= 0 { + panic("content range cannot operate without a chunk size") + } + return &chunkedContentRange{ + start: 0, + chunkSize: chunkSize, + size: size, + } +} + +func (r *chunkedContentRange) getEnd() int64 { + end := r.chunkSize + r.start + if end > r.size { + end = r.size + } + return end +} + +func (r *chunkedContentRange) getChunkSize() int64 { + return r.getEnd() - r.start +} + +// next moves the range to the next frame +// it panics if it was the last chunk +func (r *chunkedContentRange) next() { + r.start += r.chunkSize + if r.start >= r.size { + panic("no more chunk of data") + } +} + +func (r *chunkedContentRange) isLastChunk() bool { + return r.getEnd() == r.size +} + +func (r *chunkedContentRange) getContentRangeHeader() string { + end := r.getEnd() + return fmt.Sprintf("bytes %d-%d/%d", r.start, end-1, r.size) +} diff --git a/backend/seafile/content_range_test.go b/backend/seafile/content_range_test.go new file mode 100644 index 000000000..92f4373e2 --- /dev/null +++ b/backend/seafile/content_range_test.go @@ -0,0 +1,86 @@ +package seafile + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestContentRangeHeader(t *testing.T) { + fixtures := []struct { + start, chunkSize, size int64 + expect string + }{ + {0, 1, 10, "bytes 0-0/10"}, // from byte 0 (inclusive) to byte 0 (inclusive) == 1 byte + {0, 10, 10, "bytes 0-9/10"}, + {0, 20, 10, "bytes 0-9/10"}, + {1, 1, 10, "bytes 1-1/10"}, + {1, 10, 10, "bytes 1-9/10"}, + {1, 10, 10, "bytes 1-9/10"}, + {9, 1, 10, "bytes 9-9/10"}, + {9, 2, 10, "bytes 9-9/10"}, + {9, 5, 10, "bytes 9-9/10"}, + } + + for _, fixture := range fixtures { + t.Run(fmt.Sprintf("%+v", fixture), func(t *testing.T) { + r := &chunkedContentRange{start: fixture.start, chunkSize: fixture.chunkSize, size: fixture.size} + assert.Equal(t, fixture.expect, r.getContentRangeHeader()) + }) + } +} + +func TestChunkSize(t *testing.T) { + fixtures := []struct { + start, chunkSize, size int64 + expected int64 + isLastChunk bool + }{ + {0, 10, 10, 10, true}, // chunk size same as size + {0, 20, 10, 10, true}, // chuck size bigger than size + {0, 10, 20, 10, false}, // chuck size smaller than size + {1, 10, 10, 9, true}, // chunk size same as size + {1, 20, 10, 9, true}, // chuck size bigger than size + {1, 10, 20, 10, false}, // chuck size smaller than size + {15, 10, 20, 5, true}, // smaller remaining + } + + for _, fixture := range fixtures { + t.Run(fmt.Sprintf("%d/%d/%d", fixture.start, fixture.chunkSize, fixture.size), func(t *testing.T) { + r := &chunkedContentRange{start: fixture.start, chunkSize: fixture.chunkSize, size: fixture.size} + assert.Equal(t, fixture.expected, r.getChunkSize()) + assert.Equal(t, fixture.isLastChunk, r.isLastChunk()) + }) + } +} + +func TestRanges(t *testing.T) { + fixtures := []struct { + size int64 + chunkSize int64 + expectedChunks int + }{ + {10, 1, 10}, + {20, 2, 10}, + {10, 10, 1}, + {10, 3, 4}, + } + for _, fixture := range fixtures { + t.Run(fmt.Sprintf("%d/%d", fixture.size, fixture.chunkSize), func(t *testing.T) { + r := newChunkedContentRange(fixture.chunkSize, fixture.size) + // first chunk is counted before the loop + count := 1 + size := r.getChunkSize() + + for !r.isLastChunk() { + r.next() + count++ + size += r.getChunkSize() + } + assert.Panics(t, func() { r.next() }) + assert.Equal(t, fixture.expectedChunks, count) + assert.Equal(t, fixture.size, size) + }) + } +} diff --git a/backend/seafile/object.go b/backend/seafile/object.go index 25257825a..91b39e786 100644 --- a/backend/seafile/object.go +++ b/backend/seafile/object.go @@ -6,6 +6,7 @@ import ( "time" "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/chunksize" "github.com/rclone/rclone/fs/hash" ) @@ -89,29 +90,63 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo // But for unknown-sized objects (indicated by src.Size() == -1), Upload should either // return an error or update the object properly (rather than e.g. calling panic). func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { - // The upload sometimes return a temporary 500 error - // We cannot use the pacer to retry uploading the file as the upload link is single use only - for retry := 0; retry <= 3; retry++ { - uploadLink, err := o.fs.getUploadLink(ctx, o.libraryID) - if err != nil { - return err - } - - uploaded, err := o.fs.upload(ctx, in, uploadLink, o.pathInLibrary) - if err == ErrorInternalDuringUpload { - // This is a temporary error, try again with a new upload link - continue - } - if err != nil { - return err - } - // Set the properties from the upload back to the object - o.size = uploaded.Size - o.id = uploaded.ID - - return nil + size := src.Size() + if size <= int64(o.fs.opt.UploadCutoff) || o.fs.noChunkUpload { + // upload whole file in 1 request + return o.upload(ctx, in, src) } - return ErrorInternalDuringUpload + // upload in parts + chunkSize := chunksize.Calculator(o, size, maxParts, o.fs.opt.ChunkSize) + return o.uploadLargeFile(ctx, in, src, chunkSize) +} + +// upload whole file in 1 request +func (o *Object) upload(ctx context.Context, in io.Reader, src fs.ObjectInfo) error { + uploadLink, err := o.fs.getUploadLink(ctx, o.libraryID) + if err != nil { + return err + } + + uploaded, err := o.fs.upload(ctx, in, uploadLink, o.pathInLibrary, src.Size()) + if err != nil { + return err + } + // Set the properties from the upload back to the object + o.size = uploaded.Size + o.id = uploaded.ID + + return nil +} + +func (o *Object) uploadLargeFile(ctx context.Context, in io.Reader, src fs.ObjectInfo, chunkSize fs.SizeSuffix) error { + uploadLink, err := o.fs.getUploadLink(ctx, o.libraryID) + if err != nil { + return err + } + size := src.Size() + contentRange := newChunkedContentRange(int64(chunkSize), size) + for { + fs.Debugf(nil, "uploading chunk %s", contentRange.getContentRangeHeader()) + err = o.fs.uploadChunk(ctx, in, uploadLink, o.pathInLibrary, contentRange) + if err != nil { + return err + } + contentRange.next() + // the last part is a slightly different API call + if contentRange.isLastChunk() { + break + } + } + fs.Debugf(nil, "uploading last chunk %s", contentRange.getContentRangeHeader()) + uploaded, err := o.fs.uploadLastChunk(ctx, in, uploadLink, o.pathInLibrary, contentRange) + if err != nil { + return err + } + // Set the properties from the upload back to the object + o.size = uploaded.Size + o.id = uploaded.ID + + return nil } // Remove this object diff --git a/backend/seafile/pacer.go b/backend/seafile/pacer.go index 55680193e..109955a80 100644 --- a/backend/seafile/pacer.go +++ b/backend/seafile/pacer.go @@ -12,9 +12,9 @@ import ( ) const ( - minSleep = 100 * time.Millisecond - maxSleep = 10 * time.Second - decayConstant = 2 // bigger for slower decay, exponential + defaultMinSleep = 100 * time.Millisecond + maxSleep = 10 * time.Second + decayConstant = 2 // bigger for slower decay, exponential ) // Use only one pacer per server URL @@ -28,7 +28,7 @@ func init() { } // getPacer returns the unique pacer for that remote URL -func getPacer(ctx context.Context, remote string) *fs.Pacer { +func getPacer(ctx context.Context, remote string, minPacer int) *fs.Pacer { pacerMutex.Lock() defer pacerMutex.Unlock() @@ -37,6 +37,10 @@ func getPacer(ctx context.Context, remote string) *fs.Pacer { return existing } + minSleep := time.Duration(minPacer) * time.Millisecond + if minSleep == 0 { + minSleep = defaultMinSleep + } pacers[remote] = fs.NewPacer( ctx, pacer.NewDefault( diff --git a/backend/seafile/seafile.go b/backend/seafile/seafile.go index b38dfa2b5..485d3f07e 100644 --- a/backend/seafile/seafile.go +++ b/backend/seafile/seafile.go @@ -28,6 +28,7 @@ import ( "github.com/rclone/rclone/lib/cache" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/rest" ) @@ -43,6 +44,12 @@ const ( configLibraryKey = "library_key" configCreateLibrary = "create_library" configAuthToken = "auth_token" + minChunkSize = 1 * fs.Mebi + defaultChunkSize = 127 * fs.Mebi + maxChunkSize = 511 * fs.Mebi + defaultUploadCutoff = 255 * fs.Mebi + maxParts = 100000 + minPacer = 100 ) // This is global to all instances of fs @@ -59,60 +66,92 @@ func init() { Description: "seafile", NewFs: NewFs, Config: Config, - Options: []fs.Option{{ - Name: configURL, - Help: "URL of seafile host to connect to.", - Required: true, - Examples: []fs.OptionExample{{ - Value: "https://cloud.seafile.com/", - Help: "Connect to cloud.seafile.com.", - }}, - Sensitive: true, - }, { - Name: configUser, - Help: "User name (usually email address).", - Required: true, - Sensitive: true, - }, { - // Password is not required, it will be left blank for 2FA - Name: configPassword, - Help: "Password.", - IsPassword: true, - Sensitive: true, - }, { - Name: config2FA, - Help: "Two-factor authentication ('true' if the account has 2FA enabled).", - Default: false, - }, { - Name: configLibrary, - Help: "Name of the library.\n\nLeave blank to access all non-encrypted libraries.", - }, { - Name: configLibraryKey, - Help: "Library password (for encrypted libraries only).\n\nLeave blank if you pass it through the command line.", - IsPassword: true, - Sensitive: true, - }, { - Name: configCreateLibrary, - Help: "Should rclone create a library if it doesn't exist.", - Advanced: true, - Default: false, - }, { - // Keep the authentication token after entering the 2FA code - Name: configAuthToken, - Help: "Authentication token.", - Hide: fs.OptionHideBoth, - Sensitive: true, - }, { - Name: config.ConfigEncoding, - Help: config.ConfigEncodingHelp, - Advanced: true, - Default: (encoder.EncodeZero | - encoder.EncodeCtl | - encoder.EncodeSlash | - encoder.EncodeBackSlash | - encoder.EncodeDoubleQuote | - encoder.EncodeInvalidUtf8), - }}, + Options: []fs.Option{ + { + Name: configURL, + Help: "URL of seafile host to connect to.", + Required: true, + Examples: []fs.OptionExample{{ + Value: "https://cloud.seafile.com/", + Help: "Connect to cloud.seafile.com.", + }}, + Sensitive: true, + }, { + Name: configUser, + Help: "User name (usually email address).", + Required: true, + Sensitive: true, + }, { + // Password is not required, it will be left blank for 2FA + Name: configPassword, + Help: "Password.", + IsPassword: true, + Sensitive: true, + }, { + Name: config2FA, + Help: "Two-factor authentication ('true' if the account has 2FA enabled).", + Default: false, + }, { + Name: configLibrary, + Help: "Name of the library.\n\nLeave blank to access all non-encrypted libraries.", + }, { + Name: configLibraryKey, + Help: "Library password (for encrypted libraries only).\n\nLeave blank if you pass it through the command line.", + IsPassword: true, + Sensitive: true, + }, { + Name: configCreateLibrary, + Help: "Should rclone create a library if it doesn't exist.", + Advanced: true, + Default: false, + }, { + // Keep the authentication token after entering the 2FA code + Name: configAuthToken, + Help: "Authentication token.", + Hide: fs.OptionHideBoth, + Sensitive: true, + }, { + Name: config.ConfigEncoding, + Help: config.ConfigEncodingHelp, + Advanced: true, + Default: (encoder.EncodeZero | + encoder.EncodeCtl | + encoder.EncodeSlash | + encoder.EncodeBackSlash | + encoder.EncodeDoubleQuote | + encoder.EncodeInvalidUtf8), + }, { + Name: "upload_cutoff", + Help: `Cutoff for switching to chunked upload. + +Files above this size will be uploaded in chunks of "--seafile-chunk-size".`, + Default: defaultUploadCutoff, + Advanced: true, + }, { + Name: "chunk_size", + Help: `Upload chunk size. + +When uploading large files, chunk the file into this size. + +Must fit in memory. These chunks are buffered in memory and there +might a maximum of "--transfers" chunks in progress at once. + +1 MB is the minimum size.`, + Default: defaultChunkSize, + Advanced: true, + }, { + Name: "min_pacer", + Help: `Minimum time between requests (in milliseconds). + +Seafile API is rate limited. The default value is to wait at least 100ms between requests. + +You can try to tweak the default value but you might trigger the rate limit which will make the request rate slower. + +The minimum value is 1ms (0 will default to 100ms)`, + Default: minPacer, + Advanced: true, + }, + }, }) } @@ -127,28 +166,34 @@ type Options struct { LibraryKey string `config:"library_key"` CreateLibrary bool `config:"create_library"` Enc encoder.MultiEncoder `config:"encoding"` + UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` + MinPacer int `config:"min_pacer"` } // Fs represents a remote seafile type Fs struct { - name string // name of this remote - root string // the path we are working on - libraryName string // current library - encrypted bool // Is this an encrypted library - rootDirectory string // directory part of root (if any) - opt Options // parsed options - libraries *cache.Cache // Keep a cache of libraries - librariesMutex sync.Mutex // Mutex to protect getLibraryID - features *fs.Features // optional features - endpoint *url.URL // URL of the host - endpointURL string // endpoint as a string - srv *rest.Client // the connection to the server - pacer *fs.Pacer // pacer for API calls - authMu sync.Mutex // Mutex to protect library decryption - createDirMutex sync.Mutex // Protect creation of directories - useOldDirectoryAPI bool // Use the old API v2 if seafile < 7 - moveDirNotAvailable bool // Version < 7.0 don't have an API to move a directory - renew *Renew // Renew an encrypted library token + name string // name of this remote + root string // the path we are working on + libraryName string // current library + encrypted bool // Is this an encrypted library + rootDirectory string // directory part of root (if any) + ci *fs.ConfigInfo // global options + opt Options // parsed options + libraries *cache.Cache // Keep a cache of libraries + librariesMutex sync.Mutex // Mutex to protect getLibraryID + features *fs.Features // optional features + endpoint *url.URL // URL of the host + endpointURL string // endpoint as a string + srv *rest.Client // the connection to the server + pacer *fs.Pacer // pacer for API calls + authMu sync.Mutex // Mutex to protect library decryption + createDirMutex sync.Mutex // Protect creation of directories + useOldDirectoryAPI bool // Use the old API v2 if seafile < 7 + moveDirNotAvailable bool // Version < 7.0 don't have an API to move a directory + noChunkUpload bool // Version < 7.0 don't have support for chunk upload + renew *Renew // Renew an encrypted library token + pool *pool.Pool // memory pool } // ------------------------------------------------------------ @@ -195,17 +240,26 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return nil, err } + ci := fs.GetConfig(ctx) + f := &Fs{ name: name, root: root, libraryName: libraryName, rootDirectory: rootDirectory, libraries: cache.New(), + ci: ci, opt: *opt, endpoint: u, endpointURL: u.String(), srv: rest.NewClient(fshttp.NewClient(ctx)).SetRoot(u.String()), - pacer: getPacer(ctx, opt.URL), + pacer: getPacer(ctx, opt.URL, opt.MinPacer), + pool: pool.New( + time.Minute, + int(opt.ChunkSize), + ci.Transfers, + ci.UseMmap, + ), } f.features = (&fs.Features{ CanHaveEmptyDirectories: true, @@ -216,7 +270,14 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e if err != nil { return nil, err } - fs.Debugf(nil, "Seafile server version %s", serverInfo.Version) + edition := "Community" + for _, feature := range serverInfo.Features { + if feature == "seafile-pro" { + edition = "Professional" + break + } + } + fs.Debugf(nil, "Seafile server version %s %s Edition", serverInfo.Version, edition) // We don't support lower than seafile v6.0 (version 6.0 is already more than 3 years old) serverVersion := semver.New(serverInfo.Version) @@ -227,8 +288,10 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e // Seafile 6 does not support recursive listing f.useOldDirectoryAPI = true f.features.ListR = nil - // It also does no support moving directories + // It also does not support moving directories f.moveDirNotAvailable = true + // no chunk upload either + f.noChunkUpload = true } // Take the authentication token from the configuration first @@ -1343,6 +1406,57 @@ func (f *Fs) newObject(ctx context.Context, remote string, size int64, modTime t return object } +func checkUploadChunkSize(cs fs.SizeSuffix) error { + if cs < minChunkSize { + return fmt.Errorf("%s is less than %s", cs, minChunkSize) + } + return nil +} + +func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadChunkSize(cs) + if err == nil { + old, f.opt.ChunkSize = f.opt.ChunkSize, cs + // this method is only called before starting an upload + // so it should be safe to adjust the memory pool to the new chunk size + f.pool.Flush() + f.pool = pool.New( + time.Minute, + int(f.opt.ChunkSize), + f.ci.Transfers, + f.ci.UseMmap, + ) + } + return +} + +func checkUploadCutoff(opt *Options, cs fs.SizeSuffix) error { + if cs < opt.ChunkSize { + return fmt.Errorf("%v is less than chunk size %v", cs, opt.ChunkSize) + } + return nil +} + +func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadCutoff(&f.opt, cs) + if err == nil { + old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs + } + return +} + +func (f *Fs) getBuf(size int) []byte { + buf := f.pool.Get() + if size > 0 && len(buf) > size { + buf = buf[:size] + } + return buf +} + +func (f *Fs) putBuf(buf []byte) { + f.pool.Put(buf) +} + // Check the interfaces are satisfied var ( _ fs.Fs = &Fs{} diff --git a/backend/seafile/seafile_mock_server_test.go b/backend/seafile/seafile_mock_server_test.go new file mode 100644 index 000000000..b07bc4a82 --- /dev/null +++ b/backend/seafile/seafile_mock_server_test.go @@ -0,0 +1,307 @@ +package seafile + +import ( + "bytes" + "context" + "crypto/sha256" + "fmt" + "hash" + "io" + "mime" + "mime/multipart" + "net/http" + "net/http/httptest" + "regexp" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/object" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + smallContent = []byte("01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") +) + +func getBasicHandler(t *testing.T, libraryID, libraryName string) *http.ServeMux { + t.Helper() + + handler := http.NewServeMux() + handler.HandleFunc("/api2/server-info/", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"version":"9.0.10"}`)) + }) + handler.HandleFunc("/api2/auth-token/", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"token":"test_token"}`)) + }) + handler.HandleFunc("/api2/repos/", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(fmt.Sprintf(`[{"encrypted":false,"id":"%s","size":10,"name":"%s"}]`, libraryID, libraryName))) + }) + handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + t.Logf("unhandled call to %q", r.URL.String()) + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte("Not found: " + r.URL.String())) + }) + return handler +} + +func TestNewFsWithMockServer(t *testing.T) { + t.Parallel() + + handler := getBasicHandler(t, "library_id", "My Library") + server := httptest.NewServer(handler) + defer server.Close() + + options := configmap.Simple{ + "url": server.URL, + "library": "My Library", + } + fs, err := NewFs(context.Background(), "TestSeafile", "", options) + assert.NoError(t, err) + assert.NotEmpty(t, fs) +} + +func TestUploadWholeFileWithErrorNoRetry(t *testing.T) { + t.Parallel() + + const filename = "new file.txt" + + handler := getBasicHandler(t, "library_id", "My Library") + server := httptest.NewServer(handler) + defer server.Close() + + // call to retrieve an upload slot + handler.HandleFunc("/api2/repos/library_id/upload-link/", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(fmt.Sprintf(`"%s/upload-api/temp_upload"`, server.URL))) + }) + // upload will fail + handler.HandleFunc("/upload-api/temp_upload", func(w http.ResponseWriter, r *http.Request) { + defer func() { _ = r.Body.Close() }() + w.WriteHeader(http.StatusInternalServerError) + }) + + options := configmap.Simple{ + "url": server.URL, + "library": "My Library", + "upload_cutoff": defaultUploadCutoff.String(), + "chunk_size": defaultChunkSize.String(), + } + fs, err := NewFs(context.Background(), "TestSeafile", "", options) + assert.NoError(t, err) + assert.NotEmpty(t, fs) + + src := object.NewStaticObjectInfo(filename, time.Now(), int64(len(smallContent)), true, nil, nil) + // call should fail + in := bytes.NewReader(smallContent) + _, err = fs.Put(context.Background(), in, src) + assert.Error(t, err) +} + +func TestUploadWholeFile(t *testing.T) { + t.Parallel() + + const filename = "new file.txt" + const parallelUploadCount = 3 + + handler := getBasicHandler(t, "library_id", "My Library") + server := httptest.NewServer(handler) + t.Cleanup(server.Close) + + // call to retrieve an upload slot + handler.HandleFunc("/api2/repos/library_id/upload-link/", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(fmt.Sprintf(`"%s/upload-api/temp_upload"`, server.URL))) + }) + handler.HandleFunc("/upload-api/temp_upload", func(w http.ResponseWriter, r *http.Request) { + defer func() { _ = r.Body.Close() }() + + mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) + assert.NoError(t, err) + assert.Equal(t, "multipart/form-data", mediaType) + mr := multipart.NewReader(r.Body, params["boundary"]) + for { + p, err := mr.NextPart() + if err == io.EOF { + return + } + assert.NoError(t, err) + if p.FileName() == filename { + body, err := io.ReadAll(p) + assert.NoError(t, err) + assert.Equal(t, smallContent, body) + + // sends response now + _, _ = w.Write([]byte(fmt.Sprintf(`[{"name":"%s","size":%d}]`, filename, len(body)))) + } + } + }) + + for i := 0; i < parallelUploadCount; i++ { + t.Run(fmt.Sprintf("parallel upload file %d", i), func(t *testing.T) { + t.Parallel() + + uploadFileContent(t, filename, server.URL, smallContent) + }) + } +} + +func TestUploadFileByChunksWithRetryOnError(t *testing.T) { + t.Parallel() + + const filename = "new file.txt" + const parallelUploadCount = 3 + var chunkSize fs.SizeSuffix = 1048576 + var currentUploadID atomic.Int32 + chunkCount := make([]atomic.Int32, parallelUploadCount) + bytesReceived := make([]atomic.Int32, parallelUploadCount) + hashes := make([]hash.Hash, parallelUploadCount) + for i := 0; i < parallelUploadCount; i++ { + hashes[i] = sha256.New() + } + + handler := getBasicHandler(t, "library_id", "My Library") + server := httptest.NewServer(handler) + t.Cleanup(server.Close) + + // call to retrieve an upload slot + handler.HandleFunc("/api2/repos/library_id/upload-link/", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(fmt.Sprintf(`"%s/upload-api/temp_upload/%d"`, server.URL, currentUploadID.Load()))) + currentUploadID.Add(1) + }) + + // call to upload chunks + handler.HandleFunc("/upload-api/temp_upload/", func(w http.ResponseWriter, r *http.Request) { + defer func() { _ = r.Body.Close() }() + + // quick hack to get the file ID from the URL + rawFileID := strings.TrimPrefix(r.URL.String(), "/upload-api/temp_upload/") + rawFileID = strings.TrimSuffix(rawFileID, "?ret-json=1") + fileID, err := strconv.Atoi(rawFileID) + require.NoError(t, err) + + currentChunk := chunkCount[fileID].Add(1) + if currentChunk == 2 { + // simulate an error on the second chunk + w.WriteHeader(http.StatusInternalServerError) + return + } + partLen := 0 + var totalBytesReceived int32 + // read all the data + mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) + assert.NoError(t, err) + assert.Equal(t, "multipart/form-data", mediaType) + mr := multipart.NewReader(r.Body, params["boundary"]) + for { + p, err := mr.NextPart() + if err == io.EOF { + return + } + assert.NoError(t, err) + if p.FileName() == filename { + body, err := io.ReadAll(p) + assert.NoError(t, err) + partLen = len(body) + totalBytesReceived = bytesReceived[fileID].Add(int32(partLen)) + hashes[fileID].Write(body) + break + } + } + t.Logf("file %d: received chunk %d = %d bytes", fileID, currentChunk, totalBytesReceived) + + // check the content-range header + contentRange := r.Header.Get("Content-Range") + t.Logf("uploaded %s", contentRange) + pattern := regexp.MustCompile(`bytes (\d+)-(\d+)\/(\d+)`) + match := pattern.FindStringSubmatch(contentRange) + if len(match) == 4 { + start, err := strconv.Atoi(match[1]) + assert.NoError(t, err) + end, err := strconv.Atoi(match[2]) + assert.NoError(t, err) + size, err := strconv.Atoi(match[3]) + assert.NoError(t, err) + + // make sure the chunk size is right + assert.Equal(t, end-start+1, partLen) + + if end+1 == size { + // this was the last chunk + _, _ = w.Write([]byte(fmt.Sprintf(`[{"name":"%s","id":"new_file_id","size":%d}]`, filename, totalBytesReceived))) + t.Logf("file %d: uploaded hash = %x", fileID, hashes[fileID].Sum(nil)) + return + } + if end+1 > size { + t.Fatalf("end %d is bigger than size %d", end, size) + } + } + // keep going to the next chunk + _, _ = w.Write([]byte(`{"success":true}`)) + }) + + for i := 0; i < parallelUploadCount; i++ { + fileID := i // can remove this for go >= 1.22 + t.Run(fmt.Sprintf("parallel upload file %d", fileID), func(t *testing.T) { + t.Parallel() + + dataHash := uploadBigFile(t, filename, server.URL, chunkSize) + t.Logf("file %d: uploaded hash = %x", fileID, dataHash) + }) + } +} + +func uploadBigFile(t *testing.T, name, endpoint string, chunkSize fs.SizeSuffix) []byte { + options := configmap.Simple{ + "url": endpoint, + "library": "My Library", + "upload_cutoff": chunkSize.String(), + "chunk_size": chunkSize.String(), + } + fs, err := NewFs(context.Background(), "TestSeafile", "", options) + assert.NoError(t, err) + assert.NotEmpty(t, fs) + + // should allow for at least 2 chunks + buffer := &bytes.Buffer{} + iterations := int(chunkSize) * 2 / len(smallContent) + for i := 0; i <= iterations; i++ { + buffer.Write(smallContent) + } + + // calculate the sha256 hash while uploading + sha256Hash := sha256.New() + reader := io.TeeReader(buffer, sha256Hash) + + size := int64(buffer.Len()) + src := object.NewStaticObjectInfo(name, time.Now(), size, true, nil, nil) + + object, err := fs.Put(context.Background(), reader, src) + assert.NoError(t, err) + assert.NotEmpty(t, object) + assert.Equal(t, size, object.Size()) + + return sha256Hash.Sum(nil) +} + +func uploadFileContent(t *testing.T, name, endpoint string, content []byte) { + options := configmap.Simple{ + "url": endpoint, + "library": "My Library", + "upload_cutoff": defaultUploadCutoff.String(), + "chunk_size": defaultChunkSize.String(), + } + fs, err := NewFs(context.Background(), "TestSeafile", "", options) + assert.NoError(t, err) + assert.NotEmpty(t, fs) + + src := object.NewStaticObjectInfo(name, time.Now(), int64(len(content)), true, nil, nil) + in := bytes.NewReader(content) + object, err := fs.Put(context.Background(), in, src) + assert.NoError(t, err) + assert.NotEmpty(t, object) + assert.Equal(t, int64(len(content)), object.Size()) +} diff --git a/backend/seafile/seafile_reverseproxy_test.go b/backend/seafile/seafile_reverseproxy_test.go new file mode 100644 index 000000000..2b7748a30 --- /dev/null +++ b/backend/seafile/seafile_reverseproxy_test.go @@ -0,0 +1,162 @@ +//go:build go1.20 + +package seafile + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/http/httputil" + "net/url" + "strconv" + "strings" + "testing" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/object" + "github.com/stretchr/testify/assert" +) + +func TestNewFsWithProxiedServer(t *testing.T) { + // creates a reverse proxy to a local instance of seafile + host := "localhost:8088" + target, _ := url.Parse("http://" + host) + handler := &httputil.ReverseProxy{ + Rewrite: func(pr *httputil.ProxyRequest) { + pr.SetURL(target) + pr.Out.Host = host + t.Logf("calling %s on %s", pr.Out.Method, pr.Out.URL.String()) + }, + ModifyResponse: func(r *http.Response) error { + t.Logf("%s response: %s", r.Request.URL.String(), r.Status) + return nil + }, + } + server := httptest.NewServer(handler) + defer server.Close() + + options := configmap.Simple{ + "url": server.URL, + "library": "My Library", + "user": "seafile@rclone.org", + "pass": "GYdWLJQb55COZYnO9Zl0GcKc_SYDr0EMVcl6rnZVFxV8zoLPBjJ7NQ", + "create_library": "true", + } + fs, err := NewFs(context.Background(), "TestSeafile", "", options) + if err != nil && strings.Contains(err.Error(), "502 Bad Gateway") { + t.Skip("cannot contact local seafile instance") + } + assert.NoError(t, err) + assert.NotEmpty(t, fs) +} + +// this test is using a reverse proxy to simulate one broken chunk during an upload +// a local instance of seafile needs to be started from the script "fstest/testserver/init.d/TestSeafile" +func TestFailedChunkUploadWithProxiedServer(t *testing.T) { + minimumChunks := 3 + var chunkSize fs.SizeSuffix = 1048576 + + // should allow for at least minimumChunks + writer := &bytes.Buffer{} + iterations := int(chunkSize) * minimumChunks / len(smallContent) + for i := 0; i <= iterations; i++ { + writer.Write(smallContent) + } + data := writer.Bytes() + + // each test will fail one chunk from 0 to 3 + for failedChunk := 0; failedChunk < minimumChunks+1; failedChunk++ { + t.Run(strconv.Itoa(failedChunk), func(t *testing.T) { + chunkCount := 0 + var proxyURL []byte + + // creates a reverse proxy to a local instance of seafile + host := "127.0.0.1:8088" + target, _ := url.Parse("http://" + host) + handler := &httputil.ReverseProxy{ + Rewrite: func(pr *httputil.ProxyRequest) { + pr.SetURL(target) + pr.Out.Host = host + pr.Out.Header.Del("Accept-Encoding") // we don't want to decompress and recompress the response + if strings.Contains(pr.Out.URL.String(), "/upload-api/") { + t.Logf("uploading chunk %s (%d)", pr.Out.Header.Get("Content-Range"), chunkCount) + if chunkCount == failedChunk { + t.Logf("this chunk should fail (%d)", chunkCount) + // the length of the data won't match with the Content-Length header + pr.Out.Body = io.NopCloser(io.LimitReader(pr.In.Body, 100)) + } + chunkCount++ + } + }, + ModifyResponse: func(r *http.Response) error { + b, _ := io.ReadAll(r.Body) + _ = r.Body.Close() + + // replace the URLs with the reverse proxy + b = bytes.ReplaceAll(b, []byte("http://"+host), proxyURL) + buf := bytes.NewBuffer(b) + r.Body = io.NopCloser(buf) + r.Header.Set("Content-Length", strconv.Itoa(buf.Len())) + return nil + }, + ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { + if strings.Contains(err.Error(), "transport connection broken") { + // we need to send a 500 error like the seafile server would do in case of a transmission error + w.WriteHeader(http.StatusInternalServerError) + return + } + t.Log(err) + w.WriteHeader(http.StatusBadGateway) + }, + } + server := httptest.NewServer(handler) + defer server.Close() + proxyURL = []byte(server.URL) + + options := configmap.Simple{ + "url": server.URL, + "library": "My Library", + "user": "seafile@rclone.org", + "pass": "GYdWLJQb55COZYnO9Zl0GcKc_SYDr0EMVcl6rnZVFxV8zoLPBjJ7NQ", + "create_library": "true", + "upload_cutoff": chunkSize.String(), + "chunk_size": chunkSize.String(), + } + fs, err := NewFs(context.Background(), "TestSeafile", "", options) + if err != nil && strings.Contains(err.Error(), "502 Bad Gateway") { + t.Skip("cannot contact local seafile instance") + } + assert.NoError(t, err) + assert.NotEmpty(t, fs) + + buffer := bytes.NewBuffer(data) + + size := int64(buffer.Len()) + filename := fmt.Sprintf("new file %d.txt", failedChunk) + src := object.NewStaticObjectInfo(filename, time.Now(), size, true, nil, nil) + + object, err := fs.Put(context.Background(), buffer, src) + assert.NoError(t, err) + assert.NotEmpty(t, object) + assert.Equal(t, size, object.Size()) + + // read the file back for comparison + object, err = fs.NewObject(context.Background(), filename) + assert.NoError(t, err) + reader, err := object.Open(context.Background()) + assert.NoError(t, err) + read, err := io.ReadAll(reader) + assert.NoError(t, err) + assert.Equal(t, data, read) + + // clean up + err = object.Remove(context.Background()) + assert.NoError(t, err) + }) + } +} diff --git a/backend/seafile/seafile_test.go b/backend/seafile/seafile_test.go index 669478548..e12fc44c6 100644 --- a/backend/seafile/seafile_test.go +++ b/backend/seafile/seafile_test.go @@ -1,10 +1,10 @@ // Test Seafile filesystem interface -package seafile_test +package seafile import ( "testing" - "github.com/rclone/rclone/backend/seafile" + "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fstest/fstests" ) @@ -12,6 +12,24 @@ import ( func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ RemoteName: "TestSeafile:", - NilObject: (*seafile.Object)(nil), + NilObject: (*Object)(nil), + ChunkedUpload: fstests.ChunkedUploadConfig{ + MinChunkSize: minChunkSize, + MaxChunkSize: maxChunkSize, + NeedMultipleChunks: true, + }, }) } + +func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadChunkSize(cs) +} + +func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadCutoff(cs) +} + +var ( + _ fstests.SetUploadChunkSizer = (*Fs)(nil) + _ fstests.SetUploadCutoffer = (*Fs)(nil) +) diff --git a/backend/seafile/webapi.go b/backend/seafile/webapi.go index 1cc05a14b..ed66af3a9 100644 --- a/backend/seafile/webapi.go +++ b/backend/seafile/webapi.go @@ -31,14 +31,28 @@ var ( // ==================== Seafile API ==================== func (f *Fs) getAuthorizationToken(ctx context.Context) (string, error) { - return getAuthorizationToken(ctx, f.srv, f.opt.User, f.opt.Password, "") + opts, request := prepareAuthorizationRequest(f.opt.User, f.opt.Password, "") + result := api.AuthenticationResult{} + + err := f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(ctx, &opts, &request, &result) + return f.shouldRetry(ctx, resp, err) + }) + if err != nil { + // This is only going to be http errors here + return "", fmt.Errorf("failed to authenticate: %w", err) + } + if result.Errors != nil && len(result.Errors) > 0 { + return "", errors.New(strings.Join(result.Errors, ", ")) + } + if result.Token == "" { + // No error in "non_field_errors" field but still empty token + return "", errors.New("failed to authenticate") + } + return result.Token, nil } -// getAuthorizationToken can be called outside of an fs (during configuration of the remote to get the authentication token) -// it's doing a single call (no pacer involved) -func getAuthorizationToken(ctx context.Context, srv *rest.Client, user, password, oneTimeCode string) (string, error) { - // API Documentation - // https://download.seafile.com/published/web-api/home.md#user-content-Quick%20Start +func prepareAuthorizationRequest(user, password, oneTimeCode string) (rest.Opts, api.AuthenticationRequest) { opts := rest.Opts{ Method: "POST", Path: "api2/auth-token/", @@ -55,6 +69,15 @@ func getAuthorizationToken(ctx context.Context, srv *rest.Client, user, password Username: user, Password: password, } + return opts, request +} + +// getAuthorizationToken is called outside of an fs (during configuration of the remote to get the authentication token) +// it's doing a single call (no pacer involved) +func getAuthorizationToken(ctx context.Context, srv *rest.Client, user, password, oneTimeCode string) (string, error) { + // API Documentation + // https://download.seafile.com/published/web-api/home.md#user-content-Quick%20Start + opts, request := prepareAuthorizationRequest(user, password, oneTimeCode) result := api.AuthenticationResult{} _, err := srv.CallJSON(ctx, &opts, &request, &result) @@ -480,6 +503,9 @@ func (f *Fs) deleteDir(ctx context.Context, libraryID, filePath string) error { if resp.StatusCode == 401 || resp.StatusCode == 403 { return fs.ErrorPermissionDenied } + if resp.StatusCode == 404 { + return fs.ErrorDirNotFound + } } return fmt.Errorf("failed to delete directory: %w", err) } @@ -678,27 +704,76 @@ func (f *Fs) getUploadLink(ctx context.Context, libraryID string) (string, error return result, nil } -func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath string) (*api.FileDetail, error) { +// getFileUploadedSize returns the size already uploaded on the server +// +//nolint:unused +func (f *Fs) getFileUploadedSize(ctx context.Context, libraryID, filePath string) (int64, error) { // API Documentation // https://download.seafile.com/published/web-api/v2.1/file-upload.md + if libraryID == "" { + return 0, errors.New("cannot get file uploaded size without a library") + } + fs.Debugf(nil, "filePath=%q", filePath) fileDir, filename := path.Split(filePath) + fileDir = "/" + strings.TrimSuffix(fileDir, "/") + if fileDir == "" { + fileDir = "/" + } + opts := rest.Opts{ + Method: "GET", + Path: APIv21 + libraryID + "/file-uploaded-bytes/", + Parameters: url.Values{ + "parent_dir": {f.opt.Enc.FromStandardPath(fileDir)}, + "file_name": {f.opt.Enc.FromStandardPath(filename)}, + }, + } + + result := api.FileUploadedBytes{} + var resp *http.Response + var err error + err = f.pacer.Call(func() (bool, error) { + resp, err = f.srv.CallJSON(ctx, &opts, nil, &result) + return f.shouldRetry(ctx, resp, err) + }) + if err != nil { + if resp != nil { + if resp.StatusCode == 401 || resp.StatusCode == 403 { + return 0, fs.ErrorPermissionDenied + } + } + return 0, fmt.Errorf("failed to get file uploaded size for parent_dir=%q and file_name=%q: %w", fileDir, filename, err) + } + return result.FileUploadedBytes, nil +} + +func (f *Fs) prepareFileUpload(ctx context.Context, in io.Reader, uploadLink, filePath string, contentRange contentRanger) (*rest.Opts, error) { + fileDir, filename := path.Split(filePath) + safeFilename := f.opt.Enc.FromStandardName(filename) parameters := url.Values{ "parent_dir": {"/"}, "relative_path": {f.opt.Enc.FromStandardPath(fileDir)}, "need_idx_progress": {"true"}, "replace": {"1"}, } - formReader, contentType, _, err := rest.MultipartUpload(ctx, in, parameters, "file", f.opt.Enc.FromStandardName(filename)) - if err != nil { - return nil, fmt.Errorf("failed to make multipart upload: %w", err) + + contentRangeHeader := contentRange.getContentRangeHeader() + opts := &rest.Opts{ + Method: http.MethodPost, + Body: in, + ContentRange: contentRangeHeader, + Parameters: url.Values{"ret-json": {"1"}}, // It needs to be on the url, not in the body parameters + MultipartParams: parameters, + MultipartContentName: "file", + MultipartFileName: safeFilename, + } + if contentRangeHeader != "" { + // When using resumable upload, the name of the file is no longer retrieved from the "file" field of the form. + // It's instead retrieved from the header. + opts.ExtraHeaders = map[string]string{ + "Content-Disposition": "attachment; filename=\"" + safeFilename + "\"", + } } - opts := rest.Opts{ - Method: "POST", - Body: formReader, - ContentType: contentType, - Parameters: url.Values{"ret-json": {"1"}}, // It needs to be on the url, not in the body parameters - } parsedURL, err := url.Parse(uploadLink) if err != nil { return nil, fmt.Errorf("failed to parse upload url: %w", err) @@ -708,11 +783,29 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath stri } else { opts.Path = uploadLink } + + chunkSize := contentRange.getChunkSize() + if chunkSize > 0 { + // seafile might not make use of the Content-Length header but a proxy (or reverse proxy) in the middle might + opts.ContentLength = &chunkSize + } + return opts, nil +} + +func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath string, size int64) (*api.FileDetail, error) { + // API Documentation + // https://download.seafile.com/published/web-api/v2.1/file-upload.md + contentRange := newStreamedContentRange(size) + opts, err := f.prepareFileUpload(ctx, in, uploadLink, filePath, contentRange) + if err != nil { + return nil, err + } + result := make([]api.FileDetail, 1) var resp *http.Response - // If an error occurs during the call, do not attempt to retry: The upload link is single use only + // We do not attempt to retry if an error occurs during the call, as we don't know the state of the reader err = f.pacer.CallNoRetry(func() (bool, error) { - resp, err = f.srv.CallJSON(ctx, &opts, nil, &result) + resp, err = f.srv.CallJSON(ctx, opts, nil, &result) return f.shouldRetryUpload(ctx, resp, err) }) if err != nil { @@ -721,7 +814,7 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath stri return nil, fs.ErrorPermissionDenied } if resp.StatusCode == 500 { - // This is a temporary error - we will get a new upload link before retrying + // This is quite common on heavy load return nil, ErrorInternalDuringUpload } } @@ -732,6 +825,97 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath stri result[0].Name = f.opt.Enc.ToStandardName(result[0].Name) return &result[0], nil } + // no file results sent back + return nil, ErrorInternalDuringUpload +} + +func (f *Fs) uploadChunk(ctx context.Context, in io.Reader, uploadLink, filePath string, contentRange contentRanger) error { + // API Documentation + // https://download.seafile.com/published/web-api/v2.1/file-upload.md + chunkSize := int(contentRange.getChunkSize()) + buffer := f.getBuf(chunkSize) + defer f.putBuf(buffer) + + read, err := io.ReadFull(in, buffer) + if err != nil { + return fmt.Errorf("error reading from source: %w", err) + } + if chunkSize > 0 && read != chunkSize { + return fmt.Errorf("expected to read %d from source, but got %d", chunkSize, read) + } + + result := api.ChunkUpload{} + var resp *http.Response + err = f.pacer.Call(func() (bool, error) { + // recreate a reader on the temporary buffer + in = bytes.NewReader(buffer) + opts, err := f.prepareFileUpload(ctx, in, uploadLink, filePath, contentRange) + if err != nil { + return false, err + } + resp, err = f.srv.CallJSON(ctx, opts, nil, &result) + return f.shouldRetry(ctx, resp, err) + }) + if err != nil { + if resp != nil { + if resp.StatusCode == 401 || resp.StatusCode == 403 { + return fs.ErrorPermissionDenied + } + if resp.StatusCode == 500 { + return fmt.Errorf("chunk upload %s: %w", contentRange.getContentRangeHeader(), ErrorInternalDuringUpload) + } + } + return fmt.Errorf("failed to upload chunk %s: %w", contentRange.getContentRangeHeader(), err) + } + if !result.Success { + return errors.New("upload failed") + } + return nil +} + +func (f *Fs) uploadLastChunk(ctx context.Context, in io.Reader, uploadLink, filePath string, contentRange contentRanger) (*api.FileDetail, error) { + // API Documentation + // https://download.seafile.com/published/web-api/v2.1/file-upload.md + chunkSize := int(contentRange.getChunkSize()) + buffer := f.getBuf(chunkSize) + defer f.putBuf(buffer) + + read, err := io.ReadFull(in, buffer) + if err != nil { + return nil, fmt.Errorf("error reading from source: %w", err) + } + if chunkSize > 0 && read != chunkSize { + return nil, fmt.Errorf("expected to read %d from source, but got %d", chunkSize, read) + } + + result := make([]api.FileDetail, 1) + var resp *http.Response + err = f.pacer.Call(func() (bool, error) { + // recreate a reader on the buffer + in = bytes.NewReader(buffer) + opts, err := f.prepareFileUpload(ctx, in, uploadLink, filePath, contentRange) + if err != nil { + return false, err + } + resp, err = f.srv.CallJSON(ctx, opts, nil, &result) + return f.shouldRetry(ctx, resp, err) + }) + if err != nil { + if resp != nil { + if resp.StatusCode == 401 || resp.StatusCode == 403 { + return nil, fs.ErrorPermissionDenied + } + if resp.StatusCode == 500 { + return nil, fmt.Errorf("last chunk: %w", ErrorInternalDuringUpload) + } + } + return nil, fmt.Errorf("failed to upload last chunk: %w", err) + } + if len(result) > 0 { + result[0].Parent = f.opt.Enc.ToStandardPath(result[0].Parent) + result[0].Name = f.opt.Enc.ToStandardName(result[0].Name) + return &result[0], nil + } return nil, nil } diff --git a/docs/content/seafile.md b/docs/content/seafile.md index a7b32f597..1f909aeb7 100644 --- a/docs/content/seafile.md +++ b/docs/content/seafile.md @@ -400,5 +400,53 @@ Properties: - Type: string - Required: false +#### --seafile-upload-cutoff + +Cutoff for switching to chunked upload. + +Files above this size will be uploaded in chunks of "--seafile-chunk-size". + +Properties: + +- Config: upload_cutoff +- Env Var: RCLONE_SEAFILE_UPLOAD_CUTOFF +- Type: SizeSuffix +- Default: 255Mi + +#### --seafile-chunk-size + +Upload chunk size. + + When uploading large files, chunk the file into this size. + + Must fit in memory. These chunks are buffered in memory and there + might a maximum of "--transfers" chunks in progress at once. + + 1 MB is the minimum size. + +Properties: + +- Config: chunk_size +- Env Var: RCLONE_SEAFILE_CHUNK_SIZE +- Type: SizeSuffix +- Default: 127Mi + +#### --seafile-min-pacer + +Minimum time between requests (in milliseconds). + + Seafile API is rate limited. The default value is to wait at least 100ms between requests. + + You can try to tweak the default value but you might trigger the rate limit which will make the request rate slower. + + The minimum value is 1ms (0 will default to 100ms) + +Properties: + +- Config: min_pacer +- Env Var: RCLONE_SEAFILE_MIN_PACER +- Type: int +- Default: 100 + {{< rem autogenerated options stop >}} diff --git a/fstest/test_all/config.yaml b/fstest/test_all/config.yaml index 10279dd66..569eb30ee 100644 --- a/fstest/test_all/config.yaml +++ b/fstest/test_all/config.yaml @@ -418,11 +418,6 @@ backends: fastlist: false ignore: - TestApplyTransforms - - backend: "seafile" - remote: "TestSeafileV6:" - fastlist: false - ignore: - - TestIntegration/FsMkdir/FsPutFiles/FsDirMove - backend: "seafile" remote: "TestSeafile:" fastlist: true diff --git a/fstest/testserver/init.d/TestSeafile b/fstest/testserver/init.d/TestSeafile index c18a1d0db..6fc680902 100755 --- a/fstest/testserver/init.d/TestSeafile +++ b/fstest/testserver/init.d/TestSeafile @@ -3,14 +3,14 @@ set -e # environment variables passed on docker-compose -export NAME=seafile7 +export NAME=seafile export MYSQL_ROOT_PASSWORD=pixenij4zacoguq0kopamid6 export SEAFILE_ADMIN_EMAIL=seafile@rclone.org export SEAFILE_ADMIN_PASSWORD=pixenij4zacoguq0kopamid6 export SEAFILE_IP=127.0.0.1 export SEAFILE_PORT=8087 export SEAFILE_TEST_DATA=${SEAFILE_TEST_DATA:-/tmp/seafile-test-data} -export SEAFILE_VERSION=latest +export SEAFILE_VERSION=${SEAFILE_VERSION:-latest} # make sure the data directory exists mkdir -p ${SEAFILE_TEST_DATA}/${NAME} @@ -45,12 +45,14 @@ start() { # create default library curl -X POST -H "Authorization: Token ${TOKEN}" "http://${SEAFILE_IP}:${SEAFILE_PORT}/api2/default-repo/" + echo echo _connect=${SEAFILE_IP}:${SEAFILE_PORT} echo type=seafile echo url=http://${SEAFILE_IP}:${SEAFILE_PORT}/ echo user=${SEAFILE_ADMIN_EMAIL} echo pass=$(rclone obscure ${SEAFILE_ADMIN_PASSWORD}) echo library=My Library + echo min_pacer=100 } stop() { diff --git a/fstest/testserver/init.d/TestSeafileEncrypted b/fstest/testserver/init.d/TestSeafileEncrypted index 0493e9082..a4eb2791c 100755 --- a/fstest/testserver/init.d/TestSeafileEncrypted +++ b/fstest/testserver/init.d/TestSeafileEncrypted @@ -7,14 +7,14 @@ TEST_LIBRARY=Encrypted TEST_LIBRARY_PASSWORD=SecretKey # environment variables passed on docker-compose -export NAME=seafile7encrypted +export NAME=seafile-encrypted export MYSQL_ROOT_PASSWORD=pixenij4zacoguq0kopamid6 export SEAFILE_ADMIN_EMAIL=seafile@rclone.org export SEAFILE_ADMIN_PASSWORD=pixenij4zacoguq0kopamid6 export SEAFILE_IP=127.0.0.1 export SEAFILE_PORT=8088 export SEAFILE_TEST_DATA=${SEAFILE_TEST_DATA:-/tmp/seafile-test-data} -export SEAFILE_VERSION=latest +export SEAFILE_VERSION=${SEAFILE_VERSION:-latest} # make sure the data directory exists mkdir -p ${SEAFILE_TEST_DATA}/${NAME} @@ -25,8 +25,20 @@ COMPOSE_DIR=$(dirname "$0")/seafile start() { docker-compose --project-directory ${COMPOSE_DIR} --project-name ${NAME} --file ${COMPOSE_DIR}/docker-compose.yml up -d - # it takes some time for the database to be created - sleep 60 + # wait for Seafile server to start + seafile_endpoint="http://${SEAFILE_IP}:${SEAFILE_PORT}/" + wait_seconds=1 + echo -n "Waiting for Seafile server to start" + for iterations in `seq 1 60`; + do + http_code=$(curl -s -o /dev/null -L -w '%{http_code}' "$seafile_endpoint" || true;) + if [ "$http_code" -eq 200 ]; then + echo + break + fi + echo -n "." + sleep $wait_seconds + done # authentication token answer should be like: {"token":"dbf58423f1632b5b679a13b0929f1d0751d9250c"} TOKEN=`curl --silent \ @@ -37,6 +49,7 @@ start() { # create encrypted library curl -X POST -d "name=${TEST_LIBRARY}&passwd=${TEST_LIBRARY_PASSWORD}" -H "Authorization: Token ${TOKEN}" "http://${SEAFILE_IP}:${SEAFILE_PORT}/api2/repos/" + echo echo _connect=${SEAFILE_IP}:${SEAFILE_PORT} echo type=seafile echo url=http://${SEAFILE_IP}:${SEAFILE_PORT}/ @@ -44,6 +57,7 @@ start() { echo pass=$(rclone obscure ${SEAFILE_ADMIN_PASSWORD}) echo library=${TEST_LIBRARY} echo library_key=$(rclone obscure ${TEST_LIBRARY_PASSWORD}) + echo min_pacer=100 } stop() { diff --git a/fstest/testserver/init.d/TestSeafileV6 b/fstest/testserver/init.d/TestSeafileV6 deleted file mode 100755 index 1ed076c44..000000000 --- a/fstest/testserver/init.d/TestSeafileV6 +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash - -set -e - -# local variables -NAME=seafile6 -SEAFILE_IP=127.0.0.1 -SEAFILE_PORT=8086 -SEAFILE_ADMIN_EMAIL=seafile@rclone.org -SEAFILE_ADMIN_PASSWORD=qebiwob7wafixif8sojiboj4 -SEAFILE_TEST_DATA=${SEAFILE_TEST_DATA:-/tmp/seafile-test-data} -SEAFILE_VERSION=latest - -. $(dirname "$0")/docker.bash - -start() { - # make sure the data directory exists - mkdir -p ${SEAFILE_TEST_DATA}/${NAME} - - docker run --rm -d --name $NAME \ - -e SEAFILE_SERVER_HOSTNAME=${SEAFILE_IP}:${SEAFILE_PORT} \ - -e SEAFILE_ADMIN_EMAIL=${SEAFILE_ADMIN_EMAIL} \ - -e SEAFILE_ADMIN_PASSWORD=${SEAFILE_ADMIN_PASSWORD} \ - -v ${SEAFILE_TEST_DATA}/${NAME}:/shared \ - -p ${SEAFILE_IP}:${SEAFILE_PORT}:80 \ - seafileltd/seafile:${SEAFILE_VERSION} - - # it takes some time for the database to be created - sleep 60 - - # authentication token answer should be like: {"token":"dbf58423f1632b5b679a13b0929f1d0751d9250c"} - TOKEN=`curl --silent \ - --data-urlencode username=${SEAFILE_ADMIN_EMAIL} -d password=${SEAFILE_ADMIN_PASSWORD} \ - http://${SEAFILE_IP}:${SEAFILE_PORT}/api2/auth-token/ \ - | sed 's/^{"token":"\(.*\)"}$/\1/'` - - # create default library - curl -X POST -H "Authorization: Token ${TOKEN}" "http://${SEAFILE_IP}:${SEAFILE_PORT}/api2/default-repo/" - - echo _connect=${SEAFILE_IP}:${SEAFILE_PORT} - echo type=seafile - echo url=http://${SEAFILE_IP}:${SEAFILE_PORT}/ - echo user=${SEAFILE_ADMIN_EMAIL} - echo pass=$(rclone obscure ${SEAFILE_ADMIN_PASSWORD}) - echo library=My Library -} - -. $(dirname "$0")/run.bash