diff --git a/fs/operations/check.go b/fs/operations/check.go new file mode 100644 index 000000000..eded6f825 --- /dev/null +++ b/fs/operations/check.go @@ -0,0 +1,350 @@ +package operations + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + "sync/atomic" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/march" + "github.com/rclone/rclone/lib/readers" +) + +// checkFn is the type of the checking function used in CheckFn() +// +// It should check the two objects (a, b) and return if they differ +// and whether the hash was used. +// +// If there are differences then this should Errorf the difference and +// the reason but return with err = nil. It should not CountError in +// this case. +type checkFn func(ctx context.Context, a, b fs.Object) (differ bool, noHash bool, err error) + +// CheckOpt contains options for the Check functions +type CheckOpt struct { + Fdst, Fsrc fs.Fs // fses to check + Check checkFn // function to use for checking + OneWay bool // one way only? + Combined io.Writer // a file with file names with leading sigils + MissingOnSrc io.Writer // files only in the destination + MissingOnDst io.Writer // files only in the source + Match io.Writer // matching files + Differ io.Writer // differing files + Error io.Writer // files with errors of some kind +} + +// checkMarch is used to march over two Fses in the same way as +// sync/copy +type checkMarch struct { + ioMu sync.Mutex + wg sync.WaitGroup + tokens chan struct{} + differences int32 + noHashes int32 + srcFilesMissing int32 + dstFilesMissing int32 + matches int32 + opt CheckOpt +} + +// report outputs the fileName to out if required and to the combined log +func (c *checkMarch) report(o fs.DirEntry, out io.Writer, sigil rune) { + if out != nil { + c.ioMu.Lock() + _, _ = fmt.Fprintf(out, "%v\n", o) + c.ioMu.Unlock() + } + if c.opt.Combined != nil { + c.ioMu.Lock() + _, _ = fmt.Fprintf(c.opt.Combined, "%c %v\n", sigil, o) + c.ioMu.Unlock() + } +} + +// DstOnly have an object which is in the destination only +func (c *checkMarch) DstOnly(dst fs.DirEntry) (recurse bool) { + switch dst.(type) { + case fs.Object: + if c.opt.OneWay { + return false + } + err := errors.Errorf("File not in %v", c.opt.Fsrc) + fs.Errorf(dst, "%v", err) + _ = fs.CountError(err) + atomic.AddInt32(&c.differences, 1) + atomic.AddInt32(&c.srcFilesMissing, 1) + c.report(dst, c.opt.MissingOnSrc, '-') + case fs.Directory: + // Do the same thing to the entire contents of the directory + if c.opt.OneWay { + return false + } + return true + default: + panic("Bad object in DirEntries") + } + return false +} + +// SrcOnly have an object which is in the source only +func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) { + switch src.(type) { + case fs.Object: + err := errors.Errorf("File not in %v", c.opt.Fdst) + fs.Errorf(src, "%v", err) + _ = fs.CountError(err) + atomic.AddInt32(&c.differences, 1) + atomic.AddInt32(&c.dstFilesMissing, 1) + c.report(src, c.opt.MissingOnDst, '+') + case fs.Directory: + // Do the same thing to the entire contents of the directory + return true + default: + panic("Bad object in DirEntries") + } + return false +} + +// check to see if two objects are identical using the check function +func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool, err error) { + tr := accounting.Stats(ctx).NewCheckingTransfer(src) + defer func() { + tr.Done(err) + }() + if sizeDiffers(src, dst) { + err = errors.Errorf("Sizes differ") + fs.Errorf(src, "%v", err) + return true, false, nil + } + if fs.Config.SizeOnly { + return false, false, nil + } + return c.opt.Check(ctx, dst, src) +} + +// Match is called when src and dst are present, so sync src to dst +func (c *checkMarch) Match(ctx context.Context, dst, src fs.DirEntry) (recurse bool) { + switch srcX := src.(type) { + case fs.Object: + dstX, ok := dst.(fs.Object) + if ok { + if SkipDestructive(ctx, src, "check") { + return false + } + c.wg.Add(1) + c.tokens <- struct{}{} // put a token to limit concurrency + go func() { + defer func() { + <-c.tokens // get the token back to free up a slot + c.wg.Done() + }() + differ, noHash, err := c.checkIdentical(ctx, dstX, srcX) + if err != nil { + fs.Errorf(src, "%v", err) + _ = fs.CountError(err) + c.report(src, c.opt.Error, '!') + } else if differ { + atomic.AddInt32(&c.differences, 1) + err := errors.New("files differ") + // the checkFn has already logged the reason + _ = fs.CountError(err) + c.report(src, c.opt.Differ, '*') + } else { + atomic.AddInt32(&c.matches, 1) + c.report(src, c.opt.Match, '=') + if noHash { + atomic.AddInt32(&c.noHashes, 1) + fs.Debugf(dstX, "OK - could not check hash") + } else { + fs.Debugf(dstX, "OK") + } + } + }() + } else { + err := errors.Errorf("is file on %v but directory on %v", c.opt.Fsrc, c.opt.Fdst) + fs.Errorf(src, "%v", err) + _ = fs.CountError(err) + atomic.AddInt32(&c.differences, 1) + atomic.AddInt32(&c.dstFilesMissing, 1) + c.report(src, c.opt.MissingOnDst, '+') + } + case fs.Directory: + // Do the same thing to the entire contents of the directory + _, ok := dst.(fs.Directory) + if ok { + return true + } + err := errors.Errorf("is file on %v but directory on %v", c.opt.Fdst, c.opt.Fsrc) + fs.Errorf(dst, "%v", err) + _ = fs.CountError(err) + atomic.AddInt32(&c.differences, 1) + atomic.AddInt32(&c.srcFilesMissing, 1) + c.report(dst, c.opt.MissingOnSrc, '-') + + default: + panic("Bad object in DirEntries") + } + return false +} + +// CheckFn checks the files in fsrc and fdst according to Size and +// hash using checkFunction on each file to check the hashes. +// +// checkFunction sees if dst and src are identical +// +// it returns true if differences were found +// it also returns whether it couldn't be hashed +func CheckFn(ctx context.Context, opt *CheckOpt) error { + if opt.Check == nil { + return errors.New("internal error: nil check function") + } + c := &checkMarch{ + tokens: make(chan struct{}, fs.Config.Checkers), + opt: *opt, + } + + // set up a march over fdst and fsrc + m := &march.March{ + Ctx: ctx, + Fdst: c.opt.Fdst, + Fsrc: c.opt.Fsrc, + Dir: "", + Callback: c, + } + fs.Debugf(c.opt.Fdst, "Waiting for checks to finish") + err := m.Run() + c.wg.Wait() // wait for background go-routines + + if c.dstFilesMissing > 0 { + fs.Logf(c.opt.Fdst, "%d files missing", c.dstFilesMissing) + } + if c.srcFilesMissing > 0 { + fs.Logf(c.opt.Fsrc, "%d files missing", c.srcFilesMissing) + } + + fs.Logf(c.opt.Fdst, "%d differences found", accounting.Stats(ctx).GetErrors()) + if errs := accounting.Stats(ctx).GetErrors(); errs > 0 { + fs.Logf(c.opt.Fdst, "%d errors while checking", errs) + } + if c.noHashes > 0 { + fs.Logf(c.opt.Fdst, "%d hashes could not be checked", c.noHashes) + } + if c.matches > 0 { + fs.Logf(c.opt.Fdst, "%d matching files", c.matches) + } + if c.differences > 0 { + return errors.Errorf("%d differences found", c.differences) + } + return err +} + +// Check the files in fsrc and fdst according to Size and hash +func Check(ctx context.Context, opt *CheckOpt) error { + optCopy := *opt + optCopy.Check = func(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool, err error) { + same, ht, err := CheckHashes(ctx, src, dst) + if err != nil { + return true, false, err + } + if ht == hash.None { + return false, true, nil + } + if !same { + err = errors.Errorf("%v differ", ht) + fs.Errorf(src, "%v", err) + return true, false, nil + } + return false, false, nil + } + + return CheckFn(ctx, &optCopy) +} + +// CheckEqualReaders checks to see if in1 and in2 have the same +// content when read. +// +// it returns true if differences were found +func CheckEqualReaders(in1, in2 io.Reader) (differ bool, err error) { + const bufSize = 64 * 1024 + buf1 := make([]byte, bufSize) + buf2 := make([]byte, bufSize) + for { + n1, err1 := readers.ReadFill(in1, buf1) + n2, err2 := readers.ReadFill(in2, buf2) + // check errors + if err1 != nil && err1 != io.EOF { + return true, err1 + } else if err2 != nil && err2 != io.EOF { + return true, err2 + } + // err1 && err2 are nil or io.EOF here + // process the data + if n1 != n2 || !bytes.Equal(buf1[:n1], buf2[:n2]) { + return true, nil + } + // if both streams finished the we have finished + if err1 == io.EOF && err2 == io.EOF { + break + } + } + return false, nil +} + +// CheckIdenticalDownload checks to see if dst and src are identical +// by reading all their bytes if necessary. +// +// it returns true if differences were found +func CheckIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) { + err = Retry(src, fs.Config.LowLevelRetries, func() error { + differ, err = checkIdenticalDownload(ctx, dst, src) + return err + }) + return differ, err +} + +// Does the work for CheckIdenticalDownload +func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) { + in1, err := dst.Open(ctx) + if err != nil { + return true, errors.Wrapf(err, "failed to open %q", dst) + } + tr1 := accounting.Stats(ctx).NewTransfer(dst) + defer func() { + tr1.Done(nil) // error handling is done by the caller + }() + in1 = tr1.Account(in1).WithBuffer() // account and buffer the transfer + + in2, err := src.Open(ctx) + if err != nil { + return true, errors.Wrapf(err, "failed to open %q", src) + } + tr2 := accounting.Stats(ctx).NewTransfer(dst) + defer func() { + tr2.Done(nil) // error handling is done by the caller + }() + in2 = tr2.Account(in2).WithBuffer() // account and buffer the transfer + + // To assign err variable before defer. + differ, err = CheckEqualReaders(in1, in2) + return +} + +// CheckDownload checks the files in fsrc and fdst according to Size +// and the actual contents of the files. +func CheckDownload(ctx context.Context, opt *CheckOpt) error { + optCopy := *opt + optCopy.Check = func(ctx context.Context, a, b fs.Object) (differ bool, noHash bool, err error) { + differ, err = CheckIdenticalDownload(ctx, a, b) + if err != nil { + return true, true, errors.Wrap(err, "failed to download") + } + return differ, false, nil + } + return CheckFn(ctx, &optCopy) +} diff --git a/fs/operations/check_test.go b/fs/operations/check_test.go new file mode 100644 index 000000000..ed9dc8dc8 --- /dev/null +++ b/fs/operations/check_test.go @@ -0,0 +1,272 @@ +package operations_test + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "os" + "sort" + "strings" + "testing" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/lib/readers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testCheck(t *testing.T, checkFunction func(ctx context.Context, opt *operations.CheckOpt) error) { + r := fstest.NewRun(t) + defer r.Finalise() + + addBuffers := func(opt *operations.CheckOpt) { + opt.Combined = new(bytes.Buffer) + opt.MissingOnSrc = new(bytes.Buffer) + opt.MissingOnDst = new(bytes.Buffer) + opt.Match = new(bytes.Buffer) + opt.Differ = new(bytes.Buffer) + opt.Error = new(bytes.Buffer) + } + + sortLines := func(in string) []string { + if in == "" { + return []string{} + } + lines := strings.Split(in, "\n") + sort.Strings(lines) + return lines + } + + checkBuffer := func(name string, want map[string]string, out io.Writer) { + expected := want[name] + buf, ok := out.(*bytes.Buffer) + require.True(t, ok) + assert.Equal(t, sortLines(expected), sortLines(buf.String()), name) + } + + checkBuffers := func(opt *operations.CheckOpt, want map[string]string) { + checkBuffer("combined", want, opt.Combined) + checkBuffer("missingonsrc", want, opt.MissingOnSrc) + checkBuffer("missingondst", want, opt.MissingOnDst) + checkBuffer("match", want, opt.Match) + checkBuffer("differ", want, opt.Differ) + checkBuffer("error", want, opt.Error) + } + + check := func(i int, wantErrors int64, wantChecks int64, oneway bool, wantOutput map[string]string) { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + accounting.GlobalStats().ResetCounters() + var buf bytes.Buffer + log.SetOutput(&buf) + defer func() { + log.SetOutput(os.Stderr) + }() + opt := operations.CheckOpt{ + Fdst: r.Fremote, + Fsrc: r.Flocal, + OneWay: oneway, + } + addBuffers(&opt) + err := checkFunction(context.Background(), &opt) + gotErrors := accounting.GlobalStats().GetErrors() + gotChecks := accounting.GlobalStats().GetChecks() + if wantErrors == 0 && err != nil { + t.Errorf("%d: Got error when not expecting one: %v", i, err) + } + if wantErrors != 0 && err == nil { + t.Errorf("%d: No error when expecting one", i) + } + if wantErrors != gotErrors { + t.Errorf("%d: Expecting %d errors but got %d", i, wantErrors, gotErrors) + } + if gotChecks > 0 && !strings.Contains(buf.String(), "matching files") { + t.Errorf("%d: Total files matching line missing", i) + } + if wantChecks != gotChecks { + t.Errorf("%d: Expecting %d total matching files but got %d", i, wantChecks, gotChecks) + } + checkBuffers(&opt, wantOutput) + }) + } + + file1 := r.WriteBoth(context.Background(), "rutabaga", "is tasty", t3) + fstest.CheckItems(t, r.Fremote, file1) + fstest.CheckItems(t, r.Flocal, file1) + check(1, 0, 1, false, map[string]string{ + "combined": "= rutabaga\n", + "missingonsrc": "", + "missingondst": "", + "match": "rutabaga\n", + "differ": "", + "error": "", + }) + + file2 := r.WriteFile("potato2", "------------------------------------------------------------", t1) + fstest.CheckItems(t, r.Flocal, file1, file2) + check(2, 1, 1, false, map[string]string{ + "combined": "+ potato2\n= rutabaga\n", + "missingonsrc": "", + "missingondst": "potato2\n", + "match": "rutabaga\n", + "differ": "", + "error": "", + }) + + file3 := r.WriteObject(context.Background(), "empty space", "-", t2) + fstest.CheckItems(t, r.Fremote, file1, file3) + check(3, 2, 1, false, map[string]string{ + "combined": "- empty space\n+ potato2\n= rutabaga\n", + "missingonsrc": "empty space\n", + "missingondst": "potato2\n", + "match": "rutabaga\n", + "differ": "", + "error": "", + }) + + file2r := file2 + if fs.Config.SizeOnly { + file2r = r.WriteObject(context.Background(), "potato2", "--Some-Differences-But-Size-Only-Is-Enabled-----------------", t1) + } else { + r.WriteObject(context.Background(), "potato2", "------------------------------------------------------------", t1) + } + fstest.CheckItems(t, r.Fremote, file1, file2r, file3) + check(4, 1, 2, false, map[string]string{ + "combined": "- empty space\n= potato2\n= rutabaga\n", + "missingonsrc": "empty space\n", + "missingondst": "", + "match": "rutabaga\npotato2\n", + "differ": "", + "error": "", + }) + + file3r := file3 + file3l := r.WriteFile("empty space", "DIFFER", t2) + fstest.CheckItems(t, r.Flocal, file1, file2, file3l) + check(5, 1, 3, false, map[string]string{ + "combined": "* empty space\n= potato2\n= rutabaga\n", + "missingonsrc": "", + "missingondst": "", + "match": "potato2\nrutabaga\n", + "differ": "empty space\n", + "error": "", + }) + + file4 := r.WriteObject(context.Background(), "remotepotato", "------------------------------------------------------------", t1) + fstest.CheckItems(t, r.Fremote, file1, file2r, file3r, file4) + check(6, 2, 3, false, map[string]string{ + "combined": "* empty space\n= potato2\n= rutabaga\n- remotepotato\n", + "missingonsrc": "remotepotato\n", + "missingondst": "", + "match": "potato2\nrutabaga\n", + "differ": "empty space\n", + "error": "", + }) + check(7, 1, 3, true, map[string]string{ + "combined": "* empty space\n= potato2\n= rutabaga\n", + "missingonsrc": "", + "missingondst": "", + "match": "potato2\nrutabaga\n", + "differ": "empty space\n", + "error": "", + }) +} + +func TestCheck(t *testing.T) { + testCheck(t, operations.Check) +} + +func TestCheckFsError(t *testing.T) { + dstFs, err := fs.NewFs("non-existent") + if err != nil { + t.Fatal(err) + } + srcFs, err := fs.NewFs("non-existent") + if err != nil { + t.Fatal(err) + } + opt := operations.CheckOpt{ + Fdst: dstFs, + Fsrc: srcFs, + OneWay: false, + } + err = operations.Check(context.Background(), &opt) + require.Error(t, err) +} + +func TestCheckDownload(t *testing.T) { + testCheck(t, operations.CheckDownload) +} + +func TestCheckSizeOnly(t *testing.T) { + fs.Config.SizeOnly = true + defer func() { fs.Config.SizeOnly = false }() + TestCheck(t) +} + +func TestCheckEqualReaders(t *testing.T) { + b65a := make([]byte, 65*1024) + b65b := make([]byte, 65*1024) + b65b[len(b65b)-1] = 1 + b66 := make([]byte, 66*1024) + + differ, err := operations.CheckEqualReaders(bytes.NewBuffer(b65a), bytes.NewBuffer(b65a)) + assert.NoError(t, err) + assert.Equal(t, differ, false) + + differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), bytes.NewBuffer(b65b)) + assert.NoError(t, err) + assert.Equal(t, differ, true) + + differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), bytes.NewBuffer(b66)) + assert.NoError(t, err) + assert.Equal(t, differ, true) + + differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b66), bytes.NewBuffer(b65a)) + assert.NoError(t, err) + assert.Equal(t, differ, true) + + myErr := errors.New("sentinel") + wrap := func(b []byte) io.Reader { + r := bytes.NewBuffer(b) + e := readers.ErrorReader{Err: myErr} + return io.MultiReader(r, e) + } + + differ, err = operations.CheckEqualReaders(wrap(b65a), bytes.NewBuffer(b65a)) + assert.Equal(t, myErr, err) + assert.Equal(t, differ, true) + + differ, err = operations.CheckEqualReaders(wrap(b65a), bytes.NewBuffer(b65b)) + assert.Equal(t, myErr, err) + assert.Equal(t, differ, true) + + differ, err = operations.CheckEqualReaders(wrap(b65a), bytes.NewBuffer(b66)) + assert.Equal(t, myErr, err) + assert.Equal(t, differ, true) + + differ, err = operations.CheckEqualReaders(wrap(b66), bytes.NewBuffer(b65a)) + assert.Equal(t, myErr, err) + assert.Equal(t, differ, true) + + differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), wrap(b65a)) + assert.Equal(t, myErr, err) + assert.Equal(t, differ, true) + + differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), wrap(b65b)) + assert.Equal(t, myErr, err) + assert.Equal(t, differ, true) + + differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), wrap(b66)) + assert.Equal(t, myErr, err) + assert.Equal(t, differ, true) + + differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b66), wrap(b65a)) + assert.Equal(t, myErr, err) + assert.Equal(t, differ, true) +} diff --git a/fs/operations/operations.go b/fs/operations/operations.go index a5a75aabe..e785198de 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -29,7 +29,6 @@ import ( "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" - "github.com/rclone/rclone/fs/march" "github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/atexit" @@ -726,288 +725,6 @@ func SameDir(fdst, fsrc fs.Info) bool { return fdstRoot == fsrcRoot } -// checkIdentical checks to see if dst and src are identical -// -// it returns true if differences were found -// it also returns whether it couldn't be hashed -func checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool, err error) { - same, ht, err := CheckHashes(ctx, src, dst) - if err != nil { - return true, false, err - } - if ht == hash.None { - return false, true, nil - } - if !same { - err = errors.Errorf("%v differ", ht) - fs.Errorf(src, "%v", err) - _ = fs.CountError(err) - return true, false, nil - } - return false, false, nil -} - -// checkFn is the type of the checking function used in CheckFn() -// -// It should check the two objects (a, b) and return if they differ -// and whether the hash was used. -type checkFn func(ctx context.Context, a, b fs.Object) (differ bool, noHash bool, err error) - -// checkMarch is used to march over two Fses in the same way as -// sync/copy -type checkMarch struct { - ioMu sync.Mutex - wg sync.WaitGroup - tokens chan struct{} - differences int32 - noHashes int32 - srcFilesMissing int32 - dstFilesMissing int32 - matches int32 - opt CheckOpt -} - -// CheckOpt contains options for the Check functions -type CheckOpt struct { - Fdst, Fsrc fs.Fs // fses to check - Check checkFn // function to use for checking - OneWay bool // one way only? - Combined io.Writer // a file with file names with leading sigils - MissingOnSrc io.Writer // files only in the destination - MissingOnDst io.Writer // files only in the source - Match io.Writer // matching files - Differ io.Writer // differing files - Error io.Writer // files with errors of some kind -} - -// report outputs the fileName to out if required and to the combined log -func (c *checkMarch) report(o fs.DirEntry, out io.Writer, sigil rune) { - if out != nil { - c.ioMu.Lock() - _, _ = fmt.Fprintf(out, "%v\n", o) - c.ioMu.Unlock() - } - if c.opt.Combined != nil { - c.ioMu.Lock() - _, _ = fmt.Fprintf(c.opt.Combined, "%c %v\n", sigil, o) - c.ioMu.Unlock() - } -} - -// DstOnly have an object which is in the destination only -func (c *checkMarch) DstOnly(dst fs.DirEntry) (recurse bool) { - switch dst.(type) { - case fs.Object: - if c.opt.OneWay { - return false - } - err := errors.Errorf("File not in %v", c.opt.Fsrc) - fs.Errorf(dst, "%v", err) - _ = fs.CountError(err) - atomic.AddInt32(&c.differences, 1) - atomic.AddInt32(&c.srcFilesMissing, 1) - c.report(dst, c.opt.MissingOnSrc, '-') - case fs.Directory: - // Do the same thing to the entire contents of the directory - if c.opt.OneWay { - return false - } - return true - default: - panic("Bad object in DirEntries") - } - return false -} - -// SrcOnly have an object which is in the source only -func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) { - switch src.(type) { - case fs.Object: - err := errors.Errorf("File not in %v", c.opt.Fdst) - fs.Errorf(src, "%v", err) - _ = fs.CountError(err) - atomic.AddInt32(&c.differences, 1) - atomic.AddInt32(&c.dstFilesMissing, 1) - c.report(src, c.opt.MissingOnDst, '+') - case fs.Directory: - // Do the same thing to the entire contents of the directory - return true - default: - panic("Bad object in DirEntries") - } - return false -} - -// check to see if two objects are identical using the check function -func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool, err error) { - tr := accounting.Stats(ctx).NewCheckingTransfer(src) - defer func() { - tr.Done(err) - }() - if sizeDiffers(src, dst) { - err = errors.Errorf("Sizes differ") - fs.Errorf(src, "%v", err) - return true, false, nil - } - if fs.Config.SizeOnly { - return false, false, nil - } - return c.opt.Check(ctx, dst, src) -} - -// Match is called when src and dst are present, so sync src to dst -func (c *checkMarch) Match(ctx context.Context, dst, src fs.DirEntry) (recurse bool) { - switch srcX := src.(type) { - case fs.Object: - dstX, ok := dst.(fs.Object) - if ok { - if SkipDestructive(ctx, src, "check") { - return false - } - c.wg.Add(1) - c.tokens <- struct{}{} // put a token to limit concurrency - go func() { - defer func() { - <-c.tokens // get the token back to free up a slot - c.wg.Done() - }() - differ, noHash, err := c.checkIdentical(ctx, dstX, srcX) - if err != nil { - fs.Errorf(src, "%v", err) - _ = fs.CountError(err) - c.report(src, c.opt.Error, '!') - } else if differ { - atomic.AddInt32(&c.differences, 1) - err := errors.New("files differ") - fs.Errorf(src, "%v", err) - _ = fs.CountError(err) - c.report(src, c.opt.Differ, '*') - } else { - atomic.AddInt32(&c.matches, 1) - c.report(src, c.opt.Match, '=') - if noHash { - atomic.AddInt32(&c.noHashes, 1) - fs.Debugf(dstX, "OK - could not check hash") - } else { - fs.Debugf(dstX, "OK") - } - } - }() - } else { - err := errors.Errorf("is file on %v but directory on %v", c.opt.Fsrc, c.opt.Fdst) - fs.Errorf(src, "%v", err) - _ = fs.CountError(err) - atomic.AddInt32(&c.differences, 1) - atomic.AddInt32(&c.dstFilesMissing, 1) - c.report(src, c.opt.MissingOnDst, '+') - } - case fs.Directory: - // Do the same thing to the entire contents of the directory - _, ok := dst.(fs.Directory) - if ok { - return true - } - err := errors.Errorf("is file on %v but directory on %v", c.opt.Fdst, c.opt.Fsrc) - fs.Errorf(dst, "%v", err) - _ = fs.CountError(err) - atomic.AddInt32(&c.differences, 1) - atomic.AddInt32(&c.srcFilesMissing, 1) - c.report(dst, c.opt.MissingOnSrc, '-') - - default: - panic("Bad object in DirEntries") - } - return false -} - -// CheckFn checks the files in fsrc and fdst according to Size and -// hash using checkFunction on each file to check the hashes. -// -// checkFunction sees if dst and src are identical -// -// it returns true if differences were found -// it also returns whether it couldn't be hashed -func CheckFn(ctx context.Context, opt *CheckOpt) error { - if opt.Check == nil { - return errors.New("internal error: nil check function") - } - c := &checkMarch{ - tokens: make(chan struct{}, fs.Config.Checkers), - opt: *opt, - } - - // set up a march over fdst and fsrc - m := &march.March{ - Ctx: ctx, - Fdst: c.opt.Fdst, - Fsrc: c.opt.Fsrc, - Dir: "", - Callback: c, - } - fs.Debugf(c.opt.Fdst, "Waiting for checks to finish") - err := m.Run() - c.wg.Wait() // wait for background go-routines - - if c.dstFilesMissing > 0 { - fs.Logf(c.opt.Fdst, "%d files missing", c.dstFilesMissing) - } - if c.srcFilesMissing > 0 { - fs.Logf(c.opt.Fsrc, "%d files missing", c.srcFilesMissing) - } - - fs.Logf(c.opt.Fdst, "%d differences found", accounting.Stats(ctx).GetErrors()) - if errs := accounting.Stats(ctx).GetErrors(); errs > 0 { - fs.Logf(c.opt.Fdst, "%d errors while checking", errs) - } - if c.noHashes > 0 { - fs.Logf(c.opt.Fdst, "%d hashes could not be checked", c.noHashes) - } - if c.matches > 0 { - fs.Logf(c.opt.Fdst, "%d matching files", c.matches) - } - if c.differences > 0 { - return errors.Errorf("%d differences found", c.differences) - } - return err -} - -// Check the files in fsrc and fdst according to Size and hash -func Check(ctx context.Context, opt *CheckOpt) error { - optCopy := *opt - optCopy.Check = checkIdentical - return CheckFn(ctx, &optCopy) -} - -// CheckEqualReaders checks to see if in1 and in2 have the same -// content when read. -// -// it returns true if differences were found -func CheckEqualReaders(in1, in2 io.Reader) (differ bool, err error) { - const bufSize = 64 * 1024 - buf1 := make([]byte, bufSize) - buf2 := make([]byte, bufSize) - for { - n1, err1 := readers.ReadFill(in1, buf1) - n2, err2 := readers.ReadFill(in2, buf2) - // check errors - if err1 != nil && err1 != io.EOF { - return true, err1 - } else if err2 != nil && err2 != io.EOF { - return true, err2 - } - // err1 && err2 are nil or io.EOF here - // process the data - if n1 != n2 || !bytes.Equal(buf1[:n1], buf2[:n2]) { - return true, nil - } - // if both streams finished the we have finished - if err1 == io.EOF && err2 == io.EOF { - break - } - } - return false, nil -} - // Retry runs fn up to maxTries times if it returns a retriable error func Retry(o interface{}, maxTries int, fn func() error) (err error) { for tries := 1; tries <= maxTries; tries++ { @@ -1026,59 +743,6 @@ func Retry(o interface{}, maxTries int, fn func() error) (err error) { return err } -// CheckIdenticalDownload checks to see if dst and src are identical -// by reading all their bytes if necessary. -// -// it returns true if differences were found -func CheckIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) { - err = Retry(src, fs.Config.LowLevelRetries, func() error { - differ, err = checkIdenticalDownload(ctx, dst, src) - return err - }) - return differ, err -} - -// Does the work for CheckIdenticalDownload -func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ bool, err error) { - in1, err := dst.Open(ctx) - if err != nil { - return true, errors.Wrapf(err, "failed to open %q", dst) - } - tr1 := accounting.Stats(ctx).NewTransfer(dst) - defer func() { - tr1.Done(nil) // error handling is done by the caller - }() - in1 = tr1.Account(in1).WithBuffer() // account and buffer the transfer - - in2, err := src.Open(ctx) - if err != nil { - return true, errors.Wrapf(err, "failed to open %q", src) - } - tr2 := accounting.Stats(ctx).NewTransfer(dst) - defer func() { - tr2.Done(nil) // error handling is done by the caller - }() - in2 = tr2.Account(in2).WithBuffer() // account and buffer the transfer - - // To assign err variable before defer. - differ, err = CheckEqualReaders(in1, in2) - return -} - -// CheckDownload checks the files in fsrc and fdst according to Size -// and the actual contents of the files. -func CheckDownload(ctx context.Context, opt *CheckOpt) error { - optCopy := *opt - optCopy.Check = func(ctx context.Context, a, b fs.Object) (differ bool, noHash bool, err error) { - differ, err = CheckIdenticalDownload(ctx, a, b) - if err != nil { - return true, true, errors.Wrap(err, "failed to download") - } - return differ, false, nil - } - return CheckFn(ctx, &optCopy) -} - // ListFn lists the Fs to the supplied function // // Lists in parallel which may get them out of order diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index 7f7f3367c..f432c9dd3 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -22,16 +22,13 @@ package operations_test import ( "bytes" "context" - "errors" "fmt" "io" "io/ioutil" - "log" "net/http" "net/http/httptest" "os" "regexp" - "sort" "strings" "testing" "time" @@ -46,7 +43,6 @@ import ( "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/lib/random" - "github.com/rclone/rclone/lib/readers" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -355,194 +351,6 @@ func TestRetry(t *testing.T) { } -func testCheck(t *testing.T, checkFunction func(ctx context.Context, opt *operations.CheckOpt) error) { - r := fstest.NewRun(t) - defer r.Finalise() - - addBuffers := func(opt *operations.CheckOpt) { - opt.Combined = new(bytes.Buffer) - opt.MissingOnSrc = new(bytes.Buffer) - opt.MissingOnDst = new(bytes.Buffer) - opt.Match = new(bytes.Buffer) - opt.Differ = new(bytes.Buffer) - opt.Error = new(bytes.Buffer) - } - - sortLines := func(in string) []string { - if in == "" { - return []string{} - } - lines := strings.Split(in, "\n") - sort.Strings(lines) - return lines - } - - checkBuffer := func(name string, want map[string]string, out io.Writer) { - expected := want[name] - buf, ok := out.(*bytes.Buffer) - require.True(t, ok) - assert.Equal(t, sortLines(expected), sortLines(buf.String()), name) - } - - checkBuffers := func(opt *operations.CheckOpt, want map[string]string) { - checkBuffer("combined", want, opt.Combined) - checkBuffer("missingonsrc", want, opt.MissingOnSrc) - checkBuffer("missingondst", want, opt.MissingOnDst) - checkBuffer("match", want, opt.Match) - checkBuffer("differ", want, opt.Differ) - checkBuffer("error", want, opt.Error) - } - - check := func(i int, wantErrors int64, wantChecks int64, oneway bool, wantOutput map[string]string) { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - accounting.GlobalStats().ResetCounters() - var buf bytes.Buffer - log.SetOutput(&buf) - defer func() { - log.SetOutput(os.Stderr) - }() - opt := operations.CheckOpt{ - Fdst: r.Fremote, - Fsrc: r.Flocal, - OneWay: oneway, - } - addBuffers(&opt) - err := checkFunction(context.Background(), &opt) - gotErrors := accounting.GlobalStats().GetErrors() - gotChecks := accounting.GlobalStats().GetChecks() - if wantErrors == 0 && err != nil { - t.Errorf("%d: Got error when not expecting one: %v", i, err) - } - if wantErrors != 0 && err == nil { - t.Errorf("%d: No error when expecting one", i) - } - if wantErrors != gotErrors { - t.Errorf("%d: Expecting %d errors but got %d", i, wantErrors, gotErrors) - } - if gotChecks > 0 && !strings.Contains(buf.String(), "matching files") { - t.Errorf("%d: Total files matching line missing", i) - } - if wantChecks != gotChecks { - t.Errorf("%d: Expecting %d total matching files but got %d", i, wantChecks, gotChecks) - } - checkBuffers(&opt, wantOutput) - }) - } - - file1 := r.WriteBoth(context.Background(), "rutabaga", "is tasty", t3) - fstest.CheckItems(t, r.Fremote, file1) - fstest.CheckItems(t, r.Flocal, file1) - check(1, 0, 1, false, map[string]string{ - "combined": "= rutabaga\n", - "missingonsrc": "", - "missingondst": "", - "match": "rutabaga\n", - "differ": "", - "error": "", - }) - - file2 := r.WriteFile("potato2", "------------------------------------------------------------", t1) - fstest.CheckItems(t, r.Flocal, file1, file2) - check(2, 1, 1, false, map[string]string{ - "combined": "+ potato2\n= rutabaga\n", - "missingonsrc": "", - "missingondst": "potato2\n", - "match": "rutabaga\n", - "differ": "", - "error": "", - }) - - file3 := r.WriteObject(context.Background(), "empty space", "-", t2) - fstest.CheckItems(t, r.Fremote, file1, file3) - check(3, 2, 1, false, map[string]string{ - "combined": "- empty space\n+ potato2\n= rutabaga\n", - "missingonsrc": "empty space\n", - "missingondst": "potato2\n", - "match": "rutabaga\n", - "differ": "", - "error": "", - }) - - file2r := file2 - if fs.Config.SizeOnly { - file2r = r.WriteObject(context.Background(), "potato2", "--Some-Differences-But-Size-Only-Is-Enabled-----------------", t1) - } else { - r.WriteObject(context.Background(), "potato2", "------------------------------------------------------------", t1) - } - fstest.CheckItems(t, r.Fremote, file1, file2r, file3) - check(4, 1, 2, false, map[string]string{ - "combined": "- empty space\n= potato2\n= rutabaga\n", - "missingonsrc": "empty space\n", - "missingondst": "", - "match": "rutabaga\npotato2\n", - "differ": "", - "error": "", - }) - - file3r := file3 - file3l := r.WriteFile("empty space", "DIFFER", t2) - fstest.CheckItems(t, r.Flocal, file1, file2, file3l) - check(5, 1, 3, false, map[string]string{ - "combined": "* empty space\n= potato2\n= rutabaga\n", - "missingonsrc": "", - "missingondst": "", - "match": "potato2\nrutabaga\n", - "differ": "empty space\n", - "error": "", - }) - - file4 := r.WriteObject(context.Background(), "remotepotato", "------------------------------------------------------------", t1) - fstest.CheckItems(t, r.Fremote, file1, file2r, file3r, file4) - check(6, 2, 3, false, map[string]string{ - "combined": "* empty space\n= potato2\n= rutabaga\n- remotepotato\n", - "missingonsrc": "remotepotato\n", - "missingondst": "", - "match": "potato2\nrutabaga\n", - "differ": "empty space\n", - "error": "", - }) - check(7, 1, 3, true, map[string]string{ - "combined": "* empty space\n= potato2\n= rutabaga\n", - "missingonsrc": "", - "missingondst": "", - "match": "potato2\nrutabaga\n", - "differ": "empty space\n", - "error": "", - }) -} - -func TestCheck(t *testing.T) { - testCheck(t, operations.Check) -} - -func TestCheckFsError(t *testing.T) { - dstFs, err := fs.NewFs("non-existent") - if err != nil { - t.Fatal(err) - } - srcFs, err := fs.NewFs("non-existent") - if err != nil { - t.Fatal(err) - } - opt := operations.CheckOpt{ - Fdst: dstFs, - Fsrc: srcFs, - OneWay: false, - } - err = operations.Check(context.Background(), &opt) - require.Error(t, err) -} - -func TestCheckDownload(t *testing.T) { - testCheck(t, operations.CheckDownload) -} - -func TestCheckSizeOnly(t *testing.T) { - fs.Config.SizeOnly = true - defer func() { fs.Config.SizeOnly = false }() - TestCheck(t) -} - func TestCat(t *testing.T) { r := fstest.NewRun(t) defer r.Finalise() @@ -1276,68 +1084,6 @@ func TestOverlapping(t *testing.T) { } } -func TestCheckEqualReaders(t *testing.T) { - b65a := make([]byte, 65*1024) - b65b := make([]byte, 65*1024) - b65b[len(b65b)-1] = 1 - b66 := make([]byte, 66*1024) - - differ, err := operations.CheckEqualReaders(bytes.NewBuffer(b65a), bytes.NewBuffer(b65a)) - assert.NoError(t, err) - assert.Equal(t, differ, false) - - differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), bytes.NewBuffer(b65b)) - assert.NoError(t, err) - assert.Equal(t, differ, true) - - differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), bytes.NewBuffer(b66)) - assert.NoError(t, err) - assert.Equal(t, differ, true) - - differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b66), bytes.NewBuffer(b65a)) - assert.NoError(t, err) - assert.Equal(t, differ, true) - - myErr := errors.New("sentinel") - wrap := func(b []byte) io.Reader { - r := bytes.NewBuffer(b) - e := readers.ErrorReader{Err: myErr} - return io.MultiReader(r, e) - } - - differ, err = operations.CheckEqualReaders(wrap(b65a), bytes.NewBuffer(b65a)) - assert.Equal(t, myErr, err) - assert.Equal(t, differ, true) - - differ, err = operations.CheckEqualReaders(wrap(b65a), bytes.NewBuffer(b65b)) - assert.Equal(t, myErr, err) - assert.Equal(t, differ, true) - - differ, err = operations.CheckEqualReaders(wrap(b65a), bytes.NewBuffer(b66)) - assert.Equal(t, myErr, err) - assert.Equal(t, differ, true) - - differ, err = operations.CheckEqualReaders(wrap(b66), bytes.NewBuffer(b65a)) - assert.Equal(t, myErr, err) - assert.Equal(t, differ, true) - - differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), wrap(b65a)) - assert.Equal(t, myErr, err) - assert.Equal(t, differ, true) - - differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), wrap(b65b)) - assert.Equal(t, myErr, err) - assert.Equal(t, differ, true) - - differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b65a), wrap(b66)) - assert.Equal(t, myErr, err) - assert.Equal(t, differ, true) - - differ, err = operations.CheckEqualReaders(bytes.NewBuffer(b66), wrap(b65a)) - assert.Equal(t, myErr, err) - assert.Equal(t, differ, true) -} - func TestListFormat(t *testing.T) { item0 := &operations.ListJSONItem{ Path: "a",