mirror of
https://github.com/rclone/rclone.git
synced 2024-11-25 01:44:41 +01:00
Refactor into sub filesystems
This commit is contained in:
parent
2bca3a45bd
commit
6085dc1b5a
@ -1,8 +1,9 @@
|
|||||||
// Drive interface
|
// Drive interface
|
||||||
package main
|
package drive
|
||||||
|
|
||||||
// FIXME drive code is leaking goroutines somehow - reported bug
|
// FIXME drive code is leaking goroutines somehow - reported bug
|
||||||
// https://code.google.com/p/google-api-go-client/issues/detail?id=23
|
// https://code.google.com/p/google-api-go-client/issues/detail?id=23
|
||||||
|
// Now fixed!
|
||||||
|
|
||||||
// FIXME list containers equivalent should list directories?
|
// FIXME list containers equivalent should list directories?
|
||||||
|
|
||||||
@ -22,6 +23,7 @@ package main
|
|||||||
// * files with / in name
|
// * files with / in name
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"../fs"
|
||||||
"code.google.com/p/goauth2/oauth"
|
"code.google.com/p/goauth2/oauth"
|
||||||
"code.google.com/p/google-api-go-client/drive/v2"
|
"code.google.com/p/google-api-go-client/drive/v2"
|
||||||
"errors"
|
"errors"
|
||||||
@ -287,7 +289,7 @@ func NewFsDrive(path string) (*FsDrive, error) {
|
|||||||
// Return an FsObject from a path
|
// Return an FsObject from a path
|
||||||
//
|
//
|
||||||
// May return nil if an error occurred
|
// May return nil if an error occurred
|
||||||
func (f *FsDrive) NewFsObjectWithInfo(remote string, info *drive.File) FsObject {
|
func (f *FsDrive) NewFsObjectWithInfo(remote string, info *drive.File) fs.FsObject {
|
||||||
fs := &FsObjectDrive{
|
fs := &FsObjectDrive{
|
||||||
drive: f,
|
drive: f,
|
||||||
remote: remote,
|
remote: remote,
|
||||||
@ -297,7 +299,7 @@ func (f *FsDrive) NewFsObjectWithInfo(remote string, info *drive.File) FsObject
|
|||||||
} else {
|
} else {
|
||||||
err := fs.readMetaData() // reads info and meta, returning an error
|
err := fs.readMetaData() // reads info and meta, returning an error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// logged already FsDebug("Failed to read info: %s", err)
|
// logged already fs.FsDebug("Failed to read info: %s", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -307,7 +309,7 @@ func (f *FsDrive) NewFsObjectWithInfo(remote string, info *drive.File) FsObject
|
|||||||
// Return an FsObject from a path
|
// Return an FsObject from a path
|
||||||
//
|
//
|
||||||
// May return nil if an error occurred
|
// May return nil if an error occurred
|
||||||
func (f *FsDrive) NewFsObject(remote string) FsObject {
|
func (f *FsDrive) NewFsObject(remote string) fs.FsObject {
|
||||||
return f.NewFsObjectWithInfo(remote, nil)
|
return f.NewFsObjectWithInfo(remote, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -317,7 +319,7 @@ func (f *FsDrive) NewFsObject(remote string) FsObject {
|
|||||||
//
|
//
|
||||||
// This fetches the minimum amount of stuff but does more API calls
|
// This fetches the minimum amount of stuff but does more API calls
|
||||||
// which makes it slow
|
// which makes it slow
|
||||||
func (f *FsDrive) listDirRecursive(dirId string, path string, out FsObjectsChan) error {
|
func (f *FsDrive) listDirRecursive(dirId string, path string, out fs.FsObjectsChan) error {
|
||||||
var subError error
|
var subError error
|
||||||
// Make the API request
|
// Make the API request
|
||||||
_, err := f.listAll(dirId, "", false, false, func(item *drive.File) bool {
|
_, err := f.listAll(dirId, "", false, false, func(item *drive.File) bool {
|
||||||
@ -355,7 +357,7 @@ func (f *FsDrive) listDirRecursive(dirId string, path string, out FsObjectsChan)
|
|||||||
//
|
//
|
||||||
// This is fast in terms of number of API calls, but slow in terms of
|
// This is fast in terms of number of API calls, but slow in terms of
|
||||||
// fetching more data than it needs
|
// fetching more data than it needs
|
||||||
func (f *FsDrive) listDirFull(dirId string, path string, out FsObjectsChan) error {
|
func (f *FsDrive) listDirFull(dirId string, path string, out fs.FsObjectsChan) error {
|
||||||
// Orphans waiting for their parent
|
// Orphans waiting for their parent
|
||||||
orphans := make(map[string][]*drive.File)
|
orphans := make(map[string][]*drive.File)
|
||||||
|
|
||||||
@ -545,13 +547,13 @@ func (f *FsDrive) findRoot(create bool) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Walk the path returning a channel of FsObjects
|
// Walk the path returning a channel of FsObjects
|
||||||
func (f *FsDrive) List() FsObjectsChan {
|
func (f *FsDrive) List() fs.FsObjectsChan {
|
||||||
out := make(FsObjectsChan, *checkers)
|
out := make(fs.FsObjectsChan, fs.Config.Checkers)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
err := f.findRoot(false)
|
err := f.findRoot(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Couldn't find root: %s", err)
|
log.Printf("Couldn't find root: %s", err)
|
||||||
} else {
|
} else {
|
||||||
if *driveFullList {
|
if *driveFullList {
|
||||||
@ -560,7 +562,7 @@ func (f *FsDrive) List() FsObjectsChan {
|
|||||||
err = f.listDirRecursive(f.rootId, "", out)
|
err = f.listDirRecursive(f.rootId, "", out)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("List failed: %s", err)
|
log.Printf("List failed: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -569,17 +571,17 @@ func (f *FsDrive) List() FsObjectsChan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Walk the path returning a channel of FsObjects
|
// Walk the path returning a channel of FsObjects
|
||||||
func (f *FsDrive) ListDir() FsDirChan {
|
func (f *FsDrive) ListDir() fs.FsDirChan {
|
||||||
out := make(FsDirChan, *checkers)
|
out := make(fs.FsDirChan, fs.Config.Checkers)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
err := f.findRoot(false)
|
err := f.findRoot(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Couldn't find root: %s", err)
|
log.Printf("Couldn't find root: %s", err)
|
||||||
} else {
|
} else {
|
||||||
_, err := f.listAll(f.rootId, "", true, false, func(item *drive.File) bool {
|
_, err := f.listAll(f.rootId, "", true, false, func(item *drive.File) bool {
|
||||||
dir := &FsDir{
|
dir := &fs.FsDir{
|
||||||
Name: item.Title,
|
Name: item.Title,
|
||||||
Bytes: -1,
|
Bytes: -1,
|
||||||
Count: -1,
|
Count: -1,
|
||||||
@ -589,7 +591,7 @@ func (f *FsDrive) ListDir() FsDirChan {
|
|||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("ListDir failed: %s", err)
|
log.Printf("ListDir failed: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -602,7 +604,7 @@ func (f *FsDrive) ListDir() FsDirChan {
|
|||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created
|
// The new object may have been created
|
||||||
func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) {
|
func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.FsObject, error) {
|
||||||
// Temporary FsObject under construction
|
// Temporary FsObject under construction
|
||||||
fs := &FsObjectDrive{drive: f, remote: remote}
|
fs := &FsObjectDrive{drive: f, remote: remote}
|
||||||
|
|
||||||
@ -710,45 +712,45 @@ func (f *FsDrive) Purge() error {
|
|||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
// Return the remote path
|
// Return the remote path
|
||||||
func (fs *FsObjectDrive) Remote() string {
|
func (o *FsObjectDrive) Remote() string {
|
||||||
return fs.remote
|
return o.remote
|
||||||
}
|
}
|
||||||
|
|
||||||
// Md5sum returns the Md5sum of an object returning a lowercase hex string
|
// Md5sum returns the Md5sum of an object returning a lowercase hex string
|
||||||
func (fs *FsObjectDrive) Md5sum() (string, error) {
|
func (o *FsObjectDrive) Md5sum() (string, error) {
|
||||||
return fs.md5sum, nil
|
return o.md5sum, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of an object in bytes
|
// Size returns the size of an object in bytes
|
||||||
func (fs *FsObjectDrive) Size() int64 {
|
func (o *FsObjectDrive) Size() int64 {
|
||||||
return fs.bytes
|
return o.bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
// setMetaData sets the fs data from a drive.File
|
// setMetaData sets the fs data from a drive.File
|
||||||
func (fs *FsObjectDrive) setMetaData(info *drive.File) {
|
func (o *FsObjectDrive) setMetaData(info *drive.File) {
|
||||||
fs.id = info.Id
|
o.id = info.Id
|
||||||
fs.url = info.DownloadUrl
|
o.url = info.DownloadUrl
|
||||||
fs.md5sum = strings.ToLower(info.Md5Checksum)
|
o.md5sum = strings.ToLower(info.Md5Checksum)
|
||||||
fs.bytes = info.FileSize
|
o.bytes = info.FileSize
|
||||||
fs.modifiedDate = info.ModifiedDate
|
o.modifiedDate = info.ModifiedDate
|
||||||
}
|
}
|
||||||
|
|
||||||
// readMetaData gets the info if it hasn't already been fetched
|
// readMetaData gets the info if it hasn't already been fetched
|
||||||
func (fs *FsObjectDrive) readMetaData() (err error) {
|
func (o *FsObjectDrive) readMetaData() (err error) {
|
||||||
if fs.id != "" {
|
if o.id != "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
directory, leaf := splitPath(fs.remote)
|
directory, leaf := splitPath(o.remote)
|
||||||
directoryId, err := fs.drive.findDir(directory, false)
|
directoryId, err := o.drive.findDir(directory, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsDebug(fs, "Couldn't find directory: %s", err)
|
fs.FsDebug(o, "Couldn't find directory: %s", err)
|
||||||
return fmt.Errorf("Couldn't find directory: %s", err)
|
return fmt.Errorf("Couldn't find directory: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
found, err := fs.drive.listAll(directoryId, leaf, false, true, func(item *drive.File) bool {
|
found, err := o.drive.listAll(directoryId, leaf, false, true, func(item *drive.File) bool {
|
||||||
if item.Title == leaf {
|
if item.Title == leaf {
|
||||||
fs.setMetaData(item)
|
o.setMetaData(item)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
@ -757,7 +759,7 @@ func (fs *FsObjectDrive) readMetaData() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
FsDebug(fs, "Couldn't find object")
|
fs.FsDebug(o, "Couldn't find object")
|
||||||
return fmt.Errorf("Couldn't find object")
|
return fmt.Errorf("Couldn't find object")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -768,26 +770,26 @@ func (fs *FsObjectDrive) readMetaData() (err error) {
|
|||||||
//
|
//
|
||||||
// It attempts to read the objects mtime and if that isn't present the
|
// It attempts to read the objects mtime and if that isn't present the
|
||||||
// LastModified returned in the http headers
|
// LastModified returned in the http headers
|
||||||
func (fs *FsObjectDrive) ModTime() time.Time {
|
func (o *FsObjectDrive) ModTime() time.Time {
|
||||||
err := fs.readMetaData()
|
err := o.readMetaData()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsLog(fs, "Failed to read metadata: %s", err)
|
fs.FsLog(o, "Failed to read metadata: %s", err)
|
||||||
return time.Now()
|
return time.Now()
|
||||||
}
|
}
|
||||||
modTime, err := time.Parse(time.RFC3339, fs.modifiedDate)
|
modTime, err := time.Parse(time.RFC3339, o.modifiedDate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsLog(fs, "Failed to read mtime from object: %s", err)
|
fs.FsLog(o, "Failed to read mtime from object: %s", err)
|
||||||
return time.Now()
|
return time.Now()
|
||||||
}
|
}
|
||||||
return modTime
|
return modTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets the modification time of the local fs object
|
// Sets the modification time of the local fs object
|
||||||
func (fs *FsObjectDrive) SetModTime(modTime time.Time) {
|
func (o *FsObjectDrive) SetModTime(modTime time.Time) {
|
||||||
err := fs.readMetaData()
|
err := o.readMetaData()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(fs, "Failed to read metadata: %s", err)
|
fs.FsLog(o, "Failed to read metadata: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// New metadata
|
// New metadata
|
||||||
@ -795,23 +797,23 @@ func (fs *FsObjectDrive) SetModTime(modTime time.Time) {
|
|||||||
ModifiedDate: modTime.Format(time.RFC3339Nano),
|
ModifiedDate: modTime.Format(time.RFC3339Nano),
|
||||||
}
|
}
|
||||||
// Set modified date
|
// Set modified date
|
||||||
_, err = fs.drive.svc.Files.Update(fs.id, info).SetModifiedDate(true).Do()
|
_, err = o.drive.svc.Files.Update(o.id, info).SetModifiedDate(true).Do()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(fs, "Failed to update remote mtime: %s", err)
|
fs.FsLog(o, "Failed to update remote mtime: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Is this object storable
|
// Is this object storable
|
||||||
func (fs *FsObjectDrive) Storable() bool {
|
func (o *FsObjectDrive) Storable() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open an object for read
|
// Open an object for read
|
||||||
func (fs *FsObjectDrive) Open() (in io.ReadCloser, err error) {
|
func (o *FsObjectDrive) Open() (in io.ReadCloser, err error) {
|
||||||
req, _ := http.NewRequest("GET", fs.url, nil)
|
req, _ := http.NewRequest("GET", o.url, nil)
|
||||||
req.Header.Set("User-Agent", "rclone/1.0")
|
req.Header.Set("User-Agent", "rclone/1.0")
|
||||||
res, err := fs.drive.client.Do(req)
|
res, err := o.drive.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -823,11 +825,11 @@ func (fs *FsObjectDrive) Open() (in io.ReadCloser, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove an object
|
// Remove an object
|
||||||
func (fs *FsObjectDrive) Remove() error {
|
func (o *FsObjectDrive) Remove() error {
|
||||||
return fs.drive.svc.Files.Delete(fs.id).Do()
|
return o.drive.svc.Files.Delete(o.id).Do()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the interfaces are satisfied
|
// Check the interfaces are satisfied
|
||||||
var _ Fs = &FsDrive{}
|
var _ fs.Fs = &FsDrive{}
|
||||||
var _ Purger = &FsDrive{}
|
var _ fs.Purger = &FsDrive{}
|
||||||
var _ FsObject = &FsObjectDrive{}
|
var _ fs.FsObject = &FsObjectDrive{}
|
@ -1,6 +1,6 @@
|
|||||||
// Accounting and limiting reader
|
// Accounting and limiting reader
|
||||||
|
|
||||||
package main
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -14,7 +14,7 @@ import (
|
|||||||
|
|
||||||
// Globals
|
// Globals
|
||||||
var (
|
var (
|
||||||
stats = NewStats()
|
Stats = NewStats()
|
||||||
)
|
)
|
||||||
|
|
||||||
// Stringset holds some strings
|
// Stringset holds some strings
|
||||||
@ -35,7 +35,7 @@ func (ss StringSet) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stats limits and accounts all transfers
|
// Stats limits and accounts all transfers
|
||||||
type Stats struct {
|
type StatsInfo struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
bytes int64
|
bytes int64
|
||||||
errors int64
|
errors int64
|
||||||
@ -46,24 +46,24 @@ type Stats struct {
|
|||||||
start time.Time
|
start time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStats cretates an initialised Stats
|
// NewStats cretates an initialised StatsInfo
|
||||||
func NewStats() *Stats {
|
func NewStats() *StatsInfo {
|
||||||
return &Stats{
|
return &StatsInfo{
|
||||||
checking: make(StringSet, *checkers),
|
checking: make(StringSet, Config.Checkers),
|
||||||
transferring: make(StringSet, *transfers),
|
transferring: make(StringSet, Config.Transfers),
|
||||||
start: time.Now(),
|
start: time.Now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// String convert the Stats to a string for printing
|
// String convert the StatsInfo to a string for printing
|
||||||
func (s *Stats) String() string {
|
func (s *StatsInfo) String() string {
|
||||||
s.lock.RLock()
|
s.lock.RLock()
|
||||||
defer s.lock.RUnlock()
|
defer s.lock.RUnlock()
|
||||||
dt := time.Now().Sub(stats.start)
|
dt := time.Now().Sub(s.start)
|
||||||
dt_seconds := dt.Seconds()
|
dt_seconds := dt.Seconds()
|
||||||
speed := 0.0
|
speed := 0.0
|
||||||
if dt > 0 {
|
if dt > 0 {
|
||||||
speed = float64(stats.bytes) / 1024 / dt_seconds
|
speed = float64(s.bytes) / 1024 / dt_seconds
|
||||||
}
|
}
|
||||||
buf := &bytes.Buffer{}
|
buf := &bytes.Buffer{}
|
||||||
fmt.Fprintf(buf, `
|
fmt.Fprintf(buf, `
|
||||||
@ -73,10 +73,10 @@ Checks: %10d
|
|||||||
Transferred: %10d
|
Transferred: %10d
|
||||||
Elapsed time: %v
|
Elapsed time: %v
|
||||||
`,
|
`,
|
||||||
stats.bytes, speed,
|
s.bytes, speed,
|
||||||
stats.errors,
|
s.errors,
|
||||||
stats.checks,
|
s.checks,
|
||||||
stats.transfers,
|
s.transfers,
|
||||||
dt)
|
dt)
|
||||||
if len(s.checking) > 0 {
|
if len(s.checking) > 0 {
|
||||||
fmt.Fprintf(buf, "Checking: %s\n", s.checking)
|
fmt.Fprintf(buf, "Checking: %s\n", s.checking)
|
||||||
@ -87,41 +87,48 @@ Elapsed time: %v
|
|||||||
return buf.String()
|
return buf.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log outputs the Stats to the log
|
// Log outputs the StatsInfo to the log
|
||||||
func (s *Stats) Log() {
|
func (s *StatsInfo) Log() {
|
||||||
log.Printf("%v\n", stats)
|
log.Printf("%v\n", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bytes updates the stats for bytes bytes
|
// Bytes updates the stats for bytes bytes
|
||||||
func (s *Stats) Bytes(bytes int64) {
|
func (s *StatsInfo) Bytes(bytes int64) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
s.bytes += bytes
|
s.bytes += bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
// Errors updates the stats for errors
|
// Errors updates the stats for errors
|
||||||
func (s *Stats) Errors(errors int64) {
|
func (s *StatsInfo) Errors(errors int64) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
s.errors += errors
|
s.errors += errors
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Errored returns whether there have been any errors
|
||||||
|
func (s *StatsInfo) Errored() bool {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
return s.errors != 0
|
||||||
|
}
|
||||||
|
|
||||||
// Error adds a single error into the stats
|
// Error adds a single error into the stats
|
||||||
func (s *Stats) Error() {
|
func (s *StatsInfo) Error() {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
s.errors += 1
|
s.errors += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checking adds a check into the stats
|
// Checking adds a check into the stats
|
||||||
func (s *Stats) Checking(fs FsObject) {
|
func (s *StatsInfo) Checking(fs FsObject) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
s.checking[fs.Remote()] = true
|
s.checking[fs.Remote()] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// DoneChecking removes a check from the stats
|
// DoneChecking removes a check from the stats
|
||||||
func (s *Stats) DoneChecking(fs FsObject) {
|
func (s *StatsInfo) DoneChecking(fs FsObject) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
delete(s.checking, fs.Remote())
|
delete(s.checking, fs.Remote())
|
||||||
@ -129,14 +136,14 @@ func (s *Stats) DoneChecking(fs FsObject) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Transferring adds a transfer into the stats
|
// Transferring adds a transfer into the stats
|
||||||
func (s *Stats) Transferring(fs FsObject) {
|
func (s *StatsInfo) Transferring(fs FsObject) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
s.transferring[fs.Remote()] = true
|
s.transferring[fs.Remote()] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// DoneTransferring removes a transfer from the stats
|
// DoneTransferring removes a transfer from the stats
|
||||||
func (s *Stats) DoneTransferring(fs FsObject) {
|
func (s *StatsInfo) DoneTransferring(fs FsObject) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
delete(s.transferring, fs.Remote())
|
delete(s.transferring, fs.Remote())
|
||||||
@ -160,7 +167,7 @@ func NewAccount(in io.ReadCloser) *Account {
|
|||||||
func (file *Account) Read(p []byte) (n int, err error) {
|
func (file *Account) Read(p []byte) (n int, err error) {
|
||||||
n, err = file.in.Read(p)
|
n, err = file.in.Read(p)
|
||||||
file.bytes += int64(n)
|
file.bytes += int64(n)
|
||||||
stats.Bytes(int64(n))
|
Stats.Bytes(int64(n))
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
// FIXME Do something?
|
// FIXME Do something?
|
||||||
}
|
}
|
@ -1,14 +1,49 @@
|
|||||||
// File system interface
|
// File system interface
|
||||||
|
|
||||||
package main
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"regexp"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Globals
|
||||||
|
var (
|
||||||
|
// Global config
|
||||||
|
Config = &FsConfig{}
|
||||||
|
// Filesystem registry
|
||||||
|
fsRegistry []fsRegistryItem
|
||||||
|
)
|
||||||
|
|
||||||
|
// Filesystem config options
|
||||||
|
type FsConfig struct {
|
||||||
|
Verbose bool
|
||||||
|
Quiet bool
|
||||||
|
ModifyWindow time.Duration
|
||||||
|
Checkers int
|
||||||
|
Transfers int
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME need local to go last
|
||||||
|
|
||||||
|
// Filesystem registry item
|
||||||
|
type fsRegistryItem struct {
|
||||||
|
match *regexp.Regexp // if this matches then can call newFs
|
||||||
|
newFs func(string) (Fs, error) // create a new file system
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register a filesystem
|
||||||
|
//
|
||||||
|
// If a path matches with match then can call newFs on it
|
||||||
|
//
|
||||||
|
// Fs modules should use this in an init() function
|
||||||
|
func Register(match *regexp.Regexp, newFs func(string) (Fs, error)) {
|
||||||
|
fsRegistry = append(fsRegistry, fsRegistryItem{match: match, newFs: newFs})
|
||||||
|
}
|
||||||
|
|
||||||
// A Filesystem, describes the local filesystem and the remote object store
|
// A Filesystem, describes the local filesystem and the remote object store
|
||||||
type Fs interface {
|
type Fs interface {
|
||||||
// String returns a description of the FS
|
// String returns a description of the FS
|
||||||
@ -100,21 +135,17 @@ type FsDirChan chan *FsDir
|
|||||||
//
|
//
|
||||||
// FIXME make more generic
|
// FIXME make more generic
|
||||||
func NewFs(path string) (Fs, error) {
|
func NewFs(path string) (Fs, error) {
|
||||||
if swiftMatch.MatchString(path) {
|
for _, item := range fsRegistry {
|
||||||
return NewFsSwift(path)
|
if item.match.MatchString(path) {
|
||||||
|
return item.newFs(path)
|
||||||
}
|
}
|
||||||
if s3Match.MatchString(path) {
|
|
||||||
return NewFsS3(path)
|
|
||||||
}
|
}
|
||||||
if driveMatch.MatchString(path) {
|
panic("Not found") // FIXME
|
||||||
return NewFsDrive(path)
|
|
||||||
}
|
|
||||||
return NewFsLocal(path)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write debuging output for this FsObject
|
// Write debuging output for this FsObject
|
||||||
func FsDebug(fs FsObject, text string, args ...interface{}) {
|
func FsDebug(fs FsObject, text string, args ...interface{}) {
|
||||||
if *verbose {
|
if Config.Verbose {
|
||||||
out := fmt.Sprintf(text, args...)
|
out := fmt.Sprintf(text, args...)
|
||||||
log.Printf("%s: %s", fs.Remote(), out)
|
log.Printf("%s: %s", fs.Remote(), out)
|
||||||
}
|
}
|
||||||
@ -122,7 +153,7 @@ func FsDebug(fs FsObject, text string, args ...interface{}) {
|
|||||||
|
|
||||||
// Write log output for this FsObject
|
// Write log output for this FsObject
|
||||||
func FsLog(fs FsObject, text string, args ...interface{}) {
|
func FsLog(fs FsObject, text string, args ...interface{}) {
|
||||||
if !*quiet {
|
if !Config.Quiet {
|
||||||
out := fmt.Sprintf(text, args...)
|
out := fmt.Sprintf(text, args...)
|
||||||
log.Printf("%s: %s", fs.Remote(), out)
|
log.Printf("%s: %s", fs.Remote(), out)
|
||||||
}
|
}
|
||||||
@ -145,13 +176,13 @@ func checkClose(c io.Closer, err *error) {
|
|||||||
func CheckMd5sums(src, dst FsObject) (bool, error) {
|
func CheckMd5sums(src, dst FsObject) (bool, error) {
|
||||||
srcMd5, err := src.Md5sum()
|
srcMd5, err := src.Md5sum()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
Stats.Error()
|
||||||
FsLog(src, "Failed to calculate src md5: %s", err)
|
FsLog(src, "Failed to calculate src md5: %s", err)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
dstMd5, err := dst.Md5sum()
|
dstMd5, err := dst.Md5sum()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
Stats.Error()
|
||||||
FsLog(dst, "Failed to calculate dst md5: %s", err)
|
FsLog(dst, "Failed to calculate dst md5: %s", err)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -186,10 +217,11 @@ func Equal(src, dst FsObject) bool {
|
|||||||
srcModTime := src.ModTime()
|
srcModTime := src.ModTime()
|
||||||
dstModTime := dst.ModTime()
|
dstModTime := dst.ModTime()
|
||||||
dt := dstModTime.Sub(srcModTime)
|
dt := dstModTime.Sub(srcModTime)
|
||||||
if dt >= *modifyWindow || dt <= -*modifyWindow {
|
ModifyWindow := Config.ModifyWindow
|
||||||
|
if dt >= ModifyWindow || dt <= -ModifyWindow {
|
||||||
FsDebug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime)
|
FsDebug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime)
|
||||||
} else {
|
} else {
|
||||||
FsDebug(src, "Size and modification time differ by %s (within %s)", dt, *modifyWindow)
|
FsDebug(src, "Size and modification time differ by %s (within %s)", dt, ModifyWindow)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,7 +245,7 @@ func Equal(src, dst FsObject) bool {
|
|||||||
func Copy(f Fs, src FsObject) {
|
func Copy(f Fs, src FsObject) {
|
||||||
in0, err := src.Open()
|
in0, err := src.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
Stats.Error()
|
||||||
FsLog(src, "Failed to open: %s", err)
|
FsLog(src, "Failed to open: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -225,13 +257,13 @@ func Copy(f Fs, src FsObject) {
|
|||||||
err = inErr
|
err = inErr
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
Stats.Error()
|
||||||
FsLog(src, "Failed to copy: %s", err)
|
FsLog(src, "Failed to copy: %s", err)
|
||||||
if dst != nil {
|
if dst != nil {
|
||||||
FsDebug(dst, "Removing failed copy")
|
FsDebug(dst, "Removing failed copy")
|
||||||
removeErr := dst.Remove()
|
removeErr := dst.Remove()
|
||||||
if removeErr != nil {
|
if removeErr != nil {
|
||||||
stats.Error()
|
Stats.Error()
|
||||||
FsLog(dst, "Failed to remove failed copy: %s", removeErr)
|
FsLog(dst, "Failed to remove failed copy: %s", removeErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,7 +1,8 @@
|
|||||||
// Local filesystem interface
|
// Local filesystem interface
|
||||||
package main
|
package local
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"../fs"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -10,10 +11,19 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Pattern to match a local url (matches anything)
|
||||||
|
var Match = regexp.MustCompile(``)
|
||||||
|
|
||||||
|
// Register with Fs
|
||||||
|
func init() {
|
||||||
|
fs.Register(Match, NewFs)
|
||||||
|
}
|
||||||
|
|
||||||
// FsLocal represents a local filesystem rooted at root
|
// FsLocal represents a local filesystem rooted at root
|
||||||
type FsLocal struct {
|
type FsLocal struct {
|
||||||
root string // The root directory
|
root string // The root directory
|
||||||
@ -30,8 +40,8 @@ type FsObjectLocal struct {
|
|||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
// NewFsLocal contstructs an FsLocal from the path
|
// NewFs contstructs an FsLocal from the path
|
||||||
func NewFsLocal(root string) (*FsLocal, error) {
|
func NewFs(root string) (fs.Fs, error) {
|
||||||
root = path.Clean(root)
|
root = path.Clean(root)
|
||||||
f := &FsLocal{root: root}
|
f := &FsLocal{root: root}
|
||||||
return f, nil
|
return f, nil
|
||||||
@ -45,42 +55,42 @@ func (f *FsLocal) String() string {
|
|||||||
// Return an FsObject from a path
|
// Return an FsObject from a path
|
||||||
//
|
//
|
||||||
// May return nil if an error occurred
|
// May return nil if an error occurred
|
||||||
func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) FsObject {
|
func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) fs.FsObject {
|
||||||
path := filepath.Join(f.root, remote)
|
path := filepath.Join(f.root, remote)
|
||||||
fs := &FsObjectLocal{remote: remote, path: path}
|
o := &FsObjectLocal{remote: remote, path: path}
|
||||||
if info != nil {
|
if info != nil {
|
||||||
fs.info = info
|
o.info = info
|
||||||
} else {
|
} else {
|
||||||
err := fs.lstat()
|
err := o.lstat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsDebug(fs, "Failed to stat %s: %s", path, err)
|
fs.FsDebug(o, "Failed to stat %s: %s", path, err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fs
|
return o
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return an FsObject from a path
|
// Return an FsObject from a path
|
||||||
//
|
//
|
||||||
// May return nil if an error occurred
|
// May return nil if an error occurred
|
||||||
func (f *FsLocal) NewFsObject(remote string) FsObject {
|
func (f *FsLocal) NewFsObject(remote string) fs.FsObject {
|
||||||
return f.NewFsObjectWithInfo(remote, nil)
|
return f.NewFsObjectWithInfo(remote, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// List the path returning a channel of FsObjects
|
// List the path returning a channel of FsObjects
|
||||||
//
|
//
|
||||||
// Ignores everything which isn't Storable, eg links etc
|
// Ignores everything which isn't Storable, eg links etc
|
||||||
func (f *FsLocal) List() FsObjectsChan {
|
func (f *FsLocal) List() fs.FsObjectsChan {
|
||||||
out := make(FsObjectsChan, *checkers)
|
out := make(fs.FsObjectsChan, fs.Config.Checkers)
|
||||||
go func() {
|
go func() {
|
||||||
err := filepath.Walk(f.root, func(path string, fi os.FileInfo, err error) error {
|
err := filepath.Walk(f.root, func(path string, fi os.FileInfo, err error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Failed to open directory: %s: %s", path, err)
|
log.Printf("Failed to open directory: %s: %s", path, err)
|
||||||
} else {
|
} else {
|
||||||
remote, err := filepath.Rel(f.root, path)
|
remote, err := filepath.Rel(f.root, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Failed to get relative path %s: %s", path, err)
|
log.Printf("Failed to get relative path %s: %s", path, err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -97,7 +107,7 @@ func (f *FsLocal) List() FsObjectsChan {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Failed to open directory: %s: %s", f.root, err)
|
log.Printf("Failed to open directory: %s: %s", f.root, err)
|
||||||
}
|
}
|
||||||
close(out)
|
close(out)
|
||||||
@ -106,18 +116,18 @@ func (f *FsLocal) List() FsObjectsChan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Walk the path returning a channel of FsObjects
|
// Walk the path returning a channel of FsObjects
|
||||||
func (f *FsLocal) ListDir() FsDirChan {
|
func (f *FsLocal) ListDir() fs.FsDirChan {
|
||||||
out := make(FsDirChan, *checkers)
|
out := make(fs.FsDirChan, fs.Config.Checkers)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
items, err := ioutil.ReadDir(f.root)
|
items, err := ioutil.ReadDir(f.root)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Couldn't find read directory: %s", err)
|
log.Printf("Couldn't find read directory: %s", err)
|
||||||
} else {
|
} else {
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
if item.IsDir() {
|
if item.IsDir() {
|
||||||
dir := &FsDir{
|
dir := &fs.FsDir{
|
||||||
Name: item.Name(),
|
Name: item.Name(),
|
||||||
When: item.ModTime(),
|
When: item.ModTime(),
|
||||||
Bytes: 0,
|
Bytes: 0,
|
||||||
@ -127,7 +137,7 @@ func (f *FsLocal) ListDir() FsDirChan {
|
|||||||
dirpath := path.Join(f.root, item.Name())
|
dirpath := path.Join(f.root, item.Name())
|
||||||
err := filepath.Walk(dirpath, func(path string, fi os.FileInfo, err error) error {
|
err := filepath.Walk(dirpath, func(path string, fi os.FileInfo, err error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Failed to open directory: %s: %s", path, err)
|
log.Printf("Failed to open directory: %s: %s", path, err)
|
||||||
} else {
|
} else {
|
||||||
dir.Count += 1
|
dir.Count += 1
|
||||||
@ -136,7 +146,7 @@ func (f *FsLocal) ListDir() FsDirChan {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Failed to open directory: %s: %s", dirpath, err)
|
log.Printf("Failed to open directory: %s: %s", dirpath, err)
|
||||||
}
|
}
|
||||||
out <- dir
|
out <- dir
|
||||||
@ -149,7 +159,7 @@ func (f *FsLocal) ListDir() FsDirChan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Puts the FsObject to the local filesystem
|
// Puts the FsObject to the local filesystem
|
||||||
func (f *FsLocal) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) {
|
func (f *FsLocal) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.FsObject, error) {
|
||||||
dstPath := filepath.Join(f.root, remote)
|
dstPath := filepath.Join(f.root, remote)
|
||||||
// Temporary FsObject under construction
|
// Temporary FsObject under construction
|
||||||
fs := &FsObjectLocal{remote: remote, path: dstPath}
|
fs := &FsObjectLocal{remote: remote, path: dstPath}
|
||||||
@ -225,7 +235,7 @@ func (f *FsLocal) readPrecision() (precision time.Duration) {
|
|||||||
for duration := time.Duration(1); duration < time.Second; duration *= 10 {
|
for duration := time.Duration(1); duration < time.Second; duration *= 10 {
|
||||||
// Current time with delta
|
// Current time with delta
|
||||||
t := time.Unix(time.Now().Unix(), int64(duration))
|
t := time.Unix(time.Now().Unix(), int64(duration))
|
||||||
err := Chtimes(path, t, t)
|
err := os.Chtimes(path, t, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// fmt.Println("Failed to Chtimes", err)
|
// fmt.Println("Failed to Chtimes", err)
|
||||||
break
|
break
|
||||||
@ -251,78 +261,78 @@ func (f *FsLocal) readPrecision() (precision time.Duration) {
|
|||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
// Return the remote path
|
// Return the remote path
|
||||||
func (fs *FsObjectLocal) Remote() string {
|
func (o *FsObjectLocal) Remote() string {
|
||||||
return fs.remote
|
return o.remote
|
||||||
}
|
}
|
||||||
|
|
||||||
// Md5sum calculates the Md5sum of a file returning a lowercase hex string
|
// Md5sum calculates the Md5sum of a file returning a lowercase hex string
|
||||||
func (fs *FsObjectLocal) Md5sum() (string, error) {
|
func (o *FsObjectLocal) Md5sum() (string, error) {
|
||||||
in, err := os.Open(fs.path)
|
in, err := os.Open(o.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(fs, "Failed to open: %s", err)
|
fs.FsLog(o, "Failed to open: %s", err)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
defer in.Close() // FIXME ignoring error
|
defer in.Close() // FIXME ignoring error
|
||||||
hash := md5.New()
|
hash := md5.New()
|
||||||
_, err = io.Copy(hash, in)
|
_, err = io.Copy(hash, in)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(fs, "Failed to read: %s", err)
|
fs.FsLog(o, "Failed to read: %s", err)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("%x", hash.Sum(nil)), nil
|
return fmt.Sprintf("%x", hash.Sum(nil)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of an object in bytes
|
// Size returns the size of an object in bytes
|
||||||
func (fs *FsObjectLocal) Size() int64 {
|
func (o *FsObjectLocal) Size() int64 {
|
||||||
return fs.info.Size()
|
return o.info.Size()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ModTime returns the modification time of the object
|
// ModTime returns the modification time of the object
|
||||||
func (fs *FsObjectLocal) ModTime() time.Time {
|
func (o *FsObjectLocal) ModTime() time.Time {
|
||||||
return fs.info.ModTime()
|
return o.info.ModTime()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets the modification time of the local fs object
|
// Sets the modification time of the local fs object
|
||||||
func (fs *FsObjectLocal) SetModTime(modTime time.Time) {
|
func (o *FsObjectLocal) SetModTime(modTime time.Time) {
|
||||||
err := Chtimes(fs.path, modTime, modTime)
|
err := os.Chtimes(o.path, modTime, modTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsDebug(fs, "Failed to set mtime on file: %s", err)
|
fs.FsDebug(o, "Failed to set mtime on file: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Is this object storable
|
// Is this object storable
|
||||||
func (fs *FsObjectLocal) Storable() bool {
|
func (o *FsObjectLocal) Storable() bool {
|
||||||
mode := fs.info.Mode()
|
mode := o.info.Mode()
|
||||||
if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 {
|
if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 {
|
||||||
FsDebug(fs, "Can't transfer non file/directory")
|
fs.FsDebug(o, "Can't transfer non file/directory")
|
||||||
return false
|
return false
|
||||||
} else if mode&os.ModeDir != 0 {
|
} else if mode&os.ModeDir != 0 {
|
||||||
FsDebug(fs, "FIXME Skipping directory")
|
fs.FsDebug(o, "FIXME Skipping directory")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open an object for read
|
// Open an object for read
|
||||||
func (fs *FsObjectLocal) Open() (in io.ReadCloser, err error) {
|
func (o *FsObjectLocal) Open() (in io.ReadCloser, err error) {
|
||||||
in, err = os.Open(fs.path)
|
in, err = os.Open(o.path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat a FsObject into info
|
// Stat a FsObject into info
|
||||||
func (fs *FsObjectLocal) lstat() error {
|
func (o *FsObjectLocal) lstat() error {
|
||||||
info, err := os.Lstat(fs.path)
|
info, err := os.Lstat(o.path)
|
||||||
fs.info = info
|
o.info = info
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove an object
|
// Remove an object
|
||||||
func (fs *FsObjectLocal) Remove() error {
|
func (o *FsObjectLocal) Remove() error {
|
||||||
return os.Remove(fs.path)
|
return os.Remove(o.path)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the interfaces are satisfied
|
// Check the interfaces are satisfied
|
||||||
var _ Fs = &FsLocal{}
|
var _ fs.Fs = &FsLocal{}
|
||||||
var _ FsObject = &FsObjectLocal{}
|
var _ fs.FsObject = &FsObjectLocal{}
|
168
rclone.go
168
rclone.go
@ -4,6 +4,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"./fs"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
@ -13,6 +14,11 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
// Active file systems
|
||||||
|
_ "./drive"
|
||||||
|
_ "./local"
|
||||||
|
_ "./s3"
|
||||||
|
_ "./swift"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Globals
|
// Globals
|
||||||
@ -28,17 +34,17 @@ var (
|
|||||||
modifyWindow = flag.Duration("modify-window", time.Nanosecond, "Max time diff to be considered the same")
|
modifyWindow = flag.Duration("modify-window", time.Nanosecond, "Max time diff to be considered the same")
|
||||||
)
|
)
|
||||||
|
|
||||||
// A pair of FsObjects
|
// A pair of fs.FsObjects
|
||||||
type PairFsObjects struct {
|
type PairFsObjects struct {
|
||||||
src, dst FsObject
|
src, dst fs.FsObject
|
||||||
}
|
}
|
||||||
|
|
||||||
type PairFsObjectsChan chan PairFsObjects
|
type PairFsObjectsChan chan PairFsObjects
|
||||||
|
|
||||||
// Check to see if src needs to be copied to dst and if so puts it in out
|
// Check to see if src needs to be copied to dst and if so puts it in out
|
||||||
func checkOne(src, dst FsObject, out FsObjectsChan) {
|
func checkOne(src, dst fs.FsObject, out fs.FsObjectsChan) {
|
||||||
if dst == nil {
|
if dst == nil {
|
||||||
FsDebug(src, "Couldn't find local file - download")
|
fs.FsDebug(src, "Couldn't find local file - download")
|
||||||
out <- src
|
out <- src
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -47,8 +53,8 @@ func checkOne(src, dst FsObject, out FsObjectsChan) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Check to see if changed or not
|
// Check to see if changed or not
|
||||||
if Equal(src, dst) {
|
if fs.Equal(src, dst) {
|
||||||
FsDebug(src, "Unchanged skipping")
|
fs.FsDebug(src, "Unchanged skipping")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
out <- src
|
out <- src
|
||||||
@ -57,49 +63,49 @@ func checkOne(src, dst FsObject, out FsObjectsChan) {
|
|||||||
// Read FsObjects~s on in send to out if they need uploading
|
// Read FsObjects~s on in send to out if they need uploading
|
||||||
//
|
//
|
||||||
// FIXME potentially doing lots of MD5SUMS at once
|
// FIXME potentially doing lots of MD5SUMS at once
|
||||||
func PairChecker(in PairFsObjectsChan, out FsObjectsChan, wg *sync.WaitGroup) {
|
func PairChecker(in PairFsObjectsChan, out fs.FsObjectsChan, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for pair := range in {
|
for pair := range in {
|
||||||
src := pair.src
|
src := pair.src
|
||||||
stats.Checking(src)
|
fs.Stats.Checking(src)
|
||||||
checkOne(src, pair.dst, out)
|
checkOne(src, pair.dst, out)
|
||||||
stats.DoneChecking(src)
|
fs.Stats.DoneChecking(src)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read FsObjects~s on in send to out if they need uploading
|
// Read FsObjects~s on in send to out if they need uploading
|
||||||
//
|
//
|
||||||
// FIXME potentially doing lots of MD5SUMS at once
|
// FIXME potentially doing lots of MD5SUMS at once
|
||||||
func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
|
func Checker(in, out fs.FsObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for src := range in {
|
for src := range in {
|
||||||
stats.Checking(src)
|
fs.Stats.Checking(src)
|
||||||
dst := fdst.NewFsObject(src.Remote())
|
dst := fdst.NewFsObject(src.Remote())
|
||||||
checkOne(src, dst, out)
|
checkOne(src, dst, out)
|
||||||
stats.DoneChecking(src)
|
fs.Stats.DoneChecking(src)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read FsObjects on in and copy them
|
// Read FsObjects on in and copy them
|
||||||
func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
|
func Copier(in fs.FsObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for src := range in {
|
for src := range in {
|
||||||
stats.Transferring(src)
|
fs.Stats.Transferring(src)
|
||||||
Copy(fdst, src)
|
fs.Copy(fdst, src)
|
||||||
stats.DoneTransferring(src)
|
fs.Stats.DoneTransferring(src)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copies fsrc into fdst
|
// Copies fsrc into fdst
|
||||||
func CopyFs(fdst, fsrc Fs) {
|
func CopyFs(fdst, fsrc fs.Fs) {
|
||||||
err := fdst.Mkdir()
|
err := fdst.Mkdir()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatal("Failed to make destination")
|
log.Fatal("Failed to make destination")
|
||||||
}
|
}
|
||||||
|
|
||||||
to_be_checked := fsrc.List()
|
to_be_checked := fsrc.List()
|
||||||
to_be_uploaded := make(FsObjectsChan, *transfers)
|
to_be_uploaded := make(fs.FsObjectsChan, *transfers)
|
||||||
|
|
||||||
var checkerWg sync.WaitGroup
|
var checkerWg sync.WaitGroup
|
||||||
checkerWg.Add(*checkers)
|
checkerWg.Add(*checkers)
|
||||||
@ -121,7 +127,7 @@ func CopyFs(fdst, fsrc Fs) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete all the files passed in the channel
|
// Delete all the files passed in the channel
|
||||||
func DeleteFiles(to_be_deleted FsObjectsChan) {
|
func DeleteFiles(to_be_deleted fs.FsObjectsChan) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(*transfers)
|
wg.Add(*transfers)
|
||||||
for i := 0; i < *transfers; i++ {
|
for i := 0; i < *transfers; i++ {
|
||||||
@ -129,16 +135,16 @@ func DeleteFiles(to_be_deleted FsObjectsChan) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for dst := range to_be_deleted {
|
for dst := range to_be_deleted {
|
||||||
if *dry_run {
|
if *dry_run {
|
||||||
FsDebug(dst, "Not deleting as -dry-run")
|
fs.FsDebug(dst, "Not deleting as -dry-run")
|
||||||
} else {
|
} else {
|
||||||
stats.Checking(dst)
|
fs.Stats.Checking(dst)
|
||||||
err := dst.Remove()
|
err := dst.Remove()
|
||||||
stats.DoneChecking(dst)
|
fs.Stats.DoneChecking(dst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(dst, "Couldn't delete: %s", err)
|
fs.FsLog(dst, "Couldn't delete: %s", err)
|
||||||
} else {
|
} else {
|
||||||
FsDebug(dst, "Deleted")
|
fs.FsDebug(dst, "Deleted")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -150,10 +156,10 @@ func DeleteFiles(to_be_deleted FsObjectsChan) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Syncs fsrc into fdst
|
// Syncs fsrc into fdst
|
||||||
func Sync(fdst, fsrc Fs) {
|
func Sync(fdst, fsrc fs.Fs) {
|
||||||
err := fdst.Mkdir()
|
err := fdst.Mkdir()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatal("Failed to make destination")
|
log.Fatal("Failed to make destination")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,14 +167,14 @@ func Sync(fdst, fsrc Fs) {
|
|||||||
|
|
||||||
// Read the destination files first
|
// Read the destination files first
|
||||||
// FIXME could do this in parallel and make it use less memory
|
// FIXME could do this in parallel and make it use less memory
|
||||||
delFiles := make(map[string]FsObject)
|
delFiles := make(map[string]fs.FsObject)
|
||||||
for dst := range fdst.List() {
|
for dst := range fdst.List() {
|
||||||
delFiles[dst.Remote()] = dst
|
delFiles[dst.Remote()] = dst
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read source files checking them off against dest files
|
// Read source files checking them off against dest files
|
||||||
to_be_checked := make(PairFsObjectsChan, *transfers)
|
to_be_checked := make(PairFsObjectsChan, *transfers)
|
||||||
to_be_uploaded := make(FsObjectsChan, *transfers)
|
to_be_uploaded := make(fs.FsObjectsChan, *transfers)
|
||||||
|
|
||||||
var checkerWg sync.WaitGroup
|
var checkerWg sync.WaitGroup
|
||||||
checkerWg.Add(*checkers)
|
checkerWg.Add(*checkers)
|
||||||
@ -203,13 +209,13 @@ func Sync(fdst, fsrc Fs) {
|
|||||||
log.Printf("Waiting for transfers to finish")
|
log.Printf("Waiting for transfers to finish")
|
||||||
copierWg.Wait()
|
copierWg.Wait()
|
||||||
|
|
||||||
if stats.errors != 0 {
|
if fs.Stats.Errored() {
|
||||||
log.Printf("Not deleting files as there were IO errors")
|
log.Printf("Not deleting files as there were IO errors")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete the spare files
|
// Delete the spare files
|
||||||
toDelete := make(FsObjectsChan, *transfers)
|
toDelete := make(fs.FsObjectsChan, *transfers)
|
||||||
go func() {
|
go func() {
|
||||||
for _, fs := range delFiles {
|
for _, fs := range delFiles {
|
||||||
toDelete <- fs
|
toDelete <- fs
|
||||||
@ -220,24 +226,24 @@ func Sync(fdst, fsrc Fs) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Checks the files in fsrc and fdst according to Size and MD5SUM
|
// Checks the files in fsrc and fdst according to Size and MD5SUM
|
||||||
func Check(fdst, fsrc Fs) {
|
func Check(fdst, fsrc fs.Fs) {
|
||||||
log.Printf("Building file list")
|
log.Printf("Building file list")
|
||||||
|
|
||||||
// Read the destination files first
|
// Read the destination files first
|
||||||
// FIXME could do this in parallel and make it use less memory
|
// FIXME could do this in parallel and make it use less memory
|
||||||
dstFiles := make(map[string]FsObject)
|
dstFiles := make(map[string]fs.FsObject)
|
||||||
for dst := range fdst.List() {
|
for dst := range fdst.List() {
|
||||||
dstFiles[dst.Remote()] = dst
|
dstFiles[dst.Remote()] = dst
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the source files checking them against dstFiles
|
// Read the source files checking them against dstFiles
|
||||||
// FIXME could do this in parallel and make it use less memory
|
// FIXME could do this in parallel and make it use less memory
|
||||||
srcFiles := make(map[string]FsObject)
|
srcFiles := make(map[string]fs.FsObject)
|
||||||
commonFiles := make(map[string][]FsObject)
|
commonFiles := make(map[string][]fs.FsObject)
|
||||||
for src := range fsrc.List() {
|
for src := range fsrc.List() {
|
||||||
remote := src.Remote()
|
remote := src.Remote()
|
||||||
if dst, ok := dstFiles[remote]; ok {
|
if dst, ok := dstFiles[remote]; ok {
|
||||||
commonFiles[remote] = []FsObject{dst, src}
|
commonFiles[remote] = []fs.FsObject{dst, src}
|
||||||
delete(dstFiles, remote)
|
delete(dstFiles, remote)
|
||||||
} else {
|
} else {
|
||||||
srcFiles[remote] = src
|
srcFiles[remote] = src
|
||||||
@ -246,17 +252,17 @@ func Check(fdst, fsrc Fs) {
|
|||||||
|
|
||||||
log.Printf("Files in %s but not in %s", fdst, fsrc)
|
log.Printf("Files in %s but not in %s", fdst, fsrc)
|
||||||
for remote := range dstFiles {
|
for remote := range dstFiles {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf(remote)
|
log.Printf(remote)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Files in %s but not in %s", fsrc, fdst)
|
log.Printf("Files in %s but not in %s", fsrc, fdst)
|
||||||
for remote := range srcFiles {
|
for remote := range srcFiles {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf(remote)
|
log.Printf(remote)
|
||||||
}
|
}
|
||||||
|
|
||||||
checks := make(chan []FsObject, *transfers)
|
checks := make(chan []fs.FsObject, *transfers)
|
||||||
go func() {
|
go func() {
|
||||||
for _, check := range commonFiles {
|
for _, check := range commonFiles {
|
||||||
checks <- check
|
checks <- check
|
||||||
@ -271,47 +277,47 @@ func Check(fdst, fsrc Fs) {
|
|||||||
defer checkerWg.Done()
|
defer checkerWg.Done()
|
||||||
for check := range checks {
|
for check := range checks {
|
||||||
dst, src := check[0], check[1]
|
dst, src := check[0], check[1]
|
||||||
stats.Checking(src)
|
fs.Stats.Checking(src)
|
||||||
if src.Size() != dst.Size() {
|
if src.Size() != dst.Size() {
|
||||||
stats.DoneChecking(src)
|
fs.Stats.DoneChecking(src)
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(src, "Sizes differ")
|
fs.FsLog(src, "Sizes differ")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
same, err := CheckMd5sums(src, dst)
|
same, err := fs.CheckMd5sums(src, dst)
|
||||||
stats.DoneChecking(src)
|
fs.Stats.DoneChecking(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !same {
|
if !same {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(src, "Md5sums differ")
|
fs.FsLog(src, "Md5sums differ")
|
||||||
}
|
}
|
||||||
FsDebug(src, "OK")
|
fs.FsDebug(src, "OK")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Waiting for checks to finish")
|
log.Printf("Waiting for checks to finish")
|
||||||
checkerWg.Wait()
|
checkerWg.Wait()
|
||||||
log.Printf("%d differences found", stats.errors)
|
log.Printf("%d differences found", fs.Stats.Errors)
|
||||||
}
|
}
|
||||||
|
|
||||||
// List the Fs to stdout
|
// List the Fs to stdout
|
||||||
//
|
//
|
||||||
// Lists in parallel which may get them out of order
|
// Lists in parallel which may get them out of order
|
||||||
func List(f, _ Fs) {
|
func List(f, _ fs.Fs) {
|
||||||
in := f.List()
|
in := f.List()
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(*checkers)
|
wg.Add(*checkers)
|
||||||
for i := 0; i < *checkers; i++ {
|
for i := 0; i < *checkers; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for fs := range in {
|
for o := range in {
|
||||||
stats.Checking(fs)
|
fs.Stats.Checking(o)
|
||||||
modTime := fs.ModTime()
|
modTime := o.ModTime()
|
||||||
stats.DoneChecking(fs)
|
fs.Stats.DoneChecking(o)
|
||||||
fmt.Printf("%9d %19s %s\n", fs.Size(), modTime.Format("2006-01-02 15:04:05.00000000"), fs.Remote())
|
fmt.Printf("%9d %19s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.00000000"), o.Remote())
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -319,29 +325,29 @@ func List(f, _ Fs) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// List the directories/buckets/containers in the Fs to stdout
|
// List the directories/buckets/containers in the Fs to stdout
|
||||||
func ListDir(f, _ Fs) {
|
func ListDir(f, _ fs.Fs) {
|
||||||
for dir := range f.ListDir() {
|
for dir := range f.ListDir() {
|
||||||
fmt.Printf("%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
|
fmt.Printf("%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Makes a destination directory or container
|
// Makes a destination directory or container
|
||||||
func mkdir(fdst, fsrc Fs) {
|
func mkdir(fdst, fsrc fs.Fs) {
|
||||||
err := fdst.Mkdir()
|
err := fdst.Mkdir()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatalf("Mkdir failed: %s", err)
|
log.Fatalf("Mkdir failed: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Removes a container but not if not empty
|
// Removes a container but not if not empty
|
||||||
func rmdir(fdst, fsrc Fs) {
|
func rmdir(fdst, fsrc fs.Fs) {
|
||||||
if *dry_run {
|
if *dry_run {
|
||||||
log.Printf("Not deleting %s as -dry-run", fdst)
|
log.Printf("Not deleting %s as -dry-run", fdst)
|
||||||
} else {
|
} else {
|
||||||
err := fdst.Rmdir()
|
err := fdst.Rmdir()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatalf("Rmdir failed: %s", err)
|
log.Fatalf("Rmdir failed: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -350,11 +356,11 @@ func rmdir(fdst, fsrc Fs) {
|
|||||||
// Removes a container and all of its contents
|
// Removes a container and all of its contents
|
||||||
//
|
//
|
||||||
// FIXME doesn't delete local directories
|
// FIXME doesn't delete local directories
|
||||||
func purge(fdst, fsrc Fs) {
|
func purge(fdst, fsrc fs.Fs) {
|
||||||
if f, ok := fdst.(Purger); ok {
|
if f, ok := fdst.(fs.Purger); ok {
|
||||||
err := f.Purge()
|
err := f.Purge()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatalf("Purge failed: %s", err)
|
log.Fatalf("Purge failed: %s", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -367,7 +373,7 @@ func purge(fdst, fsrc Fs) {
|
|||||||
type Command struct {
|
type Command struct {
|
||||||
name string
|
name string
|
||||||
help string
|
help string
|
||||||
run func(fdst, fsrc Fs)
|
run func(fdst, fsrc fs.Fs)
|
||||||
minArgs, maxArgs int
|
minArgs, maxArgs int
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -516,7 +522,7 @@ func main() {
|
|||||||
if *cpuprofile != "" {
|
if *cpuprofile != "" {
|
||||||
f, err := os.Create(*cpuprofile)
|
f, err := os.Create(*cpuprofile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
pprof.StartCPUProfile(f)
|
pprof.StartCPUProfile(f)
|
||||||
@ -540,32 +546,32 @@ func main() {
|
|||||||
break
|
break
|
||||||
} else if strings.HasPrefix(command.name, cmd) {
|
} else if strings.HasPrefix(command.name, cmd) {
|
||||||
if found != nil {
|
if found != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatalf("Not unique - matches multiple commands %q", cmd)
|
log.Fatalf("Not unique - matches multiple commands %q", cmd)
|
||||||
}
|
}
|
||||||
found = command
|
found = command
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if found == nil {
|
if found == nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatalf("Unknown command %q", cmd)
|
log.Fatalf("Unknown command %q", cmd)
|
||||||
}
|
}
|
||||||
found.checkArgs(args)
|
found.checkArgs(args)
|
||||||
|
|
||||||
// Make source and destination fs
|
// Make source and destination fs
|
||||||
var fdst, fsrc Fs
|
var fdst, fsrc fs.Fs
|
||||||
var err error
|
var err error
|
||||||
if len(args) >= 1 {
|
if len(args) >= 1 {
|
||||||
fdst, err = NewFs(args[0])
|
fdst, err = fs.NewFs(args[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatal("Failed to create file system: ", err)
|
log.Fatal("Failed to create file system: ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(args) >= 2 {
|
if len(args) >= 2 {
|
||||||
fsrc, err = NewFs(args[1])
|
fsrc, err = fs.NewFs(args[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Fatal("Failed to create destination file system: ", err)
|
log.Fatal("Failed to create destination file system: ", err)
|
||||||
}
|
}
|
||||||
fsrc, fdst = fdst, fsrc
|
fsrc, fdst = fdst, fsrc
|
||||||
@ -575,34 +581,34 @@ func main() {
|
|||||||
if fsrc != nil {
|
if fsrc != nil {
|
||||||
precision := fsrc.Precision()
|
precision := fsrc.Precision()
|
||||||
log.Printf("Source precision %s\n", precision)
|
log.Printf("Source precision %s\n", precision)
|
||||||
if precision > *modifyWindow {
|
if precision > fs.Config.ModifyWindow {
|
||||||
*modifyWindow = precision
|
fs.Config.ModifyWindow = precision
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if fdst != nil {
|
if fdst != nil {
|
||||||
precision := fdst.Precision()
|
precision := fdst.Precision()
|
||||||
log.Printf("Destination precision %s\n", precision)
|
log.Printf("Destination precision %s\n", precision)
|
||||||
if precision > *modifyWindow {
|
if precision > fs.Config.ModifyWindow {
|
||||||
*modifyWindow = precision
|
fs.Config.ModifyWindow = precision
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("Modify window is %s\n", *modifyWindow)
|
log.Printf("Modify window is %s\n", fs.Config.ModifyWindow)
|
||||||
|
|
||||||
// Print the stats every statsInterval
|
// Print the stats every statsInterval
|
||||||
go func() {
|
go func() {
|
||||||
ch := time.Tick(*statsInterval)
|
ch := time.Tick(*statsInterval)
|
||||||
for {
|
for {
|
||||||
<-ch
|
<-ch
|
||||||
stats.Log()
|
fs.Stats.Log()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Run the actual command
|
// Run the actual command
|
||||||
if found.run != nil {
|
if found.run != nil {
|
||||||
found.run(fdst, fsrc)
|
found.run(fdst, fsrc)
|
||||||
fmt.Println(stats)
|
fmt.Println(fs.Stats)
|
||||||
log.Printf("*** Go routines at exit %d\n", runtime.NumGoroutine())
|
log.Printf("*** Go routines at exit %d\n", runtime.NumGoroutine())
|
||||||
if stats.errors > 0 {
|
if fs.Stats.Errored() {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
// S3 interface
|
// S3 interface
|
||||||
package main
|
package s3
|
||||||
|
|
||||||
// FIXME need to prevent anything but ListDir working for s3://
|
// FIXME need to prevent anything but ListDir working for s3://
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"../fs"
|
||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -22,6 +23,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Pattern to match a s3 url
|
||||||
|
var Match = regexp.MustCompile(`^s3://([^/]*)(.*)$`)
|
||||||
|
|
||||||
|
// Register with Fs
|
||||||
|
func init() {
|
||||||
|
fs.Register(Match, NewFs)
|
||||||
|
}
|
||||||
|
|
||||||
// Constants
|
// Constants
|
||||||
const (
|
const (
|
||||||
metaMtime = "X-Amz-Meta-Mtime" // the meta key to store mtime in
|
metaMtime = "X-Amz-Meta-Mtime" // the meta key to store mtime in
|
||||||
@ -66,12 +75,9 @@ func (f *FsS3) String() string {
|
|||||||
return fmt.Sprintf("S3 bucket %s", f.bucket)
|
return fmt.Sprintf("S3 bucket %s", f.bucket)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pattern to match a s3 url
|
|
||||||
var s3Match = regexp.MustCompile(`^s3://([^/]*)(.*)$`)
|
|
||||||
|
|
||||||
// parseParse parses a s3 'url'
|
// parseParse parses a s3 'url'
|
||||||
func s3ParsePath(path string) (bucket, directory string, err error) {
|
func s3ParsePath(path string) (bucket, directory string, err error) {
|
||||||
parts := s3Match.FindAllStringSubmatch(path, -1)
|
parts := Match.FindAllStringSubmatch(path, -1)
|
||||||
if len(parts) != 1 || len(parts[0]) != 3 {
|
if len(parts) != 1 || len(parts[0]) != 3 {
|
||||||
err = fmt.Errorf("Couldn't parse s3 url %q", path)
|
err = fmt.Errorf("Couldn't parse s3 url %q", path)
|
||||||
} else {
|
} else {
|
||||||
@ -113,7 +119,7 @@ func s3Connection() (*s3.S3, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewFsS3 contstructs an FsS3 from the path, bucket:path
|
// NewFsS3 contstructs an FsS3 from the path, bucket:path
|
||||||
func NewFsS3(path string) (*FsS3, error) {
|
func NewFs(path string) (fs.Fs, error) {
|
||||||
bucket, directory, err := s3ParsePath(path)
|
bucket, directory, err := s3ParsePath(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -137,46 +143,46 @@ func NewFsS3(path string) (*FsS3, error) {
|
|||||||
// Return an FsObject from a path
|
// Return an FsObject from a path
|
||||||
//
|
//
|
||||||
// May return nil if an error occurred
|
// May return nil if an error occurred
|
||||||
func (f *FsS3) NewFsObjectWithInfo(remote string, info *s3.Key) FsObject {
|
func (f *FsS3) NewFsObjectWithInfo(remote string, info *s3.Key) fs.FsObject {
|
||||||
fs := &FsObjectS3{
|
o := &FsObjectS3{
|
||||||
s3: f,
|
s3: f,
|
||||||
remote: remote,
|
remote: remote,
|
||||||
}
|
}
|
||||||
if info != nil {
|
if info != nil {
|
||||||
// Set info but not meta
|
// Set info but not meta
|
||||||
var err error
|
var err error
|
||||||
fs.lastModified, err = time.Parse(time.RFC3339, info.LastModified)
|
o.lastModified, err = time.Parse(time.RFC3339, info.LastModified)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsLog(fs, "Failed to read last modified: %s", err)
|
fs.FsLog(o, "Failed to read last modified: %s", err)
|
||||||
fs.lastModified = time.Now()
|
o.lastModified = time.Now()
|
||||||
}
|
}
|
||||||
fs.etag = info.ETag
|
o.etag = info.ETag
|
||||||
fs.bytes = info.Size
|
o.bytes = info.Size
|
||||||
} else {
|
} else {
|
||||||
err := fs.readMetaData() // reads info and meta, returning an error
|
err := o.readMetaData() // reads info and meta, returning an error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// logged already FsDebug("Failed to read info: %s", err)
|
// logged already FsDebug("Failed to read info: %s", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fs
|
return o
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return an FsObject from a path
|
// Return an FsObject from a path
|
||||||
//
|
//
|
||||||
// May return nil if an error occurred
|
// May return nil if an error occurred
|
||||||
func (f *FsS3) NewFsObject(remote string) FsObject {
|
func (f *FsS3) NewFsObject(remote string) fs.FsObject {
|
||||||
return f.NewFsObjectWithInfo(remote, nil)
|
return f.NewFsObjectWithInfo(remote, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk the path returning a channel of FsObjects
|
// Walk the path returning a channel of FsObjects
|
||||||
func (f *FsS3) List() FsObjectsChan {
|
func (f *FsS3) List() fs.FsObjectsChan {
|
||||||
out := make(FsObjectsChan, *checkers)
|
out := make(fs.FsObjectsChan, fs.Config.Checkers)
|
||||||
go func() {
|
go func() {
|
||||||
// FIXME need to implement ALL loop
|
// FIXME need to implement ALL loop
|
||||||
objects, err := f.b.List("", "", "", 10000)
|
objects, err := f.b.List("", "", "", 10000)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Couldn't read bucket %q: %s", f.bucket, err)
|
log.Printf("Couldn't read bucket %q: %s", f.bucket, err)
|
||||||
} else {
|
} else {
|
||||||
for i := range objects.Contents {
|
for i := range objects.Contents {
|
||||||
@ -192,17 +198,17 @@ func (f *FsS3) List() FsObjectsChan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Lists the buckets
|
// Lists the buckets
|
||||||
func (f *FsS3) ListDir() FsDirChan {
|
func (f *FsS3) ListDir() fs.FsDirChan {
|
||||||
out := make(FsDirChan, *checkers)
|
out := make(fs.FsDirChan, fs.Config.Checkers)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
buckets, err := f.c.ListBuckets()
|
buckets, err := f.c.ListBuckets()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Couldn't list buckets: %s", err)
|
log.Printf("Couldn't list buckets: %s", err)
|
||||||
} else {
|
} else {
|
||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
out <- &FsDir{
|
out <- &fs.FsDir{
|
||||||
Name: bucket.Name,
|
Name: bucket.Name,
|
||||||
When: bucket.CreationDate,
|
When: bucket.CreationDate,
|
||||||
Bytes: -1,
|
Bytes: -1,
|
||||||
@ -215,7 +221,7 @@ func (f *FsS3) ListDir() FsDirChan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Put the FsObject into the bucket
|
// Put the FsObject into the bucket
|
||||||
func (f *FsS3) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) {
|
func (f *FsS3) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.FsObject, error) {
|
||||||
// Temporary FsObject under construction
|
// Temporary FsObject under construction
|
||||||
fs := &FsObjectS3{s3: f, remote: remote}
|
fs := &FsObjectS3{s3: f, remote: remote}
|
||||||
|
|
||||||
@ -253,51 +259,51 @@ func (f *FsS3) Rmdir() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Return the precision
|
// Return the precision
|
||||||
func (fs *FsS3) Precision() time.Duration {
|
func (f *FsS3) Precision() time.Duration {
|
||||||
return time.Nanosecond
|
return time.Nanosecond
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
// Return the remote path
|
// Return the remote path
|
||||||
func (fs *FsObjectS3) Remote() string {
|
func (o *FsObjectS3) Remote() string {
|
||||||
return fs.remote
|
return o.remote
|
||||||
}
|
}
|
||||||
|
|
||||||
// Md5sum returns the Md5sum of an object returning a lowercase hex string
|
// Md5sum returns the Md5sum of an object returning a lowercase hex string
|
||||||
func (fs *FsObjectS3) Md5sum() (string, error) {
|
func (o *FsObjectS3) Md5sum() (string, error) {
|
||||||
return strings.Trim(strings.ToLower(fs.etag), `"`), nil
|
return strings.Trim(strings.ToLower(o.etag), `"`), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of an object in bytes
|
// Size returns the size of an object in bytes
|
||||||
func (fs *FsObjectS3) Size() int64 {
|
func (o *FsObjectS3) Size() int64 {
|
||||||
return fs.bytes
|
return o.bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
// readMetaData gets the metadata if it hasn't already been fetched
|
// readMetaData gets the metadata if it hasn't already been fetched
|
||||||
//
|
//
|
||||||
// it also sets the info
|
// it also sets the info
|
||||||
func (fs *FsObjectS3) readMetaData() (err error) {
|
func (o *FsObjectS3) readMetaData() (err error) {
|
||||||
if fs.meta != nil {
|
if o.meta != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
headers, err := fs.s3.b.Head(fs.remote, nil)
|
headers, err := o.s3.b.Head(o.remote, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsDebug(fs, "Failed to read info: %s", err)
|
fs.FsDebug(o, "Failed to read info: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
size, err := strconv.ParseInt(headers["Content-Length"], 10, 64)
|
size, err := strconv.ParseInt(headers["Content-Length"], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsDebug(fs, "Failed to read size from: %q", headers)
|
fs.FsDebug(o, "Failed to read size from: %q", headers)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fs.etag = headers["Etag"]
|
o.etag = headers["Etag"]
|
||||||
fs.bytes = size
|
o.bytes = size
|
||||||
fs.meta = headers
|
o.meta = headers
|
||||||
if fs.lastModified, err = time.Parse(http.TimeFormat, headers["Last-Modified"]); err != nil {
|
if o.lastModified, err = time.Parse(http.TimeFormat, headers["Last-Modified"]); err != nil {
|
||||||
FsLog(fs, "Failed to read last modified from HEAD: %s", err)
|
fs.FsLog(o, "Failed to read last modified from HEAD: %s", err)
|
||||||
fs.lastModified = time.Now()
|
o.lastModified = time.Now()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -306,58 +312,58 @@ func (fs *FsObjectS3) readMetaData() (err error) {
|
|||||||
//
|
//
|
||||||
// It attempts to read the objects mtime and if that isn't present the
|
// It attempts to read the objects mtime and if that isn't present the
|
||||||
// LastModified returned in the http headers
|
// LastModified returned in the http headers
|
||||||
func (fs *FsObjectS3) ModTime() time.Time {
|
func (o *FsObjectS3) ModTime() time.Time {
|
||||||
err := fs.readMetaData()
|
err := o.readMetaData()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsLog(fs, "Failed to read metadata: %s", err)
|
fs.FsLog(o, "Failed to read metadata: %s", err)
|
||||||
return time.Now()
|
return time.Now()
|
||||||
}
|
}
|
||||||
// read mtime out of metadata if available
|
// read mtime out of metadata if available
|
||||||
d, ok := fs.meta[metaMtime]
|
d, ok := o.meta[metaMtime]
|
||||||
if !ok {
|
if !ok {
|
||||||
// FsDebug(fs, "No metadata")
|
// fs.FsDebug(o, "No metadata")
|
||||||
return fs.lastModified
|
return o.lastModified
|
||||||
}
|
}
|
||||||
modTime, err := swift.FloatStringToTime(d)
|
modTime, err := swift.FloatStringToTime(d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsLog(fs, "Failed to read mtime from object: %s", err)
|
fs.FsLog(o, "Failed to read mtime from object: %s", err)
|
||||||
return fs.lastModified
|
return o.lastModified
|
||||||
}
|
}
|
||||||
return modTime
|
return modTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets the modification time of the local fs object
|
// Sets the modification time of the local fs object
|
||||||
func (fs *FsObjectS3) SetModTime(modTime time.Time) {
|
func (o *FsObjectS3) SetModTime(modTime time.Time) {
|
||||||
err := fs.readMetaData()
|
err := o.readMetaData()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(fs, "Failed to read metadata: %s", err)
|
fs.FsLog(o, "Failed to read metadata: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fs.meta[metaMtime] = swift.TimeToFloatString(modTime)
|
o.meta[metaMtime] = swift.TimeToFloatString(modTime)
|
||||||
_, err = fs.s3.b.Update(fs.remote, fs.s3.perm, fs.meta)
|
_, err = o.s3.b.Update(o.remote, o.s3.perm, o.meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(fs, "Failed to update remote mtime: %s", err)
|
fs.FsLog(o, "Failed to update remote mtime: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Is this object storable
|
// Is this object storable
|
||||||
func (fs *FsObjectS3) Storable() bool {
|
func (o *FsObjectS3) Storable() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open an object for read
|
// Open an object for read
|
||||||
func (fs *FsObjectS3) Open() (in io.ReadCloser, err error) {
|
func (o *FsObjectS3) Open() (in io.ReadCloser, err error) {
|
||||||
in, err = fs.s3.b.GetReader(fs.remote)
|
in, err = o.s3.b.GetReader(o.remote)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove an object
|
// Remove an object
|
||||||
func (fs *FsObjectS3) Remove() error {
|
func (o *FsObjectS3) Remove() error {
|
||||||
return fs.s3.b.Del(fs.remote)
|
return o.s3.b.Del(o.remote)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the interfaces are satisfied
|
// Check the interfaces are satisfied
|
||||||
var _ Fs = &FsS3{}
|
var _ fs.Fs = &FsS3{}
|
||||||
var _ FsObject = &FsObjectS3{}
|
var _ fs.FsObject = &FsObjectS3{}
|
@ -1,9 +1,10 @@
|
|||||||
// Swift interface
|
// Swift interface
|
||||||
package main
|
package swift
|
||||||
|
|
||||||
// FIXME need to prevent anything but ListDir working for swift://
|
// FIXME need to prevent anything but ListDir working for swift://
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"../fs"
|
||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -16,6 +17,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Pattern to match a swift url
|
||||||
|
var Match = regexp.MustCompile(`^swift://([^/]*)(.*)$`)
|
||||||
|
|
||||||
|
// Register with Fs
|
||||||
|
func init() {
|
||||||
|
fs.Register(Match, NewFs)
|
||||||
|
}
|
||||||
|
|
||||||
// FsSwift represents a remote swift server
|
// FsSwift represents a remote swift server
|
||||||
type FsSwift struct {
|
type FsSwift struct {
|
||||||
c swift.Connection // the connection to the swift server
|
c swift.Connection // the connection to the swift server
|
||||||
@ -50,12 +59,9 @@ func (f *FsSwift) String() string {
|
|||||||
return fmt.Sprintf("Swift container %s", f.container)
|
return fmt.Sprintf("Swift container %s", f.container)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pattern to match a swift url
|
|
||||||
var swiftMatch = regexp.MustCompile(`^swift://([^/]*)(.*)$`)
|
|
||||||
|
|
||||||
// parseParse parses a swift 'url'
|
// parseParse parses a swift 'url'
|
||||||
func parsePath(path string) (container, directory string, err error) {
|
func parsePath(path string) (container, directory string, err error) {
|
||||||
parts := swiftMatch.FindAllStringSubmatch(path, -1)
|
parts := Match.FindAllStringSubmatch(path, -1)
|
||||||
if len(parts) != 1 || len(parts[0]) != 3 {
|
if len(parts) != 1 || len(parts[0]) != 3 {
|
||||||
err = fmt.Errorf("Couldn't parse swift url %q", path)
|
err = fmt.Errorf("Couldn't parse swift url %q", path)
|
||||||
} else {
|
} else {
|
||||||
@ -88,8 +94,8 @@ func swiftConnection() (*swift.Connection, error) {
|
|||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFsSwift contstructs an FsSwift from the path, container:path
|
// NewFs contstructs an FsSwift from the path, container:path
|
||||||
func NewFsSwift(path string) (*FsSwift, error) {
|
func NewFs(path string) (fs.Fs, error) {
|
||||||
container, directory, err := parsePath(path)
|
container, directory, err := parsePath(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -108,7 +114,7 @@ func NewFsSwift(path string) (*FsSwift, error) {
|
|||||||
// Return an FsObject from a path
|
// Return an FsObject from a path
|
||||||
//
|
//
|
||||||
// May return nil if an error occurred
|
// May return nil if an error occurred
|
||||||
func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) FsObject {
|
func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) fs.FsObject {
|
||||||
fs := &FsObjectSwift{
|
fs := &FsObjectSwift{
|
||||||
swift: f,
|
swift: f,
|
||||||
remote: remote,
|
remote: remote,
|
||||||
@ -129,13 +135,13 @@ func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) FsObjec
|
|||||||
// Return an FsObject from a path
|
// Return an FsObject from a path
|
||||||
//
|
//
|
||||||
// May return nil if an error occurred
|
// May return nil if an error occurred
|
||||||
func (f *FsSwift) NewFsObject(remote string) FsObject {
|
func (f *FsSwift) NewFsObject(remote string) fs.FsObject {
|
||||||
return f.NewFsObjectWithInfo(remote, nil)
|
return f.NewFsObjectWithInfo(remote, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk the path returning a channel of FsObjects
|
// Walk the path returning a channel of FsObjects
|
||||||
func (f *FsSwift) List() FsObjectsChan {
|
func (f *FsSwift) List() fs.FsObjectsChan {
|
||||||
out := make(FsObjectsChan, *checkers)
|
out := make(fs.FsObjectsChan, fs.Config.Checkers)
|
||||||
go func() {
|
go func() {
|
||||||
// FIXME use a smaller limit?
|
// FIXME use a smaller limit?
|
||||||
err := f.c.ObjectsWalk(f.container, nil, func(opts *swift.ObjectsOpts) (interface{}, error) {
|
err := f.c.ObjectsWalk(f.container, nil, func(opts *swift.ObjectsOpts) (interface{}, error) {
|
||||||
@ -151,7 +157,7 @@ func (f *FsSwift) List() FsObjectsChan {
|
|||||||
return objects, err
|
return objects, err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Couldn't read container %q: %s", f.container, err)
|
log.Printf("Couldn't read container %q: %s", f.container, err)
|
||||||
}
|
}
|
||||||
close(out)
|
close(out)
|
||||||
@ -160,17 +166,17 @@ func (f *FsSwift) List() FsObjectsChan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Lists the containers
|
// Lists the containers
|
||||||
func (f *FsSwift) ListDir() FsDirChan {
|
func (f *FsSwift) ListDir() fs.FsDirChan {
|
||||||
out := make(FsDirChan, *checkers)
|
out := make(fs.FsDirChan, fs.Config.Checkers)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
containers, err := f.c.ContainersAll(nil)
|
containers, err := f.c.ContainersAll(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
log.Printf("Couldn't list containers: %s", err)
|
log.Printf("Couldn't list containers: %s", err)
|
||||||
} else {
|
} else {
|
||||||
for _, container := range containers {
|
for _, container := range containers {
|
||||||
out <- &FsDir{
|
out <- &fs.FsDir{
|
||||||
Name: container.Name,
|
Name: container.Name,
|
||||||
Bytes: container.Bytes,
|
Bytes: container.Bytes,
|
||||||
Count: container.Count,
|
Count: container.Count,
|
||||||
@ -186,7 +192,7 @@ func (f *FsSwift) ListDir() FsDirChan {
|
|||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created
|
// The new object may have been created
|
||||||
func (f *FsSwift) Put(in io.Reader, remote string, modTime time.Time, size int64) (FsObject, error) {
|
func (f *FsSwift) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.FsObject, error) {
|
||||||
// Temporary FsObject under construction
|
// Temporary FsObject under construction
|
||||||
fs := &FsObjectSwift{swift: f, remote: remote}
|
fs := &FsObjectSwift{swift: f, remote: remote}
|
||||||
|
|
||||||
@ -217,35 +223,35 @@ func (fs *FsSwift) Precision() time.Duration {
|
|||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
// Return the remote path
|
// Return the remote path
|
||||||
func (fs *FsObjectSwift) Remote() string {
|
func (o *FsObjectSwift) Remote() string {
|
||||||
return fs.remote
|
return o.remote
|
||||||
}
|
}
|
||||||
|
|
||||||
// Md5sum returns the Md5sum of an object returning a lowercase hex string
|
// Md5sum returns the Md5sum of an object returning a lowercase hex string
|
||||||
func (fs *FsObjectSwift) Md5sum() (string, error) {
|
func (o *FsObjectSwift) Md5sum() (string, error) {
|
||||||
return strings.ToLower(fs.info.Hash), nil
|
return strings.ToLower(o.info.Hash), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of an object in bytes
|
// Size returns the size of an object in bytes
|
||||||
func (fs *FsObjectSwift) Size() int64 {
|
func (o *FsObjectSwift) Size() int64 {
|
||||||
return fs.info.Bytes
|
return o.info.Bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
// readMetaData gets the metadata if it hasn't already been fetched
|
// readMetaData gets the metadata if it hasn't already been fetched
|
||||||
//
|
//
|
||||||
// it also sets the info
|
// it also sets the info
|
||||||
func (fs *FsObjectSwift) readMetaData() (err error) {
|
func (o *FsObjectSwift) readMetaData() (err error) {
|
||||||
if fs.meta != nil {
|
if o.meta != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
info, h, err := fs.swift.c.Object(fs.swift.container, fs.remote)
|
info, h, err := o.swift.c.Object(o.swift.container, o.remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FsDebug(fs, "Failed to read info: %s", err)
|
fs.FsDebug(o, "Failed to read info: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
meta := h.ObjectMetadata()
|
meta := h.ObjectMetadata()
|
||||||
fs.info = info
|
o.info = info
|
||||||
fs.meta = &meta
|
o.meta = &meta
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,52 +260,52 @@ func (fs *FsObjectSwift) readMetaData() (err error) {
|
|||||||
//
|
//
|
||||||
// It attempts to read the objects mtime and if that isn't present the
|
// It attempts to read the objects mtime and if that isn't present the
|
||||||
// LastModified returned in the http headers
|
// LastModified returned in the http headers
|
||||||
func (fs *FsObjectSwift) ModTime() time.Time {
|
func (o *FsObjectSwift) ModTime() time.Time {
|
||||||
err := fs.readMetaData()
|
err := o.readMetaData()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// FsLog(fs, "Failed to read metadata: %s", err)
|
// fs.FsLog(o, "Failed to read metadata: %s", err)
|
||||||
return fs.info.LastModified
|
return o.info.LastModified
|
||||||
}
|
}
|
||||||
modTime, err := fs.meta.GetModTime()
|
modTime, err := o.meta.GetModTime()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// FsLog(fs, "Failed to read mtime from object: %s", err)
|
// fs.FsLog(o, "Failed to read mtime from object: %s", err)
|
||||||
return fs.info.LastModified
|
return o.info.LastModified
|
||||||
}
|
}
|
||||||
return modTime
|
return modTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets the modification time of the local fs object
|
// Sets the modification time of the local fs object
|
||||||
func (fs *FsObjectSwift) SetModTime(modTime time.Time) {
|
func (o *FsObjectSwift) SetModTime(modTime time.Time) {
|
||||||
err := fs.readMetaData()
|
err := o.readMetaData()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(fs, "Failed to read metadata: %s", err)
|
fs.FsLog(o, "Failed to read metadata: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fs.meta.SetModTime(modTime)
|
o.meta.SetModTime(modTime)
|
||||||
err = fs.swift.c.ObjectUpdate(fs.swift.container, fs.remote, fs.meta.ObjectHeaders())
|
err = o.swift.c.ObjectUpdate(o.swift.container, o.remote, o.meta.ObjectHeaders())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stats.Error()
|
fs.Stats.Error()
|
||||||
FsLog(fs, "Failed to update remote mtime: %s", err)
|
fs.FsLog(o, "Failed to update remote mtime: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Is this object storable
|
// Is this object storable
|
||||||
func (fs *FsObjectSwift) Storable() bool {
|
func (o *FsObjectSwift) Storable() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open an object for read
|
// Open an object for read
|
||||||
func (fs *FsObjectSwift) Open() (in io.ReadCloser, err error) {
|
func (o *FsObjectSwift) Open() (in io.ReadCloser, err error) {
|
||||||
in, _, err = fs.swift.c.ObjectOpen(fs.swift.container, fs.remote, true, nil)
|
in, _, err = o.swift.c.ObjectOpen(o.swift.container, o.remote, true, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove an object
|
// Remove an object
|
||||||
func (fs *FsObjectSwift) Remove() error {
|
func (o *FsObjectSwift) Remove() error {
|
||||||
return fs.swift.c.ObjectDelete(fs.swift.container, fs.remote)
|
return o.swift.c.ObjectDelete(o.swift.container, o.remote)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the interfaces are satisfied
|
// Check the interfaces are satisfied
|
||||||
var _ Fs = &FsSwift{}
|
var _ fs.Fs = &FsSwift{}
|
||||||
var _ FsObject = &FsObjectSwift{}
|
var _ fs.FsObject = &FsObjectSwift{}
|
Loading…
Reference in New Issue
Block a user