union: Add multiple error handler

This commit is contained in:
Max Sum 2019-12-05 01:26:28 +08:00 committed by Nick Craig-Wood
parent 540bd61305
commit d7bb2d1d89
3 changed files with 145 additions and 103 deletions

View File

@ -7,6 +7,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/pkg/errors"
"github.com/rclone/rclone/backend/union/upstream" "github.com/rclone/rclone/backend/union/upstream"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
) )
@ -70,7 +71,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// Get multiple reader // Get multiple reader
readers := make([]io.Reader, len(entries)) readers := make([]io.Reader, len(entries))
writers := make([]io.Writer, len(entries)) writers := make([]io.Writer, len(entries))
errs := make([]error, len(entries)+1) errs := Errors(make([]error, len(entries)+1))
for i := range entries { for i := range entries {
r, w := io.Pipe() r, w := io.Pipe()
bw := bufio.NewWriter(w) bw := bufio.NewWriter(w)
@ -79,26 +80,23 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
} }
go func() { go func() {
mw := io.MultiWriter(writers...) mw := io.MultiWriter(writers...)
_, errs[len(entries)] = io.Copy(mw, in) es := make([]error, len(writers)+1)
for _, bw := range writers { _, es[len(es)-1] = io.Copy(mw, in)
bw.(*bufio.Writer).Flush() for i, bw := range writers {
es[i] = bw.(*bufio.Writer).Flush()
} }
errs[len(entries)] = Errors(es).Err()
}() }()
// Multi-threading // Multi-threading
multithread(len(entries), func(i int) { multithread(len(entries), func(i int) {
if o, ok := entries[i].(*upstream.Object); ok { if o, ok := entries[i].(*upstream.Object); ok {
errs[i] = o.Update(ctx, readers[i], src, options...) err := o.Update(ctx, readers[i], src, options...)
errs[i] = errors.Wrap(err, o.UpstreamFs().Name())
} else { } else {
errs[i] = fs.ErrorNotAFile errs[i] = fs.ErrorNotAFile
} }
}) })
// Get an object for future operation return errs.Err()
for _, err := range errs {
if err != nil {
return err
}
}
return nil
} }
// Remove candidate objects selected by ACTION policy // Remove candidate objects selected by ACTION policy
@ -108,20 +106,16 @@ func (o *Object) Remove(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
errs := make([]error, len(entries)) errs := Errors(make([]error, len(entries)))
multithread(len(entries), func(i int) { multithread(len(entries), func(i int) {
if o, ok := entries[i].(*upstream.Object); ok { if o, ok := entries[i].(*upstream.Object); ok {
errs[i] = o.Remove(ctx) err := o.Remove(ctx)
errs[i] = errors.Wrap(err, o.UpstreamFs().Name())
} else { } else {
errs[i] = fs.ErrorNotAFile errs[i] = fs.ErrorNotAFile
} }
}) })
for _, err := range errs { return errs.Err()
if err != nil {
return err
}
}
return nil
} }
// SetModTime sets the metadata on the object to set the modification date // SetModTime sets the metadata on the object to set the modification date
@ -132,21 +126,17 @@ func (o *Object) SetModTime(ctx context.Context, t time.Time) error {
return err return err
} }
var wg sync.WaitGroup var wg sync.WaitGroup
errs := make([]error, len(entries)) errs := Errors(make([]error, len(entries)))
multithread(len(entries), func(i int) { multithread(len(entries), func(i int) {
if o, ok := entries[i].(*upstream.Object); ok { if o, ok := entries[i].(*upstream.Object); ok {
errs[i] = o.SetModTime(ctx, t) err := o.SetModTime(ctx, t)
errs[i] = errors.Wrap(err, o.UpstreamFs().Name())
} else { } else {
errs[i] = fs.ErrorNotAFile errs[i] = fs.ErrorNotAFile
} }
}) })
wg.Wait() wg.Wait()
for _, err := range errs { return errs.Err()
if err != nil {
return err
}
}
return nil
} }
// ModTime returns the modification date of the directory // ModTime returns the modification date of the directory

71
backend/union/errors.go Normal file
View File

