union: fix uploading files to union of all bucket based remotes

Before this fix, if uploading to a union consisting of all bucket
based remotes (eg s3), uploads failed with:

    Failed to copy: object not found

This was because the union backend was relying on parent directories
being created to work out which files to upload. If all the upstreams
were bucket based backends which can't hold empty directories, no
directories were created and the upload failed.

This fixes the problem by returning the upstreams used when creating
the directory for the upload, rather than searching for them again
after they've been created.

This will also make the union backend a little more efficient.

Fixes #6170
This commit is contained in:
Nick Craig-Wood 2022-05-13 16:08:52 +01:00
parent fcec4bedbe
commit c2baacc0a4

View File

@ -141,22 +141,20 @@ func (f *Fs) Hashes() hash.Set {
return f.hashSet
}
// Mkdir makes the root directory of the Fs object
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
// mkdir makes the directory passed in and returns the upstreams used
func (f *Fs) mkdir(ctx context.Context, dir string) ([]*upstream.Fs, error) {
upstreams, err := f.create(ctx, dir)
if err == fs.ErrorObjectNotFound {
if dir != parentDir(dir) {
if err := f.Mkdir(ctx, parentDir(dir)); err != nil {
return err
}
upstreams, err = f.create(ctx, dir)
parent := parentDir(dir)
if dir != parent {
upstreams, err = f.mkdir(ctx, parent)
} else if dir == "" {
// If root dirs not created then create them
upstreams, err = f.upstreams, nil
}
}
if err != nil {
return err
return nil, err
}
errs := Errors(make([]error, len(upstreams)))
multithread(len(upstreams), func(i int) {
@ -165,7 +163,17 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
errs[i] = fmt.Errorf("%s: %w", upstreams[i].Name(), err)
}
})
return errs.Err()
err = errs.Err()
if err != nil {
return nil, err
}
return upstreams, nil
}
// Mkdir makes the root directory of the Fs object
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
_, err := f.mkdir(ctx, dir)
return err
}
// Purge all files in the directory
@ -449,10 +457,7 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bo
srcPath := src.Remote()
upstreams, err := f.create(ctx, srcPath)
if err == fs.ErrorObjectNotFound {
if err := f.Mkdir(ctx, parentDir(srcPath)); err != nil {
return nil, err
}
upstreams, err = f.create(ctx, srcPath)
upstreams, err = f.mkdir(ctx, parentDir(srcPath))
}
if err != nil {
return nil, err