b2: Include custom upload headers in large file info - fixes #7744

This commit is contained in:
Pat Patterson 2024-04-09 14:13:33 -07:00 committed by Nick Craig-Wood
parent 495a5759d3
commit 56caab2033
7 changed files with 226 additions and 74 deletions

View File

@ -306,6 +306,7 @@ type Object struct {
sha1 string // SHA-1 hash if known
size int64 // Size of the object
mimeType string // Content-Type of the object
meta map[string]string // The object metadata if known - may be nil - with lower case keys
}
// ------------------------------------------------------------
@ -1593,7 +1594,14 @@ func (o *Object) decodeMetaDataRaw(ID, SHA1 string, Size int64, UploadTimestamp
o.size = Size
// Use the UploadTimestamp if can't get file info
o.modTime = time.Time(UploadTimestamp)
return o.parseTimeString(Info[timeKey])
err = o.parseTimeString(Info[timeKey])
if err != nil {
return err
}
// For now, just set "mtime" in metadata
o.meta = make(map[string]string, 1)
o.meta["mtime"] = o.modTime.Format(time.RFC3339Nano)
return nil
}
// decodeMetaData sets the metadata in the object from an api.File
@ -1695,6 +1703,16 @@ func timeString(modTime time.Time) string {
return strconv.FormatInt(modTime.UnixNano()/1e6, 10)
}
// parseTimeStringHelper converts a decimal string number of milliseconds
// elapsed since January 1, 1970 UTC into a time.Time
func parseTimeStringHelper(timeString string) (time.Time, error) {
unixMilliseconds, err := strconv.ParseInt(timeString, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(unixMilliseconds/1e3, (unixMilliseconds%1e3)*1e6).UTC(), nil
}
// parseTimeString converts a decimal string number of milliseconds
// elapsed since January 1, 1970 UTC into a time.Time and stores it in
// the modTime variable.
@ -1702,12 +1720,12 @@ func (o *Object) parseTimeString(timeString string) (err error) {
if timeString == "" {
return nil
}
unixMilliseconds, err := strconv.ParseInt(timeString, 10, 64)
modTime, err := parseTimeStringHelper(timeString)
if err != nil {
fs.Debugf(o, "Failed to parse mod time string %q: %v", timeString, err)
return nil
}
o.modTime = time.Unix(unixMilliseconds/1e3, (unixMilliseconds%1e3)*1e6).UTC()
o.modTime = modTime
return nil
}
@ -1861,6 +1879,14 @@ func (o *Object) getOrHead(ctx context.Context, method string, options []fs.Open
ContentType: resp.Header.Get("Content-Type"),
Info: Info,
}
// Embryonic metadata support - just mtime
o.meta = make(map[string]string, 1)
modTime, err := parseTimeStringHelper(info.Info[timeKey])
if err == nil {
o.meta["mtime"] = modTime.Format(time.RFC3339Nano)
}
// When reading files from B2 via cloudflare using
// --b2-download-url cloudflare strips the Content-Length
// headers (presumably so it can inject stuff) so use the old
@ -1958,7 +1984,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err == nil {
fs.Debugf(o, "File is big enough for chunked streaming")
up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil)
up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil, options...)
if err != nil {
o.fs.putRW(rw)
return err
@ -1990,7 +2016,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return o.decodeMetaDataFileInfo(up.info)
}
modTime := src.ModTime(ctx)
modTime, err := o.getModTime(ctx, src, options)
if err != nil {
return err
}
calculatedSha1, _ := src.Hash(ctx, hash.SHA1)
if calculatedSha1 == "" {
@ -2095,6 +2124,36 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return o.decodeMetaDataFileInfo(&response)
}
// Get modTime from the source; if --metadata is set, fetch the src metadata and get it from there.
// When metadata support is added to b2, this method will need a more generic name
func (o *Object) getModTime(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (time.Time, error) {
modTime := src.ModTime(ctx)
// Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, o.fs, src, options)
if err != nil {
return time.Time{}, fmt.Errorf("failed to read metadata from source object: %w", err)
}
// merge metadata into request and user metadata
for k, v := range meta {
k = strings.ToLower(k)
// For now, the only metadata we're concerned with is "mtime"
switch k {
case "mtime":
// mtime in meta overrides source ModTime
metaModTime, err := time.Parse(time.RFC3339Nano, v)
if err != nil {
fs.Debugf(o, "failed to parse metadata %s: %q: %v", k, v, err)
} else {
modTime = metaModTime
}
default:
// Do nothing for now
}
}
return modTime, nil
}
// OpenChunkWriter returns the chunk size and a ChunkWriter
//
// Pass in the remote and the src object
@ -2126,7 +2185,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
Concurrency: o.fs.opt.UploadConcurrency,
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
}
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil)
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil, options...)
return info, up, err
}