@ -0,0 +1,71 @@
package union
import (
"bytes"
"fmt"
)
// The Errors type wraps a slice of errors
type Errors []error
var (
// FilterNil returns the error directly
FilterNil = func(err error) error {
return err
}
)
// Map returns a copy of the error slice with all its errors modified
// according to the mapping function. If mapping returns nil,
// the error is dropped from the error slice with no replacement.
func (e Errors) Map(mapping func(error) error) Errors {
s := make([]error, len(e))
i := 0
for _, err := range e {
nerr := mapping(err)
if nerr == nil {
continue
}
s[i] = nerr
i++
}
return Errors(s[:i])
}
// Err returns a MultiError struct containing this Errors instance, or nil
// if there are zero errors contained.
func (e Errors) Err() error {
e = e.Map(FilterNil)
if len(e) == 0 {
return nil
}
return &MultiError{Errors: e}
}
// MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError struct {
Errors Errors
}
// Error returns a concatenated string of the contained errors
func (m *MultiError) Error() string {
var buf bytes.Buffer
if len(m.Errors) == 1 {
buf.WriteString("1 error: ")
} else {
fmt.Fprintf(&buf, "%d errors: ", len(m.Errors))
}
for i, err := range m.Errors {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
return buf.String()
}

View File

@ -126,16 +126,12 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
if err != nil { if err != nil {
return err return err
} }
errs := make([]error, len(upstreams)) errs := Errors(make([]error, len(upstreams)))
multithread(len(upstreams), func(i int) { multithread(len(upstreams), func(i int) {
errs[i] = upstreams[i].Rmdir(ctx, dir) err := upstreams[i].Rmdir(ctx, dir)
errs[i] = errors.Wrap(err, upstreams[i].Name())
}) })
for _, err := range errs { return errs.Err()
if err != nil {
return err
}
}
return nil
} }
// Hashes returns hash.HashNone to indicate remote hashing is unavailable // Hashes returns hash.HashNone to indicate remote hashing is unavailable
@ -150,16 +146,12 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
if err != nil { if err != nil {
return err return err
} }
errs := make([]error, len(upstreams)) errs := Errors(make([]error, len(upstreams)))
multithread(len(upstreams), func(i int) { multithread(len(upstreams), func(i int) {
errs[i] = upstreams[i].Mkdir(ctx, dir) err := upstreams[i].Mkdir(ctx, dir)
errs[i] = errors.Wrap(err, upstreams[i].Name())
}) })
for _, err := range errs { return errs.Err()
if err != nil {
return err
}
}
return nil
} }
// Purge all files in the root and the root directory // Purge all files in the root and the root directory
@ -178,16 +170,12 @@ func (f *Fs) Purge(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
errs := make([]error, len(upstreams)) errs := Errors(make([]error, len(upstreams)))
multithread(len(upstreams), func(i int) { multithread(len(upstreams), func(i int) {
errs[i] = upstreams[i].Features().Purge(ctx) err := upstreams[i].Features().Purge(ctx)
errs[i] = errors.Wrap(err, upstreams[i].Name())
}) })
for _, err := range errs { return errs.Err()
if err != nil {
return err
}
}
return nil
} }
// Copy src to this remote using server side copy operations. // Copy src to this remote using server side copy operations.
@ -247,17 +235,17 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
} }
} }
objs := make([]*upstream.Object, len(entries)) objs := make([]*upstream.Object, len(entries))
errs := make([]error, len(entries)) errs := Errors(make([]error, len(entries)))
multithread(len(entries), func(i int) { multithread(len(entries), func(i int) {
u := entries[i].UpstreamFs() u := entries[i].UpstreamFs()
o, ok := entries[i].(*upstream.Object) o, ok := entries[i].(*upstream.Object)
if !ok { if !ok {
errs[i] = fs.ErrorNotAFile errs[i] = errors.Wrap(fs.ErrorNotAFile, u.Name())
return return
} }
mo, err := u.Features().Move(ctx, o, remote) mo, err := u.Features().Move(ctx, o, remote)
if err != nil || mo == nil { if err != nil || mo == nil {
errs[i] = err errs[i] = errors.Wrap(err, u.Name())
return return
} }
objs[i] = u.WrapObject(mo) objs[i] = u.WrapObject(mo)
@ -272,12 +260,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, err := range errs { return e.(*Object), errs.Err()
if err != nil {
return e.(*Object), err
}
}
return e.(*Object), nil
} }
// DirMove moves src, srcRemote to this remote at dstRemote // DirMove moves src, srcRemote to this remote at dstRemote
@ -303,17 +286,13 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
return fs.ErrorCantDirMove return fs.ErrorCantDirMove
} }
} }
errs := make([]error, len(upstreams)) errs := Errors(make([]error, len(upstreams)))
multithread(len(upstreams), func(i int) { multithread(len(upstreams), func(i int) {
u := upstreams[i] u := upstreams[i]
errs[i] = u.Features().DirMove(ctx, u, srcRemote, dstRemote) err := u.Features().DirMove(ctx, u, srcRemote, dstRemote)
errs[i] = errors.Wrap(err, u.Name())
}) })
for _, err := range errs { return errs.Err()
if err != nil {
return err
}
}
return nil
} }
// ChangeNotify calls the passed function with a path // ChangeNotify calls the passed function with a path
@ -379,6 +358,7 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bo
e, err := f.wrapEntries(u.WrapObject(o)) e, err := f.wrapEntries(u.WrapObject(o))
return e.(*Object), err return e.(*Object), err
} }
errs := Errors(make([]error, len(upstreams)+1))
// Get multiple reader // Get multiple reader
readers := make([]io.Reader, len(upstreams)) readers := make([]io.Reader, len(upstreams))
writers := make([]io.Writer, len(upstreams)) writers := make([]io.Writer, len(upstreams))
@ -386,35 +366,42 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bo
r, w := io.Pipe() r, w := io.Pipe()
bw := bufio.NewWriter(w) bw := bufio.NewWriter(w)
readers[i], writers[i] = r, bw readers[i], writers[i] = r, bw
defer w.Close() defer func() {
err := w.Close()
if err != nil {
panic(err)
}
}()
} }
go func() { go func() {
mw := io.MultiWriter(writers...) mw := io.MultiWriter(writers...)
io.Copy(mw, in) es := make([]error, len(writers)+1)
for _, bw := range writers { _, es[len(es)-1] = io.Copy(mw, in)
bw.(*bufio.Writer).Flush() for i, bw := range writers {
es[i] = bw.(*bufio.Writer).Flush()
} }
errs[len(upstreams)] = Errors(es).Err()
}() }()
// Multi-threading // Multi-threading
objs := make([]upstream.Entry, len(upstreams)) objs := make([]upstream.Entry, len(upstreams))
errs := make([]error, len(upstreams))
multithread(len(upstreams), func(i int) { multithread(len(upstreams), func(i int) {
u := upstreams[i] u := upstreams[i]
var o fs.Object var o fs.Object
var err error
if stream { if stream {
o, errs[i] = u.Features().PutStream(ctx, readers[i], src, options...) o, err = u.Features().PutStream(ctx, readers[i], src, options...)
} else { } else {
o, errs[i] = u.Put(ctx, readers[i], src, options...) o, err = u.Put(ctx, readers[i], src, options...)
} }
if errs[i] != nil { if err != nil {
errs[i] = errors.Wrap(err, u.Name())
return return
} }
objs[i] = u.WrapObject(o) objs[i] = u.WrapObject(o)
}) })
for _, err := range errs { err = errs.Err()
if err != nil { if err != nil {
return nil, err return nil, err
}
} }
e, err := f.wrapEntries(objs...) e, err := f.wrapEntries(objs...)
return e.(*Object), err return e.(*Object), err
@ -514,12 +501,12 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
// found. // found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
entriess := make([][]upstream.Entry, len(f.upstreams)) entriess := make([][]upstream.Entry, len(f.upstreams))
errs := make([]error, len(f.upstreams)) errs := Errors(make([]error, len(f.upstreams)))
multithread(len(f.upstreams), func(i int) { multithread(len(f.upstreams), func(i int) {
u := f.upstreams[i] u := f.upstreams[i]
entries, err := u.List(ctx, dir) entries, err := u.List(ctx, dir)
if err != nil { if err != nil {
errs[i] = err errs[i] = errors.Wrap(err, u.Name())
return return
} }
uEntries := make([]upstream.Entry, len(entries)) uEntries := make([]upstream.Entry, len(entries))
@ -528,18 +515,17 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
} }
entriess[i] = uEntries entriess[i] = uEntries
}) })
found := false if len(errs) == len(errs.Map(FilterNil)) {
for _, err := range errs { errs = errs.Map(func(e error) error {
if err == fs.ErrorDirNotFound { if errors.Cause(e) == fs.ErrorDirNotFound {
continue return nil
}
return e
})
if len(errs) == 0 {
return nil, fs.ErrorDirNotFound
} }
if err != nil { return nil, errs.Err()
return nil, err
}
found = true
}
if !found {
return nil, fs.ErrorDirNotFound
} }
return f.mergeDirEntries(entriess) return f.mergeDirEntries(entriess)
} }
@ -547,12 +533,12 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
// NewObject creates a new remote union file object // NewObject creates a new remote union file object
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
objs := make([]*upstream.Object, len(f.upstreams)) objs := make([]*upstream.Object, len(f.upstreams))
errs := make([]error, len(f.upstreams)) errs := Errors(make([]error, len(f.upstreams)))
multithread(len(f.upstreams), func(i int) { multithread(len(f.upstreams), func(i int) {
u := f.upstreams[i] u := f.upstreams[i]
o, err := u.NewObject(ctx, remote) o, err := u.NewObject(ctx, remote)
if err != nil && err != fs.ErrorObjectNotFound { if err != nil && err != fs.ErrorObjectNotFound {
errs[i] = err errs[i] = errors.Wrap(err, u.Name())
return return
} }
objs[i] = u.WrapObject(o) objs[i] = u.WrapObject(o)
@ -570,12 +556,7 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, err := range errs { return e.(*Object), errs.Err()
if err != nil {
return e.(*Object), err
}
}
return e.(*Object), nil
} }
// Precision is the greatest Precision of all upstreams // Precision is the greatest Precision of all upstreams
@ -661,7 +642,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
} }
upstreams := make([]*upstream.Fs, len(opt.Upstreams)) upstreams := make([]*upstream.Fs, len(opt.Upstreams))
errs := make([]error, len(opt.Upstreams)) errs := Errors(make([]error, len(opt.Upstreams)))
multithread(len(opt.Upstreams), func(i int) { multithread(len(opt.Upstreams), func(i int) {
u := opt.Upstreams[i] u := opt.Upstreams[i]
upstreams[i], errs[i] = upstream.New(u, root, time.Duration(opt.CacheTime)*time.Second) upstreams[i], errs[i] = upstream.New(u, root, time.Duration(opt.CacheTime)*time.Second)