From 40111ba5e153c96e729d8639d46d59f85978c545 Mon Sep 17 00:00:00 2001 From: Georg Welzel Date: Tue, 3 Dec 2024 10:52:44 -0700 Subject: [PATCH] plcoud: fix failing large file uploads - fixes #8147 This changes the OpenWriterAt implementation to make client/fd handling atomic. This PR stabilizes the situation of bigger files and multi-threaded uploads. The root cause boils down to the old "fun" property of pclouds fileops API: sessions are bound to TCP connections. This forces us to use a http client with only a single connection underneath. With large files, we reuse the same connection for each chunk. If that connection interrupts (e.g. because we are talking through the internet), all chunks will fail. The probability for latter one increases with larger files. As the point of the whole multi-threaded feature was to speed-up large files in the first place, this change pulls the client creation (and hence connection handling) into each chunk. This should stabilize the situation, as each chunk (and retry) gets its own connection. --- backend/pcloud/pcloud.go | 5 +- backend/pcloud/writer_at.go | 93 +++++++++++++++++++++++++++---------- 2 files changed, 72 insertions(+), 26 deletions(-) diff --git a/backend/pcloud/pcloud.go b/backend/pcloud/pcloud.go index dc475cb9d..6d1524390 100644 --- a/backend/pcloud/pcloud.go +++ b/backend/pcloud/pcloud.go @@ -399,14 +399,15 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr if err != nil { return nil, fmt.Errorf("open file: %w", err) } + if _, err := fileClose(ctx, client, f.pacer, openResult.FileDescriptor); err != nil { + return nil, fmt.Errorf("close file: %w", err) + } writer := &writerAt{ ctx: ctx, - client: client, fs: f, size: size, remote: remote, - fd: openResult.FileDescriptor, fileID: openResult.Fileid, } diff --git a/backend/pcloud/writer_at.go b/backend/pcloud/writer_at.go index ee3380a71..adf6a6b44 100644 --- a/backend/pcloud/writer_at.go +++ b/backend/pcloud/writer_at.go @@ -18,21 +18,14 @@ import ( // writerAt implements fs.WriterAtCloser, adding the OpenWrtierAt feature to pcloud. type writerAt struct { ctx context.Context - client *rest.Client fs *Fs size int64 remote string - fd int64 fileID int64 } // Close implements WriterAt.Close. func (c *writerAt) Close() error { - // close fd - if _, err := c.fileClose(c.ctx); err != nil { - return fmt.Errorf("close fd: %w", err) - } - // Avoiding race conditions: Depending on the tcp connection, there might be // caching issues when checking the size immediately after write. // Hence we try avoiding them by checking the resulting size on a different connection. @@ -72,8 +65,18 @@ func (c *writerAt) WriteAt(buffer []byte, offset int64) (n int, err error) { inSHA1Bytes := sha1.Sum(buffer) inSHA1 := hex.EncodeToString(inSHA1Bytes[:]) + client, err := c.fs.newSingleConnClient(c.ctx) + if err != nil { + return 0, fmt.Errorf("create client: %w", err) + } + + openResult, err := fileOpen(c.ctx, client, c.fs, c.fileID) + if err != nil { + return 0, fmt.Errorf("open file: %w", err) + } + // get target hash - outChecksum, err := c.fileChecksum(c.ctx, offset, int64(contentLength)) + outChecksum, err := fileChecksum(c.ctx, client, c.fs.pacer, openResult.FileDescriptor, offset, int64(contentLength)) if err != nil { return 0, err } @@ -89,10 +92,15 @@ func (c *writerAt) WriteAt(buffer []byte, offset int64) (n int, err error) { } // upload buffer with offset if necessary - if _, err := c.filePWrite(c.ctx, offset, buffer); err != nil { + if _, err := filePWrite(c.ctx, client, c.fs.pacer, openResult.FileDescriptor, offset, buffer); err != nil { return 0, err } + // close fd + if _, err := fileClose(c.ctx, client, c.fs.pacer, openResult.FileDescriptor); err != nil { + return contentLength, fmt.Errorf("close fd: %w", err) + } + return contentLength, nil } @@ -125,11 +133,40 @@ func fileOpenNew(ctx context.Context, c *rest.Client, srcFs *Fs, directoryID, fi return result, nil } +// Call pcloud file_open using fileid with O_WRITE flags, see [API Doc.] +// [API Doc]: https://docs.pcloud.com/methods/fileops/file_open.html +func fileOpen(ctx context.Context, c *rest.Client, srcFs *Fs, fileID int64) (*api.FileOpenResponse, error) { + opts := rest.Opts{ + Method: "PUT", + Path: "/file_open", + Parameters: url.Values{}, + TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding + ExtraHeaders: map[string]string{ + "Connection": "keep-alive", + }, + } + opts.Parameters.Set("fileid", strconv.FormatInt(fileID, 10)) + opts.Parameters.Set("flags", "0x0002") // O_WRITE + + result := &api.FileOpenResponse{} + err := srcFs.pacer.CallNoRetry(func() (bool, error) { + resp, err := c.CallJSON(ctx, &opts, nil, result) + err = result.Error.Update(err) + return shouldRetry(ctx, resp, err) + }) + if err != nil { + return nil, fmt.Errorf("open new file descriptor: %w", err) + } + return result, nil +} + // Call pcloud file_checksum, see [API Doc.] // [API Doc]: https://docs.pcloud.com/methods/fileops/file_checksum.html -func (c *writerAt) fileChecksum( +func fileChecksum( ctx context.Context, - offset, count int64, + client *rest.Client, + pacer *fs.Pacer, + fd, offset, count int64, ) (*api.FileChecksumResponse, error) { opts := rest.Opts{ Method: "PUT", @@ -140,26 +177,29 @@ func (c *writerAt) fileChecksum( "Connection": "keep-alive", }, } - opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10)) + opts.Parameters.Set("fd", strconv.FormatInt(fd, 10)) opts.Parameters.Set("offset", strconv.FormatInt(offset, 10)) opts.Parameters.Set("count", strconv.FormatInt(count, 10)) result := &api.FileChecksumResponse{} - err := c.fs.pacer.CallNoRetry(func() (bool, error) { - resp, err := c.client.CallJSON(ctx, &opts, nil, result) + err := pacer.CallNoRetry(func() (bool, error) { + resp, err := client.CallJSON(ctx, &opts, nil, result) err = result.Error.Update(err) return shouldRetry(ctx, resp, err) }) if err != nil { - return nil, fmt.Errorf("checksum of fd %d with offset %d and size %d: %w", c.fd, offset, count, err) + return nil, fmt.Errorf("checksum of fd %d with offset %d and size %d: %w", fd, offset, count, err) } return result, nil } // Call pcloud file_pwrite, see [API Doc.] // [API Doc]: https://docs.pcloud.com/methods/fileops/file_pwrite.html -func (c *writerAt) filePWrite( +func filePWrite( ctx context.Context, + client *rest.Client, + pacer *fs.Pacer, + fd int64, offset int64, buf []byte, ) (*api.FilePWriteResponse, error) { @@ -176,24 +216,29 @@ func (c *writerAt) filePWrite( "Connection": "keep-alive", }, } - opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10)) + opts.Parameters.Set("fd", strconv.FormatInt(fd, 10)) opts.Parameters.Set("offset", strconv.FormatInt(offset, 10)) result := &api.FilePWriteResponse{} - err := c.fs.pacer.CallNoRetry(func() (bool, error) { - resp, err := c.client.CallJSON(ctx, &opts, nil, result) + err := pacer.CallNoRetry(func() (bool, error) { + resp, err := client.CallJSON(ctx, &opts, nil, result) err = result.Error.Update(err) return shouldRetry(ctx, resp, err) }) if err != nil { - return nil, fmt.Errorf("write %d bytes to fd %d with offset %d: %w", contentLength, c.fd, offset, err) + return nil, fmt.Errorf("write %d bytes to fd %d with offset %d: %w", contentLength, fd, offset, err) } return result, nil } // Call pcloud file_close, see [API Doc.] // [API Doc]: https://docs.pcloud.com/methods/fileops/file_close.html -func (c *writerAt) fileClose(ctx context.Context) (*api.FileCloseResponse, error) { +func fileClose( + ctx context.Context, + client *rest.Client, + pacer *fs.Pacer, + fd int64, +) (*api.FileCloseResponse, error) { opts := rest.Opts{ Method: "PUT", Path: "/file_close", @@ -201,11 +246,11 @@ func (c *writerAt) fileClose(ctx context.Context) (*api.FileCloseResponse, error TransferEncoding: []string{"identity"}, // pcloud doesn't like chunked encoding Close: true, } - opts.Parameters.Set("fd", strconv.FormatInt(c.fd, 10)) + opts.Parameters.Set("fd", strconv.FormatInt(fd, 10)) result := &api.FileCloseResponse{} - err := c.fs.pacer.CallNoRetry(func() (bool, error) { - resp, err := c.client.CallJSON(ctx, &opts, nil, result) + err := pacer.CallNoRetry(func() (bool, error) { + resp, err := client.CallJSON(ctx, &opts, nil, result) err = result.Error.Update(err) return shouldRetry(ctx, resp, err) })