dropbox: basics of metadata in Dropbox datastore working

This commit is contained in:
Nick Craig-Wood 2014-07-10 00:17:40 +01:00
parent 2149185fc2
commit 2b0911531c

View File

@ -13,6 +13,10 @@ Can only have 25,000 objects in a directory
Setting metadata is problematic! Might have to use a database Setting metadata is problematic! Might have to use a database
Md5sum has to download the file Md5sum has to download the file
FIXME do we need synchronisation for any of the dropbox calls?
FIXME need to delete metadata when we delete files!
*/ */
import ( import (
@ -25,6 +29,7 @@ import (
"time" "time"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/swift"
"github.com/stacktic/dropbox" "github.com/stacktic/dropbox"
) )
@ -34,6 +39,10 @@ const (
rcloneAppSecret = "1n9m04y2zx7bf26" rcloneAppSecret = "1n9m04y2zx7bf26"
uploadChunkSize = 64 * 1024 // chunk size for upload uploadChunkSize = 64 * 1024 // chunk size for upload
metadataLimit = dropbox.MetadataLimitDefault // max items to fetch at once metadataLimit = dropbox.MetadataLimitDefault // max items to fetch at once
datastoreName = "rclone"
tableName = "metadata"
md5sumField = "md5sum"
mtimeField = "mtime"
) )
// Register with Fs // Register with Fs
@ -84,9 +93,12 @@ func Config(name string) {
// FsDropbox represents a remote dropbox server // FsDropbox represents a remote dropbox server
type FsDropbox struct { type FsDropbox struct {
db *dropbox.Dropbox // the connection to the dropbox server db *dropbox.Dropbox // the connection to the dropbox server
root string // the path we are working on root string // the path we are working on
slashRoot string // root with "/" prefix and postix slashRoot string // root with "/" prefix and postix
datastoreManager *dropbox.DatastoreManager
datastore *dropbox.Datastore
table *dropbox.Table
} }
// FsObjectDropbox describes a dropbox object // FsObjectDropbox describes a dropbox object
@ -153,25 +165,40 @@ func NewFs(name, path string) (fs.Fs, error) {
// Authorize the client // Authorize the client
db.SetAccessToken(token) db.SetAccessToken(token)
// Make a db to store rclone metadata in
f.datastoreManager = db.NewDatastoreManager()
// Open the rclone datastore
f.datastore, err = f.datastoreManager.OpenDatastore(datastoreName)
if err != nil {
return nil, err
}
// Get the table we are using
f.table, err = f.datastore.GetTable(tableName)
if err != nil {
return nil, err
}
return f, nil return f, nil
} }
// Return an FsObject from a path // Return an FsObject from a path
func (f *FsDropbox) newFsObjectWithInfo(remote string, info *dropbox.Entry) (fs.Object, error) { func (f *FsDropbox) newFsObjectWithInfo(remote string, info *dropbox.Entry) (fs.Object, error) {
fs := &FsObjectDropbox{ o := &FsObjectDropbox{
dropbox: f, dropbox: f,
remote: remote, remote: remote,
} }
if info != nil { if info == nil {
fs.setMetaData(info) o.setMetadataFromEntry(info)
} else { } else {
err := fs.readMetaData() // reads info and meta, returning an error err := o.readEntryAndSetMetadata()
if err != nil { if err != nil {
// logged already fs.Debug("Failed to read info: %s", err) // logged already fs.Debug("Failed to read info: %s", err)
return nil, err return nil, err
} }
} }
return fs, nil return o, nil
} }
// Return an FsObject from a path // Return an FsObject from a path
@ -304,7 +331,7 @@ func (f *FsDropbox) Rmdir() error {
// Return the precision // Return the precision
func (fs *FsDropbox) Precision() time.Duration { func (fs *FsDropbox) Precision() time.Duration {
return time.Second return time.Nanosecond
} }
// Purge deletes all the files and the container // Purge deletes all the files and the container
@ -342,17 +369,26 @@ func (o *FsObjectDropbox) Md5sum() (string, error) {
if o.md5sum != "" { if o.md5sum != "" {
return o.md5sum, nil return o.md5sum, nil
} }
in, err := o.Open() err := o.readMetaData()
if err != nil { if err != nil {
return "", err fs.Log(o, "Failed to read metadata: %s", err)
return "", fmt.Errorf("Failed to read metadata: %s", err)
} }
defer in.Close()
hash := md5.New() // For pre-existing files which have no md5sum can read it and set it?
_, err = io.Copy(hash, in)
if err != nil { // in, err := o.Open()
return "", err // if err != nil {
} // return "", err
o.md5sum = fmt.Sprintf("%x", hash.Sum(nil)) // }
// defer in.Close()
// hash := md5.New()
// _, err = io.Copy(hash, in)
// if err != nil {
// return "", err
// }
// o.md5sum = fmt.Sprintf("%x", hash.Sum(nil))
return o.md5sum, nil return o.md5sum, nil
} }
@ -361,28 +397,102 @@ func (o *FsObjectDropbox) Size() int64 {
return o.bytes return o.bytes
} }
// setMetaData sets the fs data from a dropbox.Entry // setMetadataFromEntry sets the fs data from a dropbox.Entry
func (o *FsObjectDropbox) setMetaData(info *dropbox.Entry) { //
// This isn't a complete set of metadata and has an inacurate date
func (o *FsObjectDropbox) setMetadataFromEntry(info *dropbox.Entry) {
o.bytes = int64(info.Bytes) o.bytes = int64(info.Bytes)
o.modTime = time.Time(info.ClientMtime) o.modTime = time.Time(info.ClientMtime)
} }
// Reads the entry from dropbox
func (o *FsObjectDropbox) readEntry() (*dropbox.Entry, error) {
entry, err := o.dropbox.db.Metadata(o.remotePath(), false, false, "", "", metadataLimit)
if err != nil {
fs.Debug(o, "Error reading file: %s", err)
return nil, fmt.Errorf("Error reading file: %s", err)
}
return entry, nil
}
// Read entry if not set and set metadata from it
func (o *FsObjectDropbox) readEntryAndSetMetadata() error {
// Last resort set time from client
if !o.modTime.IsZero() {
return nil
}
entry, err := o.readEntry()
if err != nil {
return err
}
o.setMetadataFromEntry(entry)
return nil
}
// Returns the remote path for the object // Returns the remote path for the object
func (o *FsObjectDropbox) remotePath() string { func (o *FsObjectDropbox) remotePath() string {
return o.dropbox.slashRoot + o.remote return o.dropbox.slashRoot + o.remote
} }
// Returns the key for the metadata database
func (o *FsObjectDropbox) metadataKey() string {
// FIXME lower case it?
key := o.dropbox.slashRoot + o.remote
return fmt.Sprintf("%x", md5.Sum([]byte(key)))
}
// readMetaData gets the info if it hasn't already been fetched // readMetaData gets the info if it hasn't already been fetched
func (o *FsObjectDropbox) readMetaData() (err error) { func (o *FsObjectDropbox) readMetaData() (err error) {
if !o.modTime.IsZero() { if o.md5sum != "" {
return nil return nil
} }
entry, err := o.dropbox.db.Metadata(o.remotePath(), false, false, "", "", metadataLimit)
record, err := o.dropbox.table.Get(o.metadataKey())
if err != nil { if err != nil {
fs.Debug(o, "Couldn't find directory: %s", err) fs.Debug(o, "Couldn't read metadata: %s", err)
return fmt.Errorf("Couldn't find directory: %s", err) record = nil
} }
o.setMetaData(entry)
if record != nil {
// Read md5sum
md5sumInterface, ok, err := record.Get(md5sumField)
if err != nil {
return err
}
if !ok {
fs.Debug(o, "Couldn't find md5sum in record")
} else {
md5sum, ok := md5sumInterface.(string)
if !ok {
fs.Debug(o, "md5sum not a string")
} else {
o.md5sum = md5sum
}
}
// read mtime
mtimeInterface, ok, err := record.Get(mtimeField)
if err != nil {
return err
}
if !ok {
fs.Debug(o, "Couldn't find mtime in record")
} else {
mtime, ok := mtimeInterface.(string)
if !ok {
fs.Debug(o, "mtime not a string")
} else {
modTime, err := swift.FloatStringToTime(mtime)
if err != nil {
return err
}
o.modTime = modTime
}
}
}
// Last resort
o.readEntryAndSetMetadata()
return nil return nil
} }
@ -399,16 +509,45 @@ func (o *FsObjectDropbox) ModTime() time.Time {
return o.modTime return o.modTime
} }
// Sets the modification time of the local fs object into the record
// FIXME if we don't set md5sum what will that do?
func (o *FsObjectDropbox) setModTimeAndMd5sum(modTime time.Time, md5sum string) error {
record, err := o.dropbox.table.GetOrInsert(o.metadataKey())
if err != nil {
return fmt.Errorf("Couldn't read record: %s", err)
}
if md5sum != "" {
err = record.Set(md5sumField, md5sum)
if err != nil {
return fmt.Errorf("Couldn't set md5sum record: %s", err)
}
}
if !modTime.IsZero() {
mtime := swift.TimeToFloatString(modTime)
err := record.Set(mtimeField, mtime)
if err != nil {
return fmt.Errorf("Couldn't set mtime record: %s", err)
}
}
err = o.dropbox.datastore.Commit()
if err != nil {
return fmt.Errorf("Failed to commit metadata changes: %s", err)
}
return nil
}
// Sets the modification time of the local fs object // Sets the modification time of the local fs object
//
// Commits the datastore
func (o *FsObjectDropbox) SetModTime(modTime time.Time) { func (o *FsObjectDropbox) SetModTime(modTime time.Time) {
err := o.readMetaData() err := o.setModTimeAndMd5sum(modTime, "")
if err != nil { if err != nil {
fs.Stats.Error() fs.Stats.Error()
fs.Log(o, "Failed to read metadata: %s", err) fs.Log(o, err.Error())
return
} }
// fs.Stats.Error()
fs.Log(o, "FIXME can't update dropbox mtime")
} }
// Is this object storable // Is this object storable
@ -428,13 +567,17 @@ func (o *FsObjectDropbox) Open() (in io.ReadCloser, err error) {
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *FsObjectDropbox) Update(in io.Reader, modTime time.Time, size int64) error { func (o *FsObjectDropbox) Update(in io.Reader, modTime time.Time, size int64) error {
rc := &readCloser{in: in} // Calculate md5sum as we upload it
hash := md5.New()
rc := &readCloser{in: io.TeeReader(in, hash)}
entry, err := o.dropbox.db.UploadByChunk(rc, uploadChunkSize, o.remotePath(), true, "") entry, err := o.dropbox.db.UploadByChunk(rc, uploadChunkSize, o.remotePath(), true, "")
if err != nil { if err != nil {
return fmt.Errorf("Upload failed: %s", err) return fmt.Errorf("Upload failed: %s", err)
} }
o.setMetaData(entry) o.setMetadataFromEntry(entry)
return nil
md5sum := fmt.Sprintf("%x", hash.Sum(nil))
return o.setModTimeAndMd5sum(modTime, md5sum)
} }
// Remove an object // Remove an object