diff --git a/notes.txt b/notes.txt index 39fefe32e..e8b78b9e5 100644 --- a/notes.txt +++ b/notes.txt @@ -16,6 +16,10 @@ Need an iterate all objects routine... Could use a channel FIXME progress meter would be nice! Do this by wrapping the Reader with a progress bar Do bandwidth limit by wrapping the Reader too + Maybe using https://jra-go.googlecode.com/hg/linkio/ which will work for multiple + uploads or downloads. + * code.google.com/p/mxk/go1/flowcontrol - only does one flow at once +Or maybe put into swift library. Could have an integrity check mode where we check the MD5sums of the local vs the remote @@ -23,4 +27,15 @@ Some stats would be nice! Windows paths? Do we need to translate / and \? -Make swift timeouts be settable +Make swift timeouts be settable with command line parameters + +Add bandwidth limit? + +Make a wrapper in connection which + * measures bandwidth and reports it + * limits bandwidth using Reader and Writer + * for a pool of all individual connectinos + * does timeouts by setting a limit, seeing whether io has happened + and resetting it if it has + +Check the locking in swift module! diff --git a/swiftsync.go b/swiftsync.go index 41402cf74..9f29fd4a3 100644 --- a/swiftsync.go +++ b/swiftsync.go @@ -12,8 +12,10 @@ import ( "log" "os" "path/filepath" + "runtime" "runtime/pprof" "strings" + "sync" ) // Globals @@ -29,9 +31,11 @@ var ( verbose = flag.Bool("verbose", false, "Print lots more stuff") quiet = flag.Bool("quiet", false, "Print as little stuff as possible") // FIXME make these part of swift so we get a standard set of flags? - authUrl = flag.String("auth", os.Getenv("ST_AUTH"), "Auth URL for server. Defaults to environment var ST_AUTH.") - userName = flag.String("user", os.Getenv("ST_USER"), "User name. Defaults to environment var ST_USER.") - apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.") + authUrl = flag.String("auth", os.Getenv("ST_AUTH"), "Auth URL for server. Defaults to environment var ST_AUTH.") + userName = flag.String("user", os.Getenv("ST_USER"), "User name. Defaults to environment var ST_USER.") + apiKey = flag.String("key", os.Getenv("ST_KEY"), "API key (password). Defaults to environment var ST_KEY.") + checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.") + uploaders = flag.Int("uploaders", 4, "Number of uploaders to run in parallel.") ) type FsObject struct { @@ -40,6 +44,8 @@ type FsObject struct { info os.FileInfo } +type FsObjectsChan chan *FsObject + type FsObjects []FsObject // Write debuging output for this FsObject @@ -128,70 +134,73 @@ func (fs *FsObject) changed(c *swift.Connection, container string) bool { return false } -// Puts the FsObject into the container -func (fs *FsObject) put(c *swift.Connection, container string) { +// Is this object storable +func (fs *FsObject) storable(c *swift.Connection, container string) bool { mode := fs.info.Mode() if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 { fs.Debugf("Can't transfer non file/directory") + return false } else if mode&os.ModeDir != 0 { // Debug? fs.Debugf("FIXME Skipping directory") - } else { - // Check to see if changed or not - if !fs.changed(c, container) { - fs.Debugf("Unchanged skipping") - return - } - // FIXME content type - in, err := os.Open(fs.path) - if err != nil { - fs.Debugf("Failed to open: %s", err) - return - } - defer in.Close() - m := swift.Metadata{} - m.SetModTime(fs.info.ModTime()) - _, err = c.ObjectPut(container, fs.rel, in, true, "", "", m.ObjectHeaders()) - if err != nil { - fs.Debugf("Failed to upload: %s", err) - return - } - fs.Debugf("Uploaded") + return false } - + return true } -// Walk the path +// Puts the FsObject into the container +func (fs *FsObject) put(c *swift.Connection, container string) { + // FIXME content type + in, err := os.Open(fs.path) + if err != nil { + fs.Debugf("Failed to open: %s", err) + return + } + defer in.Close() + m := swift.Metadata{} + m.SetModTime(fs.info.ModTime()) + _, err = c.ObjectPut(container, fs.rel, in, true, "", "", m.ObjectHeaders()) + if err != nil { + fs.Debugf("Failed to upload: %s", err) + return + } + fs.Debugf("Uploaded") +} + +// Walk the path returning a channel of FsObjects // // FIXME ignore symlinks? // FIXME what about hardlinks / etc -func walk(root string) FsObjects { - files := make(FsObjects, 0, 1024) - err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error { +func walk(root string) FsObjectsChan { + out := make(FsObjectsChan, *checkers) + go func() { + err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error { + if err != nil { + log.Printf("Failed to open directory: %s: %s", path, err) + } else { + info, err := os.Lstat(path) + if err != nil { + log.Printf("Failed to stat %s: %s", path, err) + return nil + } + rel, err := filepath.Rel(root, path) + if err != nil { + log.Printf("Failed to get relative path %s: %s", path, err) + return nil + } + if rel == "." { + rel = "" + } + out <- &FsObject{rel: rel, path: path, info: info} + } + return nil + }) if err != nil { - log.Printf("Failed to open directory: %s: %s", path, err) - } else { - info, err := os.Lstat(path) - if err != nil { - log.Printf("Failed to stat %s: %s", path, err) - return nil - } - rel, err := filepath.Rel(root, path) - if err != nil { - log.Printf("Failed to get relative path %s: %s", path, err) - return nil - } - if rel == "." { - rel = "" - } - files = append(files, FsObject{rel: rel, path: path, info: info}) + log.Printf("Failed to open directory: %s: %s", root, err) } - return nil - }) - if err != nil { - log.Printf("Failed to open directory: %s: %s", root, err) - } - return files + close(out) + }() + return out } // syntaxError prints the syntax @@ -221,14 +230,57 @@ func checkArgs(args []string, n int, message string) { } } -// uploads a file into a container -func upload(c *swift.Connection, root, container string) { - files := walk(root) - for _, fs := range files { +// Read FsObjects on in and write them to out if they need uploading +// +// FIXME potentially doing lots of MD5SUMS at once +func checker(c *swift.Connection, container string, in, out FsObjectsChan, wg *sync.WaitGroup) { + defer wg.Done() + for fs := range in { + // Check to see if can store this + if !fs.storable(c, container) { + continue + } + // Check to see if changed or not + if !fs.changed(c, container) { + fs.Debugf("Unchanged skipping") + continue + } + out <- fs + } +} + +// Read FsObjects on in and upload them +func uploader(c *swift.Connection, container string, in FsObjectsChan, wg *sync.WaitGroup) { + defer wg.Done() + for fs := range in { fs.put(c, container) } } +// Syncs a directory into a container +func upload(c *swift.Connection, root, container string) { + to_be_checked := walk(root) + to_be_uploaded := make(FsObjectsChan, *uploaders) + + var checkerWg sync.WaitGroup + checkerWg.Add(*checkers) + for i := 0; i < *checkers; i++ { + go checker(c, container, to_be_checked, to_be_uploaded, &checkerWg) + } + + var uploaderWg sync.WaitGroup + uploaderWg.Add(*uploaders) + for i := 0; i < *uploaders; i++ { + go uploader(c, container, to_be_uploaded, &uploaderWg) + } + + log.Printf("Waiting for checks to finish") + checkerWg.Wait() + close(to_be_uploaded) + log.Printf("Waiting for uploads to finish") + uploaderWg.Wait() +} + // Lists the containers func listContainers(c *swift.Connection) { containers, err := c.ContainersAll(nil) @@ -276,7 +328,7 @@ func main() { flag.Usage = syntaxError flag.Parse() args := flag.Args() - //runtime.GOMAXPROCS(3) + runtime.GOMAXPROCS(runtime.NumCPU()) // Setup profiling if desired if *cpuprofile != "" {