diff --git a/amazonclouddrive/amazonclouddrive.go b/amazonclouddrive/amazonclouddrive.go index c28d166aa..9512903ec 100644 --- a/amazonclouddrive/amazonclouddrive.go +++ b/amazonclouddrive/amazonclouddrive.go @@ -534,7 +534,7 @@ func (f *Fs) checkUpload(resp *http.Response, in io.Reader, src fs.ObjectInfo, i // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { remote := src.Remote() size := src.Size() // Temporary Object under construction @@ -1002,7 +1002,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { file := acd.File{Node: o.info} var info *acd.File var resp *http.Response diff --git a/b2/b2.go b/b2/b2.go index 6fd1198ba..18510e1ee 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -660,7 +660,7 @@ func (f *Fs) clearBucketID() { // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { // Temporary Object under construction fs := &Object{ fs: f, @@ -1161,7 +1161,7 @@ func urlEncode(in string) string { // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { if *b2Versions { return errNotWithVersions } diff --git a/crypt/crypt.go b/crypt/crypt.go index 92c4e9a95..57d8992e3 100644 --- a/crypt/crypt.go +++ b/crypt/crypt.go @@ -162,7 +162,7 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) { // May create the object even if it returns an error - if so // will return the object and the error, otherwise will return // nil and the error -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { wrappedIn, err := f.cipher.EncryptData(in) if err != nil { return nil, err @@ -282,7 +282,7 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { // // This will create a duplicate if we upload a new file without // checking to see if there is one already - use Put() for that. -func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { do := f.Fs.Features().PutUnchecked if do == nil { return nil, errors.New("can't PutUnchecked") @@ -455,7 +455,7 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) { } // Update in to the object with the modTime given of the given size -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { wrappedIn, err := o.f.cipher.EncryptData(in) if err != nil { return err diff --git a/drive/drive.go b/drive/drive.go index 947badce8..00d704280 100644 --- a/drive/drive.go +++ b/drive/drive.go @@ -584,7 +584,7 @@ func (f *Fs) createFileInfo(remote string, modTime time.Time, size int64) (*Obje // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { exisitingObj, err := f.newObjectWithInfo(src.Remote(), nil) switch err { case nil: @@ -601,7 +601,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { // // This will create a duplicate if we upload a new file without // checking to see if there is one already - use Put() for that. -func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { remote := src.Remote() size := src.Size() modTime := src.ModTime() @@ -1212,7 +1212,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { // Copy the reader into the object updating modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { size := src.Size() modTime := src.ModTime() if o.isDocument { diff --git a/dropbox/dropbox.go b/dropbox/dropbox.go index e005cf5ba..69492b3cb 100644 --- a/dropbox/dropbox.go +++ b/dropbox/dropbox.go @@ -424,13 +424,13 @@ func (rc *readCloser) Close() error { // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { // Temporary Object under construction o := &Object{ fs: f, remote: src.Remote(), } - return o, o.Update(in, src) + return o, o.Update(in, src, options...) } // Mkdir creates the container if it doesn't exist @@ -835,7 +835,7 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size // Copy the reader into the object updating modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { remote := o.remotePath() if ignoredFiles.MatchString(remote) { fs.Logf(o, "File name disallowed - not uploading") diff --git a/fs/fs.go b/fs/fs.go index 8b918aa87..c55309a5e 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -129,7 +129,7 @@ type Fs interface { // May create the object even if it returns an error - if so // will return the object and the error, otherwise will return // nil and the error - Put(in io.Reader, src ObjectInfo) (Object, error) + Put(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error) // Mkdir makes the directory (container, bucket) // @@ -174,7 +174,7 @@ type Object interface { Open(options ...OpenOption) (io.ReadCloser, error) // Update in to the object with the modTime given of the given size - Update(in io.Reader, src ObjectInfo) error + Update(in io.Reader, src ObjectInfo, options ...OpenOption) error // Removes this object Remove() error @@ -287,7 +287,7 @@ type Features struct { // // May create duplicates or return errors if src already // exists. - PutUnchecked func(in io.Reader, src ObjectInfo) (Object, error) + PutUnchecked func(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error) // CleanUp the trash in the Fs // @@ -466,7 +466,7 @@ type PutUncheckeder interface { // // May create duplicates or return errors if src already // exists. - PutUnchecked(in io.Reader, src ObjectInfo) (Object, error) + PutUnchecked(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error) } // CleanUpper is an optional interfaces for Fs diff --git a/fs/lister_test.go b/fs/lister_test.go index 7e5f2a881..6fb97635c 100644 --- a/fs/lister_test.go +++ b/fs/lister_test.go @@ -31,8 +31,10 @@ func (o mockObject) Size() int64 { return func (o mockObject) Storable() bool { return true } func (o mockObject) SetModTime(time.Time) error { return errNotImpl } func (o mockObject) Open(options ...OpenOption) (io.ReadCloser, error) { return nil, errNotImpl } -func (o mockObject) Update(in io.Reader, src ObjectInfo) error { return errNotImpl } -func (o mockObject) Remove() error { return errNotImpl } +func (o mockObject) Update(in io.Reader, src ObjectInfo, options ...OpenOption) error { + return errNotImpl +} +func (o mockObject) Remove() error { return errNotImpl } type mockFs struct { listFn func(o ListOpts, dir string) diff --git a/fs/operations.go b/fs/operations.go index 923f53fc6..3f18a6f68 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -268,6 +268,17 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) { maxTries := Config.LowLevelRetries tries := 0 doUpdate := dst != nil + // work out which hash to use - limit to 1 hash in common + var common HashSet + hashType := HashNone + if !Config.SizeOnly { + common = src.Fs().Hashes().Overlap(f.Hashes()) + if common.Count() > 0 { + hashType = common.GetOne() + common = HashSet(hashType) + } + } + hashOption := &HashesOption{Hashes: common} var actionTaken string for { // Try server side copy first - if has optional interface and @@ -285,7 +296,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) { // If can't server side copy, do it manually if err == ErrorCantCopy { var in0 io.ReadCloser - in0, err = src.Open() + in0, err = src.Open(hashOption) if err != nil { err = errors.Wrap(err, "failed to open source object") } else { @@ -297,10 +308,10 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) { } if doUpdate { actionTaken = "Copied (replaced existing)" - err = dst.Update(in, wrappedSrc) + err = dst.Update(in, wrappedSrc, hashOption) } else { actionTaken = "Copied (new)" - dst, err = f.Put(in, wrappedSrc) + dst, err = f.Put(in, wrappedSrc, hashOption) } closeErr := in.Close() if err == nil { @@ -338,12 +349,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) { // Verify hashes are the same after transfer - ignoring blank hashes // TODO(klauspost): This could be extended, so we always create a hash type matching // the destination, and calculate it while sending. - common := src.Fs().Hashes().Overlap(dst.Fs().Hashes()) - // Debugf(src, "common hashes: %v", common) - if !Config.SizeOnly && common.Count() > 0 { - // Get common hash type - hashType := common.GetOne() - + if hashType != HashNone { var srcSum string srcSum, err = src.Hash(hashType) if err != nil { diff --git a/fs/options.go b/fs/options.go index e1f24c004..a1d65b4d8 100644 --- a/fs/options.go +++ b/fs/options.go @@ -94,6 +94,27 @@ func (o *HTTPOption) Mandatory() bool { return false } +// HashesOption defines an option used to tell the local fs to limit +// the number of hashes it calculates. +type HashesOption struct { + Hashes HashSet +} + +// Header formats the option as an http header +func (o *HashesOption) Header() (key string, value string) { + return "", "" +} + +// String formats the option into human readable form +func (o *HashesOption) String() string { + return fmt.Sprintf("HashesOption(%v)", o.Hashes) +} + +// Mandatory returns whether the option must be parsed or can be ignored +func (o *HashesOption) Mandatory() bool { + return false +} + // OpenOptionAddHeaders adds each header found in options to the // headers map provided the key was non empty. func OpenOptionAddHeaders(options []OpenOption, headers map[string]string) { diff --git a/ftp/ftp.go b/ftp/ftp.go index b557409e5..c847b0555 100644 --- a/ftp/ftp.go +++ b/ftp/ftp.go @@ -380,7 +380,7 @@ func (f *Fs) Precision() time.Duration { // May create the object even if it returns an error - if so // will return the object and the error, otherwise will return // nil and the error -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { // fs.Debugf(f, "Trying to put file %s", src.Remote()) err := f.mkParentDir(src.Remote()) if err != nil { @@ -390,7 +390,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { fs: f, remote: src.Remote(), } - err = o.Update(in, src) + err = o.Update(in, src, options...) return o, err } @@ -674,7 +674,7 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) { // Copy the reader into the object updating modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { // defer fs.Trace(o, "src=%v", src)("err=%v", &err) path := path.Join(o.fs.root, o.remote) // remove the file if upload failed diff --git a/googlecloudstorage/googlecloudstorage.go b/googlecloudstorage/googlecloudstorage.go index 19809a32d..128b708c7 100644 --- a/googlecloudstorage/googlecloudstorage.go +++ b/googlecloudstorage/googlecloudstorage.go @@ -445,13 +445,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) { // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { // Temporary Object under construction o := &Object{ fs: f, remote: src.Remote(), } - return o, o.Update(in, src) + return o, o.Update(in, src, options...) } // Mkdir creates the bucket if it doesn't exist @@ -683,7 +683,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { size := src.Size() modTime := src.ModTime() diff --git a/local/local.go b/local/local.go index 6d595f788..252e95135 100644 --- a/local/local.go +++ b/local/local.go @@ -372,11 +372,11 @@ func (m *mapper) Save(in, out string) string { } // Put the Object to the local filesystem -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { remote := src.Remote() // Temporary Object under construction - info filled in by Update() o := f.newObject(remote, "") - err := o.Update(in, src) + err := o.Update(in, src, options...) if err != nil { return nil, err } @@ -707,10 +707,13 @@ func (file *localOpenFile) Close() (err error) { // Open an object for read func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { var offset int64 + hashes := fs.SupportedHashes for _, option := range options { switch x := option.(type) { case *fs.SeekOption: offset = x.Offset + case *fs.HashesOption: + hashes = x.Hashes default: if option.Mandatory() { fs.Logf(o, "Unsupported mandatory option: %v", option) @@ -728,11 +731,15 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { // don't attempt to make checksums return fd, err } + hash, err := fs.NewMultiHasherTypes(hashes) + if err != nil { + return nil, err + } // Update the md5sum as we go along in = &localOpenFile{ o: o, in: fd, - hash: fs.NewMultiHasher(), + hash: hash, } return in, nil } @@ -744,7 +751,15 @@ func (o *Object) mkdirAll() error { } // Update the object from in with modTime and size -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + hashes := fs.SupportedHashes + for _, option := range options { + switch x := option.(type) { + case *fs.HashesOption: + hashes = x.Hashes + } + } + err := o.mkdirAll() if err != nil { return err @@ -756,7 +771,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { } // Calculate the hash of the object we are reading as we go along - hash := fs.NewMultiHasher() + hash, err := fs.NewMultiHasherTypes(hashes) + if err != nil { + return err + } in = io.TeeReader(in, hash) _, err = io.Copy(out, in) diff --git a/onedrive/onedrive.go b/onedrive/onedrive.go index 3ac2cbdb9..c89b322fe 100644 --- a/onedrive/onedrive.go +++ b/onedrive/onedrive.go @@ -468,7 +468,7 @@ func (f *Fs) createObject(remote string, modTime time.Time, size int64) (o *Obje // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { remote := src.Remote() size := src.Size() modTime := src.ModTime() @@ -477,7 +477,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { if err != nil { return nil, err } - return o, o.Update(in, src) + return o, o.Update(in, src, options...) } // Mkdir creates the container if it doesn't exist @@ -1007,7 +1007,7 @@ func (o *Object) uploadMultipart(in io.Reader, size int64) (err error) { // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { o.fs.tokenRenewer.Start() defer o.fs.tokenRenewer.Stop() diff --git a/s3/s3.go b/s3/s3.go index 8c2b6190b..173a95361 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -624,13 +624,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) { } // Put the Object into the bucket -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { // Temporary Object under construction fs := &Object{ fs: f, remote: src.Remote(), } - return fs, fs.Update(in, src) + return fs, fs.Update(in, src, options...) } // Check if the bucket exists @@ -902,7 +902,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { } // Update the Object from in with modTime and size -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { modTime := src.ModTime() uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) { diff --git a/sftp/sftp.go b/sftp/sftp.go index e2bc7d1c0..262113708 100644 --- a/sftp/sftp.go +++ b/sftp/sftp.go @@ -290,7 +290,7 @@ func (f *Fs) List(out fs.ListOpts, dir string) { } // Put data from into a new remote sftp file object described by and -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { err := f.mkParentDir(src.Remote()) if err != nil { return nil, errors.Wrap(err, "Put mkParentDir failed") @@ -300,7 +300,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { fs: f, remote: src.Remote(), } - err = o.Update(in, src) + err = o.Update(in, src, options...) if err != nil { return nil, err } @@ -543,7 +543,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { } // Update a remote sftp file using the data and ModTime from -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { file, err := o.fs.sftpClient.Create(o.path()) if err != nil { return errors.Wrap(err, "Update Create failed") diff --git a/swift/swift.go b/swift/swift.go index e06643e1b..c1b6af14e 100644 --- a/swift/swift.go +++ b/swift/swift.go @@ -399,13 +399,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) { // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { // Temporary Object under construction fs := &Object{ fs: f, remote: src.Remote(), } - return fs, fs.Update(in, src) + return fs, fs.Update(in, src, options...) } // Mkdir creates the container if it doesn't exist @@ -737,7 +737,7 @@ func (o *Object) updateChunks(in io.Reader, headers swift.Headers, size int64, c // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { size := src.Size() modTime := src.ModTime() diff --git a/yandex/yandex.go b/yandex/yandex.go index 6e9b58b30..2a5b8bc9b 100644 --- a/yandex/yandex.go +++ b/yandex/yandex.go @@ -397,7 +397,7 @@ func (o *Object) readMetaData() (err error) { // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned -func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { +func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { remote := src.Remote() size := src.Size() modTime := src.ModTime() @@ -409,7 +409,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { modTime: modTime, } //TODO maybe read metadata after upload to check if file uploaded successfully - return o, o.Update(in, src) + return o, o.Update(in, src, options...) } // Mkdir creates the container if it doesn't exist @@ -556,7 +556,7 @@ func (o *Object) remotePath() string { // Copy the reader into the object updating modTime and size // // The new object may have been created if an error is returned -func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { +func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { size := src.Size() modTime := src.ModTime()