sync: implement --order-by flag to order transfers - fixes #1205

This commit is contained in:
Nick Craig-Wood 2019-11-28 17:01:21 +00:00
parent 0e57335396
commit 375d25f158
6 changed files with 301 additions and 16 deletions

View File

@ -671,8 +671,8 @@ queue is in use. Note that it will use in the order of N kB of memory
when the backlog is in use. when the backlog is in use.
Setting this large allows rclone to calculate how many files are Setting this large allows rclone to calculate how many files are
pending more accurately and give a more accurate estimated finish pending more accurately, give a more accurate estimated finish
time. time and make `--order-by` work more accurately.
Setting this small will make rclone more synchronous to the listings Setting this small will make rclone more synchronous to the listings
of the remote which may be desirable. of the remote which may be desirable.
@ -823,6 +823,48 @@ files if they are incorrect as it would normally.
This can be used if the remote is being synced with another tool also This can be used if the remote is being synced with another tool also
(eg the Google Drive client). (eg the Google Drive client).
### --order-by string ###
The `--order-by` flag controls the order in which files in the backlog
are processed in `rclone sync`, `rclone copy` and `rclone move`.
The order by string is constructed like this. The first part
describes what aspect is being measured:
- `size` - order by the size of the files
- `name` - order by the full path of the files
- `modtime` - order by the modification date of the files
This can have a modifier appended with a comma:
- `ascending` or `asc` - order so that the smallest (or oldest) is processed first
- `descending` or `desc` - order so that the largest (or newest) is processed first
If no modifier is supplied then the order is `ascending`.
For example
- `--order-by size,desc` - send the largest files first
- `--order-by modtime,ascending` - send the oldest files first
- `--order-by name` - send the files with alphabetically by path first
If the `--order-by` flag is not supplied or it is supplied with an
empty string then the default ordering will be used which is as
scanned. With `--checkers 1` this is mostly alphabetical, however
with the default `--checkers 8` it is somewhat random.
#### Limitations
The `--order-by` flag does not do a separate pass over the data. This
means that is may transfer some files out of the order specified if
- there are no files in the backlog or the source has not been fully scanned yet
- there are more than [--max-backlog](#max-backlog-n) files in the backlog
Rclone will do its best to transfer the best file it has so in
practice this should not cause a problem. Think of `--order-by` as
being more of a best efforts flag rather than a perfect ordering.
### -P, --progress ### ### -P, --progress ###
This flag makes rclone update the stats in a static block in the This flag makes rclone update the stats in a static block in the

View File

@ -104,6 +104,7 @@ type ConfigInfo struct {
MultiThreadCutoff SizeSuffix MultiThreadCutoff SizeSuffix
MultiThreadStreams int MultiThreadStreams int
MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags) MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags)
OrderBy string // instructions on how to order the transfer
} }
// NewConfig creates a new config with everything set to the default // NewConfig creates a new config with everything set to the default

View File

@ -105,6 +105,7 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.") flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.")
flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.") flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.")
flags.BoolVarP(flagSet, &fs.Config.UseJSONLog, "use-json-log", "", fs.Config.UseJSONLog, "Use json log format.") flags.BoolVarP(flagSet, &fs.Config.UseJSONLog, "use-json-log", "", fs.Config.UseJSONLog, "Use json log format.")
flags.StringVarP(flagSet, &fs.Config.OrderBy, "order-by", "", fs.Config.OrderBy, "Instructions on how to order the transfers, eg 'size,descending'")
} }
// SetFlags converts any flags into config which weren't straight forward // SetFlags converts any flags into config which weren't straight forward

View File