View File

@ -184,16 +184,64 @@ func TestParseTimeString(t *testing.T) {
}
// This is adapted from the s3 equivalent.
func (f *Fs) InternalTestMetadata(t *testing.T) {
// Return a map of the headers in the options with keys stripped of the "x-bz-info-" prefix
func OpenOptionToMetaData(options []fs.OpenOption) map[string]string {
var headers = make(map[string]string)
for _, option := range options {
k, v := option.Header()
k = strings.ToLower(k)
if strings.HasPrefix(k, headerPrefix) {
headers[k[len(headerPrefix):]] = v
}
}
return headers
}
func (f *Fs) internalTestMetadata(t *testing.T, size string, uploadCutoff string, chunkSize string) {
what := fmt.Sprintf("Size%s/UploadCutoff%s/ChunkSize%s", size, uploadCutoff, chunkSize)
t.Run(what, func(t *testing.T) {
ctx := context.Background()
original := random.String(1000)
ss := fs.SizeSuffix(0)
err := ss.Set(size)
require.NoError(t, err)
original := random.String(int(ss))
contents := fstest.Gz(t, original)
mimeType := "text/html"
if chunkSize != "" {
ss := fs.SizeSuffix(0)
err := ss.Set(chunkSize)
require.NoError(t, err)
_, err = f.SetUploadChunkSize(ss)
require.NoError(t, err)
}
if uploadCutoff != "" {
ss := fs.SizeSuffix(0)
err := ss.Set(uploadCutoff)
require.NoError(t, err)
_, err = f.SetUploadCutoff(ss)
require.NoError(t, err)
}
item := fstest.NewItem("test-metadata", contents, fstest.Time("2001-05-06T04:05:06.499Z"))
btime := time.Now()
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, contents, true, mimeType, nil)
metadata := fs.Metadata{
// Just mtime for now - limit to milliseconds since x-bz-info-src_last_modified_millis can't support any
"mtime": "2009-05-06T04:05:06.499Z",
}
// Need to specify HTTP options with the header prefix since they are passed as-is
options := []fs.OpenOption{
&fs.HTTPOption{Key: "X-Bz-Info-a", Value: "1"},
&fs.HTTPOption{Key: "X-Bz-Info-b", Value: "2"},
}
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, true, contents, true, mimeType, metadata, options...)
defer func() {
assert.NoError(t, obj.Remove(ctx))
}()
@ -201,7 +249,19 @@ func (f *Fs) InternalTestMetadata(t *testing.T) {
gotMetadata, err := o.getMetaData(ctx)
require.NoError(t, err)
// We currently have a limited amount of metadata to test with B2
// X-Bz-Info-a & X-Bz-Info-b
optMetadata := OpenOptionToMetaData(options)
for k, v := range optMetadata {
got := gotMetadata.Info[k]
assert.Equal(t, v, got, k)
}
// mtime
for k, v := range metadata {
got := o.meta[k]
assert.Equal(t, v, got, k)
}
assert.Equal(t, mimeType, gotMetadata.ContentType, "Content-Type")
// Modification time from the x-bz-info-src_last_modified_millis header
@ -233,6 +293,15 @@ func (f *Fs) InternalTestMetadata(t *testing.T) {
checkDownload(contents, int64(len(contents)), sha1Sum(t, contents))
})
})
})
}
func (f *Fs) InternalTestMetadata(t *testing.T) {
// 1 kB regular file
f.internalTestMetadata(t, "1kiB", "", "")
// 10 MiB large file
f.internalTestMetadata(t, "10MiB", "6MiB", "6MiB")
}
func sha1Sum(t *testing.T, s string) string {

View File

@ -91,7 +91,7 @@ type largeUpload struct {
// newLargeUpload starts an upload of object o from in with metadata in src
//
// If newInfo is set then metadata from that will be used instead of reading it from src
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, defaultChunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File) (up *largeUpload, err error) {
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, defaultChunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File, options ...fs.OpenOption) (up *largeUpload, err error) {
size := src.Size()
parts := 0
chunkSize := defaultChunkSize
@ -104,11 +104,6 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
parts++
}
}
opts := rest.Opts{
Method: "POST",
Path: "/b2_start_large_file",
}
bucket, bucketPath := o.split()
bucketID, err := f.getBucketID(ctx, bucket)
if err != nil {
@ -118,12 +113,27 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
BucketID: bucketID,
Name: f.opt.Enc.FromStandardPath(bucketPath),
}
optionsToSend := make([]fs.OpenOption, 0, len(options))
if newInfo == nil {
modTime := src.ModTime(ctx)
modTime, err := o.getModTime(ctx, src, options)
if err != nil {
return nil, err
}
request.ContentType = fs.MimeType(ctx, src)
request.Info = map[string]string{
timeKey: timeString(modTime),
}
// Custom upload headers - remove header prefix since they are sent in the body
for _, option := range options {
k, v := option.Header()
k = strings.ToLower(k)
if strings.HasPrefix(k, headerPrefix) {
request.Info[k[len(headerPrefix):]] = v
} else {
optionsToSend = append(optionsToSend, option)
}
}
// Set the SHA1 if known
if !o.fs.opt.DisableCheckSum || doCopy {
if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" {
@ -134,6 +144,11 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
request.ContentType = newInfo.ContentType
request.Info = newInfo.Info
}
opts := rest.Opts{
Method: "POST",
Path: "/b2_start_large_file",
Options: optionsToSend,
}
var response api.StartLargeFileResponse
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)

