From 6085dc1b5acc0cb9890a0a37f9807d91411c4298 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 27 Jun 2013 20:13:07 +0100 Subject: [PATCH] Refactor into sub filesystems --- fs_drive.go => drive/fs.go | 116 +++++++++++---------- accounting.go => fs/accounting.go | 61 ++++++----- fs.go => fs/fs.go | 70 +++++++++---- fs_local.go => local/fs.go | 112 +++++++++++--------- rclone.go | 168 ++++++++++++++++-------------- fs_s3.go => s3/fs.go | 136 ++++++++++++------------ fs_swift.go => swift/fs.go | 108 ++++++++++--------- 7 files changed, 420 insertions(+), 351 deletions(-) rename fs_drive.go => drive/fs.go (89%) rename accounting.go => fs/accounting.go (71%) rename fs.go => fs/fs.go (83%) rename fs_local.go => local/fs.go (75%) rename fs_s3.go => s3/fs.go (74%) rename fs_swift.go => swift/fs.go (74%) diff --git a/fs_drive.go b/drive/fs.go similarity index 89% rename from fs_drive.go rename to drive/fs.go index d4ee7fb39..629435fd1 100644 --- a/fs_drive.go +++ b/drive/fs.go @@ -1,8 +1,9 @@ // Drive interface -package main +package drive // FIXME drive code is leaking goroutines somehow - reported bug // https://code.google.com/p/google-api-go-client/issues/detail?id=23 +// Now fixed! // FIXME list containers equivalent should list directories? @@ -22,6 +23,7 @@ package main // * files with / in name import ( + "../fs" "code.google.com/p/goauth2/oauth" "code.google.com/p/google-api-go-client/drive/v2" "errors" @@ -287,7 +289,7 @@ func NewFsDrive(path string) (*FsDrive, error) { // Return an FsObject from a path // // May return nil if an error occurred -func (f *FsDrive) NewFsObjectWithInfo(remote string, info *drive.File) FsObject { +func (f *FsDrive) NewFsObjectWithInfo(remote string, info *drive.File) fs.FsObject { fs := &FsObjectDrive{ drive: f, remote: remote, @@ -297,7 +299,7 @@ func (f *FsDrive) NewFsObjectWithInfo(remote string, info *drive.File) FsObject } else { err := fs.readMetaData() // reads info and meta, returning an error if err != nil { - // logged already FsDebug("Failed to read info: %s", err) + // logged already fs.FsDebug("Failed to read info: %s", err) return nil } } @@ -307,7 +309,7 @@ func (f *FsDrive) NewFsObjectWithInfo(remote string, info *drive.File) FsObject // Return an FsObject from a path // // May return nil if an error occurred -func (f *FsDrive) NewFsObject(remote string) FsObject { +func (f *FsDrive) NewFsObject(remote string) fs.FsObject { return f.NewFsObjectWithInfo(remote, nil) } @@ -317,7 +319,7 @@ func (f *FsDrive) NewFsObject(remote string) FsObject { // // This fetches the minimum amount of stuff but does more API calls // which makes it slow -func (f *FsDrive) listDirRecursive(dirId string, path string, out FsObjectsChan) error { +func (f *FsDrive) listDirRecursive(dirId string, path string, out fs.FsObjectsChan) error { var subError error // Make the API request _, err := f.listAll(dirId, "", false, false, func(item *drive.File) bool { @@ -355,7 +357,7 @@ func (f *FsDrive) listDirRecursive(dirId string, path string, out FsObjectsChan) // // This is fast in terms of number of API calls, but slow in terms of // fetching more data than it needs -func (f *FsDrive) listDirFull(dirId string, path string, out FsObjectsChan) error { +func (f *FsDrive) listDirFull(dirId string, path string, out fs.FsObjectsChan) error { // Orphans waiting for their parent orphans := make(map[string][]*drive.File) @@ -545,13 +547,13 @@ func (f *FsDrive) findRoot(create bool) error { } // Walk the path returning a channel of FsObjects -func (f *FsDrive) List() FsObjectsChan { - out := make(FsObjectsChan, *checkers) +func (f *FsDrive) List() fs.FsObjectsChan { + out := make(fs.FsObjectsChan, fs.Config.Checkers) go func() { defer close(out) err := f.findRoot(false) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Couldn't find root: %s", err) } else { if *driveFullList { @@ -560,7 +562,7 @@ func (f *FsDrive) List() FsObjectsChan { err = f.listDirRecursive(f.rootId, "", out) } if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("List failed: %s", err) } } @@ -569,17 +571,17 @@ func (f *FsDrive) List() FsObjectsChan { } // Walk the path returning a channel of FsObjects -func (f *FsDrive) ListDir() FsDirChan { - out := make(FsDirChan, *checkers) +func (f *FsDrive) ListDir() fs.FsDirChan { + out := make(fs.FsDirChan, fs.Config.Checkers) go func() { defer close(out) err := f.findRoot(false) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Couldn't find root: %s", err) } else { _, err := f.listAll(f.rootId, "", true, false, func(item *drive.File) bool { - dir := &FsDir{ + dir := &fs.FsDir{ Name: item.Title, Bytes: -1, Count: -1, @@ -589,7 +591,7 @@ func (f *FsDrive) ListDir() FsDirChan { return false }) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("ListDir failed: %s", err) } } @@ -602,7 +604,7 @@ func (f *FsDrive) ListDir() FsDirChan { // Copy the reader in to the new object which is returned // // The new object may have been created -func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) { +func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.FsObject, error) { // Temporary FsObject under construction fs := &FsObjectDrive{drive: f, remote: remote} @@ -710,45 +712,45 @@ func (f *FsDrive) Purge() error { // ------------------------------------------------------------ // Return the remote path -func (fs *FsObjectDrive) Remote() string { - return fs.remote +func (o *FsObjectDrive) Remote() string { + return o.remote } // Md5sum returns the Md5sum of an object returning a lowercase hex string -func (fs *FsObjectDrive) Md5sum() (string, error) { - return fs.md5sum, nil +func (o *FsObjectDrive) Md5sum() (string, error) { + return o.md5sum, nil } // Size returns the size of an object in bytes -func (fs *FsObjectDrive) Size() int64 { - return fs.bytes +func (o *FsObjectDrive) Size() int64 { + return o.bytes } // setMetaData sets the fs data from a drive.File -func (fs *FsObjectDrive) setMetaData(info *drive.File) { - fs.id = info.Id - fs.url = info.DownloadUrl - fs.md5sum = strings.ToLower(info.Md5Checksum) - fs.bytes = info.FileSize - fs.modifiedDate = info.ModifiedDate +func (o *FsObjectDrive) setMetaData(info *drive.File) { + o.id = info.Id + o.url = info.DownloadUrl + o.md5sum = strings.ToLower(info.Md5Checksum) + o.bytes = info.FileSize + o.modifiedDate = info.ModifiedDate } // readMetaData gets the info if it hasn't already been fetched -func (fs *FsObjectDrive) readMetaData() (err error) { - if fs.id != "" { +func (o *FsObjectDrive) readMetaData() (err error) { + if o.id != "" { return nil } - directory, leaf := splitPath(fs.remote) - directoryId, err := fs.drive.findDir(directory, false) + directory, leaf := splitPath(o.remote) + directoryId, err := o.drive.findDir(directory, false) if err != nil { - FsDebug(fs, "Couldn't find directory: %s", err) + fs.FsDebug(o, "Couldn't find directory: %s", err) return fmt.Errorf("Couldn't find directory: %s", err) } - found, err := fs.drive.listAll(directoryId, leaf, false, true, func(item *drive.File) bool { + found, err := o.drive.listAll(directoryId, leaf, false, true, func(item *drive.File) bool { if item.Title == leaf { - fs.setMetaData(item) + o.setMetaData(item) return true } return false @@ -757,7 +759,7 @@ func (fs *FsObjectDrive) readMetaData() (err error) { return err } if !found { - FsDebug(fs, "Couldn't find object") + fs.FsDebug(o, "Couldn't find object") return fmt.Errorf("Couldn't find object") } return nil @@ -768,26 +770,26 @@ func (fs *FsObjectDrive) readMetaData() (err error) { // // It attempts to read the objects mtime and if that isn't present the // LastModified returned in the http headers -func (fs *FsObjectDrive) ModTime() time.Time { - err := fs.readMetaData() +func (o *FsObjectDrive) ModTime() time.Time { + err := o.readMetaData() if err != nil { - FsLog(fs, "Failed to read metadata: %s", err) + fs.FsLog(o, "Failed to read metadata: %s", err) return time.Now() } - modTime, err := time.Parse(time.RFC3339, fs.modifiedDate) + modTime, err := time.Parse(time.RFC3339, o.modifiedDate) if err != nil { - FsLog(fs, "Failed to read mtime from object: %s", err) + fs.FsLog(o, "Failed to read mtime from object: %s", err) return time.Now() } return modTime } // Sets the modification time of the local fs object -func (fs *FsObjectDrive) SetModTime(modTime time.Time) { - err := fs.readMetaData() +func (o *FsObjectDrive) SetModTime(modTime time.Time) { + err := o.readMetaData() if err != nil { - stats.Error() - FsLog(fs, "Failed to read metadata: %s", err) + fs.Stats.Error() + fs.FsLog(o, "Failed to read metadata: %s", err) return } // New metadata @@ -795,23 +797,23 @@ func (fs *FsObjectDrive) SetModTime(modTime time.Time) { ModifiedDate: modTime.Format(time.RFC3339Nano), } // Set modified date - _, err = fs.drive.svc.Files.Update(fs.id, info).SetModifiedDate(true).Do() + _, err = o.drive.svc.Files.Update(o.id, info).SetModifiedDate(true).Do() if err != nil { - stats.Error() - FsLog(fs, "Failed to update remote mtime: %s", err) + fs.Stats.Error() + fs.FsLog(o, "Failed to update remote mtime: %s", err) } } // Is this object storable -func (fs *FsObjectDrive) Storable() bool { +func (o *FsObjectDrive) Storable() bool { return true } // Open an object for read -func (fs *FsObjectDrive) Open() (in io.ReadCloser, err error) { - req, _ := http.NewRequest("GET", fs.url, nil) +func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) { + req, _ := http.NewRequest("GET", o.url, nil) req.Header.Set("User-Agent", "rclone/1.0") - res, err := fs.drive.client.Do(req) + res, err := o.drive.client.Do(req) if err != nil { return nil, err } @@ -823,11 +825,11 @@ func (fs *FsObjectDrive) Open() (in io.ReadCloser, err error) { } // Remove an object -func (fs *FsObjectDrive) Remove() error { - return fs.drive.svc.Files.Delete(fs.id).Do() +func (o *FsObjectDrive) Remove() error { + return o.drive.svc.Files.Delete(o.id).Do() } // Check the interfaces are satisfied -var _ Fs = &FsDrive{} -var _ Purger = &FsDrive{} -var _ FsObject = &FsObjectDrive{} +var _ fs.Fs = &FsDrive{} +var _ fs.Purger = &FsDrive{} +var _ fs.FsObject = &FsObjectDrive{} diff --git a/accounting.go b/fs/accounting.go similarity index 71% rename from accounting.go rename to fs/accounting.go index e50336427..9af5a4caa 100644 --- a/accounting.go +++ b/fs/accounting.go @@ -1,6 +1,6 @@ // Accounting and limiting reader -package main +package fs import ( "bytes" @@ -14,7 +14,7 @@ import ( // Globals var ( - stats = NewStats() + Stats = NewStats() ) // Stringset holds some strings @@ -35,7 +35,7 @@ func (ss StringSet) String() string { } // Stats limits and accounts all transfers -type Stats struct { +type StatsInfo struct { lock sync.RWMutex bytes int64 errors int64 @@ -46,24 +46,24 @@ type Stats struct { start time.Time } -// NewStats cretates an initialised Stats -func NewStats() *Stats { - return &Stats{ - checking: make(StringSet, *checkers), - transferring: make(StringSet, *transfers), +// NewStats cretates an initialised StatsInfo +func NewStats() *StatsInfo { + return &StatsInfo{ + checking: make(StringSet, Config.Checkers), + transferring: make(StringSet, Config.Transfers), start: time.Now(), } } -// String convert the Stats to a string for printing -func (s *Stats) String() string { +// String convert the StatsInfo to a string for printing +func (s *StatsInfo) String() string { s.lock.RLock() defer s.lock.RUnlock() - dt := time.Now().Sub(stats.start) + dt := time.Now().Sub(s.start) dt_seconds := dt.Seconds() speed := 0.0 if dt > 0 { - speed = float64(stats.bytes) / 1024 / dt_seconds + speed = float64(s.bytes) / 1024 / dt_seconds } buf := &bytes.Buffer{} fmt.Fprintf(buf, ` @@ -73,10 +73,10 @@ Checks: %10d Transferred: %10d Elapsed time: %v `, - stats.bytes, speed, - stats.errors, - stats.checks, - stats.transfers, + s.bytes, speed, + s.errors, + s.checks, + s.transfers, dt) if len(s.checking) > 0 { fmt.Fprintf(buf, "Checking: %s\n", s.checking) @@ -87,41 +87,48 @@ Elapsed time: %v return buf.String() } -// Log outputs the Stats to the log -func (s *Stats) Log() { - log.Printf("%v\n", stats) +// Log outputs the StatsInfo to the log +func (s *StatsInfo) Log() { + log.Printf("%v\n", s) } // Bytes updates the stats for bytes bytes -func (s *Stats) Bytes(bytes int64) { +func (s *StatsInfo) Bytes(bytes int64) { s.lock.Lock() defer s.lock.Unlock() s.bytes += bytes } // Errors updates the stats for errors -func (s *Stats) Errors(errors int64) { +func (s *StatsInfo) Errors(errors int64) { s.lock.Lock() defer s.lock.Unlock() s.errors += errors } +// Errored returns whether there have been any errors +func (s *StatsInfo) Errored() bool { + s.lock.Lock() + defer s.lock.Unlock() + return s.errors != 0 +} + // Error adds a single error into the stats -func (s *Stats) Error() { +func (s *StatsInfo) Error() { s.lock.Lock() defer s.lock.Unlock() s.errors += 1 } // Checking adds a check into the stats -func (s *Stats) Checking(fs FsObject) { +func (s *StatsInfo) Checking(fs FsObject) { s.lock.Lock() defer s.lock.Unlock() s.checking[fs.Remote()] = true } // DoneChecking removes a check from the stats -func (s *Stats) DoneChecking(fs FsObject) { +func (s *StatsInfo) DoneChecking(fs FsObject) { s.lock.Lock() defer s.lock.Unlock() delete(s.checking, fs.Remote()) @@ -129,14 +136,14 @@ func (s *Stats) DoneChecking(fs FsObject) { } // Transferring adds a transfer into the stats -func (s *Stats) Transferring(fs FsObject) { +func (s *StatsInfo) Transferring(fs FsObject) { s.lock.Lock() defer s.lock.Unlock() s.transferring[fs.Remote()] = true } // DoneTransferring removes a transfer from the stats -func (s *Stats) DoneTransferring(fs FsObject) { +func (s *StatsInfo) DoneTransferring(fs FsObject) { s.lock.Lock() defer s.lock.Unlock() delete(s.transferring, fs.Remote()) @@ -160,7 +167,7 @@ func NewAccount(in io.ReadCloser) *Account { func (file *Account) Read(p []byte) (n int, err error) { n, err = file.in.Read(p) file.bytes += int64(n) - stats.Bytes(int64(n)) + Stats.Bytes(int64(n)) if err == io.EOF { // FIXME Do something? } diff --git a/fs.go b/fs/fs.go similarity index 83% rename from fs.go rename to fs/fs.go index f51b6b233..2b574cbec 100644 --- a/fs.go +++ b/fs/fs.go @@ -1,14 +1,49 @@ // File system interface -package main +package fs import ( "fmt" "io" "log" + "regexp" "time" ) +// Globals +var ( + // Global config + Config = &FsConfig{} + // Filesystem registry + fsRegistry []fsRegistryItem +) + +// Filesystem config options +type FsConfig struct { + Verbose bool + Quiet bool + ModifyWindow time.Duration + Checkers int + Transfers int +} + +// FIXME need local to go last + +// Filesystem registry item +type fsRegistryItem struct { + match *regexp.Regexp // if this matches then can call newFs + newFs func(string) (Fs, error) // create a new file system +} + +// Register a filesystem +// +// If a path matches with match then can call newFs on it +// +// Fs modules should use this in an init() function +func Register(match *regexp.Regexp, newFs func(string) (Fs, error)) { + fsRegistry = append(fsRegistry, fsRegistryItem{match: match, newFs: newFs}) +} + // A Filesystem, describes the local filesystem and the remote object store type Fs interface { // String returns a description of the FS @@ -100,21 +135,17 @@ type FsDirChan chan *FsDir // // FIXME make more generic func NewFs(path string) (Fs, error) { - if swiftMatch.MatchString(path) { - return NewFsSwift(path) + for _, item := range fsRegistry { + if item.match.MatchString(path) { + return item.newFs(path) + } } - if s3Match.MatchString(path) { - return NewFsS3(path) - } - if driveMatch.MatchString(path) { - return NewFsDrive(path) - } - return NewFsLocal(path) + panic("Not found") // FIXME } // Write debuging output for this FsObject func FsDebug(fs FsObject, text string, args ...interface{}) { - if *verbose { + if Config.Verbose { out := fmt.Sprintf(text, args...) log.Printf("%s: %s", fs.Remote(), out) } @@ -122,7 +153,7 @@ func FsDebug(fs FsObject, text string, args ...interface{}) { // Write log output for this FsObject func FsLog(fs FsObject, text string, args ...interface{}) { - if !*quiet { + if !Config.Quiet { out := fmt.Sprintf(text, args...) log.Printf("%s: %s", fs.Remote(), out) } @@ -145,13 +176,13 @@ func checkClose(c io.Closer, err *error) { func CheckMd5sums(src, dst FsObject) (bool, error) { srcMd5, err := src.Md5sum() if err != nil { - stats.Error() + Stats.Error() FsLog(src, "Failed to calculate src md5: %s", err) return false, err } dstMd5, err := dst.Md5sum() if err != nil { - stats.Error() + Stats.Error() FsLog(dst, "Failed to calculate dst md5: %s", err) return false, err } @@ -186,10 +217,11 @@ func Equal(src, dst FsObject) bool { srcModTime := src.ModTime() dstModTime := dst.ModTime() dt := dstModTime.Sub(srcModTime) - if dt >= *modifyWindow || dt <= -*modifyWindow { + ModifyWindow := Config.ModifyWindow + if dt >= ModifyWindow || dt <= -ModifyWindow { FsDebug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime) } else { - FsDebug(src, "Size and modification time differ by %s (within %s)", dt, *modifyWindow) + FsDebug(src, "Size and modification time differ by %s (within %s)", dt, ModifyWindow) return true } @@ -213,7 +245,7 @@ func Equal(src, dst FsObject) bool { func Copy(f Fs, src FsObject) { in0, err := src.Open() if err != nil { - stats.Error() + Stats.Error() FsLog(src, "Failed to open: %s", err) return } @@ -225,13 +257,13 @@ func Copy(f Fs, src FsObject) { err = inErr } if err != nil { - stats.Error() + Stats.Error() FsLog(src, "Failed to copy: %s", err) if dst != nil { FsDebug(dst, "Removing failed copy") removeErr := dst.Remove() if removeErr != nil { - stats.Error() + Stats.Error() FsLog(dst, "Failed to remove failed copy: %s", removeErr) } } diff --git a/fs_local.go b/local/fs.go similarity index 75% rename from fs_local.go rename to local/fs.go index f7050974b..5f8afdc11 100644 --- a/fs_local.go +++ b/local/fs.go @@ -1,7 +1,8 @@ // Local filesystem interface -package main +package local import ( + "../fs" "crypto/md5" "fmt" "io" @@ -10,10 +11,19 @@ import ( "os" "path" "path/filepath" + "regexp" "sync" "time" ) +// Pattern to match a local url (matches anything) +var Match = regexp.MustCompile(``) + +// Register with Fs +func init() { + fs.Register(Match, NewFs) +} + // FsLocal represents a local filesystem rooted at root type FsLocal struct { root string // The root directory @@ -30,8 +40,8 @@ type FsObjectLocal struct { // ------------------------------------------------------------ -// NewFsLocal contstructs an FsLocal from the path -func NewFsLocal(root string) (*FsLocal, error) { +// NewFs contstructs an FsLocal from the path +func NewFs(root string) (fs.Fs, error) { root = path.Clean(root) f := &FsLocal{root: root} return f, nil @@ -45,42 +55,42 @@ func (f *FsLocal) String() string { // Return an FsObject from a path // // May return nil if an error occurred -func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) FsObject { +func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) fs.FsObject { path := filepath.Join(f.root, remote) - fs := &FsObjectLocal{remote: remote, path: path} + o := &FsObjectLocal{remote: remote, path: path} if info != nil { - fs.info = info + o.info = info } else { - err := fs.lstat() + err := o.lstat() if err != nil { - FsDebug(fs, "Failed to stat %s: %s", path, err) + fs.FsDebug(o, "Failed to stat %s: %s", path, err) return nil } } - return fs + return o } // Return an FsObject from a path // // May return nil if an error occurred -func (f *FsLocal) NewFsObject(remote string) FsObject { +func (f *FsLocal) NewFsObject(remote string) fs.FsObject { return f.NewFsObjectWithInfo(remote, nil) } // List the path returning a channel of FsObjects // // Ignores everything which isn't Storable, eg links etc -func (f *FsLocal) List() FsObjectsChan { - out := make(FsObjectsChan, *checkers) +func (f *FsLocal) List() fs.FsObjectsChan { + out := make(fs.FsObjectsChan, fs.Config.Checkers) go func() { err := filepath.Walk(f.root, func(path string, fi os.FileInfo, err error) error { if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Failed to open directory: %s: %s", path, err) } else { remote, err := filepath.Rel(f.root, path) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Failed to get relative path %s: %s", path, err) return nil } @@ -97,7 +107,7 @@ func (f *FsLocal) List() FsObjectsChan { return nil }) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Failed to open directory: %s: %s", f.root, err) } close(out) @@ -106,18 +116,18 @@ func (f *FsLocal) List() FsObjectsChan { } // Walk the path returning a channel of FsObjects -func (f *FsLocal) ListDir() FsDirChan { - out := make(FsDirChan, *checkers) +func (f *FsLocal) ListDir() fs.FsDirChan { + out := make(fs.FsDirChan, fs.Config.Checkers) go func() { defer close(out) items, err := ioutil.ReadDir(f.root) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Couldn't find read directory: %s", err) } else { for _, item := range items { if item.IsDir() { - dir := &FsDir{ + dir := &fs.FsDir{ Name: item.Name(), When: item.ModTime(), Bytes: 0, @@ -127,7 +137,7 @@ func (f *FsLocal) ListDir() FsDirChan { dirpath := path.Join(f.root, item.Name()) err := filepath.Walk(dirpath, func(path string, fi os.FileInfo, err error) error { if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Failed to open directory: %s: %s", path, err) } else { dir.Count += 1 @@ -136,7 +146,7 @@ func (f *FsLocal) ListDir() FsDirChan { return nil }) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Failed to open directory: %s: %s", dirpath, err) } out <- dir @@ -149,7 +159,7 @@ func (f *FsLocal) ListDir() FsDirChan { } // Puts the FsObject to the local filesystem -func (f *FsLocal) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) { +func (f *FsLocal) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.FsObject, error) { dstPath := filepath.Join(f.root, remote) // Temporary FsObject under construction fs := &FsObjectLocal{remote: remote, path: dstPath} @@ -225,7 +235,7 @@ func (f *FsLocal) readPrecision() (precision time.Duration) { for duration := time.Duration(1); duration < time.Second; duration *= 10 { // Current time with delta t := time.Unix(time.Now().Unix(), int64(duration)) - err := Chtimes(path, t, t) + err := os.Chtimes(path, t, t) if err != nil { // fmt.Println("Failed to Chtimes", err) break @@ -251,78 +261,78 @@ func (f *FsLocal) readPrecision() (precision time.Duration) { // ------------------------------------------------------------ // Return the remote path -func (fs *FsObjectLocal) Remote() string { - return fs.remote +func (o *FsObjectLocal) Remote() string { + return o.remote } // Md5sum calculates the Md5sum of a file returning a lowercase hex string -func (fs *FsObjectLocal) Md5sum() (string, error) { - in, err := os.Open(fs.path) +func (o *FsObjectLocal) Md5sum() (string, error) { + in, err := os.Open(o.path) if err != nil { - stats.Error() - FsLog(fs, "Failed to open: %s", err) + fs.Stats.Error() + fs.FsLog(o, "Failed to open: %s", err) return "", err } defer in.Close() // FIXME ignoring error hash := md5.New() _, err = io.Copy(hash, in) if err != nil { - stats.Error() - FsLog(fs, "Failed to read: %s", err) + fs.Stats.Error() + fs.FsLog(o, "Failed to read: %s", err) return "", err } return fmt.Sprintf("%x", hash.Sum(nil)), nil } // Size returns the size of an object in bytes -func (fs *FsObjectLocal) Size() int64 { - return fs.info.Size() +func (o *FsObjectLocal) Size() int64 { + return o.info.Size() } // ModTime returns the modification time of the object -func (fs *FsObjectLocal) ModTime() time.Time { - return fs.info.ModTime() +func (o *FsObjectLocal) ModTime() time.Time { + return o.info.ModTime() } // Sets the modification time of the local fs object -func (fs *FsObjectLocal) SetModTime(modTime time.Time) { - err := Chtimes(fs.path, modTime, modTime) +func (o *FsObjectLocal) SetModTime(modTime time.Time) { + err := os.Chtimes(o.path, modTime, modTime) if err != nil { - FsDebug(fs, "Failed to set mtime on file: %s", err) + fs.FsDebug(o, "Failed to set mtime on file: %s", err) } } // Is this object storable -func (fs *FsObjectLocal) Storable() bool { - mode := fs.info.Mode() +func (o *FsObjectLocal) Storable() bool { + mode := o.info.Mode() if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 { - FsDebug(fs, "Can't transfer non file/directory") + fs.FsDebug(o, "Can't transfer non file/directory") return false } else if mode&os.ModeDir != 0 { - FsDebug(fs, "FIXME Skipping directory") + fs.FsDebug(o, "FIXME Skipping directory") return false } return true } // Open an object for read -func (fs *FsObjectLocal) Open() (in io.ReadCloser, err error) { - in, err = os.Open(fs.path) +func (o *FsObjectLocal) Open() (in io.ReadCloser, err error) { + in, err = os.Open(o.path) return } // Stat a FsObject into info -func (fs *FsObjectLocal) lstat() error { - info, err := os.Lstat(fs.path) - fs.info = info +func (o *FsObjectLocal) lstat() error { + info, err := os.Lstat(o.path) + o.info = info return err } // Remove an object -func (fs *FsObjectLocal) Remove() error { - return os.Remove(fs.path) +func (o *FsObjectLocal) Remove() error { + return os.Remove(o.path) } // Check the interfaces are satisfied -var _ Fs = &FsLocal{} -var _ FsObject = &FsObjectLocal{} +var _ fs.Fs = &FsLocal{} +var _ fs.FsObject = &FsObjectLocal{} diff --git a/rclone.go b/rclone.go index aaab94630..2a94340c6 100644 --- a/rclone.go +++ b/rclone.go @@ -4,6 +4,7 @@ package main import ( + "./fs" "flag" "fmt" "log" @@ -13,6 +14,11 @@ import ( "strings" "sync" "time" + // Active file systems + _ "./drive" + _ "./local" + _ "./s3" + _ "./swift" ) // Globals @@ -28,17 +34,17 @@ var ( modifyWindow = flag.Duration("modify-window", time.Nanosecond, "Max time diff to be considered the same") ) -// A pair of FsObjects +// A pair of fs.FsObjects type PairFsObjects struct { - src, dst FsObject + src, dst fs.FsObject } type PairFsObjectsChan chan PairFsObjects // Check to see if src needs to be copied to dst and if so puts it in out -func checkOne(src, dst FsObject, out FsObjectsChan) { +func checkOne(src, dst fs.FsObject, out fs.FsObjectsChan) { if dst == nil { - FsDebug(src, "Couldn't find local file - download") + fs.FsDebug(src, "Couldn't find local file - download") out <- src return } @@ -47,8 +53,8 @@ func checkOne(src, dst FsObject, out FsObjectsChan) { return } // Check to see if changed or not - if Equal(src, dst) { - FsDebug(src, "Unchanged skipping") + if fs.Equal(src, dst) { + fs.FsDebug(src, "Unchanged skipping") return } out <- src @@ -57,49 +63,49 @@ func checkOne(src, dst FsObject, out FsObjectsChan) { // Read FsObjects~s on in send to out if they need uploading // // FIXME potentially doing lots of MD5SUMS at once -func PairChecker(in PairFsObjectsChan, out FsObjectsChan, wg *sync.WaitGroup) { +func PairChecker(in PairFsObjectsChan, out fs.FsObjectsChan, wg *sync.WaitGroup) { defer wg.Done() for pair := range in { src := pair.src - stats.Checking(src) + fs.Stats.Checking(src) checkOne(src, pair.dst, out) - stats.DoneChecking(src) + fs.Stats.DoneChecking(src) } } // Read FsObjects~s on in send to out if they need uploading // // FIXME potentially doing lots of MD5SUMS at once -func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { +func Checker(in, out fs.FsObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) { defer wg.Done() for src := range in { - stats.Checking(src) + fs.Stats.Checking(src) dst := fdst.NewFsObject(src.Remote()) checkOne(src, dst, out) - stats.DoneChecking(src) + fs.Stats.DoneChecking(src) } } // Read FsObjects on in and copy them -func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { +func Copier(in fs.FsObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) { defer wg.Done() for src := range in { - stats.Transferring(src) - Copy(fdst, src) - stats.DoneTransferring(src) + fs.Stats.Transferring(src) + fs.Copy(fdst, src) + fs.Stats.DoneTransferring(src) } } // Copies fsrc into fdst -func CopyFs(fdst, fsrc Fs) { +func CopyFs(fdst, fsrc fs.Fs) { err := fdst.Mkdir() if err != nil { - stats.Error() + fs.Stats.Error() log.Fatal("Failed to make destination") } to_be_checked := fsrc.List() - to_be_uploaded := make(FsObjectsChan, *transfers) + to_be_uploaded := make(fs.FsObjectsChan, *transfers) var checkerWg sync.WaitGroup checkerWg.Add(*checkers) @@ -121,7 +127,7 @@ func CopyFs(fdst, fsrc Fs) { } // Delete all the files passed in the channel -func DeleteFiles(to_be_deleted FsObjectsChan) { +func DeleteFiles(to_be_deleted fs.FsObjectsChan) { var wg sync.WaitGroup wg.Add(*transfers) for i := 0; i < *transfers; i++ { @@ -129,16 +135,16 @@ func DeleteFiles(to_be_deleted FsObjectsChan) { defer wg.Done() for dst := range to_be_deleted { if *dry_run { - FsDebug(dst, "Not deleting as -dry-run") + fs.FsDebug(dst, "Not deleting as -dry-run") } else { - stats.Checking(dst) + fs.Stats.Checking(dst) err := dst.Remove() - stats.DoneChecking(dst) + fs.Stats.DoneChecking(dst) if err != nil { - stats.Error() - FsLog(dst, "Couldn't delete: %s", err) + fs.Stats.Error() + fs.FsLog(dst, "Couldn't delete: %s", err) } else { - FsDebug(dst, "Deleted") + fs.FsDebug(dst, "Deleted") } } } @@ -150,10 +156,10 @@ func DeleteFiles(to_be_deleted FsObjectsChan) { } // Syncs fsrc into fdst -func Sync(fdst, fsrc Fs) { +func Sync(fdst, fsrc fs.Fs) { err := fdst.Mkdir() if err != nil { - stats.Error() + fs.Stats.Error() log.Fatal("Failed to make destination") } @@ -161,14 +167,14 @@ func Sync(fdst, fsrc Fs) { // Read the destination files first // FIXME could do this in parallel and make it use less memory - delFiles := make(map[string]FsObject) + delFiles := make(map[string]fs.FsObject) for dst := range fdst.List() { delFiles[dst.Remote()] = dst } // Read source files checking them off against dest files to_be_checked := make(PairFsObjectsChan, *transfers) - to_be_uploaded := make(FsObjectsChan, *transfers) + to_be_uploaded := make(fs.FsObjectsChan, *transfers) var checkerWg sync.WaitGroup checkerWg.Add(*checkers) @@ -203,13 +209,13 @@ func Sync(fdst, fsrc Fs) { log.Printf("Waiting for transfers to finish") copierWg.Wait() - if stats.errors != 0 { + if fs.Stats.Errored() { log.Printf("Not deleting files as there were IO errors") return } // Delete the spare files - toDelete := make(FsObjectsChan, *transfers) + toDelete := make(fs.FsObjectsChan, *transfers) go func() { for _, fs := range delFiles { toDelete <- fs @@ -220,24 +226,24 @@ func Sync(fdst, fsrc Fs) { } // Checks the files in fsrc and fdst according to Size and MD5SUM -func Check(fdst, fsrc Fs) { +func Check(fdst, fsrc fs.Fs) { log.Printf("Building file list") // Read the destination files first // FIXME could do this in parallel and make it use less memory - dstFiles := make(map[string]FsObject) + dstFiles := make(map[string]fs.FsObject) for dst := range fdst.List() { dstFiles[dst.Remote()] = dst } // Read the source files checking them against dstFiles // FIXME could do this in parallel and make it use less memory - srcFiles := make(map[string]FsObject) - commonFiles := make(map[string][]FsObject) + srcFiles := make(map[string]fs.FsObject) + commonFiles := make(map[string][]fs.FsObject) for src := range fsrc.List() { remote := src.Remote() if dst, ok := dstFiles[remote]; ok { - commonFiles[remote] = []FsObject{dst, src} + commonFiles[remote] = []fs.FsObject{dst, src} delete(dstFiles, remote) } else { srcFiles[remote] = src @@ -246,17 +252,17 @@ func Check(fdst, fsrc Fs) { log.Printf("Files in %s but not in %s", fdst, fsrc) for remote := range dstFiles { - stats.Error() + fs.Stats.Error() log.Printf(remote) } log.Printf("Files in %s but not in %s", fsrc, fdst) for remote := range srcFiles { - stats.Error() + fs.Stats.Error() log.Printf(remote) } - checks := make(chan []FsObject, *transfers) + checks := make(chan []fs.FsObject, *transfers) go func() { for _, check := range commonFiles { checks <- check @@ -271,47 +277,47 @@ func Check(fdst, fsrc Fs) { defer checkerWg.Done() for check := range checks { dst, src := check[0], check[1] - stats.Checking(src) + fs.Stats.Checking(src) if src.Size() != dst.Size() { - stats.DoneChecking(src) - stats.Error() - FsLog(src, "Sizes differ") + fs.Stats.DoneChecking(src) + fs.Stats.Error() + fs.FsLog(src, "Sizes differ") continue } - same, err := CheckMd5sums(src, dst) - stats.DoneChecking(src) + same, err := fs.CheckMd5sums(src, dst) + fs.Stats.DoneChecking(src) if err != nil { continue } if !same { - stats.Error() - FsLog(src, "Md5sums differ") + fs.Stats.Error() + fs.FsLog(src, "Md5sums differ") } - FsDebug(src, "OK") + fs.FsDebug(src, "OK") } }() } log.Printf("Waiting for checks to finish") checkerWg.Wait() - log.Printf("%d differences found", stats.errors) + log.Printf("%d differences found", fs.Stats.Errors) } // List the Fs to stdout // // Lists in parallel which may get them out of order -func List(f, _ Fs) { +func List(f, _ fs.Fs) { in := f.List() var wg sync.WaitGroup wg.Add(*checkers) for i := 0; i < *checkers; i++ { go func() { defer wg.Done() - for fs := range in { - stats.Checking(fs) - modTime := fs.ModTime() - stats.DoneChecking(fs) - fmt.Printf("%9d %19s %s\n", fs.Size(), modTime.Format("2006-01-02 15:04:05.00000000"), fs.Remote()) + for o := range in { + fs.Stats.Checking(o) + modTime := o.ModTime() + fs.Stats.DoneChecking(o) + fmt.Printf("%9d %19s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.00000000"), o.Remote()) } }() } @@ -319,29 +325,29 @@ func List(f, _ Fs) { } // List the directories/buckets/containers in the Fs to stdout -func ListDir(f, _ Fs) { +func ListDir(f, _ fs.Fs) { for dir := range f.ListDir() { fmt.Printf("%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name) } } // Makes a destination directory or container -func mkdir(fdst, fsrc Fs) { +func mkdir(fdst, fsrc fs.Fs) { err := fdst.Mkdir() if err != nil { - stats.Error() + fs.Stats.Error() log.Fatalf("Mkdir failed: %s", err) } } // Removes a container but not if not empty -func rmdir(fdst, fsrc Fs) { +func rmdir(fdst, fsrc fs.Fs) { if *dry_run { log.Printf("Not deleting %s as -dry-run", fdst) } else { err := fdst.Rmdir() if err != nil { - stats.Error() + fs.Stats.Error() log.Fatalf("Rmdir failed: %s", err) } } @@ -350,11 +356,11 @@ func rmdir(fdst, fsrc Fs) { // Removes a container and all of its contents // // FIXME doesn't delete local directories -func purge(fdst, fsrc Fs) { - if f, ok := fdst.(Purger); ok { +func purge(fdst, fsrc fs.Fs) { + if f, ok := fdst.(fs.Purger); ok { err := f.Purge() if err != nil { - stats.Error() + fs.Stats.Error() log.Fatalf("Purge failed: %s", err) } } else { @@ -367,7 +373,7 @@ func purge(fdst, fsrc Fs) { type Command struct { name string help string - run func(fdst, fsrc Fs) + run func(fdst, fsrc fs.Fs) minArgs, maxArgs int } @@ -516,7 +522,7 @@ func main() { if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { - stats.Error() + fs.Stats.Error() log.Fatal(err) } pprof.StartCPUProfile(f) @@ -540,32 +546,32 @@ func main() { break } else if strings.HasPrefix(command.name, cmd) { if found != nil { - stats.Error() + fs.Stats.Error() log.Fatalf("Not unique - matches multiple commands %q", cmd) } found = command } } if found == nil { - stats.Error() + fs.Stats.Error() log.Fatalf("Unknown command %q", cmd) } found.checkArgs(args) // Make source and destination fs - var fdst, fsrc Fs + var fdst, fsrc fs.Fs var err error if len(args) >= 1 { - fdst, err = NewFs(args[0]) + fdst, err = fs.NewFs(args[0]) if err != nil { - stats.Error() + fs.Stats.Error() log.Fatal("Failed to create file system: ", err) } } if len(args) >= 2 { - fsrc, err = NewFs(args[1]) + fsrc, err = fs.NewFs(args[1]) if err != nil { - stats.Error() + fs.Stats.Error() log.Fatal("Failed to create destination file system: ", err) } fsrc, fdst = fdst, fsrc @@ -575,34 +581,34 @@ func main() { if fsrc != nil { precision := fsrc.Precision() log.Printf("Source precision %s\n", precision) - if precision > *modifyWindow { - *modifyWindow = precision + if precision > fs.Config.ModifyWindow { + fs.Config.ModifyWindow = precision } } if fdst != nil { precision := fdst.Precision() log.Printf("Destination precision %s\n", precision) - if precision > *modifyWindow { - *modifyWindow = precision + if precision > fs.Config.ModifyWindow { + fs.Config.ModifyWindow = precision } } - log.Printf("Modify window is %s\n", *modifyWindow) + log.Printf("Modify window is %s\n", fs.Config.ModifyWindow) // Print the stats every statsInterval go func() { ch := time.Tick(*statsInterval) for { <-ch - stats.Log() + fs.Stats.Log() } }() // Run the actual command if found.run != nil { found.run(fdst, fsrc) - fmt.Println(stats) + fmt.Println(fs.Stats) log.Printf("*** Go routines at exit %d\n", runtime.NumGoroutine()) - if stats.errors > 0 { + if fs.Stats.Errored() { os.Exit(1) } os.Exit(0) diff --git a/fs_s3.go b/s3/fs.go similarity index 74% rename from fs_s3.go rename to s3/fs.go index ed84b3f9b..93a1764ac 100644 --- a/fs_s3.go +++ b/s3/fs.go @@ -1,9 +1,10 @@ // S3 interface -package main +package s3 // FIXME need to prevent anything but ListDir working for s3:// import ( + "../fs" "errors" "flag" "fmt" @@ -22,6 +23,14 @@ import ( "time" ) +// Pattern to match a s3 url +var Match = regexp.MustCompile(`^s3://([^/]*)(.*)$`) + +// Register with Fs +func init() { + fs.Register(Match, NewFs) +} + // Constants const ( metaMtime = "X-Amz-Meta-Mtime" // the meta key to store mtime in @@ -66,12 +75,9 @@ func (f *FsS3) String() string { return fmt.Sprintf("S3 bucket %s", f.bucket) } -// Pattern to match a s3 url -var s3Match = regexp.MustCompile(`^s3://([^/]*)(.*)$`) - // parseParse parses a s3 'url' func s3ParsePath(path string) (bucket, directory string, err error) { - parts := s3Match.FindAllStringSubmatch(path, -1) + parts := Match.FindAllStringSubmatch(path, -1) if len(parts) != 1 || len(parts[0]) != 3 { err = fmt.Errorf("Couldn't parse s3 url %q", path) } else { @@ -113,7 +119,7 @@ func s3Connection() (*s3.S3, error) { } // NewFsS3 contstructs an FsS3 from the path, bucket:path -func NewFsS3(path string) (*FsS3, error) { +func NewFs(path string) (fs.Fs, error) { bucket, directory, err := s3ParsePath(path) if err != nil { return nil, err @@ -137,46 +143,46 @@ func NewFsS3(path string) (*FsS3, error) { // Return an FsObject from a path // // May return nil if an error occurred -func (f *FsS3) NewFsObjectWithInfo(remote string, info *s3.Key) FsObject { - fs := &FsObjectS3{ +func (f *FsS3) NewFsObjectWithInfo(remote string, info *s3.Key) fs.FsObject { + o := &FsObjectS3{ s3: f, remote: remote, } if info != nil { // Set info but not meta var err error - fs.lastModified, err = time.Parse(time.RFC3339, info.LastModified) + o.lastModified, err = time.Parse(time.RFC3339, info.LastModified) if err != nil { - FsLog(fs, "Failed to read last modified: %s", err) - fs.lastModified = time.Now() + fs.FsLog(o, "Failed to read last modified: %s", err) + o.lastModified = time.Now() } - fs.etag = info.ETag - fs.bytes = info.Size + o.etag = info.ETag + o.bytes = info.Size } else { - err := fs.readMetaData() // reads info and meta, returning an error + err := o.readMetaData() // reads info and meta, returning an error if err != nil { // logged already FsDebug("Failed to read info: %s", err) return nil } } - return fs + return o } // Return an FsObject from a path // // May return nil if an error occurred -func (f *FsS3) NewFsObject(remote string) FsObject { +func (f *FsS3) NewFsObject(remote string) fs.FsObject { return f.NewFsObjectWithInfo(remote, nil) } // Walk the path returning a channel of FsObjects -func (f *FsS3) List() FsObjectsChan { - out := make(FsObjectsChan, *checkers) +func (f *FsS3) List() fs.FsObjectsChan { + out := make(fs.FsObjectsChan, fs.Config.Checkers) go func() { // FIXME need to implement ALL loop objects, err := f.b.List("", "", "", 10000) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Couldn't read bucket %q: %s", f.bucket, err) } else { for i := range objects.Contents { @@ -192,17 +198,17 @@ func (f *FsS3) List() FsObjectsChan { } // Lists the buckets -func (f *FsS3) ListDir() FsDirChan { - out := make(FsDirChan, *checkers) +func (f *FsS3) ListDir() fs.FsDirChan { + out := make(fs.FsDirChan, fs.Config.Checkers) go func() { defer close(out) buckets, err := f.c.ListBuckets() if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Couldn't list buckets: %s", err) } else { for _, bucket := range buckets { - out <- &FsDir{ + out <- &fs.FsDir{ Name: bucket.Name, When: bucket.CreationDate, Bytes: -1, @@ -215,7 +221,7 @@ func (f *FsS3) ListDir() FsDirChan { } // Put the FsObject into the bucket -func (f *FsS3) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) { +func (f *FsS3) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.FsObject, error) { // Temporary FsObject under construction fs := &FsObjectS3{s3: f, remote: remote} @@ -253,51 +259,51 @@ func (f *FsS3) Rmdir() error { } // Return the precision -func (fs *FsS3) Precision() time.Duration { +func (f *FsS3) Precision() time.Duration { return time.Nanosecond } // ------------------------------------------------------------ // Return the remote path -func (fs *FsObjectS3) Remote() string { - return fs.remote +func (o *FsObjectS3) Remote() string { + return o.remote } // Md5sum returns the Md5sum of an object returning a lowercase hex string -func (fs *FsObjectS3) Md5sum() (string, error) { - return strings.Trim(strings.ToLower(fs.etag), `"`), nil +func (o *FsObjectS3) Md5sum() (string, error) { + return strings.Trim(strings.ToLower(o.etag), `"`), nil } // Size returns the size of an object in bytes -func (fs *FsObjectS3) Size() int64 { - return fs.bytes +func (o *FsObjectS3) Size() int64 { + return o.bytes } // readMetaData gets the metadata if it hasn't already been fetched // // it also sets the info -func (fs *FsObjectS3) readMetaData() (err error) { - if fs.meta != nil { +func (o *FsObjectS3) readMetaData() (err error) { + if o.meta != nil { return nil } - headers, err := fs.s3.b.Head(fs.remote, nil) + headers, err := o.s3.b.Head(o.remote, nil) if err != nil { - FsDebug(fs, "Failed to read info: %s", err) + fs.FsDebug(o, "Failed to read info: %s", err) return err } size, err := strconv.ParseInt(headers["Content-Length"], 10, 64) if err != nil { - FsDebug(fs, "Failed to read size from: %q", headers) + fs.FsDebug(o, "Failed to read size from: %q", headers) return err } - fs.etag = headers["Etag"] - fs.bytes = size - fs.meta = headers - if fs.lastModified, err = time.Parse(http.TimeFormat, headers["Last-Modified"]); err != nil { - FsLog(fs, "Failed to read last modified from HEAD: %s", err) - fs.lastModified = time.Now() + o.etag = headers["Etag"] + o.bytes = size + o.meta = headers + if o.lastModified, err = time.Parse(http.TimeFormat, headers["Last-Modified"]); err != nil { + fs.FsLog(o, "Failed to read last modified from HEAD: %s", err) + o.lastModified = time.Now() } return nil } @@ -306,58 +312,58 @@ func (fs *FsObjectS3) readMetaData() (err error) { // // It attempts to read the objects mtime and if that isn't present the // LastModified returned in the http headers -func (fs *FsObjectS3) ModTime() time.Time { - err := fs.readMetaData() +func (o *FsObjectS3) ModTime() time.Time { + err := o.readMetaData() if err != nil { - FsLog(fs, "Failed to read metadata: %s", err) + fs.FsLog(o, "Failed to read metadata: %s", err) return time.Now() } // read mtime out of metadata if available - d, ok := fs.meta[metaMtime] + d, ok := o.meta[metaMtime] if !ok { - // FsDebug(fs, "No metadata") - return fs.lastModified + // fs.FsDebug(o, "No metadata") + return o.lastModified } modTime, err := swift.FloatStringToTime(d) if err != nil { - FsLog(fs, "Failed to read mtime from object: %s", err) - return fs.lastModified + fs.FsLog(o, "Failed to read mtime from object: %s", err) + return o.lastModified } return modTime } // Sets the modification time of the local fs object -func (fs *FsObjectS3) SetModTime(modTime time.Time) { - err := fs.readMetaData() +func (o *FsObjectS3) SetModTime(modTime time.Time) { + err := o.readMetaData() if err != nil { - stats.Error() - FsLog(fs, "Failed to read metadata: %s", err) + fs.Stats.Error() + fs.FsLog(o, "Failed to read metadata: %s", err) return } - fs.meta[metaMtime] = swift.TimeToFloatString(modTime) - _, err = fs.s3.b.Update(fs.remote, fs.s3.perm, fs.meta) + o.meta[metaMtime] = swift.TimeToFloatString(modTime) + _, err = o.s3.b.Update(o.remote, o.s3.perm, o.meta) if err != nil { - stats.Error() - FsLog(fs, "Failed to update remote mtime: %s", err) + fs.Stats.Error() + fs.FsLog(o, "Failed to update remote mtime: %s", err) } } // Is this object storable -func (fs *FsObjectS3) Storable() bool { +func (o *FsObjectS3) Storable() bool { return true } // Open an object for read -func (fs *FsObjectS3) Open() (in io.ReadCloser, err error) { - in, err = fs.s3.b.GetReader(fs.remote) +func (o *FsObjectS3) Open() (in io.ReadCloser, err error) { + in, err = o.s3.b.GetReader(o.remote) return } // Remove an object -func (fs *FsObjectS3) Remove() error { - return fs.s3.b.Del(fs.remote) +func (o *FsObjectS3) Remove() error { + return o.s3.b.Del(o.remote) } // Check the interfaces are satisfied -var _ Fs = &FsS3{} -var _ FsObject = &FsObjectS3{} +var _ fs.Fs = &FsS3{} +var _ fs.FsObject = &FsObjectS3{} diff --git a/fs_swift.go b/swift/fs.go similarity index 74% rename from fs_swift.go rename to swift/fs.go index 0d70d2d9c..6c12cc22a 100644 --- a/fs_swift.go +++ b/swift/fs.go @@ -1,9 +1,10 @@ // Swift interface -package main +package swift // FIXME need to prevent anything but ListDir working for swift:// import ( + "../fs" "errors" "flag" "fmt" @@ -16,6 +17,14 @@ import ( "time" ) +// Pattern to match a swift url +var Match = regexp.MustCompile(`^swift://([^/]*)(.*)$`) + +// Register with Fs +func init() { + fs.Register(Match, NewFs) +} + // FsSwift represents a remote swift server type FsSwift struct { c swift.Connection // the connection to the swift server @@ -50,12 +59,9 @@ func (f *FsSwift) String() string { return fmt.Sprintf("Swift container %s", f.container) } -// Pattern to match a swift url -var swiftMatch = regexp.MustCompile(`^swift://([^/]*)(.*)$`) - // parseParse parses a swift 'url' func parsePath(path string) (container, directory string, err error) { - parts := swiftMatch.FindAllStringSubmatch(path, -1) + parts := Match.FindAllStringSubmatch(path, -1) if len(parts) != 1 || len(parts[0]) != 3 { err = fmt.Errorf("Couldn't parse swift url %q", path) } else { @@ -88,8 +94,8 @@ func swiftConnection() (*swift.Connection, error) { return c, nil } -// NewFsSwift contstructs an FsSwift from the path, container:path -func NewFsSwift(path string) (*FsSwift, error) { +// NewFs contstructs an FsSwift from the path, container:path +func NewFs(path string) (fs.Fs, error) { container, directory, err := parsePath(path) if err != nil { return nil, err @@ -108,7 +114,7 @@ func NewFsSwift(path string) (*FsSwift, error) { // Return an FsObject from a path // // May return nil if an error occurred -func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) FsObject { +func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) fs.FsObject { fs := &FsObjectSwift{ swift: f, remote: remote, @@ -129,13 +135,13 @@ func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) FsObjec // Return an FsObject from a path // // May return nil if an error occurred -func (f *FsSwift) NewFsObject(remote string) FsObject { +func (f *FsSwift) NewFsObject(remote string) fs.FsObject { return f.NewFsObjectWithInfo(remote, nil) } // Walk the path returning a channel of FsObjects -func (f *FsSwift) List() FsObjectsChan { - out := make(FsObjectsChan, *checkers) +func (f *FsSwift) List() fs.FsObjectsChan { + out := make(fs.FsObjectsChan, fs.Config.Checkers) go func() { // FIXME use a smaller limit? err := f.c.ObjectsWalk(f.container, nil, func(opts *swift.ObjectsOpts) (interface{}, error) { @@ -151,7 +157,7 @@ func (f *FsSwift) List() FsObjectsChan { return objects, err }) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Couldn't read container %q: %s", f.container, err) } close(out) @@ -160,17 +166,17 @@ func (f *FsSwift) List() FsObjectsChan { } // Lists the containers -func (f *FsSwift) ListDir() FsDirChan { - out := make(FsDirChan, *checkers) +func (f *FsSwift) ListDir() fs.FsDirChan { + out := make(fs.FsDirChan, fs.Config.Checkers) go func() { defer close(out) containers, err := f.c.ContainersAll(nil) if err != nil { - stats.Error() + fs.Stats.Error() log.Printf("Couldn't list containers: %s", err) } else { for _, container := range containers { - out <- &FsDir{ + out <- &fs.FsDir{ Name: container.Name, Bytes: container.Bytes, Count: container.Count, @@ -186,7 +192,7 @@ func (f *FsSwift) ListDir() FsDirChan { // Copy the reader in to the new object which is returned // // The new object may have been created -func (f *FsSwift) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) { +func (f *FsSwift) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.FsObject, error) { // Temporary FsObject under construction fs := &FsObjectSwift{swift: f, remote: remote} @@ -217,35 +223,35 @@ func (fs *FsSwift) Precision() time.Duration { // ------------------------------------------------------------ // Return the remote path -func (fs *FsObjectSwift) Remote() string { - return fs.remote +func (o *FsObjectSwift) Remote() string { + return o.remote } // Md5sum returns the Md5sum of an object returning a lowercase hex string -func (fs *FsObjectSwift) Md5sum() (string, error) { - return strings.ToLower(fs.info.Hash), nil +func (o *FsObjectSwift) Md5sum() (string, error) { + return strings.ToLower(o.info.Hash), nil } // Size returns the size of an object in bytes -func (fs *FsObjectSwift) Size() int64 { - return fs.info.Bytes +func (o *FsObjectSwift) Size() int64 { + return o.info.Bytes } // readMetaData gets the metadata if it hasn't already been fetched // // it also sets the info -func (fs *FsObjectSwift) readMetaData() (err error) { - if fs.meta != nil { +func (o *FsObjectSwift) readMetaData() (err error) { + if o.meta != nil { return nil } - info, h, err := fs.swift.c.Object(fs.swift.container, fs.remote) + info, h, err := o.swift.c.Object(o.swift.container, o.remote) if err != nil { - FsDebug(fs, "Failed to read info: %s", err) + fs.FsDebug(o, "Failed to read info: %s", err) return err } meta := h.ObjectMetadata() - fs.info = info - fs.meta = &meta + o.info = info + o.meta = &meta return nil } @@ -254,52 +260,52 @@ func (fs *FsObjectSwift) readMetaData() (err error) { // // It attempts to read the objects mtime and if that isn't present the // LastModified returned in the http headers -func (fs *FsObjectSwift) ModTime() time.Time { - err := fs.readMetaData() +func (o *FsObjectSwift) ModTime() time.Time { + err := o.readMetaData() if err != nil { - // FsLog(fs, "Failed to read metadata: %s", err) - return fs.info.LastModified + // fs.FsLog(o, "Failed to read metadata: %s", err) + return o.info.LastModified } - modTime, err := fs.meta.GetModTime() + modTime, err := o.meta.GetModTime() if err != nil { - // FsLog(fs, "Failed to read mtime from object: %s", err) - return fs.info.LastModified + // fs.FsLog(o, "Failed to read mtime from object: %s", err) + return o.info.LastModified } return modTime } // Sets the modification time of the local fs object -func (fs *FsObjectSwift) SetModTime(modTime time.Time) { - err := fs.readMetaData() +func (o *FsObjectSwift) SetModTime(modTime time.Time) { + err := o.readMetaData() if err != nil { - stats.Error() - FsLog(fs, "Failed to read metadata: %s", err) + fs.Stats.Error() + fs.FsLog(o, "Failed to read metadata: %s", err) return } - fs.meta.SetModTime(modTime) - err = fs.swift.c.ObjectUpdate(fs.swift.container, fs.remote, fs.meta.ObjectHeaders()) + o.meta.SetModTime(modTime) + err = o.swift.c.ObjectUpdate(o.swift.container, o.remote, o.meta.ObjectHeaders()) if err != nil { - stats.Error() - FsLog(fs, "Failed to update remote mtime: %s", err) + fs.Stats.Error() + fs.FsLog(o, "Failed to update remote mtime: %s", err) } } // Is this object storable -func (fs *FsObjectSwift) Storable() bool { +func (o *FsObjectSwift) Storable() bool { return true } // Open an object for read -func (fs *FsObjectSwift) Open() (in io.ReadCloser, err error) { - in, _, err = fs.swift.c.ObjectOpen(fs.swift.container, fs.remote, true, nil) +func (o *FsObjectSwift) Open() (in io.ReadCloser, err error) { + in, _, err = o.swift.c.ObjectOpen(o.swift.container, o.remote, true, nil) return } // Remove an object -func (fs *FsObjectSwift) Remove() error { - return fs.swift.c.ObjectDelete(fs.swift.container, fs.remote) +func (o *FsObjectSwift) Remove() error { + return o.swift.c.ObjectDelete(o.swift.container, o.remote) } // Check the interfaces are satisfied -var _ Fs = &FsSwift{} -var _ FsObject = &FsObjectSwift{} +var _ fs.Fs = &FsSwift{} +var _ fs.FsObject = &FsObjectSwift{}