union: Add fast path for single upstream upload

This commit is contained in:
Max Sum 2019-12-02 00:01:27 +08:00 committed by Nick Craig-Wood
parent cd26142705
commit 1b1e156908

View File

@ -153,7 +153,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil {
return err
}
// Get mutliple reader
if len(entries) == 1 {
obj := entries[0].(*upstream.Object)
return obj.Update(ctx, in, src, options...)
}
// Get multiple reader
readers := make([]io.Reader, len(entries))
writers := make([]io.Writer, len(entries))
errs := make([]error, len(entries) + 1)
@ -161,9 +165,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
r, w := io.Pipe()
bw := bufio.NewWriter(w)
readers[i], writers[i] = r, bw
defer func () {
w.Close()
}()
defer w.Close()
}
go func() {
mw := io.MultiWriter(writers...)
@ -515,8 +517,7 @@ func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch
// as an optional interface
func (f *Fs) DirCacheFlush() {
multithread(len(f.upstreams), func(i int){
do := f.upstreams[i].Features().DirCacheFlush;
if do != nil {
if do := f.upstreams[i].Features().DirCacheFlush; do != nil {
do()
}
})
@ -524,75 +525,63 @@ func (f *Fs) DirCacheFlush() {
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) {
srcPath := src.Remote()
u, err := f.search(ctx, srcPath)
upstreams := []*upstream.Fs{u}
if err != nil || len(upstreams) == 0 {
upstreams, err = f.create(ctx, parentDir(srcPath))
upstreams, err := f.create(ctx, parentDir(srcPath))
if err != nil {
return nil, err
}
if len(upstreams) == 1 {
u := upstreams[0]
var o fs.Object
var err error
if stream {
o, err = u.Features().PutStream(ctx, in, src, options...)
} else {
o, err = u.Put(ctx, in, src, options...)
}
if err != nil {
return nil, err
}
e, err := f.wrapEntries(u.WrapObject(o))
return e.(*Object), err
}
// Get mutliple reader
readers := make([]*io.PipeReader, len(upstreams))
writers := make([]*io.PipeWriter, len(upstreams))
errs := make([]error, len(upstreams) + 1)
for i := range upstreams {
// Get multiple reader
readers := make([]io.Reader, len(upstreams))
writers := make([]io.Writer, len(upstreams))
for i := range writers {
r, w := io.Pipe()
readers[i], writers[i] = r, w
bw := bufio.NewWriter(w)
readers[i], writers[i] = r, bw
defer w.Close()
}
go func() {
bufw := make([]io.Writer, len(writers))
for i, w := range writers {
bufw[i] = bufio.NewWriter(w)
}
mw := io.MultiWriter(bufw...)
_, err := io.Copy(mw, in)
if err != nil {
errs[len(upstreams)] = err
}
for _, bw := range bufw {
mw := io.MultiWriter(writers...)
io.Copy(mw, in)
for _, bw := range writers {
bw.(*bufio.Writer).Flush()
}
}()
// Multi-threading
var wg sync.WaitGroup
objs := make([]*upstream.Object, len(upstreams))
for i, u := range upstreams {
wg.Add(1)
i, u := i, u // Closure
go func() {
defer wg.Done()
var o fs.Object
var err error
if stream {
o, err = u.Features().PutStream(ctx, readers[i], src, options...)
} else {
o, err = u.Put(ctx, readers[i], src, options...)
}
if err != nil {
errs[i] = err
return
}
objs[i] = u.WrapObject(o)
}()
}
wg.Wait()
for _, w := range writers {
w.Close()
}
var entries []upstream.Entry
for i, o := range objs {
if errs[i] != nil {
err = errs[i]
continue
objs := make([]upstream.Entry, len(upstreams))
errs := make([]error, len(upstreams))
multithread(len(upstreams), func(i int) {
u := upstreams[i]
var o fs.Object
if stream {
o, errs[i] = u.Features().PutStream(ctx, readers[i], src, options...)
} else {
o, errs[i] = u.Put(ctx, readers[i], src, options...)
}
if errs[i] != nil {
return
}
objs[i] = u.WrapObject(o)
})
for _, err := range errs {
if err != nil {
return nil, err
}
entries = append(entries, o)
}
if len(entries) == 0 {
return nil, err
}
// Get an object for future operation
e, err := f.wrapEntries(entries...)
e, err := f.wrapEntries(objs...)
return e.(*Object), err
}
@ -602,7 +591,15 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bo
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.put(ctx, in, src, false, options...)
o, err := f.NewObject(ctx, src.Remote())
switch err {
case nil:
return o, o.Update(ctx, in, src, options...)
case fs.ErrorObjectNotFound:
return f.put(ctx, in, src, false, options...)
default:
return nil, err
}
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
@ -611,7 +608,15 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.put(ctx, in, src, true, options...)
o, err := f.NewObject(ctx, src.Remote())
switch err {
case nil:
return o, o.Update(ctx, in, src, options...)
case fs.ErrorObjectNotFound:
return f.put(ctx, in, src, true, options...)
default:
return nil, err
}
}
// About gets quota information from the Fs