local: fix race conditions updating and reading Object metadata

This commit is contained in:
Nick Craig-Wood 2020-06-21 13:02:46 +01:00
parent ed32a759ed
commit 20f4fda3c9

View File

@ -182,20 +182,22 @@ type Fs struct {
warned map[string]struct{} // whether we have warned about this string warned map[string]struct{} // whether we have warned about this string
// do os.Lstat or os.Stat // do os.Lstat or os.Stat
lstat func(name string) (os.FileInfo, error) lstat func(name string) (os.FileInfo, error)
objectHashesMu sync.Mutex // global lock for Object.hashes objectMetaMu sync.RWMutex // global lock for Object metadata
} }
// Object represents a local filesystem object // Object represents a local filesystem object
type Object struct { type Object struct {
fs *Fs // The Fs this object is part of fs *Fs // The Fs this object is part of
remote string // The remote path (encoded path) remote string // The remote path (encoded path)
path string // The local path (OS path) path string // The local path (OS path)
size int64 // file metadata - always present // When using these items the fs.objectMetaMu must be held
mode os.FileMode size int64 // file metadata - always present
modTime time.Time mode os.FileMode
hashes map[hash.Type]string // Hashes modTime time.Time
translatedLink bool // Is this object a translated link hashes map[hash.Type]string // Hashes
// these are read only and don't need the mutex held
translatedLink bool // Is this object a translated link
} }
// ------------------------------------------------------------ // ------------------------------------------------------------
@ -632,6 +634,9 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
// Temporary Object under construction // Temporary Object under construction
dstObj := f.newObject(remote) dstObj := f.newObject(remote)
dstObj.fs.objectMetaMu.RLock()
dstObjMode := dstObj.mode
dstObj.fs.objectMetaMu.RUnlock()
// Check it is a file if it exists // Check it is a file if it exists
err := dstObj.lstat() err := dstObj.lstat()
@ -639,7 +644,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
// OK // OK
} else if err != nil { } else if err != nil {
return nil, err return nil, err
} else if !dstObj.fs.isRegular(dstObj.mode) { } else if !dstObj.fs.isRegular(dstObjMode) {
// It isn't a file // It isn't a file
return nil, errors.New("can't move file onto non-file") return nil, errors.New("can't move file onto non-file")
} }
@ -793,8 +798,10 @@ func (o *Object) Remote() string {
// Hash returns the requested hash of a file as a lowercase hex string // Hash returns the requested hash of a file as a lowercase hex string
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) { func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
// Check that the underlying file hasn't changed // Check that the underlying file hasn't changed
o.fs.objectMetaMu.RLock()
oldtime := o.modTime oldtime := o.modTime
oldsize := o.size oldsize := o.size
o.fs.objectMetaMu.RUnlock()
err := o.lstat() err := o.lstat()
var changed bool var changed bool
if err != nil { if err != nil {
@ -806,15 +813,16 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", errors.Wrap(err, "hash: failed to stat") return "", errors.Wrap(err, "hash: failed to stat")
} }
} else { } else {
o.fs.objectMetaMu.RLock()
changed = !o.modTime.Equal(oldtime) || oldsize != o.size changed = !o.modTime.Equal(oldtime) || oldsize != o.size
o.fs.objectMetaMu.RUnlock()
} }
o.fs.objectHashesMu.Lock() o.fs.objectMetaMu.RLock()
hashes := o.hashes
hashValue, hashFound := o.hashes[r] hashValue, hashFound := o.hashes[r]
o.fs.objectHashesMu.Unlock() o.fs.objectMetaMu.RUnlock()
if changed || hashes == nil || !hashFound { if changed || !hashFound {
var in io.ReadCloser var in io.ReadCloser
if !o.translatedLink { if !o.translatedLink {
@ -833,6 +841,7 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
if err != nil { if err != nil {
return "", errors.Wrap(err, "hash: failed to open") return "", errors.Wrap(err, "hash: failed to open")
} }
var hashes map[hash.Type]string
hashes, err = hash.StreamTypes(in, hash.NewHashSet(r)) hashes, err = hash.StreamTypes(in, hash.NewHashSet(r))
closeErr := in.Close() closeErr := in.Close()
if err != nil { if err != nil {
@ -842,24 +851,28 @@ func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
return "", errors.Wrap(closeErr, "hash: failed to close") return "", errors.Wrap(closeErr, "hash: failed to close")
} }
hashValue = hashes[r] hashValue = hashes[r]
o.fs.objectHashesMu.Lock() o.fs.objectMetaMu.Lock()
if o.hashes == nil { if o.hashes == nil {
o.hashes = hashes o.hashes = hashes
} else { } else {
o.hashes[r] = hashValue o.hashes[r] = hashValue
} }
o.fs.objectHashesMu.Unlock() o.fs.objectMetaMu.Unlock()
} }
return hashValue, nil return hashValue, nil
} }
// Size returns the size of an object in bytes // Size returns the size of an object in bytes
func (o *Object) Size() int64 { func (o *Object) Size() int64 {
o.fs.objectMetaMu.RLock()
defer o.fs.objectMetaMu.RUnlock()
return o.size return o.size
} }
// ModTime returns the modification time of the object // ModTime returns the modification time of the object
func (o *Object) ModTime(ctx context.Context) time.Time { func (o *Object) ModTime(ctx context.Context) time.Time {
o.fs.objectMetaMu.RLock()
defer o.fs.objectMetaMu.RUnlock()
return o.modTime return o.modTime
} }
@ -880,7 +893,9 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
// Storable returns a boolean showing if this object is storable // Storable returns a boolean showing if this object is storable
func (o *Object) Storable() bool { func (o *Object) Storable() bool {
o.fs.objectMetaMu.RLock()
mode := o.mode mode := o.mode
o.fs.objectMetaMu.RUnlock()
if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks { if mode&os.ModeSymlink != 0 && !o.fs.opt.TranslateSymlinks {
if !o.fs.opt.SkipSymlinks { if !o.fs.opt.SkipSymlinks {
fs.Logf(o, "Can't follow symlink without -L/--copy-links") fs.Logf(o, "Can't follow symlink without -L/--copy-links")
@ -913,11 +928,15 @@ func (file *localOpenFile) Read(p []byte) (n int, err error) {
if err != nil { if err != nil {
return 0, errors.Wrap(err, "can't read status of source file while transferring") return 0, errors.Wrap(err, "can't read status of source file while transferring")
} }
if file.o.size != fi.Size() { file.o.fs.objectMetaMu.RLock()
return 0, fserrors.NoLowLevelRetryError(errors.Errorf("can't copy - source file is being updated (size changed from %d to %d)", file.o.size, fi.Size())) oldtime := file.o.modTime
oldsize := file.o.size
file.o.fs.objectMetaMu.RUnlock()
if oldsize != fi.Size() {
return 0, fserrors.NoLowLevelRetryError(errors.Errorf("can't copy - source file is being updated (size changed from %d to %d)", oldsize, fi.Size()))
} }
if !file.o.modTime.Equal(fi.ModTime()) { if !oldtime.Equal(fi.ModTime()) {
return 0, fserrors.NoLowLevelRetryError(errors.Errorf("can't copy - source file is being updated (mod time changed from %v to %v)", file.o.modTime, fi.ModTime())) return 0, fserrors.NoLowLevelRetryError(errors.Errorf("can't copy - source file is being updated (mod time changed from %v to %v)", oldtime, fi.ModTime()))
} }
} }
@ -934,9 +953,9 @@ func (file *localOpenFile) Close() (err error) {
err = file.in.Close() err = file.in.Close()
if err == nil { if err == nil {
if file.hash.Size() == file.o.Size() { if file.hash.Size() == file.o.Size() {
file.o.fs.objectHashesMu.Lock() file.o.fs.objectMetaMu.Lock()
file.o.hashes = file.hash.Sums() file.o.hashes = file.hash.Sums()
file.o.fs.objectHashesMu.Unlock() file.o.fs.objectMetaMu.Unlock()
} }
} }
return err return err
@ -961,7 +980,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
case *fs.SeekOption: case *fs.SeekOption:
offset = x.Offset offset = x.Offset
case *fs.RangeOption: case *fs.RangeOption:
offset, limit = x.Decode(o.size) offset, limit = x.Decode(o.Size())
case *fs.HashesOption: case *fs.HashesOption:
if x.Hashes.Count() > 0 { if x.Hashes.Count() > 0 {
hasher, err = hash.NewMultiHasherTypes(x.Hashes) hasher, err = hash.NewMultiHasherTypes(x.Hashes)
@ -1119,9 +1138,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// All successful so update the hashes // All successful so update the hashes
if hasher != nil { if hasher != nil {
o.fs.objectHashesMu.Lock() o.fs.objectMetaMu.Lock()
o.hashes = hasher.Sums() o.hashes = hasher.Sums()
o.fs.objectHashesMu.Unlock() o.fs.objectMetaMu.Unlock()
} }
// Set the mtime // Set the mtime
@ -1183,17 +1202,11 @@ func (o *Object) setMetadata(info os.FileInfo) {
if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() { if o.fs.opt.NoCheckUpdated && !o.modTime.IsZero() {
return return
} }
// Don't overwrite the info if we don't need to o.fs.objectMetaMu.Lock()
// this avoids upsetting the race detector o.size = info.Size()
if o.size != info.Size() { o.modTime = info.ModTime()
o.size = info.Size() o.mode = info.Mode()
} o.fs.objectMetaMu.Unlock()
if !o.modTime.Equal(info.ModTime()) {
o.modTime = info.ModTime()
}
if o.mode != info.Mode() {
o.mode = info.Mode()
}
} }
// Stat an Object into info // Stat an Object into info