diff --git a/backend/yandex/yandex.go b/backend/yandex/yandex.go index 265802cb5..d03c2320d 100644 --- a/backend/yandex/yandex.go +++ b/backend/yandex/yandex.go @@ -28,6 +28,7 @@ import ( "github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/rest" "golang.org/x/oauth2" + "golang.org/x/sync/errgroup" ) //oAuth @@ -1073,7 +1074,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read return resp.Body, err } -func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeType string, options ...fs.OpenOption) (err error) { +func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { // prepare upload var resp *http.Response var ur api.AsyncInfo @@ -1087,6 +1088,29 @@ func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeT opts.Parameters.Set("path", o.fs.opt.Enc.FromStandardPath(o.filePath())) opts.Parameters.Set("overwrite", strconv.FormatBool(overwrite)) + // Check to see if we can calculate a MD5 and SHA256 hash and + // if so start calculating them to do de-dupe the uploads. + var ( + hashes = src.Fs().Hashes() + size = src.Size() + dedupe = size >= 0 && hashes.Contains(hash.MD5) && hashes.Contains(hash.SHA256) + g *errgroup.Group + gCtx context.Context + md5sum string + sha256sum string + ) + if dedupe { + g, gCtx = errgroup.WithContext(ctx) + g.Go(func() (err error) { + md5sum, err = src.Hash(gCtx, hash.MD5) + return err + }) + g.Go(func() (err error) { + sha256sum, err = src.Hash(gCtx, hash.SHA256) + return err + }) + } + err = o.fs.pacer.Call(func() (bool, error) { resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &ur) return shouldRetry(ctx, resp, err) @@ -1098,11 +1122,27 @@ func (o *Object) upload(ctx context.Context, in io.Reader, overwrite bool, mimeT // perform the actual upload opts = rest.Opts{ - RootURL: ur.HRef, - Method: "PUT", - ContentType: mimeType, - Body: in, - NoResponse: true, + RootURL: ur.HRef, + Method: "PUT", + ContentType: fs.MimeType(ctx, src), + Body: in, + ExtraHeaders: map[string]string{}, + NoResponse: true, + } + if size >= 0 { + opts.ContentLength = &size + } + + // Add the hashes to the PUT to dedupe the upload if possible + if dedupe { + err = g.Wait() + if err != nil { + fs.Debugf(o, "failed to calculate MD5 or SHA256: %v", err) + } else { + opts.ExtraHeaders["Expect"] = "100-continue" + opts.ExtraHeaders["Etag"] = md5sum + opts.ExtraHeaders["Sha256"] = sha256sum + } } err = o.fs.pacer.Call(func() (bool, error) { @@ -1130,7 +1170,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } //upload file - err = o.upload(ctx, in1, true, fs.MimeType(ctx, src), options...) + err = o.upload(ctx, in1, true, src, options...) if err != nil { return err }