View File

@ -59,7 +59,7 @@ func (f *Fs) InternalTestMetadata(t *testing.T) {
//"utime" - read-only
//"content-type" - read-only
}
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, contents, true, "text/html", metadata)
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, false, contents, true, "text/html", metadata)
defer func() {
assert.NoError(t, obj.Remove(ctx))
}()

View File

@ -379,7 +379,7 @@ func (f *Fs) putWithMeta(ctx context.Context, t *testing.T, file *fstest.Item, p
}
expectedMeta.Set("permissions", marshalPerms(t, perms))
obj := fstests.PutTestContentsMetadata(ctx, t, f, file, content, true, "plain/text", expectedMeta)
obj := fstests.PutTestContentsMetadata(ctx, t, f, file, false, content, true, "plain/text", expectedMeta)
do, ok := obj.(fs.Metadataer)
require.True(t, ok)
actualMeta, err := do.Metadata(ctx)

View File

@ -58,7 +58,7 @@ func (f *Fs) InternalTestMetadata(t *testing.T) {
// "tier" - read only
// "btime" - read only
}
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, contents, true, "text/html", metadata)
obj := fstests.PutTestContentsMetadata(ctx, t, f, &item, true, contents, true, "text/html", metadata)
defer func() {
assert.NoError(t, obj.Remove(ctx))
}()

View File

