operations: fix missing metadata for multipart transfers to local disk

Before this change multipart downloads to the local disk with
--metadata failed to have their metadata set properly.

This was because the OpenWriterAt interface doesn't receive metadata
when creating the object.

This patch fixes the problem by using the recently introduced
Object.SetMetadata method to set the metadata on the object after the
download has completed (when using --metadata). If the backend we are
copying to is using OpenWriterAt but the Object doesn't support
SetMetadata then it will write an ERROR level log but complete
successfully. This should not happen at the moment as only the local
backend supports metadata and OpenWriterAt but it may in the future.

It also adds a test to check metadata is preserved when doing
multipart transfers.

Fixes #7424
This commit is contained in:
Nick Craig-Wood
2024-05-08 17:50:31 +01:00
parent 629e895da8
commit 6a0a54ab97
2 changed files with 70 additions and 21 deletions

View File

@ -146,6 +146,9 @@ func TestMultithreadCopy(t *testing.T) {
r := fstest.NewRun(t)
ctx := context.Background()
chunkSize := skipIfNotMultithread(ctx, t, r)
// Check every other transfer for metadata
checkMetadata := false
ctx, ci := fs.AddConfig(ctx)
for _, upload := range []bool{false, true} {
for _, test := range []struct {
@ -156,32 +159,54 @@ func TestMultithreadCopy(t *testing.T) {
{size: chunkSize * 2, streams: 2},
{size: chunkSize*2 + 1, streams: 2},
} {
checkMetadata = !checkMetadata
ci.Metadata = checkMetadata
fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams)
t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) {
if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit {
t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit)
}
var (
contents = random.String(test.size)
t1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
file1 fstest.Item
src, dst fs.Object
err error
contents = random.String(test.size)
t1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
file1 fstest.Item
src, dst fs.Object
err error
testMetadata = fs.Metadata{
// System metadata supported by all backends
"mtime": t1.Format(time.RFC3339Nano),
// User metadata
"potato": "jersey",
}
)
var fSrc, fDst fs.Fs
if upload {
file1 = r.WriteFile(fileName, contents, t1)
r.CheckRemoteItems(t)
r.CheckLocalItems(t, file1)
src, err = r.Flocal.NewObject(ctx, fileName)
fDst, fSrc = r.Fremote, r.Flocal
} else {
file1 = r.WriteObject(ctx, fileName, contents, t1)
r.CheckRemoteItems(t, file1)
r.CheckLocalItems(t)
src, err = r.Fremote.NewObject(ctx, fileName)
fDst, fSrc = r.Flocal, r.Fremote
}
src, err = fSrc.NewObject(ctx, fileName)
require.NoError(t, err)
do, canSetMetadata := src.(fs.SetMetadataer)
if checkMetadata && canSetMetadata {
// Set metadata on the source if required
err := do.SetMetadata(ctx, testMetadata)
if err == fs.ErrorNotImplemented {
canSetMetadata = false
} else {
require.NoError(t, err)
fstest.CheckEntryMetadata(ctx, t, r.Flocal, src, testMetadata)
}
}
accounting.GlobalStats().ResetCounters()
tr := accounting.GlobalStats().NewTransfer(src, nil)
@ -189,19 +214,21 @@ func TestMultithreadCopy(t *testing.T) {
tr.Done(ctx, err)
}()
if upload {
dst, err = multiThreadCopy(ctx, r.Fremote, fileName, src, test.streams, tr)
} else {
dst, err = multiThreadCopy(ctx, r.Flocal, fileName, src, test.streams, tr)
}
dst, err = multiThreadCopy(ctx, fDst, fileName, src, test.streams, tr)
require.NoError(t, err)
assert.Equal(t, src.Size(), dst.Size())
assert.Equal(t, fileName, dst.Remote())
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
fstest.CheckListingWithPrecision(t, r.Flocal, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
fstest.CheckListingWithPrecision(t, fSrc, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))
fstest.CheckListingWithPrecision(t, fDst, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))
if checkMetadata && canSetMetadata && fDst.Features().ReadMetadata {
fstest.CheckEntryMetadata(ctx, t, fDst, dst, testMetadata)
}
require.NoError(t, dst.Remove(ctx))
require.NoError(t, src.Remove(ctx))
})
}
}