rclone/rclone.go

620 lines
13 KiB
Go
Raw Normal View History

2013-06-27 21:00:01 +02:00
// Sync files and directories to and from local and remote object stores
2013-06-27 20:51:03 +02:00
//
// Nick Craig-Wood <nick@craig-wood.com>
package main
import (
2013-06-27 21:13:07 +02:00
"./fs"
"flag"
"fmt"
"log"
"os"
"runtime"
"runtime/pprof"
"strings"
"sync"
"time"
2013-06-27 21:13:07 +02:00
// Active file systems
_ "./drive"
_ "./local"
_ "./s3"
_ "./swift"
)
// Globals
var (
// Flags
cpuprofile = flag.String("cpuprofile", "", "Write cpu profile to file")
verbose = flag.Bool("verbose", false, "Print lots more stuff")
quiet = flag.Bool("quiet", false, "Print as little stuff as possible")
dry_run = flag.Bool("dry-run", false, "Do a trial run with no permanent changes")
checkers = flag.Int("checkers", 8, "Number of checkers to run in parallel.")
transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.")
statsInterval = flag.Duration("stats", time.Minute*1, "Interval to print stats")
modifyWindow = flag.Duration("modify-window", time.Nanosecond, "Max time diff to be considered the same")
)
2013-06-28 09:57:32 +02:00
// A pair of fs.Objects
type PairFsObjects struct {
2013-06-28 09:57:32 +02:00
src, dst fs.Object
}
type PairFsObjectsChan chan PairFsObjects
// Check to see if src needs to be copied to dst and if so puts it in out
2013-06-28 09:57:32 +02:00
func checkOne(src, dst fs.Object, out fs.ObjectsChan) {
if dst == nil {
2013-06-28 09:57:32 +02:00
fs.Debug(src, "Couldn't find local file - download")
out <- src
return
}
// Check to see if can store this
if !src.Storable() {
return
}
// Check to see if changed or not
2013-06-27 21:13:07 +02:00
if fs.Equal(src, dst) {
2013-06-28 09:57:32 +02:00
fs.Debug(src, "Unchanged skipping")
return
}
out <- src
}
// Read FsObjects~s on in send to out if they need uploading
//
// FIXME potentially doing lots of MD5SUMS at once
2013-06-28 09:57:32 +02:00
func PairChecker(in PairFsObjectsChan, out fs.ObjectsChan, wg *sync.WaitGroup) {
defer wg.Done()
for pair := range in {
src := pair.src
2013-06-27 21:13:07 +02:00
fs.Stats.Checking(src)
checkOne(src, pair.dst, out)
2013-06-27 21:13:07 +02:00
fs.Stats.DoneChecking(src)
}
}
// Read FsObjects~s on in send to out if they need uploading
//
// FIXME potentially doing lots of MD5SUMS at once
2013-06-28 09:57:32 +02:00
func Checker(in, out fs.ObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) {
defer wg.Done()
for src := range in {
2013-06-27 21:13:07 +02:00
fs.Stats.Checking(src)
dst := fdst.NewFsObject(src.Remote())
checkOne(src, dst, out)
2013-06-27 21:13:07 +02:00
fs.Stats.DoneChecking(src)
}
}
// Read FsObjects on in and copy them
2013-06-28 09:57:32 +02:00
func Copier(in fs.ObjectsChan, fdst fs.Fs, wg *sync.WaitGroup) {
defer wg.Done()
for src := range in {
2013-06-27 21:13:07 +02:00
fs.Stats.Transferring(src)
fs.Copy(fdst, src)
fs.Stats.DoneTransferring(src)
}
}
// Copies fsrc into fdst
2013-06-27 21:13:07 +02:00
func CopyFs(fdst, fsrc fs.Fs) {
err := fdst.Mkdir()
if err != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
log.Fatal("Failed to make destination")
}
to_be_checked := fsrc.List()
2013-06-28 09:57:32 +02:00
to_be_uploaded := make(fs.ObjectsChan, *transfers)
var checkerWg sync.WaitGroup
checkerWg.Add(*checkers)
for i := 0; i < *checkers; i++ {
go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg)
}
var copierWg sync.WaitGroup
copierWg.Add(*transfers)
2012-12-23 10:32:33 +01:00
for i := 0; i < *transfers; i++ {
go Copier(to_be_uploaded, fdst, &copierWg)
}
log.Printf("Waiting for checks to finish")
checkerWg.Wait()
close(to_be_uploaded)
log.Printf("Waiting for transfers to finish")
copierWg.Wait()
}
// Delete all the files passed in the channel
2013-06-28 09:57:32 +02:00
func DeleteFiles(to_be_deleted fs.ObjectsChan) {
var wg sync.WaitGroup
wg.Add(*transfers)
for i := 0; i < *transfers; i++ {
go func() {
defer wg.Done()
for dst := range to_be_deleted {
if *dry_run {
2013-06-28 09:57:32 +02:00
fs.Debug(dst, "Not deleting as -dry-run")
} else {
2013-06-27 21:13:07 +02:00
fs.Stats.Checking(dst)
err := dst.Remove()
2013-06-27 21:13:07 +02:00
fs.Stats.DoneChecking(dst)
if err != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
2013-06-28 09:57:32 +02:00
fs.Log(dst, "Couldn't delete: %s", err)
} else {
2013-06-28 09:57:32 +02:00
fs.Debug(dst, "Deleted")
}
}
}
}()
}
log.Printf("Waiting for deletions to finish")
wg.Wait()
}
// Syncs fsrc into fdst
2013-06-27 21:13:07 +02:00
func Sync(fdst, fsrc fs.Fs) {
err := fdst.Mkdir()
if err != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
log.Fatal("Failed to make destination")
}
log.Printf("Building file list")
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
2013-06-28 09:57:32 +02:00
delFiles := make(map[string]fs.Object)
2012-12-31 18:31:19 +01:00
for dst := range fdst.List() {
delFiles[dst.Remote()] = dst
}
// Read source files checking them off against dest files
to_be_checked := make(PairFsObjectsChan, *transfers)
2013-06-28 09:57:32 +02:00
to_be_uploaded := make(fs.ObjectsChan, *transfers)
var checkerWg sync.WaitGroup
checkerWg.Add(*checkers)
for i := 0; i < *checkers; i++ {
go PairChecker(to_be_checked, to_be_uploaded, &checkerWg)
}
var copierWg sync.WaitGroup
copierWg.Add(*transfers)
for i := 0; i < *transfers; i++ {
go Copier(to_be_uploaded, fdst, &copierWg)
}
go func() {
for src := range fsrc.List() {
remote := src.Remote()
dst, found := delFiles[remote]
if found {
delete(delFiles, remote)
to_be_checked <- PairFsObjects{src, dst}
} else {
// No need to check doesn't exist
to_be_uploaded <- src
}
}
close(to_be_checked)
}()
log.Printf("Waiting for checks to finish")
checkerWg.Wait()
close(to_be_uploaded)
log.Printf("Waiting for transfers to finish")
copierWg.Wait()
2013-06-27 21:13:07 +02:00
if fs.Stats.Errored() {
log.Printf("Not deleting files as there were IO errors")
return
}
// Delete the spare files
2013-06-28 09:57:32 +02:00
toDelete := make(fs.ObjectsChan, *transfers)
go func() {
for _, fs := range delFiles {
toDelete <- fs
}
close(toDelete)
}()
DeleteFiles(toDelete)
}
2012-12-31 18:31:19 +01:00
// Checks the files in fsrc and fdst according to Size and MD5SUM
2013-06-27 21:13:07 +02:00
func Check(fdst, fsrc fs.Fs) {
log.Printf("Building file list")
2012-12-31 18:31:19 +01:00
// Read the destination files first
// FIXME could do this in parallel and make it use less memory
2013-06-28 09:57:32 +02:00
dstFiles := make(map[string]fs.Object)
2012-12-31 18:31:19 +01:00
for dst := range fdst.List() {
dstFiles[dst.Remote()] = dst
}
// Read the source files checking them against dstFiles
// FIXME could do this in parallel and make it use less memory
2013-06-28 09:57:32 +02:00
srcFiles := make(map[string]fs.Object)
commonFiles := make(map[string][]fs.Object)
2012-12-31 18:31:19 +01:00
for src := range fsrc.List() {
remote := src.Remote()
if dst, ok := dstFiles[remote]; ok {
2013-06-28 09:57:32 +02:00
commonFiles[remote] = []fs.Object{dst, src}
2012-12-31 18:31:19 +01:00
delete(dstFiles, remote)
} else {
srcFiles[remote] = src
}
}
log.Printf("Files in %s but not in %s", fdst, fsrc)
for remote := range dstFiles {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
2012-12-31 18:31:19 +01:00
log.Printf(remote)
}
log.Printf("Files in %s but not in %s", fsrc, fdst)
for remote := range srcFiles {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
2012-12-31 18:31:19 +01:00
log.Printf(remote)
}
2013-06-28 09:57:32 +02:00
checks := make(chan []fs.Object, *transfers)
2012-12-31 18:31:19 +01:00
go func() {
for _, check := range commonFiles {
checks <- check
}
close(checks)
}()
var checkerWg sync.WaitGroup
checkerWg.Add(*checkers)
for i := 0; i < *checkers; i++ {
go func() {
defer checkerWg.Done()
for check := range checks {
dst, src := check[0], check[1]
2013-06-27 21:13:07 +02:00
fs.Stats.Checking(src)
2012-12-31 18:31:19 +01:00
if src.Size() != dst.Size() {
2013-06-27 21:13:07 +02:00
fs.Stats.DoneChecking(src)
fs.Stats.Error()
2013-06-28 09:57:32 +02:00
fs.Log(src, "Sizes differ")
2012-12-31 18:31:19 +01:00
continue
}
2013-06-27 21:13:07 +02:00
same, err := fs.CheckMd5sums(src, dst)
fs.Stats.DoneChecking(src)
2012-12-31 18:31:19 +01:00
if err != nil {
continue
}
if !same {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
2013-06-28 09:57:32 +02:00
fs.Log(src, "Md5sums differ")
2012-12-31 18:31:19 +01:00
}
2013-06-28 09:57:32 +02:00
fs.Debug(src, "OK")
2012-12-31 18:31:19 +01:00
}
}()
}
log.Printf("Waiting for checks to finish")
checkerWg.Wait()
2013-06-27 21:13:07 +02:00
log.Printf("%d differences found", fs.Stats.Errors)
2012-12-31 18:31:19 +01:00
}
// List the Fs to stdout
//
// Lists in parallel which may get them out of order
2013-06-27 21:13:07 +02:00
func List(f, _ fs.Fs) {
in := f.List()
var wg sync.WaitGroup
wg.Add(*checkers)
for i := 0; i < *checkers; i++ {
go func() {
defer wg.Done()
2013-06-27 21:13:07 +02:00
for o := range in {
fs.Stats.Checking(o)
modTime := o.ModTime()
fs.Stats.DoneChecking(o)
fmt.Printf("%9d %19s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.00000000"), o.Remote())
}
}()
}
wg.Wait()
}
2013-01-23 23:43:20 +01:00
// List the directories/buckets/containers in the Fs to stdout
2013-06-27 21:13:07 +02:00
func ListDir(f, _ fs.Fs) {
2013-01-23 23:43:20 +01:00
for dir := range f.ListDir() {
fmt.Printf("%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
2012-12-04 00:58:17 +01:00
}
}
// Makes a destination directory or container
2013-06-27 21:13:07 +02:00
func mkdir(fdst, fsrc fs.Fs) {
err := fdst.Mkdir()
if err != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
log.Fatalf("Mkdir failed: %s", err)
}
}
// Removes a container but not if not empty
2013-06-27 21:13:07 +02:00
func rmdir(fdst, fsrc fs.Fs) {
if *dry_run {
log.Printf("Not deleting %s as -dry-run", fdst)
} else {
err := fdst.Rmdir()
if err != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
log.Fatalf("Rmdir failed: %s", err)
}
}
}
// Removes a container and all of its contents
//
// FIXME doesn't delete local directories
2013-06-27 21:13:07 +02:00
func purge(fdst, fsrc fs.Fs) {
if f, ok := fdst.(fs.Purger); ok {
err := f.Purge()
if err != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
log.Fatalf("Purge failed: %s", err)
}
} else {
DeleteFiles(fdst.List())
log.Printf("Deleting path")
rmdir(fdst, fsrc)
}
}
2012-12-04 00:58:17 +01:00
type Command struct {
name string
help string
2013-06-27 21:13:07 +02:00
run func(fdst, fsrc fs.Fs)
2012-12-04 00:58:17 +01:00
minArgs, maxArgs int
}
// checkArgs checks there are enough arguments and prints a message if not
func (cmd *Command) checkArgs(args []string) {
if len(args) < cmd.minArgs {
syntaxError()
fmt.Fprintf(os.Stderr, "Command %s needs %d arguments mininum\n", cmd.name, cmd.minArgs)
os.Exit(1)
} else if len(args) > cmd.maxArgs {
syntaxError()
fmt.Fprintf(os.Stderr, "Command %s needs %d arguments maximum\n", cmd.name, cmd.maxArgs)
os.Exit(1)
}
}
var Commands = []Command{
{
"copy",
`<source> <destination>
Copy the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by
MD5SUM. Doesn't delete files from the destination.
2012-12-04 00:58:17 +01:00
`,
CopyFs,
2, 2,
},
{
"sync",
`<source> <destination>
Sync the source to the destination. Doesn't transfer
unchanged files, testing first by modification time then by
MD5SUM. Deletes any files that exist in source that don't
exist in destination. Since this can cause data loss, test
2012-12-31 18:31:19 +01:00
first with the -dry-run flag.`,
Sync,
2012-12-04 00:58:17 +01:00
2, 2,
},
{
"ls",
`[<path>]
2013-01-23 23:43:20 +01:00
List all the objects in the the path.`,
List,
1, 1,
},
{
"lsd",
`[<path>]
List all directoryes/objects/buckets in the the path.`,
2013-01-23 23:43:20 +01:00
ListDir,
1, 1,
2012-12-04 00:58:17 +01:00
},
{
"mkdir",
`<path>
2012-12-31 18:31:19 +01:00
Make the path if it doesn't already exist`,
2012-12-04 00:58:17 +01:00
mkdir,
1, 1,
},
{
"rmdir",
`<path>
Remove the path. Note that you can't remove a path with
2012-12-31 18:31:19 +01:00
objects in it, use purge for that.`,
2012-12-04 00:58:17 +01:00
rmdir,
1, 1,
},
{
"purge",
`<path>
2012-12-31 18:31:19 +01:00
Remove the path and all of its contents.`,
purge,
1, 1,
},
2012-12-31 18:31:19 +01:00
{
"check",
`<source> <destination>
Checks the files in the source and destination match. It
compares sizes and MD5SUMs and prints a report of files which
don't match. It doesn't alter the source or destination.`,
Check,
2, 2,
},
{
"help",
`
This help.`,
nil,
0, 0,
},
2012-12-04 00:58:17 +01:00
}
// syntaxError prints the syntax
func syntaxError() {
2013-06-27 21:00:01 +02:00
fmt.Fprintf(os.Stderr, `Sync files and directories to and from local and remote object stores
2012-12-04 00:58:17 +01:00
Syntax: [options] subcommand <parameters> <parameters...>
Subcommands:
2012-12-04 00:58:17 +01:00
`)
for i := range Commands {
cmd := &Commands[i]
2012-12-31 18:31:19 +01:00
fmt.Fprintf(os.Stderr, " %s: %s\n\n", cmd.name, cmd.help)
2012-12-04 00:58:17 +01:00
}
fmt.Fprintf(os.Stderr, "Options:\n")
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, `
It is only necessary to use a unique prefix of the subcommand, eg 'up' for 'upload'.
`)
}
// Exit with the message
func fatal(message string, args ...interface{}) {
syntaxError()
fmt.Fprintf(os.Stderr, message, args...)
os.Exit(1)
}
func main() {
flag.Usage = syntaxError
flag.Parse()
args := flag.Args()
runtime.GOMAXPROCS(runtime.NumCPU())
// Setup profiling if desired
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
if len(args) < 1 {
fatal("No command supplied\n")
}
2012-12-04 00:58:17 +01:00
cmd := strings.ToLower(args[0])
args = args[1:]
2012-12-04 00:58:17 +01:00
// Find the command doing a prefix match
var found *Command
for i := range Commands {
command := &Commands[i]
// exact command name found - use that
if command.name == cmd {
found = command
break
} else if strings.HasPrefix(command.name, cmd) {
if found != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
2012-12-04 00:58:17 +01:00
log.Fatalf("Not unique - matches multiple commands %q", cmd)
}
found = command
}
}
2012-12-04 00:58:17 +01:00
if found == nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
2012-12-04 00:58:17 +01:00
log.Fatalf("Unknown command %q", cmd)
}
found.checkArgs(args)
// Make source and destination fs
2013-06-27 21:13:07 +02:00
var fdst, fsrc fs.Fs
var err error
if len(args) >= 1 {
2013-06-27 21:13:07 +02:00
fdst, err = fs.NewFs(args[0])
if err != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
log.Fatal("Failed to create file system: ", err)
}
}
if len(args) >= 2 {
2013-06-27 21:13:07 +02:00
fsrc, err = fs.NewFs(args[1])
if err != nil {
2013-06-27 21:13:07 +02:00
fs.Stats.Error()
log.Fatal("Failed to create destination file system: ", err)
}
fsrc, fdst = fdst, fsrc
}
// Work out modify window
if fsrc != nil {
precision := fsrc.Precision()
log.Printf("Source precision %s\n", precision)
2013-06-27 21:13:07 +02:00
if precision > fs.Config.ModifyWindow {
fs.Config.ModifyWindow = precision
}
}
if fdst != nil {
precision := fdst.Precision()
log.Printf("Destination precision %s\n", precision)
2013-06-27 21:13:07 +02:00
if precision > fs.Config.ModifyWindow {
fs.Config.ModifyWindow = precision
}
}
2013-06-27 21:13:07 +02:00
log.Printf("Modify window is %s\n", fs.Config.ModifyWindow)
// Print the stats every statsInterval
go func() {
ch := time.Tick(*statsInterval)
for {
<-ch
2013-06-27 21:13:07 +02:00
fs.Stats.Log()
}
}()
// Run the actual command
2012-12-31 18:31:19 +01:00
if found.run != nil {
found.run(fdst, fsrc)
2013-06-27 21:13:07 +02:00
fmt.Println(fs.Stats)
log.Printf("*** Go routines at exit %d\n", runtime.NumGoroutine())
2013-06-27 21:13:07 +02:00
if fs.Stats.Errored() {
os.Exit(1)
}
os.Exit(0)
2012-12-31 18:31:19 +01:00
} else {
syntaxError()
}
}