mirror of
https://github.com/rclone/rclone.git
synced 2025-01-03 04:49:47 +01:00
yandex: support deduplicated uploads - fixes #6032
This commit is contained in:
parent
83ea146ea8
commit
7b3caf8d11
@ -28,6 +28,7 @@ import (
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
//oAuth
|
||||
@ -1073,7 +1074,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
||||
return resp.Body, err
|
||||
}
|
||||
|
||||
func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeType string, options ...fs.OpenOption) (err error) {
|
||||
func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||
// prepare upload
|
||||
var resp *http.Response
|
||||
var ur api.AsyncInfo
|
||||
@ -1087,6 +1088,29 @@ func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeT
|
||||
opts.Parameters.Set("path", o.fs.opt.Enc.FromStandardPath(o.filePath()))
|
||||
opts.Parameters.Set("overwrite", strconv.FormatBool(overwrite))
|
||||
|
||||
// Check to see if we can calculate a MD5 and SHA256 hash and
|
||||
// if so start calculating them to do de-dupe the uploads.
|
||||
var (
|
||||
hashes = src.Fs().Hashes()
|
||||
size = src.Size()
|
||||
dedupe = size >= 0 && hashes.Contains(hash.MD5) && hashes.Contains(hash.SHA256)
|
||||
g *errgroup.Group
|
||||
gCtx context.Context
|
||||
md5sum string
|
||||
sha256sum string
|
||||
)
|
||||
if dedupe {
|
||||
g, gCtx = errgroup.WithContext(ctx)
|
||||
g.Go(func() (err error) {
|
||||
md5sum, err = src.Hash(gCtx, hash.MD5)
|
||||
return err
|
||||
})
|
||||
g.Go(func() (err error) {
|
||||
sha256sum, err = src.Hash(gCtx, hash.SHA256)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &ur)
|
||||
return shouldRetry(ctx, resp, err)
|
||||
@ -1098,11 +1122,27 @@ func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeT
|
||||
|
||||
// perform the actual upload
|
||||
opts = rest.Opts{
|
||||
RootURL: ur.HRef,
|
||||
Method: "PUT",
|
||||
ContentType: mimeType,
|
||||
Body: in,
|
||||
NoResponse: true,
|
||||
RootURL: ur.HRef,
|
||||
Method: "PUT",
|
||||
ContentType: fs.MimeType(ctx, src),
|
||||
Body: in,
|
||||
ExtraHeaders: map[string]string{},
|
||||
NoResponse: true,
|
||||
}
|
||||
if size >= 0 {
|
||||
opts.ContentLength = &size
|
||||
}
|
||||
|
||||
// Add the hashes to the PUT to dedupe the upload if possible
|
||||
if dedupe {
|
||||
err = g.Wait()
|
||||
if err != nil {
|
||||
fs.Debugf(o, "failed to calculate MD5 or SHA256: %v", err)
|
||||
} else {
|
||||
opts.ExtraHeaders["Expect"] = "100-continue"
|
||||
opts.ExtraHeaders["Etag"] = md5sum
|
||||
opts.ExtraHeaders["Sha256"] = sha256sum
|
||||
}
|
||||
}
|
||||
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
@ -1130,7 +1170,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
}
|
||||
|
||||
//upload file
|
||||
err = o.upload(ctx, in1, true, fs.MimeType(ctx, src), options...)
|
||||
err = o.upload(ctx, in1, true, src, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user