chunker: improve detection of incompatible metadata #4917

Before this patch chunker required that there is at least one
data chunk to start checking for a composite object.

Now if chunker finds at least one potential temporary or control
chunk, it marks found files as a suspected composite object.
When later rclone tries a concrete operation on the object,
it performs postponed metadata read and decides: is this a native
composite object, incompatible metadata or just garbage.
This commit is contained in:
Ivan Andreev 2021-01-04 04:08:22 +03:00
parent 3877df4e62
commit 847625822f

View File

@ -121,6 +121,8 @@ const maxTransactionProbes = 100
// standard chunker errors
var (
ErrChunkOverflow = errors.New("chunk number overflow")
ErrMetaTooBig = errors.New("metadata is too big")
ErrMetaUnknown = errors.New("unknown metadata, please upgrade rclone")
)
// variants of baseMove's parameter delMode
@ -693,43 +695,47 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
switch entry := dirOrObject.(type) {
case fs.Object:
remote := entry.Remote()
if mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(remote); mainRemote != "" {
if xactID != "" {
if revealHidden {
fs.Infof(f, "ignore temporary chunk %q", remote)
}
break
mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(remote)
if mainRemote == "" {
// this is meta object or standalone file
object := f.newObject("", entry, nil)
byRemote[remote] = object
tempEntries = append(tempEntries, object)
break
}
// this is some kind of chunk
// metobject should have been created above if present
isSpecial := xactID != "" || ctrlType != ""
mainObject := byRemote[mainRemote]
if mainObject == nil && f.useMeta && !isSpecial {
fs.Debugf(f, "skip orphan data chunk %q", remote)
break
}
if mainObject == nil && !f.useMeta {
// this is the "nometa" case
// create dummy chunked object without metadata
mainObject = f.newObject(mainRemote, nil, nil)
byRemote[mainRemote] = mainObject
if !badEntry[mainRemote] {
tempEntries = append(tempEntries, mainObject)
}
if ctrlType != "" {
if revealHidden {
fs.Infof(f, "ignore control chunk %q", remote)
}
break
}
if isSpecial {
if revealHidden {
fs.Infof(f, "ignore non-data chunk %q", remote)
}
mainObject := byRemote[mainRemote]
if mainObject == nil && f.useMeta {
fs.Debugf(f, "skip chunk %q without meta object", remote)
break
}
if mainObject == nil {
// useMeta is false - create chunked object without metadata
mainObject = f.newObject(mainRemote, nil, nil)
byRemote[mainRemote] = mainObject
if !badEntry[mainRemote] {
tempEntries = append(tempEntries, mainObject)
}
}
if err := mainObject.addChunk(entry, chunkNo); err != nil {
if f.opt.FailHard {
return nil, err
}
badEntry[mainRemote] = true
// need to read metadata to ensure actual object type
if f.useMeta && mainObject != nil && mainObject.size <= maxMetadataSize {
mainObject.unsure = true
}
break
}
object := f.newObject("", entry, nil)
byRemote[remote] = object
tempEntries = append(tempEntries, object)
if err := mainObject.addChunk(entry, chunkNo); err != nil {
if f.opt.FailHard {
return nil, err
}
badEntry[mainRemote] = true
}
case fs.Directory:
isSubdir[entry.Remote()] = true
wrapDir := fs.NewDirCopy(ctx, entry)
@ -784,6 +790,13 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
// but opening even a small file can be slow on some backends.
//
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
return f.scanObject(ctx, remote, false)
}
// scanObject is like NewObject with optional quick scan mode.
// The quick mode avoids directory requests other than `List`,
// ignores non-chunked objects and skips chunk size checks.
func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.Object, error) {
if err := f.forbidChunk(false, remote); err != nil {
return nil, errors.Wrap(err, "can't access")
}
@ -844,8 +857,15 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
continue // bypass regexp to save cpu
}
mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(entryRemote)
if mainRemote == "" || mainRemote != remote || ctrlType != "" || xactID != "" {
continue // skip non-conforming, temporary and control chunks
if mainRemote == "" || mainRemote != remote {
continue // skip non-conforming chunks
}
if ctrlType != "" || xactID != "" {
if f.useMeta {
// temporary/control chunk calls for lazy metadata read
o.unsure = true
}
continue
}
//fs.Debugf(f, "%q belongs to %q as chunk %d", entryRemote, mainRemote, chunkNo)
if err := o.addChunk(entry, chunkNo); err != nil {
@ -855,7 +875,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
if o.main == nil && (o.chunks == nil || len(o.chunks) == 0) {
// Scanning hasn't found data chunks with conforming names.
if f.useMeta {
if f.useMeta || quickScan {
// Metadata is required but absent and there are no chunks.
return nil, fs.ErrorObjectNotFound
}
@ -878,8 +898,10 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
// file without metadata. Validate it and update the total data size.
// As an optimization, skip metadata reading here - we will call
// readMetadata lazily when needed (reading can be expensive).
if err := o.validate(); err != nil {
return nil, err
if !quickScan {
if err := o.validate(); err != nil {
return nil, err
}
}
return o, nil
}
@ -888,13 +910,24 @@ func (o *Object) readMetadata(ctx context.Context) error {
if o.isFull {
return nil
}
if !o.isComposite() || !o.f.useMeta {
if !o.f.useMeta || (!o.isComposite() && !o.unsure) {
o.isFull = true
return nil
}
// validate metadata
metaObject := o.main
if metaObject.Size() > maxMetadataSize {
if o.unsure {
// this is not metadata but a foreign object
o.unsure = false
o.chunks = nil // make isComposite return false
o.isFull = true // cache results
return nil
}
return ErrMetaTooBig
}
reader, err := metaObject.Open(ctx)
if err != nil {
return err
@ -907,8 +940,22 @@ func (o *Object) readMetadata(ctx context.Context) error {
switch o.f.opt.MetaFormat {
case "simplejson":
metaInfo, err := unmarshalSimpleJSON(ctx, metaObject, metadata, true)
if err != nil {
metaInfo, madeByChunker, err := unmarshalSimpleJSON(ctx, metaObject, metadata)
if o.unsure {
o.unsure = false
if !madeByChunker {
// this is not metadata but a foreign object
o.chunks = nil // make isComposite return false
o.isFull = true // cache results
return nil
}
}
switch err {
case nil:
// fall thru
case ErrMetaTooBig, ErrMetaUnknown:
return err // return these errors unwrapped for unit tests
default:
return errors.Wrap(err, "invalid metadata")
}
if o.size != metaInfo.Size() || len(o.chunks) != metaInfo.nChunks {
@ -923,7 +970,27 @@ func (o *Object) readMetadata(ctx context.Context) error {
}
// put implements Put, PutStream, PutUnchecked, Update
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption, basePut putFn) (obj fs.Object, err error) {
func (f *Fs) put(
ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption,
basePut putFn, action string, target fs.Object) (obj fs.Object, err error) {
if err := f.forbidChunk(src, remote); err != nil {
return nil, errors.Wrap(err, action+" refused")
}
if target == nil {
// Get target object with a quick directory scan
if obj, err := f.scanObject(ctx, remote, true); err == nil {
target = obj
}
}
if target != nil {
obj := target.(*Object)
if err := obj.readMetadata(ctx); err == ErrMetaUnknown {
// refuse to update a file of unsupported format
return nil, errors.Wrap(err, "refusing to "+action)
}
}
c := f.newChunkingReader(src)
wrapIn := c.wrapStream(ctx, in, src)
@ -1013,8 +1080,8 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote st
// Check for input that looks like valid metadata
needMeta := len(c.chunks) > 1
if c.readCount <= maxMetadataSize && len(c.chunks) == 1 {
_, err := unmarshalSimpleJSON(ctx, c.chunks[0], c.smallHead, false)
needMeta = err == nil
_, madeByChunker, _ := unmarshalSimpleJSON(ctx, c.chunks[0], c.smallHead)
needMeta = madeByChunker
}
// Finalize small object as non-chunked.
@ -1273,29 +1340,16 @@ func (f *Fs) removeOldChunks(ctx context.Context, remote string) {
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
if err := f.forbidChunk(src, src.Remote()); err != nil {
return nil, errors.Wrap(err, "refusing to put")
}
return f.put(ctx, in, src, src.Remote(), options, f.base.Put)
return f.put(ctx, in, src, src.Remote(), options, f.base.Put, "put", nil)
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
if err := f.forbidChunk(src, src.Remote()); err != nil {
return nil, errors.Wrap(err, "refusing to upload")
}
return f.put(ctx, in, src, src.Remote(), options, f.base.Features().PutStream)
return f.put(ctx, in, src, src.Remote(), options, f.base.Features().PutStream, "upload", nil)
}
// Update in to the object with the modTime given of the given size
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
if err := o.f.forbidChunk(o, o.Remote()); err != nil {
return errors.Wrap(err, "update refused")
}
if err := o.readMetadata(ctx); err != nil {
// refuse to update a file of unsupported format
return errors.Wrap(err, "refusing to update")
}
basePut := o.f.base.Put
if src.Size() < 0 {
basePut = o.f.base.Features().PutStream
@ -1303,7 +1357,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return errors.New("wrapped file system does not support streaming uploads")
}
}
oNew, err := o.f.put(ctx, in, src, o.Remote(), options, basePut)
oNew, err := o.f.put(ctx, in, src, o.Remote(), options, basePut, "update", o)
if err == nil {
*o = *oNew.(*Object)
}
@ -1417,7 +1471,7 @@ func (o *Object) Remove(ctx context.Context) (err error) {
// to corrupt file in hard mode. Hence, refuse to Remove, too.
return errors.Wrap(err, "refuse to corrupt")
}
if err := o.readMetadata(ctx); err != nil {
if err := o.readMetadata(ctx); err == ErrMetaUnknown {
// Proceed but warn user that unexpected things can happen.
fs.Errorf(o, "Removing a file with unsupported metadata: %v", err)
}
@ -1445,6 +1499,11 @@ func (f *Fs) copyOrMove(ctx context.Context, o *Object, remote string, do copyMo
if err := f.forbidChunk(o, remote); err != nil {
return nil, errors.Wrapf(err, "can't %s", opName)
}
if err := o.readMetadata(ctx); err != nil {
// Refuse to copy/move composite files with invalid or future
// metadata format which might involve unsupported chunk types.
return nil, errors.Wrapf(err, "can't %s this file", opName)
}
if !o.isComposite() {
fs.Debugf(o, "%s non-chunked object...", opName)
oResult, err := do(ctx, o.mainChunk(), remote) // chain operation to a single wrapped chunk
@ -1453,11 +1512,6 @@ func (f *Fs) copyOrMove(ctx context.Context, o *Object, remote string, do copyMo
}
return f.newObject("", oResult, nil), nil
}
if err := o.readMetadata(ctx); err != nil {
// Refuse to copy/move composite files with invalid or future
// metadata format which might involve unsupported chunk types.
return nil, errors.Wrapf(err, "can't %s this file", opName)
}
fs.Debugf(o, "%s %d data chunks...", opName, len(o.chunks))
mainRemote := o.remote
@ -1548,6 +1602,10 @@ func (f *Fs) okForServerSide(ctx context.Context, src fs.Object, opName string)
return
}
if obj.unsure {
// ensure object is composite if need to re-read metadata
_ = obj.readMetadata(ctx)
}
requireMetaHash := obj.isComposite() && f.opt.MetaFormat == "simplejson"
if !requireMetaHash && !f.hashAll {
ok = true // hash is not required for metadata
@ -1741,6 +1799,7 @@ type Object struct {
chunks []fs.Object // active data chunks if file is composite, or wrapped file as a single chunk if meta format is 'none'
size int64 // cached total size of chunks in a composite file or -1 for non-chunked files
isFull bool // true if metadata has been read
unsure bool // true if need to read metadata to detect object type
md5 string
sha1 string
f *Fs
@ -1891,15 +1950,16 @@ func (o *Object) SetModTime(ctx context.Context, mtime time.Time) error {
// on the level of wrapped remote but chunker is unaware of that.
//
func (o *Object) Hash(ctx context.Context, hashType hash.Type) (string, error) {
if err := o.readMetadata(ctx); err != nil {
return "", err // valid metadata is required to get hash, abort
}
if !o.isComposite() {
// First, chain to the wrapped non-chunked file if possible.
if value, err := o.mainChunk().Hash(ctx, hashType); err == nil && value != "" {
return value, nil
}
}
if err := o.readMetadata(ctx); err != nil {
return "", err // valid metadata is required to get hash, abort
}
// Try hash from metadata if the file is composite or if wrapped remote fails.
switch hashType {
case hash.MD5:
@ -1924,13 +1984,13 @@ func (o *Object) UnWrap() fs.Object {
// Open opens the file for read. Call Close() on the returned io.ReadCloser
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
if !o.isComposite() {
return o.mainChunk().Open(ctx, options...) // chain to wrapped non-chunked file
}
if err := o.readMetadata(ctx); err != nil {
// refuse to open unsupported format
return nil, errors.Wrap(err, "can't open")
}
if !o.isComposite() {
return o.mainChunk().Open(ctx, options...) // chain to wrapped non-chunked file
}
var openOptions []fs.OpenOption
var offset, limit int64 = 0, -1
@ -2203,57 +2263,57 @@ func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1 s
// handled by current implementation.
// The version check below will then explicitly ask user to upgrade rclone.
//
func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte, strictChecks bool) (info *ObjectInfo, err error) {
func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte) (info *ObjectInfo, madeByChunker bool, err error) {
// Be strict about JSON format
// to reduce possibility that a random small file resembles metadata.
if data != nil && len(data) > maxMetadataSize {
return nil, errors.New("too big")
return nil, false, ErrMetaTooBig
}
if data == nil || len(data) < 2 || data[0] != '{' || data[len(data)-1] != '}' {
return nil, errors.New("invalid json")
return nil, false, errors.New("invalid json")
}
var metadata metaSimpleJSON
err = json.Unmarshal(data, &metadata)
if err != nil {
return nil, err
return nil, false, err
}
// Basic fields are strictly required
// to reduce possibility that a random small file resembles metadata.
if metadata.Version == nil || metadata.Size == nil || metadata.ChunkNum == nil {
return nil, errors.New("missing required field")
return nil, false, errors.New("missing required field")
}
// Perform strict checks, avoid corruption of future metadata formats.
if *metadata.Version < 1 {
return nil, errors.New("wrong version")
return nil, false, errors.New("wrong version")
}
if *metadata.Size < 0 {
return nil, errors.New("negative file size")
return nil, false, errors.New("negative file size")
}
if *metadata.ChunkNum < 0 {
return nil, errors.New("negative number of chunks")
return nil, false, errors.New("negative number of chunks")
}
if *metadata.ChunkNum > maxSafeChunkNumber {
return nil, ErrChunkOverflow
return nil, true, ErrChunkOverflow // produced by incompatible version of rclone
}
if metadata.MD5 != "" {
_, err = hex.DecodeString(metadata.MD5)
if len(metadata.MD5) != 32 || err != nil {
return nil, errors.New("wrong md5 hash")
return nil, false, errors.New("wrong md5 hash")
}
}
if metadata.SHA1 != "" {
_, err = hex.DecodeString(metadata.SHA1)
if len(metadata.SHA1) != 40 || err != nil {
return nil, errors.New("wrong sha1 hash")
return nil, false, errors.New("wrong sha1 hash")
}
}
// ChunkNum is allowed to be 0 in future versions
if *metadata.ChunkNum < 1 && *metadata.Version <= metadataVersion {
return nil, errors.New("wrong number of chunks")
return nil, false, errors.New("wrong number of chunks")
}
// Non-strict mode also accepts future metadata versions
if *metadata.Version > metadataVersion && strictChecks {
return nil, fmt.Errorf("version %d is not supported, please upgrade rclone", metadata.Version)
if *metadata.Version > metadataVersion {
return nil, true, ErrMetaUnknown // produced by incompatible version of rclone
}
var nilFs *Fs // nil object triggers appropriate type method
@ -2261,7 +2321,7 @@ func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte,
info.nChunks = *metadata.ChunkNum
info.md5 = metadata.MD5
info.sha1 = metadata.SHA1
return info, nil
return info, true, nil
}
func silentlyRemove(ctx context.Context, o fs.Object) {