Add options to Put, PutUnchecked and Update, add HashOption and speed up local

* Add options to Put, PutUnchecked and Update for all Fses
  * Use these to create HashOption
  * Implement this in local
  * Pass the option in fs.Copy

This has the effect that we only calculate hashes we need to in the
local Fs which speeds up transfers significantly.
This commit is contained in:
Nick Craig-Wood 2017-05-28 12:44:22 +01:00
parent 6381959850
commit 20da3e6352
17 changed files with 101 additions and 54 deletions

View File

@ -534,7 +534,7 @@ func (f *Fs) checkUpload(resp *http.Response, in io.Reader, src fs.ObjectInfo, i
// Copy the reader in to the new object which is returned // Copy the reader in to the new object which is returned
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote() remote := src.Remote()
size := src.Size() size := src.Size()
// Temporary Object under construction // Temporary Object under construction
@ -1002,7 +1002,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// Update the object with the contents of the io.Reader, modTime and size // Update the object with the contents of the io.Reader, modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
file := acd.File{Node: o.info} file := acd.File{Node: o.info}
var info *acd.File var info *acd.File
var resp *http.Response var resp *http.Response

View File

@ -660,7 +660,7 @@ func (f *Fs) clearBucketID() {
// Copy the reader in to the new object which is returned // Copy the reader in to the new object which is returned
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction // Temporary Object under construction
fs := &Object{ fs := &Object{
fs: f, fs: f,
@ -1161,7 +1161,7 @@ func urlEncode(in string) string {
// Update the object with the contents of the io.Reader, modTime and size // Update the object with the contents of the io.Reader, modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
if *b2Versions { if *b2Versions {
return errNotWithVersions return errNotWithVersions
} }

View File

@ -162,7 +162,7 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) {
// May create the object even if it returns an error - if so // May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return // will return the object and the error, otherwise will return
// nil and the error // nil and the error
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
wrappedIn, err := f.cipher.EncryptData(in) wrappedIn, err := f.cipher.EncryptData(in)
if err != nil { if err != nil {
return nil, err return nil, err
@ -282,7 +282,7 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
// //
// This will create a duplicate if we upload a new file without // This will create a duplicate if we upload a new file without
// checking to see if there is one already - use Put() for that. // checking to see if there is one already - use Put() for that.
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
do := f.Fs.Features().PutUnchecked do := f.Fs.Features().PutUnchecked
if do == nil { if do == nil {
return nil, errors.New("can't PutUnchecked") return nil, errors.New("can't PutUnchecked")
@ -455,7 +455,7 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) {
} }
// Update in to the object with the modTime given of the given size // Update in to the object with the modTime given of the given size
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
wrappedIn, err := o.f.cipher.EncryptData(in) wrappedIn, err := o.f.cipher.EncryptData(in)
if err != nil { if err != nil {
return err return err

View File

@ -584,7 +584,7 @@ func (f *Fs) createFileInfo(remote string, modTime time.Time, size int64) (*Obje
// Copy the reader in to the new object which is returned // Copy the reader in to the new object which is returned
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
exisitingObj, err := f.newObjectWithInfo(src.Remote(), nil) exisitingObj, err := f.newObjectWithInfo(src.Remote(), nil)
switch err { switch err {
case nil: case nil:
@ -601,7 +601,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
// //
// This will create a duplicate if we upload a new file without // This will create a duplicate if we upload a new file without
// checking to see if there is one already - use Put() for that. // checking to see if there is one already - use Put() for that.
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote() remote := src.Remote()
size := src.Size() size := src.Size()
modTime := src.ModTime() modTime := src.ModTime()
@ -1212,7 +1212,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// Copy the reader into the object updating modTime and size // Copy the reader into the object updating modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := src.Size() size := src.Size()
modTime := src.ModTime() modTime := src.ModTime()
if o.isDocument { if o.isDocument {

View File

@ -424,13 +424,13 @@ func (rc *readCloser) Close() error {
// Copy the reader in to the new object which is returned // Copy the reader in to the new object which is returned
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction // Temporary Object under construction
o := &Object{ o := &Object{
fs: f, fs: f,
remote: src.Remote(), remote: src.Remote(),
} }
return o, o.Update(in, src) return o, o.Update(in, src, options...)
} }
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
@ -835,7 +835,7 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size
// Copy the reader into the object updating modTime and size // Copy the reader into the object updating modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
remote := o.remotePath() remote := o.remotePath()
if ignoredFiles.MatchString(remote) { if ignoredFiles.MatchString(remote) {
fs.Logf(o, "File name disallowed - not uploading") fs.Logf(o, "File name disallowed - not uploading")

View File

@ -129,7 +129,7 @@ type Fs interface {
// May create the object even if it returns an error - if so // May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return // will return the object and the error, otherwise will return
// nil and the error // nil and the error
Put(in io.Reader, src ObjectInfo) (Object, error) Put(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
// Mkdir makes the directory (container, bucket) // Mkdir makes the directory (container, bucket)
// //
@ -174,7 +174,7 @@ type Object interface {
Open(options ...OpenOption) (io.ReadCloser, error) Open(options ...OpenOption) (io.ReadCloser, error)
// Update in to the object with the modTime given of the given size // Update in to the object with the modTime given of the given size
Update(in io.Reader, src ObjectInfo) error Update(in io.Reader, src ObjectInfo, options ...OpenOption) error
// Removes this object // Removes this object
Remove() error Remove() error
@ -287,7 +287,7 @@ type Features struct {
// //
// May create duplicates or return errors if src already // May create duplicates or return errors if src already
// exists. // exists.
PutUnchecked func(in io.Reader, src ObjectInfo) (Object, error) PutUnchecked func(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
// CleanUp the trash in the Fs // CleanUp the trash in the Fs
// //
@ -466,7 +466,7 @@ type PutUncheckeder interface {
// //
// May create duplicates or return errors if src already // May create duplicates or return errors if src already
// exists. // exists.
PutUnchecked(in io.Reader, src ObjectInfo) (Object, error) PutUnchecked(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
} }
// CleanUpper is an optional interfaces for Fs // CleanUpper is an optional interfaces for Fs

View File

@ -31,7 +31,9 @@ func (o mockObject) Size() int64 { return
func (o mockObject) Storable() bool { return true } func (o mockObject) Storable() bool { return true }
func (o mockObject) SetModTime(time.Time) error { return errNotImpl } func (o mockObject) SetModTime(time.Time) error { return errNotImpl }
func (o mockObject) Open(options ...OpenOption) (io.ReadCloser, error) { return nil, errNotImpl } func (o mockObject) Open(options ...OpenOption) (io.ReadCloser, error) { return nil, errNotImpl }
func (o mockObject) Update(in io.Reader, src ObjectInfo) error { return errNotImpl } func (o mockObject) Update(in io.Reader, src ObjectInfo, options ...OpenOption) error {
return errNotImpl
}
func (o mockObject) Remove() error { return errNotImpl } func (o mockObject) Remove() error { return errNotImpl }
type mockFs struct { type mockFs struct {

View File

@ -268,6 +268,17 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
maxTries := Config.LowLevelRetries maxTries := Config.LowLevelRetries
tries := 0 tries := 0
doUpdate := dst != nil doUpdate := dst != nil
// work out which hash to use - limit to 1 hash in common
var common HashSet
hashType := HashNone
if !Config.SizeOnly {
common = src.Fs().Hashes().Overlap(f.Hashes())
if common.Count() > 0 {
hashType = common.GetOne()
common = HashSet(hashType)
}
}
hashOption := &HashesOption{Hashes: common}
var actionTaken string var actionTaken string
for { for {
// Try server side copy first - if has optional interface and // Try server side copy first - if has optional interface and
@ -285,7 +296,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
// If can't server side copy, do it manually // If can't server side copy, do it manually
if err == ErrorCantCopy { if err == ErrorCantCopy {
var in0 io.ReadCloser var in0 io.ReadCloser
in0, err = src.Open() in0, err = src.Open(hashOption)
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to open source object") err = errors.Wrap(err, "failed to open source object")
} else { } else {
@ -297,10 +308,10 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
} }
if doUpdate { if doUpdate {
actionTaken = "Copied (replaced existing)" actionTaken = "Copied (replaced existing)"
err = dst.Update(in, wrappedSrc) err = dst.Update(in, wrappedSrc, hashOption)
} else { } else {
actionTaken = "Copied (new)" actionTaken = "Copied (new)"
dst, err = f.Put(in, wrappedSrc) dst, err = f.Put(in, wrappedSrc, hashOption)
} }
closeErr := in.Close() closeErr := in.Close()
if err == nil { if err == nil {
@ -338,12 +349,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
// Verify hashes are the same after transfer - ignoring blank hashes // Verify hashes are the same after transfer - ignoring blank hashes
// TODO(klauspost): This could be extended, so we always create a hash type matching // TODO(klauspost): This could be extended, so we always create a hash type matching
// the destination, and calculate it while sending. // the destination, and calculate it while sending.
common := src.Fs().Hashes().Overlap(dst.Fs().Hashes()) if hashType != HashNone {
// Debugf(src, "common hashes: %v", common)
if !Config.SizeOnly && common.Count() > 0 {
// Get common hash type
hashType := common.GetOne()
var srcSum string var srcSum string
srcSum, err = src.Hash(hashType) srcSum, err = src.Hash(hashType)
if err != nil { if err != nil {

View File

@ -94,6 +94,27 @@ func (o *HTTPOption) Mandatory() bool {
return false return false
} }
// HashesOption defines an option used to tell the local fs to limit
// the number of hashes it calculates.
type HashesOption struct {
Hashes HashSet
}
// Header formats the option as an http header
func (o *HashesOption) Header() (key string, value string) {
return "", ""
}
// String formats the option into human readable form
func (o *HashesOption) String() string {
return fmt.Sprintf("HashesOption(%v)", o.Hashes)
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o *HashesOption) Mandatory() bool {
return false
}
// OpenOptionAddHeaders adds each header found in options to the // OpenOptionAddHeaders adds each header found in options to the
// headers map provided the key was non empty. // headers map provided the key was non empty.
func OpenOptionAddHeaders(options []OpenOption, headers map[string]string) { func OpenOptionAddHeaders(options []OpenOption, headers map[string]string) {

View File

@ -380,7 +380,7 @@ func (f *Fs) Precision() time.Duration {
// May create the object even if it returns an error - if so // May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return // will return the object and the error, otherwise will return
// nil and the error // nil and the error
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// fs.Debugf(f, "Trying to put file %s", src.Remote()) // fs.Debugf(f, "Trying to put file %s", src.Remote())
err := f.mkParentDir(src.Remote()) err := f.mkParentDir(src.Remote())
if err != nil { if err != nil {
@ -390,7 +390,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
fs: f, fs: f,
remote: src.Remote(), remote: src.Remote(),
} }
err = o.Update(in, src) err = o.Update(in, src, options...)
return o, err return o, err
} }
@ -674,7 +674,7 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) {
// Copy the reader into the object updating modTime and size // Copy the reader into the object updating modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
// defer fs.Trace(o, "src=%v", src)("err=%v", &err) // defer fs.Trace(o, "src=%v", src)("err=%v", &err)
path := path.Join(o.fs.root, o.remote) path := path.Join(o.fs.root, o.remote)
// remove the file if upload failed // remove the file if upload failed

View File

@ -445,13 +445,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
// Copy the reader in to the new object which is returned // Copy the reader in to the new object which is returned
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction // Temporary Object under construction
o := &Object{ o := &Object{
fs: f, fs: f,
remote: src.Remote(), remote: src.Remote(),
} }
return o, o.Update(in, src) return o, o.Update(in, src, options...)
} }
// Mkdir creates the bucket if it doesn't exist // Mkdir creates the bucket if it doesn't exist
@ -683,7 +683,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// Update the object with the contents of the io.Reader, modTime and size // Update the object with the contents of the io.Reader, modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := src.Size() size := src.Size()
modTime := src.ModTime() modTime := src.ModTime()

View File

@ -372,11 +372,11 @@ func (m *mapper) Save(in, out string) string {
} }
// Put the Object to the local filesystem // Put the Object to the local filesystem
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote() remote := src.Remote()
// Temporary Object under construction - info filled in by Update() // Temporary Object under construction - info filled in by Update()
o := f.newObject(remote, "") o := f.newObject(remote, "")
err := o.Update(in, src) err := o.Update(in, src, options...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -707,10 +707,13 @@ func (file *localOpenFile) Close() (err error) {
// Open an object for read // Open an object for read
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
var offset int64 var offset int64
hashes := fs.SupportedHashes
for _, option := range options { for _, option := range options {
switch x := option.(type) { switch x := option.(type) {
case *fs.SeekOption: case *fs.SeekOption:
offset = x.Offset offset = x.Offset
case *fs.HashesOption:
hashes = x.Hashes
default: default:
if option.Mandatory() { if option.Mandatory() {
fs.Logf(o, "Unsupported mandatory option: %v", option) fs.Logf(o, "Unsupported mandatory option: %v", option)
@ -728,11 +731,15 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// don't attempt to make checksums // don't attempt to make checksums
return fd, err return fd, err
} }
hash, err := fs.NewMultiHasherTypes(hashes)
if err != nil {
return nil, err
}
// Update the md5sum as we go along // Update the md5sum as we go along
in = &localOpenFile{ in = &localOpenFile{
o: o, o: o,
in: fd, in: fd,
hash: fs.NewMultiHasher(), hash: hash,
} }
return in, nil return in, nil
} }
@ -744,7 +751,15 @@ func (o *Object) mkdirAll() error {
} }
// Update the object from in with modTime and size // Update the object from in with modTime and size
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
hashes := fs.SupportedHashes
for _, option := range options {
switch x := option.(type) {
case *fs.HashesOption:
hashes = x.Hashes
}
}
err := o.mkdirAll() err := o.mkdirAll()
if err != nil { if err != nil {
return err return err
@ -756,7 +771,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
} }
// Calculate the hash of the object we are reading as we go along // Calculate the hash of the object we are reading as we go along
hash := fs.NewMultiHasher() hash, err := fs.NewMultiHasherTypes(hashes)
if err != nil {
return err
}
in = io.TeeReader(in, hash) in = io.TeeReader(in, hash)
_, err = io.Copy(out, in) _, err = io.Copy(out, in)

View File

@ -468,7 +468,7 @@ func (f *Fs) createObject(remote string, modTime time.Time, size int64) (o *Obje
// Copy the reader in to the new object which is returned // Copy the reader in to the new object which is returned
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote() remote := src.Remote()
size := src.Size() size := src.Size()
modTime := src.ModTime() modTime := src.ModTime()
@ -477,7 +477,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return o, o.Update(in, src) return o, o.Update(in, src, options...)
} }
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
@ -1007,7 +1007,7 @@ func (o *Object) uploadMultipart(in io.Reader, size int64) (err error) {
// Update the object with the contents of the io.Reader, modTime and size // Update the object with the contents of the io.Reader, modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
o.fs.tokenRenewer.Start() o.fs.tokenRenewer.Start()
defer o.fs.tokenRenewer.Stop() defer o.fs.tokenRenewer.Stop()

View File

@ -624,13 +624,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
} }
// Put the Object into the bucket // Put the Object into the bucket
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction // Temporary Object under construction
fs := &Object{ fs := &Object{
fs: f, fs: f,
remote: src.Remote(), remote: src.Remote(),
} }
return fs, fs.Update(in, src) return fs, fs.Update(in, src, options...)
} }
// Check if the bucket exists // Check if the bucket exists
@ -902,7 +902,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
} }
// Update the Object from in with modTime and size // Update the Object from in with modTime and size
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
modTime := src.ModTime() modTime := src.ModTime()
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) { uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {

View File

@ -290,7 +290,7 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
} }
// Put data from <in> into a new remote sftp file object described by <src.Remote()> and <src.ModTime()> // Put data from <in> into a new remote sftp file object described by <src.Remote()> and <src.ModTime()>
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
err := f.mkParentDir(src.Remote()) err := f.mkParentDir(src.Remote())
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Put mkParentDir failed") return nil, errors.Wrap(err, "Put mkParentDir failed")
@ -300,7 +300,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
fs: f, fs: f,
remote: src.Remote(), remote: src.Remote(),
} }
err = o.Update(in, src) err = o.Update(in, src, options...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -543,7 +543,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
} }
// Update a remote sftp file using the data <in> and ModTime from <src> // Update a remote sftp file using the data <in> and ModTime from <src>
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
file, err := o.fs.sftpClient.Create(o.path()) file, err := o.fs.sftpClient.Create(o.path())
if err != nil { if err != nil {
return errors.Wrap(err, "Update Create failed") return errors.Wrap(err, "Update Create failed")

View File

@ -399,13 +399,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
// Copy the reader in to the new object which is returned // Copy the reader in to the new object which is returned
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction // Temporary Object under construction
fs := &Object{ fs := &Object{
fs: f, fs: f,
remote: src.Remote(), remote: src.Remote(),
} }
return fs, fs.Update(in, src) return fs, fs.Update(in, src, options...)
} }
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
@ -737,7 +737,7 @@ func (o *Object) updateChunks(in io.Reader, headers swift.Headers, size int64, c
// Update the object with the contents of the io.Reader, modTime and size // Update the object with the contents of the io.Reader, modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := src.Size() size := src.Size()
modTime := src.ModTime() modTime := src.ModTime()

View File

@ -397,7 +397,7 @@ func (o *Object) readMetaData() (err error) {
// Copy the reader in to the new object which is returned // Copy the reader in to the new object which is returned
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote() remote := src.Remote()
size := src.Size() size := src.Size()
modTime := src.ModTime() modTime := src.ModTime()
@ -409,7 +409,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
modTime: modTime, modTime: modTime,
} }
//TODO maybe read metadata after upload to check if file uploaded successfully //TODO maybe read metadata after upload to check if file uploaded successfully
return o, o.Update(in, src) return o, o.Update(in, src, options...)
} }
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
@ -556,7 +556,7 @@ func (o *Object) remotePath() string {
// Copy the reader into the object updating modTime and size // Copy the reader into the object updating modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error { func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := src.Size() size := src.Size()
modTime := src.ModTime() modTime := src.ModTime()