@ -1,12 +1,19 @@
package sync package sync
import ( import (
"container/heap"
"context" "context"
"strings"
"sync" "sync"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fserrors"
) )
// compare two items for order by
type lessFn func(a, b fs.ObjectPair) bool
// pipe provides an unbounded channel like experience // pipe provides an unbounded channel like experience
// //
// Note unlike channels these aren't strictly ordered. // Note unlike channels these aren't strictly ordered.
@ -17,14 +24,57 @@ type pipe struct {
closed bool closed bool
totalSize int64 totalSize int64
stats func(items int, totalSize int64) stats func(items int, totalSize int64)
less lessFn
} }
func newPipe(stats func(items int, totalSize int64), maxBacklog int) *pipe { func newPipe(orderBy string, stats func(items int, totalSize int64), maxBacklog int) (*pipe, error) {
return &pipe{ less, err := newLess(orderBy)
if err != nil {
return nil, fserrors.FatalError(err)
}
p := &pipe{
c: make(chan struct{}, maxBacklog), c: make(chan struct{}, maxBacklog),
stats: stats, stats: stats,
less: less,
} }
if p.less != nil {
heap.Init(p)
} }
return p, nil
}
// Len satisfy heap.Interface - must be called with lock held
func (p *pipe) Len() int {
return len(p.queue)
}
// Len satisfy heap.Interface - must be called with lock held
func (p *pipe) Less(i, j int) bool {
return p.less(p.queue[i], p.queue[j])
}
// Swap satisfy heap.Interface - must be called with lock held
func (p *pipe) Swap(i, j int) {
p.queue[i], p.queue[j] = p.queue[j], p.queue[i]
}
// Push satisfy heap.Interface - must be called with lock held
func (p *pipe) Push(item interface{}) {
p.queue = append(p.queue, item.(fs.ObjectPair))
}
// Pop satisfy heap.Interface - must be called with lock held
func (p *pipe) Pop() interface{} {
old := p.queue
n := len(old)
item := old[n-1]
old[n-1] = fs.ObjectPair{} // avoid memory leak
p.queue = old[0 : n-1]
return item
}
// Check interface satisfied
var _ heap.Interface = (*pipe)(nil)
// Put an pair into the pipe // Put an pair into the pipe
// //
@ -36,7 +86,12 @@ func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
return false return false
} }
p.mu.Lock() p.mu.Lock()
if p.less == nil {
// no order-by
p.queue = append(p.queue, pair) p.queue = append(p.queue, pair)
} else {
heap.Push(p, pair)
}
size := pair.Src.Size() size := pair.Src.Size()
if size > 0 { if size > 0 {
p.totalSize += size p.totalSize += size
@ -68,10 +123,14 @@ func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) {
} }
} }
p.mu.Lock() p.mu.Lock()
if p.less == nil {
// no order-by
pair = p.queue[0] pair = p.queue[0]
p.queue[0].Src = nil p.queue[0] = fs.ObjectPair{} // avoid memory leak
p.queue[0].Dst = nil
p.queue = p.queue[1:] p.queue = p.queue[1:]
} else {
pair = heap.Pop(p).(fs.ObjectPair)
}
size := pair.Src.Size() size := pair.Src.Size()
if size > 0 { if size > 0 {
p.totalSize -= size p.totalSize -= size
@ -101,3 +160,49 @@ func (p *pipe) Close() {
p.closed = true p.closed = true
p.mu.Unlock() p.mu.Unlock()
} }
// newLess returns a less function for the heap comparison or nil if
// one is not required
func newLess(orderBy string) (less lessFn, err error) {
if orderBy == "" {
return nil, nil
}
parts := strings.Split(strings.ToLower(orderBy), ",")
if len(parts) > 2 {
return nil, errors.Errorf("bad --order-by string %q", orderBy)
}
switch parts[0] {
case "name":
less = func(a, b fs.ObjectPair) bool {
return a.Src.Remote() < b.Src.Remote()
}
case "size":
less = func(a, b fs.ObjectPair) bool {
return a.Src.Size() < b.Src.Size()
}
case "modtime":
less = func(a, b fs.ObjectPair) bool {
ctx := context.Background()
return a.Src.ModTime(ctx).Before(b.Src.ModTime(ctx))
}
default:
return nil, errors.Errorf("unknown --order-by comparison %q", parts[0])
}
descending := false
if len(parts) > 1 {
switch parts[1] {
case "ascending", "asc":
case "descending", "desc":
descending = true
default:
return nil, errors.Errorf("unknown --order-by sort direction %q", parts[1])
}
}
if descending {
oldLess := less
less = func(a, b fs.ObjectPair) bool {
return !oldLess(a, b)
}
}
return less, nil
}

View File

@ -9,6 +9,7 @@ import (
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fstest/mockobject" "github.com/rclone/rclone/fstest/mockobject"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestPipe(t *testing.T) { func TestPipe(t *testing.T) {
@ -19,7 +20,8 @@ func TestPipe(t *testing.T) {
} }
// Make a new pipe // Make a new pipe
p := newPipe(stats, 10) p, err := newPipe("", stats, 10)
require.NoError(t, err)
checkStats := func(expectedN int, expectedSize int64) { checkStats := func(expectedN int, expectedSize int64) {
n, size := p.Stats() n, size := p.Stats()
@ -60,7 +62,8 @@ func TestPipe(t *testing.T) {
assert.Panics(t, func() { p.Put(ctx, pair1) }) assert.Panics(t, func() { p.Put(ctx, pair1) })
// Make a new pipe // Make a new pipe
p = newPipe(stats, 10) p, err = newPipe("", stats, 10)
require.NoError(t, err)
ctx2, cancel := context.WithCancel(ctx) ctx2, cancel := context.WithCancel(ctx)
// cancel it in the background - check read ceases // cancel it in the background - check read ceases
@ -86,7 +89,8 @@ func TestPipeConcurrent(t *testing.T) {
stats := func(n int, size int64) {} stats := func(n int, size int64) {}
// Make a new pipe // Make a new pipe
p := newPipe(stats, 10) p, err := newPipe("", stats, 10)
require.NoError(t, err)
var wg sync.WaitGroup var wg sync.WaitGroup
obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
@ -120,3 +124,125 @@ func TestPipeConcurrent(t *testing.T) {
assert.Equal(t, int64(0), count) assert.Equal(t, int64(0), count)
} }
func TestPipeOrderBy(t *testing.T) {
var (
stats = func(n int, size int64) {}
ctx = context.Background()
obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone)
obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone)
pair1 = fs.ObjectPair{Src: obj1}
pair2 = fs.ObjectPair{Src: obj2}
)
for _, test := range []struct {
orderBy string
swapped1 bool
swapped2 bool
}{
{"", false, true},
{"size", false, false},
{"name", true, true},
{"modtime", false, true},
{"size,ascending", false, false},
{"name,asc", true, true},
{"modtime,ascending", false, true},
{"size,descending", true, true},
{"name,desc", false, false},
{"modtime,descending", true, false},
} {
t.Run(test.orderBy, func(t *testing.T) {
p, err := newPipe(test.orderBy, stats, 10)
require.NoError(t, err)
ok := p.Put(ctx, pair1)
assert.True(t, ok)
ok = p.Put(ctx, pair2)
assert.True(t, ok)
readAndCheck := func(swapped bool) {
readFirst, ok := p.Get(ctx)
assert.True(t, ok)
readSecond, ok := p.Get(ctx)
assert.True(t, ok)
if swapped {
assert.True(t, readFirst == pair2 && readSecond == pair1)
} else {
assert.True(t, readFirst == pair1 && readSecond == pair2)
}
}
readAndCheck(test.swapped1)
// insert other way round
ok = p.Put(ctx, pair2)
assert.True(t, ok)
ok = p.Put(ctx, pair1)
assert.True(t, ok)
readAndCheck(test.swapped2)
})
}
}
func TestNewLess(t *testing.T) {
t.Run("blankOK", func(t *testing.T) {
less, err := newLess("")
require.NoError(t, err)
assert.Nil(t, less)
})
t.Run("tooManyParts", func(t *testing.T) {
_, err := newLess("too,many,parts")
require.Error(t, err)
assert.Contains(t, err.Error(), "bad --order-by string")
})
t.Run("unknownComparison", func(t *testing.T) {
_, err := newLess("potato")
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown --order-by comparison")
})
t.Run("unknownSortDirection", func(t *testing.T) {
_, err := newLess("name,sideways")
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown --order-by sort direction")
})
var (
obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone)
obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone)
pair1 = fs.ObjectPair{Src: obj1}
pair2 = fs.ObjectPair{Src: obj2}
)
for _, test := range []struct {
orderBy string
pair1LessPair2 bool
pair2LessPair1 bool
}{
{"size", true, false},
{"name", false, true},
{"modtime", false, false},
{"size,ascending", true, false},
{"name,asc", false, true},
{"modtime,ascending", false, false},
{"size,descending", false, true},
{"name,desc", true, false},
{"modtime,descending", true, true},
} {
t.Run(test.orderBy, func(t *testing.T) {
less, err := newLess(test.orderBy)
require.NoError(t, err)
require.NotNil(t, less)
pair1LessPair2 := less(pair1, pair2)
assert.Equal(t, test.pair1LessPair2, pair1LessPair2)
pair2LessPair1 := less(pair2, pair1)
assert.Equal(t, test.pair2LessPair1, pair2LessPair1)
})
}
}

