chunker: fix NewFs when root points to composite multi-chunk file without metadata

Before this change, calling NewFs on a composite multi-chunk file with
--chunker-meta-format "none"
would fail due to f.base pointing to the wrong Fs. This change fixes the issue,
which was discovered on the bisync integration tests.
This commit is contained in:
nielash 2024-04-09 20:56:35 -04:00 committed by Nick Craig-Wood
parent 21f3ba13f6
commit 8524afa9ce

View File

@ -101,8 +101,10 @@ var (
// //
// And still chunker's primary function is to chunk large files // And still chunker's primary function is to chunk large files
// rather than serve as a generic metadata container. // rather than serve as a generic metadata container.
const maxMetadataSize = 1023 const (
const maxMetadataSizeWritten = 255 maxMetadataSize = 1023
maxMetadataSizeWritten = 255
)
// Current/highest supported metadata format. // Current/highest supported metadata format.
const metadataVersion = 2 const metadataVersion = 2
@ -317,11 +319,13 @@ func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs,
// i.e. `rpath` does not exist in the wrapped remote, but chunker // i.e. `rpath` does not exist in the wrapped remote, but chunker
// detects a composite file because it finds the first chunk! // detects a composite file because it finds the first chunk!
// (yet can't satisfy fstest.CheckListing, will ignore) // (yet can't satisfy fstest.CheckListing, will ignore)
if err == nil && !f.useMeta && strings.Contains(rpath, "/") { if err == nil && !f.useMeta {
firstChunkPath := f.makeChunkName(remotePath, 0, "", "") firstChunkPath := f.makeChunkName(remotePath, 0, "", "")
_, testErr := cache.Get(ctx, baseName+firstChunkPath) newBase, testErr := cache.Get(ctx, baseName+firstChunkPath)
if testErr == fs.ErrorIsFile { if testErr == fs.ErrorIsFile {
f.base = newBase
err = testErr err = testErr
cache.PinUntilFinalized(f.base, f)
} }
} }
@ -972,7 +976,7 @@ func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.
} }
continue continue
} }
//fs.Debugf(f, "%q belongs to %q as chunk %d", entryRemote, mainRemote, chunkNo) // fs.Debugf(f, "%q belongs to %q as chunk %d", entryRemote, mainRemote, chunkNo)
if err := o.addChunk(entry, chunkNo); err != nil { if err := o.addChunk(entry, chunkNo); err != nil {
return nil, err return nil, err
} }
@ -1134,8 +1138,8 @@ func (o *Object) readXactID(ctx context.Context) (xactID string, err error) {
// put implements Put, PutStream, PutUnchecked, Update // put implements Put, PutStream, PutUnchecked, Update
func (f *Fs) put( func (f *Fs) put(
ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption, ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption,
basePut putFn, action string, target fs.Object) (obj fs.Object, err error) { basePut putFn, action string, target fs.Object,
) (obj fs.Object, err error) {
// Perform consistency checks // Perform consistency checks
if err := f.forbidChunk(src, remote); err != nil { if err := f.forbidChunk(src, remote); err != nil {
return nil, fmt.Errorf("%s refused: %w", action, err) return nil, fmt.Errorf("%s refused: %w", action, err)
@ -1956,7 +1960,7 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT
return return
} }
wrappedNotifyFunc := func(path string, entryType fs.EntryType) { wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
//fs.Debugf(f, "ChangeNotify: path %q entryType %d", path, entryType) // fs.Debugf(f, "ChangeNotify: path %q entryType %d", path, entryType)
if entryType == fs.EntryObject { if entryType == fs.EntryObject {
mainPath, _, _, xactID := f.parseChunkName(path) mainPath, _, _, xactID := f.parseChunkName(path)
metaXactID := "" metaXactID := ""