From 0b9671313b14ffe839ecbd7dd2ae5ac7f6f05db8 Mon Sep 17 00:00:00 2001 From: Klaas Freitag Date: Fri, 11 Apr 2025 13:23:55 +0200 Subject: [PATCH] webdav: add an ownCloud Infinite Scale vendor that enables tus chunked upload support This change adds a new vendor called "infinitescale" to the webdav backend. It enables the ownCloud Infinite Scale https://github.com/owncloud/ocis project and implements its specific chunked uploader following the tus protocol https://tus.io Signed-off-by: Christian Richter Co-authored-by: Klaas Freitag Co-authored-by: Christian Richter Co-authored-by: Christian Richter <1058116+dragonchaser@users.noreply.github.com> Co-authored-by: Ralf Haferkamp --- backend/webdav/tus-errors.go | 40 ++++ backend/webdav/tus-upload.go | 88 ++++++++ backend/webdav/tus-uploader.go | 191 ++++++++++++++++++ backend/webdav/tus.go | 108 ++++++++++ backend/webdav/webdav.go | 42 +++- docs/content/webdav.md | 23 ++- fstest/test_all/config.yaml | 6 + .../testserver/init.d/TestWebdavInfiniteScale | 49 +++++ 8 files changed, 531 insertions(+), 16 deletions(-) create mode 100644 backend/webdav/tus-errors.go create mode 100644 backend/webdav/tus-upload.go create mode 100644 backend/webdav/tus-uploader.go create mode 100644 backend/webdav/tus.go create mode 100755 fstest/testserver/init.d/TestWebdavInfiniteScale diff --git a/backend/webdav/tus-errors.go b/backend/webdav/tus-errors.go new file mode 100644 index 000000000..aac7b95a4 --- /dev/null +++ b/backend/webdav/tus-errors.go @@ -0,0 +1,40 @@ +package webdav + +import ( + "errors" + "fmt" +) + +var ( + // ErrChunkSize is returned when the chunk size is zero + ErrChunkSize = errors.New("tus chunk size must be greater than zero") + // ErrNilLogger is returned when the logger is nil + ErrNilLogger = errors.New("tus logger can't be nil") + // ErrNilStore is returned when the store is nil + ErrNilStore = errors.New("tus store can't be nil if resume is enable") + // ErrNilUpload is returned when the upload is nil + ErrNilUpload = errors.New("tus upload can't be nil") + // ErrLargeUpload is returned when the upload body is to large + ErrLargeUpload = errors.New("tus upload body is to large") + // ErrVersionMismatch is returned when the tus protocol version is mismatching + ErrVersionMismatch = errors.New("tus protocol version mismatch") + // ErrOffsetMismatch is returned when the tus upload offset is mismatching + ErrOffsetMismatch = errors.New("tus upload offset mismatch") + // ErrUploadNotFound is returned when the tus upload is not found + ErrUploadNotFound = errors.New("tus upload not found") + // ErrResumeNotEnabled is returned when the tus resuming is not enabled + ErrResumeNotEnabled = errors.New("tus resuming not enabled") + // ErrFingerprintNotSet is returned when the tus fingerprint is not set + ErrFingerprintNotSet = errors.New("tus fingerprint not set") +) + +// ClientError represents an error state of a client +type ClientError struct { + Code int + Body []byte +} + +// Error returns an error string containing the client error code +func (c ClientError) Error() string { + return fmt.Sprintf("unexpected status code: %d", c.Code) +} diff --git a/backend/webdav/tus-upload.go b/backend/webdav/tus-upload.go new file mode 100644 index 000000000..4b4cc2272 --- /dev/null +++ b/backend/webdav/tus-upload.go @@ -0,0 +1,88 @@ +package webdav + +import ( + "bytes" + "encoding/base64" + "fmt" + "io" + "strings" +) + +// Metadata is a typedef for a string to string map to hold metadata +type Metadata map[string]string + +// Upload is a struct containing the file status during upload +type Upload struct { + stream io.ReadSeeker + size int64 + offset int64 + + Fingerprint string + Metadata Metadata +} + +// Updates the Upload information based on offset. +func (u *Upload) updateProgress(offset int64) { + u.offset = offset +} + +// Finished returns whether this upload is finished or not. +func (u *Upload) Finished() bool { + return u.offset >= u.size +} + +// Progress returns the progress in a percentage. +func (u *Upload) Progress() int64 { + return (u.offset * 100) / u.size +} + +// Offset returns the current upload offset. +func (u *Upload) Offset() int64 { + return u.offset +} + +// Size returns the size of the upload body. +func (u *Upload) Size() int64 { + return u.size +} + +// EncodedMetadata encodes the upload metadata. +func (u *Upload) EncodedMetadata() string { + var encoded []string + + for k, v := range u.Metadata { + encoded = append(encoded, fmt.Sprintf("%s %s", k, b64encode(v))) + } + + return strings.Join(encoded, ",") +} + +func b64encode(s string) string { + return base64.StdEncoding.EncodeToString([]byte(s)) +} + +// NewUpload creates a new upload from an io.Reader. +func NewUpload(reader io.Reader, size int64, metadata Metadata, fingerprint string) *Upload { + stream, ok := reader.(io.ReadSeeker) + + if !ok { + buf := new(bytes.Buffer) + _, err := buf.ReadFrom(reader) + if err != nil { + return nil + } + stream = bytes.NewReader(buf.Bytes()) + } + + if metadata == nil { + metadata = make(Metadata) + } + + return &Upload{ + stream: stream, + size: size, + + Fingerprint: fingerprint, + Metadata: metadata, + } +} diff --git a/backend/webdav/tus-uploader.go b/backend/webdav/tus-uploader.go new file mode 100644 index 000000000..4a0233ccc --- /dev/null +++ b/backend/webdav/tus-uploader.go @@ -0,0 +1,191 @@ +package webdav + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/lib/rest" +) + +// Uploader holds all information about a currently running upload +type Uploader struct { + fs *Fs + url string + upload *Upload + offset int64 + aborted bool + uploadSubs []chan Upload + notifyChan chan bool + overridePatchMethod bool +} + +// NotifyUploadProgress subscribes to progress updates. +func (u *Uploader) NotifyUploadProgress(c chan Upload) { + u.uploadSubs = append(u.uploadSubs, c) +} + +func (f *Fs) shouldRetryChunk(ctx context.Context, resp *http.Response, err error, newOff *int64) (bool, error) { + if resp == nil { + return true, err + } + + switch resp.StatusCode { + case 204: + if off, err := strconv.ParseInt(resp.Header.Get("Upload-Offset"), 10, 64); err == nil { + *newOff = off + return false, nil + } + return false, err + + case 409: + return false, ErrOffsetMismatch + case 412: + return false, ErrVersionMismatch + case 413: + return false, ErrLargeUpload + } + + return f.shouldRetry(ctx, resp, err) +} + +func (u *Uploader) uploadChunk(ctx context.Context, body io.Reader, size int64, offset int64, options ...fs.OpenOption) (int64, error) { + var method string + + if !u.overridePatchMethod { + method = "PATCH" + } else { + method = "POST" + } + + extraHeaders := map[string]string{} // FIXME: Use extraHeaders(ctx, src) from Object maybe? + extraHeaders["Upload-Offset"] = strconv.FormatInt(offset, 10) + extraHeaders["Tus-Resumable"] = "1.0.0" + extraHeaders["filetype"] = u.upload.Metadata["filetype"] + if u.overridePatchMethod { + extraHeaders["X-HTTP-Method-Override"] = "PATCH" + } + + url, err := url.Parse(u.url) + if err != nil { + return 0, fmt.Errorf("upload Chunk failed, could not parse url") + } + + // FIXME: Use GetBody func as in chunking.go + opts := rest.Opts{ + Method: method, + Path: url.Path, + NoResponse: true, + RootURL: fmt.Sprintf("%s://%s", url.Scheme, url.Host), + ContentLength: &size, + Body: body, + ContentType: "application/offset+octet-stream", + ExtraHeaders: extraHeaders, + Options: options, + } + + var newOffset int64 + + err = u.fs.pacer.CallNoRetry(func() (bool, error) { + res, err := u.fs.srv.Call(ctx, &opts) + return u.fs.shouldRetryChunk(ctx, res, err, &newOffset) + }) + if err != nil { + return 0, fmt.Errorf("uploadChunk failed: %w", err) + // FIXME What do we do here? Remove the entire upload? + // See https://github.com/tus/tusd/issues/176 + } + + return newOffset, nil +} + +// Upload uploads the entire body to the server. +func (u *Uploader) Upload(ctx context.Context, options ...fs.OpenOption) error { + cnt := 1 + + fs.Debug(u.fs, "Uploaded starts") + for u.offset < u.upload.size && !u.aborted { + err := u.UploadChunk(ctx, cnt, options...) + cnt++ + if err != nil { + return err + } + } + fs.Debug(u.fs, "-- Uploaded finished") + + return nil +} + +// UploadChunk uploads a single chunk. +func (u *Uploader) UploadChunk(ctx context.Context, cnt int, options ...fs.OpenOption) error { + chunkSize := u.fs.opt.ChunkSize + data := make([]byte, chunkSize) + + _, err := u.upload.stream.Seek(u.offset, 0) + + if err != nil { + fs.Errorf(u.fs, "Chunk %d: Error seek in stream failed: %v", cnt, err) + return err + } + + size, err := u.upload.stream.Read(data) + + if err != nil { + fs.Errorf(u.fs, "Chunk %d: Error: Can not read from data strem: %v", cnt, err) + return err + } + + body := bytes.NewBuffer(data[:size]) + + newOffset, err := u.uploadChunk(ctx, body, int64(size), u.offset, options...) + + if err == nil { + fs.Debugf(u.fs, "Uploaded chunk no %d ok, range %d -> %d", cnt, u.offset, newOffset) + } else { + fs.Errorf(u.fs, "Uploaded chunk no %d failed: %v", cnt, err) + + return err + } + + u.offset = newOffset + + u.upload.updateProgress(u.offset) + + u.notifyChan <- true + + return nil +} + +// Waits for a signal to broadcast to all subscribers +func (u *Uploader) broadcastProgress() { + for range u.notifyChan { + for _, c := range u.uploadSubs { + c <- *u.upload + } + } +} + +// NewUploader creates a new Uploader. +func NewUploader(f *Fs, url string, upload *Upload, offset int64) *Uploader { + notifyChan := make(chan bool) + + uploader := &Uploader{ + f, + url, + upload, + offset, + false, + nil, + notifyChan, + false, + } + + go uploader.broadcastProgress() + + return uploader +} diff --git a/backend/webdav/tus.go b/backend/webdav/tus.go new file mode 100644 index 000000000..3f6a843bb --- /dev/null +++ b/backend/webdav/tus.go @@ -0,0 +1,108 @@ +package webdav + +/* + Chunked upload based on the tus protocol for ownCloud Infinite Scale + See https://tus.io/protocols/resumable-upload +*/ + +import ( + "context" + "fmt" + "io" + "net/http" + "path/filepath" + "strconv" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/lib/rest" +) + +func (o *Object) updateViaTus(ctx context.Context, in io.Reader, contentType string, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { + + fn := filepath.Base(src.Remote()) + metadata := map[string]string{ + "filename": fn, + "mtime": strconv.FormatInt(src.ModTime(ctx).Unix(), 10), + "filetype": contentType, + } + + // Fingerprint is used to identify the upload when resuming. That is not yet implemented + fingerprint := "" + + // create an upload from a file. + upload := NewUpload(in, src.Size(), metadata, fingerprint) + + // create the uploader. + uploader, err := o.CreateUploader(ctx, upload, options...) + if err == nil { + // start the uploading process. + err = uploader.Upload(ctx, options...) + } + + return err +} + +func (f *Fs) getTusLocationOrRetry(ctx context.Context, resp *http.Response, err error) (bool, string, error) { + + switch resp.StatusCode { + case 201: + location := resp.Header.Get("Location") + return false, location, nil + case 412: + return false, "", ErrVersionMismatch + case 413: + return false, "", ErrLargeUpload + } + + retry, err := f.shouldRetry(ctx, resp, err) + return retry, "", err +} + +// CreateUploader creates a new upload to the server. +func (o *Object) CreateUploader(ctx context.Context, u *Upload, options ...fs.OpenOption) (*Uploader, error) { + if u == nil { + return nil, ErrNilUpload + } + + // if c.Config.Resume && len(u.Fingerprint) == 0 { + // return nil, ErrFingerprintNotSet + // } + + l := int64(0) + p := o.filePath() + // cut the filename off + dir, _ := filepath.Split(p) + if dir == "" { + dir = "/" + } + + opts := rest.Opts{ + Method: "POST", + Path: dir, + NoResponse: true, + RootURL: o.fs.endpointURL, + ContentLength: &l, + ExtraHeaders: o.extraHeaders(ctx, o), + Options: options, + } + opts.ExtraHeaders["Upload-Length"] = strconv.FormatInt(u.size, 10) + opts.ExtraHeaders["Upload-Metadata"] = u.EncodedMetadata() + opts.ExtraHeaders["Tus-Resumable"] = "1.0.0" + // opts.ExtraHeaders["mtime"] = strconv.FormatInt(src.ModTime(ctx).Unix(), 10) + + var tusLocation string + // rclone http call + err := o.fs.pacer.CallNoRetry(func() (bool, error) { + var retry bool + res, err := o.fs.srv.Call(ctx, &opts) + retry, tusLocation, err = o.fs.getTusLocationOrRetry(ctx, res, err) + return retry, err + }) + if err != nil { + return nil, fmt.Errorf("making upload directory failed: %w", err) + } + + uploader := NewUploader(o.fs, tusLocation, u, 0) + + return uploader, nil +} diff --git a/backend/webdav/webdav.go b/backend/webdav/webdav.go index 3624f4a8c..b707116bb 100644 --- a/backend/webdav/webdav.go +++ b/backend/webdav/webdav.go @@ -84,7 +84,10 @@ func init() { Help: "Nextcloud", }, { Value: "owncloud", - Help: "Owncloud", + Help: "Owncloud 10 PHP based WebDAV server", + }, { + Value: "infinitescale", + Help: "ownCloud Infinite Scale", }, { Value: "sharepoint", Help: "Sharepoint Online, authenticated by Microsoft account", @@ -212,6 +215,7 @@ type Fs struct { pacer *fs.Pacer // pacer for API calls precision time.Duration // mod time precision canStream bool // set if can stream + canTus bool // supports the TUS upload protocol useOCMtime bool // set if can use X-OC-Mtime propsetMtime bool // set if can use propset retryWithZeroDepth bool // some vendors (sharepoint) won't list files when Depth is 1 (our default) @@ -632,6 +636,15 @@ func (f *Fs) setQuirks(ctx context.Context, vendor string) error { f.propsetMtime = true f.hasOCMD5 = true f.hasOCSHA1 = true + case "infinitescale": + f.precision = time.Second + f.useOCMtime = true + f.propsetMtime = true + f.hasOCMD5 = false + f.hasOCSHA1 = true + f.canChunk = false + f.canTus = true + f.opt.ChunkSize = 10 * fs.Mebi case "nextcloud": f.precision = time.Second f.useOCMtime = true @@ -1329,7 +1342,7 @@ func (o *Object) Size() int64 { ctx := context.TODO() err := o.readMetaData(ctx) if err != nil { - fs.Logf(o, "Failed to read metadata: %v", err) + fs.Infof(o, "Failed to read metadata: %v", err) return 0 } return o.size @@ -1373,7 +1386,7 @@ func (o *Object) readMetaData(ctx context.Context) (err error) { func (o *Object) ModTime(ctx context.Context) time.Time { err := o.readMetaData(ctx) if err != nil { - fs.Logf(o, "Failed to read metadata: %v", err) + fs.Infof(o, "Failed to read metadata: %v", err) return time.Now() } return o.modTime @@ -1499,9 +1512,21 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return fmt.Errorf("Update mkParentDir failed: %w", err) } - if o.shouldUseChunkedUpload(src) { - fs.Debugf(src, "Update will use the chunked upload strategy") - err = o.updateChunked(ctx, in, src, options...) + if o.fs.canTus { // supports the tus upload protocol, ie. InfiniteScale + fs.Debugf(src, "Update will use the tus protocol to upload") + contentType := fs.MimeType(ctx, src) + err = o.updateViaTus(ctx, in, contentType, src, options...) + if err != nil { + fs.Debug(src, "tus update failed.") + return fmt.Errorf("tus update failed: %w", err) + } + } else if o.shouldUseChunkedUpload(src) { + if o.fs.opt.Vendor == "nextcloud" { + fs.Debugf(src, "Update will use the chunked upload strategy") + err = o.updateChunked(ctx, in, src, options...) + } else { + fs.Debug(src, "Chunking - unknown vendor") + } if err != nil { return err } @@ -1513,10 +1538,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // TODO: define getBody() to enable low-level HTTP/2 retries err = o.updateSimple(ctx, in, nil, filePath, src.Size(), contentType, extraHeaders, o.fs.endpointURL, options...) if err != nil { - return err + return fmt.Errorf("unchunked simple update failed: %w", err) } } - // read metadata from remote o.hasMetaData = false return o.readMetaData(ctx) @@ -1526,7 +1550,7 @@ func (o *Object) extraHeaders(ctx context.Context, src fs.ObjectInfo) map[string extraHeaders := map[string]string{} if o.fs.useOCMtime || o.fs.hasOCMD5 || o.fs.hasOCSHA1 { if o.fs.useOCMtime { - extraHeaders["X-OC-Mtime"] = fmt.Sprintf("%d", src.ModTime(ctx).Unix()) + extraHeaders["X-OC-Mtime"] = fmt.Sprintf("%d", o.modTime.Unix()) } // Set one upload checksum // Owncloud uses one checksum only to check the upload and stores its own SHA1 and MD5 diff --git a/docs/content/webdav.md b/docs/content/webdav.md index 357fea066..7c9835365 100644 --- a/docs/content/webdav.md +++ b/docs/content/webdav.md @@ -104,11 +104,11 @@ To copy a local directory to an WebDAV directory called backup ### Modification times and hashes Plain WebDAV does not support modified times. However when used with -Fastmail Files, Owncloud or Nextcloud rclone will support modified times. +Fastmail Files, ownCloud or Nextcloud rclone will support modified times. Likewise plain WebDAV does not support hashes, however when used with -Fastmail Files, Owncloud or Nextcloud rclone will support SHA1 and MD5 hashes. -Depending on the exact version of Owncloud or Nextcloud hashes may +Fastmail Files, ownCloud or Nextcloud rclone will support SHA1 and MD5 hashes. +Depending on the exact version of ownCloud or Nextcloud hashes may appear on all objects, or only on objects which had a hash uploaded with them. @@ -355,19 +355,28 @@ this as the password. Fastmail supports modified times using the `X-OC-Mtime` header. -### Owncloud +### ownCloud Click on the settings cog in the bottom right of the page and this will show the WebDAV URL that rclone needs in the config step. It will look something like `https://example.com/remote.php/webdav/`. -Owncloud supports modified times using the `X-OC-Mtime` header. +ownCloud supports modified times using the `X-OC-Mtime` header. ### Nextcloud -This is configured in an identical way to Owncloud. Note that +This is configured in an identical way to ownCloud. Note that Nextcloud initially did not support streaming of files (`rcat`) whereas -Owncloud did, but [this](https://github.com/nextcloud/nextcloud-snap/issues/365) seems to be fixed as of 2020-11-27 (tested with rclone v1.53.1 and Nextcloud Server v19). +ownCloud did, but [this](https://github.com/nextcloud/nextcloud-snap/issues/365) seems to be fixed as of 2020-11-27 (tested with rclone v1.53.1 and Nextcloud Server v19). + +### ownCloud Infinite Scale + +The WebDAV URL for Infinite Scale can be found in the details panel of +any space in Infinite Scale, if the display was enabled in the personal +settings of the user through a checkbox there. + +Infinite Scale works with the chunking [tus](https://tus.io) upload protocol. +The chunk size is currently fixed 10 MB. ### Sharepoint Online diff --git a/fstest/test_all/config.yaml b/fstest/test_all/config.yaml index 976485011..9facf06c1 100644 --- a/fstest/test_all/config.yaml +++ b/fstest/test_all/config.yaml @@ -399,6 +399,12 @@ backends: - TestIntegration/FsMkdir/FsEncoding/punctuation - TestIntegration/FsMkdir/FsEncoding/invalid_UTF-8 fastlist: false + - backend: "webdav" + remote: "TestWebdavInfiniteScale:" + ignore: + - TestIntegration/FsMkdir/FsEncoding/punctuation + - TestIntegration/FsMkdir/FsEncoding/invalid_UTF-8 + fastlist: false - backend: "webdav" remote: "TestWebdavRclone:" ignore: diff --git a/fstest/testserver/init.d/TestWebdavInfiniteScale b/fstest/testserver/init.d/TestWebdavInfiniteScale new file mode 100755 index 000000000..7226055c4 --- /dev/null +++ b/fstest/testserver/init.d/TestWebdavInfiniteScale @@ -0,0 +1,49 @@ +#!/usr/bin/env bash + +set -e + +NAME=infinitescale +USER=admin +PASS=admin +PORT=9200 + +. $(dirname "$0")/docker.bash + +start() { + + docker run --rm --name $NAME \ + -v $(pwd):/etc/ocis \ + -e "OCIS_INSECURE=true" \ + -e "IDM_ADMIN_PASSWORD=$PASS" \ + -e "OCIS_FORCE_CONFIG_OVERWRITE=true" \ + -e "OCIS_URL=https://127.0.0.1:$PORT" \ + owncloud/ocis \ + init + + docker run --rm -d --name $NAME \ + -e "OCIS_LOG_LEVEL=debug" \ + -e "OCIS_LOG_PRETTY=true" \ + -e "OCIS_URL=https://127.0.0.1:$PORT" \ + -e "OCIS_ADMIN_USER_ID=some-admin-user-id-0000-100000000000" \ + -e "IDM_ADMIN_PASSWORD=$PASS" \ + -e "OCIS_INSECURE=true" \ + -e "PROXY_ENABLE_BASIC_AUTH=true" \ + -v $(pwd):/etc/ocis \ + -p 127.0.0.1:${PORT}:9200 \ + owncloud/ocis + + echo type=webdav + echo url=https://127.0.0.1:${PORT}/dav/spaces/some-admin-user-id-0000-100000000000 + echo user=$USER + echo pass=$(rclone obscure $PASS) + echo vendor=infinitescale + echo _connect=127.0.0.1:${PORT} +} + +stop() { + # Clean up the mess + docker stop infinitescale + rm -f ./ocis.yaml +} + +. $(dirname "$0")/run.bash