View File

@ -84,14 +84,24 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
srcEmptyDirs: make(map[string]fs.DirEntry), srcEmptyDirs: make(map[string]fs.DirEntry),
noTraverse: fs.Config.NoTraverse, noTraverse: fs.Config.NoTraverse,
noCheckDest: fs.Config.NoCheckDest, noCheckDest: fs.Config.NoCheckDest,
toBeChecked: newPipe(accounting.Stats(ctx).SetCheckQueue, fs.Config.MaxBacklog),
toBeUploaded: newPipe(accounting.Stats(ctx).SetTransferQueue, fs.Config.MaxBacklog),
deleteFilesCh: make(chan fs.Object, fs.Config.Checkers), deleteFilesCh: make(chan fs.Object, fs.Config.Checkers),
trackRenames: fs.Config.TrackRenames, trackRenames: fs.Config.TrackRenames,
commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(), commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(),
toBeRenamed: newPipe(accounting.Stats(ctx).SetRenameQueue, fs.Config.MaxBacklog),
trackRenamesCh: make(chan fs.Object, fs.Config.Checkers), trackRenamesCh: make(chan fs.Object, fs.Config.Checkers),
} }
var err error
s.toBeChecked, err = newPipe(fs.Config.OrderBy, accounting.Stats(ctx).SetCheckQueue, fs.Config.MaxBacklog)
if err != nil {
return nil, err
}
s.toBeUploaded, err = newPipe(fs.Config.OrderBy, accounting.Stats(ctx).SetTransferQueue, fs.Config.MaxBacklog)
if err != nil {
return nil, err
}
s.toBeRenamed, err = newPipe(fs.Config.OrderBy, accounting.Stats(ctx).SetRenameQueue, fs.Config.MaxBacklog)
if err != nil {
return nil, err
}
s.ctx, s.cancel = context.WithCancel(ctx) s.ctx, s.cancel = context.WithCancel(ctx)
if s.noTraverse && s.deleteMode != fs.DeleteModeOff { if s.noTraverse && s.deleteMode != fs.DeleteModeOff {
fs.Errorf(nil, "Ignoring --no-traverse with sync") fs.Errorf(nil, "Ignoring --no-traverse with sync")