@ -151,7 +151,7 @@ func retry(t *testing.T, what string, f func() error) {
// It uploads the object with the mimeType and metadata passed in if set.
//
// It returns the object which will have been checked if check is set
func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item, contents string, check bool, mimeType string, metadata fs.Metadata) fs.Object {
func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item, useFileHashes bool, contents string, check bool, mimeType string, metadata fs.Metadata, options ...fs.OpenOption) fs.Object {
var (
err error
obj fs.Object
@ -163,7 +163,13 @@ func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *f
in := io.TeeReader(buf, uploadHash)
file.Size = int64(buf.Len())
obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil)
// The caller explicitly indicates whether the hashes in the file parameter should be used. If hashes is nil,
// then NewStaticObjectInfo will calculate default hashes for use in the check.
hashes := file.Hashes
if !useFileHashes {
hashes = nil
}
obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, hashes, nil)
if mimeType != "" || metadata != nil {
// force the --metadata flag on temporarily
if metadata != nil {
@ -176,7 +182,7 @@ func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *f
}
obji.WithMetadata(metadata).WithMimeType(mimeType)
}
obj, err = f.Put(ctx, in, obji)
obj, err = f.Put(ctx, in, obji, options...)
return err
})
file.Hashes = uploadHash.Sums()
@ -198,19 +204,22 @@ func PutTestContentsMetadata(ctx context.Context, t *testing.T, f fs.Fs, file *f
// PutTestContents puts file with given contents to the remote and checks it but unlike TestPutLarge doesn't remove
func PutTestContents(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item, contents string, check bool) fs.Object {
return PutTestContentsMetadata(ctx, t, f, file, contents, check, "", nil)
return PutTestContentsMetadata(ctx, t, f, file, false, contents, check, "", nil)
}
// testPut puts file with random contents to the remote
func testPut(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item) (string, fs.Object) {
contents := random.String(100)
return contents, PutTestContents(ctx, t, f, file, contents, true)
return testPutMimeType(ctx, t, f, file, "", nil)
}
// testPutMimeType puts file with random contents to the remote and the mime type given
func testPutMimeType(ctx context.Context, t *testing.T, f fs.Fs, file *fstest.Item, mimeType string, metadata fs.Metadata) (string, fs.Object) {
contents := random.String(100)
return contents, PutTestContentsMetadata(ctx, t, f, file, contents, true, mimeType, metadata)
// We just generated new contents, but file may contain hashes generated by a previous operation
if len(file.Hashes) > 0 {
file.Hashes = make(map[hash.Type]string)
}
return contents, PutTestContentsMetadata(ctx, t, f, file, false, contents, true, mimeType, metadata)
}
// testPutLarge puts file to the remote, checks it and removes it on success.
@ -1284,15 +1293,15 @@ func Run(t *testing.T, opt *Opt) {
const dstName = "test metadata copied.txt"
t1 := fstest.Time("2003-02-03T04:05:06.499999999Z")
t2 := fstest.Time("2004-03-03T04:05:06.499999999Z")
fileSrc := fstest.NewItem(srcName, srcName, t1)
contents := random.String(100)
fileSrc := fstest.NewItem(srcName, contents, t1)
var testMetadata = fs.Metadata{
// System metadata supported by all backends
"mtime": t1.Format(time.RFC3339Nano),
// User metadata
"potato": "jersey",
}
oSrc := PutTestContentsMetadata(ctx, t, f, &fileSrc, contents, true, "text/plain", testMetadata)
oSrc := PutTestContentsMetadata(ctx, t, f, &fileSrc, false, contents, true, "text/plain", testMetadata)
fstest.CheckEntryMetadata(ctx, t, f, oSrc, testMetadata)
// Copy it with --metadata-set
@ -1401,7 +1410,7 @@ func Run(t *testing.T, opt *Opt) {
// User metadata
"potato": "jersey",
}
o := PutTestContentsMetadata(ctx, t, f, &file, contents, true, "text/plain", testMetadata)
o := PutTestContentsMetadata(ctx, t, f, &file, false, contents, true, "text/plain", testMetadata)
fstest.CheckEntryMetadata(ctx, t, f, o, testMetadata)
// Move it with --metadata-set