From f7197c30d745214f331bfe48ca1ef63fc6592aad Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 3 Jan 2013 22:50:00 +0000 Subject: [PATCH] Implement stats counting and reporting and return errors in return code --- accounting.go | 177 ++++++++++++++++++++++++++++++++++++++++++++++++++ fs.go | 2 + fs_local.go | 19 ++++-- fs_swift.go | 18 ++++- notes.txt | 33 +++------- swiftsync.go | 56 ++++++++++++++-- 6 files changed, 268 insertions(+), 37 deletions(-) create mode 100644 accounting.go diff --git a/accounting.go b/accounting.go new file mode 100644 index 000000000..e50336427 --- /dev/null +++ b/accounting.go @@ -0,0 +1,177 @@ +// Accounting and limiting reader + +package main + +import ( + "bytes" + "fmt" + "io" + "log" + "strings" + "sync" + "time" +) + +// Globals +var ( + stats = NewStats() +) + +// Stringset holds some strings +type StringSet map[string]bool + +// Strings returns all the strings in the StringSet +func (ss StringSet) Strings() []string { + strings := make([]string, 0, len(ss)) + for k := range ss { + strings = append(strings, k) + } + return strings +} + +// String returns all the strings in the StringSet joined by comma +func (ss StringSet) String() string { + return strings.Join(ss.Strings(), ", ") +} + +// Stats limits and accounts all transfers +type Stats struct { + lock sync.RWMutex + bytes int64 + errors int64 + checks int64 + checking StringSet + transfers int64 + transferring StringSet + start time.Time +} + +// NewStats cretates an initialised Stats +func NewStats() *Stats { + return &Stats{ + checking: make(StringSet, *checkers), + transferring: make(StringSet, *transfers), + start: time.Now(), + } +} + +// String convert the Stats to a string for printing +func (s *Stats) String() string { + s.lock.RLock() + defer s.lock.RUnlock() + dt := time.Now().Sub(stats.start) + dt_seconds := dt.Seconds() + speed := 0.0 + if dt > 0 { + speed = float64(stats.bytes) / 1024 / dt_seconds + } + buf := &bytes.Buffer{} + fmt.Fprintf(buf, ` +Transferred: %10d Bytes (%7.2f kByte/s) +Errors: %10d +Checks: %10d +Transferred: %10d +Elapsed time: %v +`, + stats.bytes, speed, + stats.errors, + stats.checks, + stats.transfers, + dt) + if len(s.checking) > 0 { + fmt.Fprintf(buf, "Checking: %s\n", s.checking) + } + if len(s.transferring) > 0 { + fmt.Fprintf(buf, "Transferring: %s\n", s.transferring) + } + return buf.String() +} + +// Log outputs the Stats to the log +func (s *Stats) Log() { + log.Printf("%v\n", stats) +} + +// Bytes updates the stats for bytes bytes +func (s *Stats) Bytes(bytes int64) { + s.lock.Lock() + defer s.lock.Unlock() + s.bytes += bytes +} + +// Errors updates the stats for errors +func (s *Stats) Errors(errors int64) { + s.lock.Lock() + defer s.lock.Unlock() + s.errors += errors +} + +// Error adds a single error into the stats +func (s *Stats) Error() { + s.lock.Lock() + defer s.lock.Unlock() + s.errors += 1 +} + +// Checking adds a check into the stats +func (s *Stats) Checking(fs FsObject) { + s.lock.Lock() + defer s.lock.Unlock() + s.checking[fs.Remote()] = true +} + +// DoneChecking removes a check from the stats +func (s *Stats) DoneChecking(fs FsObject) { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.checking, fs.Remote()) + s.checks += 1 +} + +// Transferring adds a transfer into the stats +func (s *Stats) Transferring(fs FsObject) { + s.lock.Lock() + defer s.lock.Unlock() + s.transferring[fs.Remote()] = true +} + +// DoneTransferring removes a transfer from the stats +func (s *Stats) DoneTransferring(fs FsObject) { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.transferring, fs.Remote()) + s.transfers += 1 +} + +// Account limits and accounts for one transfer +type Account struct { + in io.ReadCloser + bytes int64 +} + +// NewAccount makes a Account reader +func NewAccount(in io.ReadCloser) *Account { + return &Account{ + in: in, + } +} + +// Read bytes from the object - see io.Reader +func (file *Account) Read(p []byte) (n int, err error) { + n, err = file.in.Read(p) + file.bytes += int64(n) + stats.Bytes(int64(n)) + if err == io.EOF { + // FIXME Do something? + } + return +} + +// Close the object +func (file *Account) Close() error { + // FIXME do something? + return file.in.Close() +} + +// Check it satisfies the interface +var _ io.ReadCloser = &Account{} diff --git a/fs.go b/fs.go index 32cd03991..e490542da 100644 --- a/fs.go +++ b/fs.go @@ -82,11 +82,13 @@ func checkClose(c io.Closer, err *error) { func CheckMd5sums(src, dst FsObject) (bool, error) { srcMd5, err := src.Md5sum() if err != nil { + stats.Error() FsLog(src, "Failed to calculate src md5: %s", err) return false, err } dstMd5, err := dst.Md5sum() if err != nil { + stats.Error() FsLog(dst, "Failed to calculate dst md5: %s", err) return false, err } diff --git a/fs_local.go b/fs_local.go index cf33b85cc..93da943a6 100644 --- a/fs_local.go +++ b/fs_local.go @@ -49,7 +49,7 @@ func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) FsObject } else { err := fs.lstat() if err != nil { - log.Printf("Failed to stat %s: %s", path, err) + FsDebug(fs, "Failed to stat %s: %s", path, err) return nil } } @@ -71,10 +71,12 @@ func (f *FsLocal) List() FsObjectsChan { go func() { err := filepath.Walk(f.root, func(path string, fi os.FileInfo, err error) error { if err != nil { + stats.Error() log.Printf("Failed to open directory: %s: %s", path, err) } else { remote, err := filepath.Rel(f.root, path) if err != nil { + stats.Error() log.Printf("Failed to get relative path %s: %s", path, err) return nil } @@ -91,6 +93,7 @@ func (f *FsLocal) List() FsObjectsChan { return nil }) if err != nil { + stats.Error() log.Printf("Failed to open directory: %s: %s", f.root, err) } close(out) @@ -107,19 +110,21 @@ func (f *FsLocal) List() FsObjectsChan { func (f *FsLocal) Put(src FsObject) { dstRemote := src.Remote() dstPath := filepath.Join(f.root, dstRemote) - log.Printf("Download %s to %s", dstRemote, dstPath) // Temporary FsObject under construction fs := &FsObjectLocal{remote: dstRemote, path: dstPath} + FsDebug(fs, "Download %s to %s", dstRemote, dstPath) dir := path.Dir(dstPath) err := os.MkdirAll(dir, 0770) if err != nil { + stats.Error() FsLog(fs, "Couldn't make directory: %s", err) return } out, err := os.Create(dstPath) if err != nil { + stats.Error() FsLog(fs, "Failed to open: %s", err) return } @@ -131,20 +136,24 @@ func (f *FsLocal) Put(src FsObject) { FsDebug(fs, "Removing failed download") removeErr := os.Remove(dstPath) if removeErr != nil { - FsLog(fs, "Failed to remove failed download: %s", err) + stats.Error() + FsLog(fs, "Failed to remove failed download: %s", removeErr) } } }() - in, err := src.Open() + in0, err := src.Open() if err != nil { + stats.Error() FsLog(fs, "Failed to open: %s", err) return } + in := NewAccount(in0) // account the transfer defer checkClose(in, &err) _, err = io.Copy(out, in) if err != nil { + stats.Error() FsLog(fs, "Failed to download: %s", err) return } @@ -176,6 +185,7 @@ func (fs *FsObjectLocal) Remote() string { func (fs *FsObjectLocal) Md5sum() (string, error) { in, err := os.Open(fs.path) if err != nil { + stats.Error() FsLog(fs, "Failed to open: %s", err) return "", err } @@ -183,6 +193,7 @@ func (fs *FsObjectLocal) Md5sum() (string, error) { hash := md5.New() _, err = io.Copy(hash, in) if err != nil { + stats.Error() FsLog(fs, "Failed to read: %s", err) return "", err } diff --git a/fs_swift.go b/fs_swift.go index 8c50e85a7..ead17e404 100644 --- a/fs_swift.go +++ b/fs_swift.go @@ -105,10 +105,12 @@ func NewFsSwift(path string) (*FsSwift, error) { func SwiftContainers() { c, err := swiftConnection() if err != nil { + stats.Error() log.Fatalf("Couldn't connect: %s", err) } containers, err := c.ContainersAll(nil) if err != nil { + stats.Error() log.Fatalf("Couldn't list containers: %s", err) } for _, container := range containers { @@ -162,6 +164,7 @@ func (f *FsSwift) List() FsObjectsChan { return objects, err }) if err != nil { + stats.Error() log.Printf("Couldn't read container %q: %s", f.container, err) } close(out) @@ -174,11 +177,13 @@ func (f *FsSwift) Put(src FsObject) { // Temporary FsObject under construction fs := &FsObjectSwift{swift: f, remote: src.Remote()} // FIXME content type - in, err := src.Open() + in0, err := src.Open() if err != nil { + stats.Error() FsLog(fs, "Failed to open: %s", err) return } + in := NewAccount(in0) // account the transfer defer in.Close() // Set the mtime @@ -187,7 +192,14 @@ func (f *FsSwift) Put(src FsObject) { _, err = fs.swift.c.ObjectPut(fs.swift.container, fs.remote, in, true, "", "", m.ObjectHeaders()) if err != nil { + stats.Error() FsLog(fs, "Failed to upload: %s", err) + FsDebug(fs, "Removing failed upload") + removeErr := fs.Remove() + if removeErr != nil { + stats.Error() + FsLog(fs, "Failed to remove failed download: %s", removeErr) + } return } FsDebug(fs, "Uploaded") @@ -231,7 +243,7 @@ func (fs *FsObjectSwift) readMetaData() (err error) { } info, h, err := fs.swift.c.Object(fs.swift.container, fs.remote) if err != nil { - FsLog(fs, "Failed to read info: %s", err) + FsDebug(fs, "Failed to read info: %s", err) return err } meta := h.ObjectMetadata() @@ -263,12 +275,14 @@ func (fs *FsObjectSwift) ModTime() time.Time { func (fs *FsObjectSwift) SetModTime(modTime time.Time) { err := fs.readMetaData() if err != nil { + stats.Error() FsLog(fs, "Failed to read metadata: %s", err) return } fs.meta.SetModTime(modTime) err = fs.swift.c.ObjectUpdate(fs.swift.container, fs.remote, fs.meta.ObjectHeaders()) if err != nil { + stats.Error() FsLog(fs, "Failed to update remote mtime: %s", err) } } diff --git a/notes.txt b/notes.txt index 839416d17..7c3cce08c 100644 --- a/notes.txt +++ b/notes.txt @@ -3,14 +3,12 @@ Todo * Ignoring the pseudo directories * if object.PseudoDirectory { * fmt.Printf("%9s %19s %s\n", "Directory", "-", fs.Remote()) - * Check logging in various parts + * Make Account wrapper + * limit bandwidth for a pool of all individual connectinos + * do timeouts by setting a limit, seeing whether io has happened + and resetting it if it has + * make Account do progress meter * Make logging controllable with flags (mostly done) - * progress meter would be nice! Do this by wrapping the Reader with a progress bar - * Do bandwidth limit by wrapping the Reader too - * Maybe using https://jra-go.googlecode.com/hg/linkio/ which will work for multiple - uploads or downloads. - * code.google.com/p/mxk/go1/flowcontrol - only does one flow at once - * Or maybe put into swift library. * -timeout: Make all timeouts be settable with command line parameters * Check the locking in swift module! * Windows paths? Do we need to translate / and \? @@ -21,27 +19,14 @@ Ideas * optimise remote copy container to another container using remote copy if local is same as remote * Allow subpaths container:/sub/path - * stats * look at auth from env in s3 module - add to swift? -Make a wrapper in connection which - * measures bandwidth and reports it - * limits bandwidth using Reader and Writer - * for a pool of all individual connectinos - * does timeouts by setting a limit, seeing whether io has happened - and resetting it if it has - Need to make directory objects otherwise can't upload an empty directory * Or could upload empty directories only? * Can't purge a local filesystem because it leaves the directories behind s3 --- - -Can maybe set last modified? - -https://forums.aws.amazon.com/message.jspa?messageID=214062 - -Otherwise can set metadata - -Returns etag and last modified in bucket list + * Can maybe set last modified? + * https://forums.aws.amazon.com/message.jspa?messageID=214062 + * Otherwise can set metadata + * Returns etag and last modified in bucket list diff --git a/swiftsync.go b/swiftsync.go index 8e7cd9763..7fa800965 100644 --- a/swiftsync.go +++ b/swiftsync.go @@ -12,18 +12,20 @@ import ( "runtime/pprof" "strings" "sync" + "time" ) // Globals var ( // Flags - cpuprofile = flag.String("cpuprofile", "", "Write cpu profile to file") - snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented - verbose = flag.Bool("verbose", false, "Print lots more stuff") - quiet = flag.Bool("quiet", false, "Print as little stuff as possible") - dry_run = flag.Bool("dry-run", false, "Do a trial run with no permanent changes") - checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.") - transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.") + cpuprofile = flag.String("cpuprofile", "", "Write cpu profile to file") + snet = flag.Bool("snet", false, "Use internal service network") // FIXME not implemented + verbose = flag.Bool("verbose", false, "Print lots more stuff") + quiet = flag.Bool("quiet", false, "Print as little stuff as possible") + dry_run = flag.Bool("dry-run", false, "Do a trial run with no permanent changes") + checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.") + transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.") + statsInterval = flag.Duration("stats", time.Minute*1, "Interval to print stats") ) // Read FsObjects~s on in send to out if they need uploading @@ -31,8 +33,11 @@ var ( // FIXME potentially doing lots of MD5SUMS at once func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { defer wg.Done() + for src := range in { + stats.Checking(src) dst := fdst.NewFsObject(src.Remote()) + stats.DoneChecking(src) if dst == nil { FsDebug(src, "Couldn't find local file - download") out <- src @@ -56,7 +61,9 @@ func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { defer wg.Done() for src := range in { + stats.Transferring(src) fdst.Put(src) + stats.DoneTransferring(src) } } @@ -64,6 +71,7 @@ func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) { func Copy(fdst, fsrc Fs) { err := fdst.Mkdir() if err != nil { + stats.Error() log.Fatal("Failed to make destination") } @@ -100,8 +108,11 @@ func DeleteFiles(to_be_deleted FsObjectsChan) { if *dry_run { FsDebug(dst, "Not deleting as -dry-run") } else { + stats.Checking(dst) err := dst.Remove() + stats.DoneChecking(dst) if err != nil { + stats.Error() FsLog(dst, "Couldn't delete: %s", err) } else { FsDebug(dst, "Deleted") @@ -119,9 +130,12 @@ func DeleteFiles(to_be_deleted FsObjectsChan) { func Sync(fdst, fsrc Fs) { err := fdst.Mkdir() if err != nil { + stats.Error() log.Fatal("Failed to make destination") } + log.Printf("Building file list") + // Read the destination files first // FIXME could do this in parallel and make it use less memory delFiles := make(map[string]FsObject) @@ -174,6 +188,8 @@ func Sync(fdst, fsrc Fs) { // Checks the files in fsrc and fdst according to Size and MD5SUM func Check(fdst, fsrc Fs) { + log.Printf("Building file list") + // Read the destination files first // FIXME could do this in parallel and make it use less memory dstFiles := make(map[string]FsObject) @@ -220,11 +236,14 @@ func Check(fdst, fsrc Fs) { defer checkerWg.Done() for check := range checks { dst, src := check[0], check[1] + stats.Checking(src) if src.Size() != dst.Size() { + stats.DoneChecking(src) FsLog(src, "Sizes differ") continue } same, err := CheckMd5sums(src, dst) + stats.DoneChecking(src) if err != nil { continue } @@ -251,7 +270,9 @@ func List(f Fs) { go func() { defer wg.Done() for fs := range in { + stats.Checking(fs) modTime := fs.ModTime() + stats.DoneChecking(fs) fmt.Printf("%9d %19s %s\n", fs.Size(), modTime.Format("2006-01-02 15:04:05"), fs.Remote()) } }() @@ -272,6 +293,7 @@ func list(fdst, fsrc Fs) { func mkdir(fdst, fsrc Fs) { err := fdst.Mkdir() if err != nil { + stats.Error() log.Fatalf("Mkdir failed: %s", err) } } @@ -283,6 +305,7 @@ func rmdir(fdst, fsrc Fs) { } else { err := fdst.Rmdir() if err != nil { + stats.Error() log.Fatalf("Rmdir failed: %s", err) } } @@ -441,6 +464,7 @@ func main() { if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { + stats.Error() log.Fatal(err) } pprof.StartCPUProfile(f) @@ -464,12 +488,14 @@ func main() { break } else if strings.HasPrefix(command.name, cmd) { if found != nil { + stats.Error() log.Fatalf("Not unique - matches multiple commands %q", cmd) } found = command } } if found == nil { + stats.Error() log.Fatalf("Unknown command %q", cmd) } found.checkArgs(args) @@ -480,20 +506,36 @@ func main() { if len(args) >= 1 { fdst, err = NewFs(args[0]) if err != nil { + stats.Error() log.Fatal("Failed to create file system: ", err) } } if len(args) >= 2 { fsrc, err = NewFs(args[1]) if err != nil { + stats.Error() log.Fatal("Failed to create destination file system: ", err) } fsrc, fdst = fdst, fsrc } + // Print the stats every statsInterval + go func() { + ch := time.Tick(*statsInterval) + for { + <-ch + stats.Log() + } + }() + // Run the actual command if found.run != nil { found.run(fdst, fsrc) + fmt.Println(stats) + if stats.errors > 0 { + os.Exit(1) + } + os.Exit(0) } else { syntaxError() }