drive: add --drive-uploads-per-second and -burst for rate limiting uploads

According to the Google docs here: https://support.google.com/a/answer/10445916

> The rate of Drive API write requests is limited—avoid exceeding 3
> requests per second of sustained write or insert requests, per
> account. Note: This rate limit can’t be increased

This adds a rate limiter set to 3 per second for uploads. This is in
the hope that we can reduce the value of the main drive pacer to speed
everything else up.

Fixes #7384
This commit is contained in:
Nick Craig-Wood 2023-10-22 10:54:41 +01:00
parent 6e4dd2ab96
commit 45c6cf5891
2 changed files with 25 additions and 6 deletions

View File

@ -47,6 +47,7 @@ import (
"github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/readers"
"golang.org/x/oauth2" "golang.org/x/oauth2"
"golang.org/x/oauth2/google" "golang.org/x/oauth2/google"
"golang.org/x/time/rate"
drive_v2 "google.golang.org/api/drive/v2" drive_v2 "google.golang.org/api/drive/v2"
drive "google.golang.org/api/drive/v3" drive "google.golang.org/api/drive/v3"
"google.golang.org/api/googleapi" "google.golang.org/api/googleapi"
@ -75,6 +76,8 @@ const (
listRGrouping = 50 // number of IDs to search at once when using ListR listRGrouping = 50 // number of IDs to search at once when using ListR
listRInputBuffer = 1000 // size of input buffer when using ListR listRInputBuffer = 1000 // size of input buffer when using ListR
defaultXDGIcon = "text-html" defaultXDGIcon = "text-html"
uploadsPerSecond = 3.0 // default number of uploads per second
uploadsPerSecondBurst = 3 // burst for the above
) )
// Globals // Globals
@ -558,6 +561,16 @@ need to use --ignore size also.`,
Default: defaultBurst, Default: defaultBurst,
Help: "Number of API calls to allow without sleeping.", Help: "Number of API calls to allow without sleeping.",
Advanced: true, Advanced: true,
}, {
Name: "uploads_per_second",
Default: uploadsPerSecond,
Help: "Number of uploads per second limit.",
Advanced: true,
}, {
Name: "uploads_per_second_burst",
Default: uploadsPerSecondBurst,
Help: "Burst for number of uploads per second limit.",
Advanced: true,
}, { }, {
Name: "server_side_across_configs", Name: "server_side_across_configs",
Default: false, Default: false,
@ -796,6 +809,8 @@ type Options struct {
V2DownloadMinSize fs.SizeSuffix `config:"v2_download_min_size"` V2DownloadMinSize fs.SizeSuffix `config:"v2_download_min_size"`
PacerMinSleep fs.Duration `config:"pacer_min_sleep"` PacerMinSleep fs.Duration `config:"pacer_min_sleep"`
PacerBurst int `config:"pacer_burst"` PacerBurst int `config:"pacer_burst"`
UploadsPerSecond float64 `config:"uploads_per_second"`
UploadsPerSecondBurst int `config:"uploads_per_second_burst"`
ServerSideAcrossConfigs bool `config:"server_side_across_configs"` ServerSideAcrossConfigs bool `config:"server_side_across_configs"`
DisableHTTP2 bool `config:"disable_http2"` DisableHTTP2 bool `config:"disable_http2"`
StopOnUploadLimit bool `config:"stop_on_upload_limit"` StopOnUploadLimit bool `config:"stop_on_upload_limit"`
@ -835,6 +850,7 @@ type Fs struct {
dirResourceKeys *sync.Map // map directory ID to resource key dirResourceKeys *sync.Map // map directory ID to resource key
permissionsMu *sync.Mutex // protect the below permissionsMu *sync.Mutex // protect the below
permissions map[string]*drive.Permission // map permission IDs to Permissions permissions map[string]*drive.Permission // map permission IDs to Permissions
uploadsLimiter *rate.Limiter // rate limit uploads
} }
type baseObject struct { type baseObject struct {
@ -1371,6 +1387,7 @@ func newFs(ctx context.Context, name, path string, m configmap.Mapper) (*Fs, err
dirResourceKeys: new(sync.Map), dirResourceKeys: new(sync.Map),
permissionsMu: new(sync.Mutex), permissionsMu: new(sync.Mutex),
permissions: make(map[string]*drive.Permission), permissions: make(map[string]*drive.Permission),
uploadsLimiter: rate.NewLimiter(rate.Limit(opt.UploadsPerSecond), opt.UploadsPerSecondBurst),
} }
f.isTeamDrive = opt.TeamDriveID != "" f.isTeamDrive = opt.TeamDriveID != ""
f.features = (&fs.Features{ f.features = (&fs.Features{
@ -2448,6 +2465,7 @@ func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo,
// Make the API request to upload metadata and file data. // Make the API request to upload metadata and file data.
// Don't retry, return a retry error instead // Don't retry, return a retry error instead
err = f.pacer.CallNoRetry(func() (bool, error) { err = f.pacer.CallNoRetry(func() (bool, error) {
_ = f.uploadsLimiter.Wait(ctx) // obey upslimit
info, err = f.svc.Files.Create(createInfo). info, err = f.svc.Files.Create(createInfo).
Media(in, googleapi.ContentType(srcMimeType), googleapi.ChunkSize(0)). Media(in, googleapi.ContentType(srcMimeType), googleapi.ChunkSize(0)).
Fields(partialFields). Fields(partialFields).

View File

@ -71,6 +71,7 @@ func (f *Fs) Upload(ctx context.Context, in io.Reader, size int64, contentType,
var res *http.Response var res *http.Response
var err error var err error
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
_ = f.uploadsLimiter.Wait(ctx) // obey upslimit
var body io.Reader var body io.Reader
body, err = googleapi.WithoutDataWrapper.JSONReader(info) body, err = googleapi.WithoutDataWrapper.JSONReader(info)
if err != nil { if err != nil {