b2: implement streaming upload of files with unknown length (see #1614) (closes #1686)

This commit is contained in:
Stefan 2017-09-16 22:43:48 +02:00 committed by GitHub
parent 4ac9a65049
commit 234bfae0d5
3 changed files with 156 additions and 47 deletions

View File

@ -5,6 +5,7 @@ package b2
// checking SHA1s? // checking SHA1s?
import ( import (
"bufio"
"bytes" "bytes"
"crypto/sha1" "crypto/sha1"
"fmt" "fmt"
@ -705,6 +706,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
return fs, fs.Update(in, src, options...) return fs, fs.Update(in, src, options...)
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(in, src, options...)
}
// Mkdir creates the bucket if it doesn't exist // Mkdir creates the bucket if it doesn't exist
func (f *Fs) Mkdir(dir string) error { func (f *Fs) Mkdir(dir string) error {
f.bucketOKMu.Lock() f.bucketOKMu.Lock()
@ -1237,8 +1243,33 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
} }
size := src.Size() size := src.Size()
// If a large file upload in chunks - see upload.go if size == -1 {
if size >= int64(uploadCutoff) { // Check if the file is large enough for a chunked upload (needs to be at least two chunks)
buf := o.fs.getUploadBlock()
n, err := io.ReadFull(in, buf)
if err == nil {
bufReader := bufio.NewReader(in)
in = bufReader
_, err = bufReader.Peek(1)
}
if err == nil {
fs.Debugf(o, "File is big enough for chunked streaming")
up, err := o.fs.newLargeUpload(o, in, src)
if err != nil {
o.fs.putUploadBlock(buf)
return err
}
return up.Stream(buf)
} else if err == io.EOF || err == io.ErrUnexpectedEOF {
fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n)
defer o.fs.putUploadBlock(buf)
size = int64(n)
in = bytes.NewReader(buf[:n])
} else {
return err
}
} else if size > int64(uploadCutoff) {
up, err := o.fs.newLargeUpload(o, in, src) up, err := o.fs.newLargeUpload(o, in, src)
if err != nil { if err != nil {
return err return err
@ -1375,6 +1406,7 @@ func (o *Object) MimeType() string {
var ( var (
_ fs.Fs = &Fs{} _ fs.Fs = &Fs{}
_ fs.Purger = &Fs{} _ fs.Purger = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.CleanUpper = &Fs{} _ fs.CleanUpper = &Fs{}
_ fs.ListRer = &Fs{} _ fs.ListRer = &Fs{}
_ fs.Object = &Object{} _ fs.Object = &Object{}

View File

@ -70,7 +70,7 @@ type largeUpload struct {
in io.Reader // read the data from here in io.Reader // read the data from here
id string // ID of the file being uploaded id string // ID of the file being uploaded
size int64 // total size size int64 // total size
parts int64 // calculated number of parts parts int64 // calculated number of parts, if known
sha1s []string // slice of SHA1s for each part sha1s []string // slice of SHA1s for each part
uploadMu sync.Mutex // lock for upload variable uploadMu sync.Mutex // lock for upload variable
uploads []*api.GetUploadPartURLResponse // result of get upload URL calls uploads []*api.GetUploadPartURLResponse // result of get upload URL calls
@ -80,13 +80,21 @@ type largeUpload struct {
func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) { func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) {
remote := o.remote remote := o.remote
size := src.Size() size := src.Size()
parts := size / int64(chunkSize) parts := int64(0)
sha1SliceSize := int64(maxParts)
if size == -1 {
fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", fs.SizeSuffix(chunkSize), fs.SizeSuffix(maxParts*chunkSize))
} else {
parts = size / int64(chunkSize)
if size%int64(chunkSize) != 0 { if size%int64(chunkSize) != 0 {
parts++ parts++
} }
if parts > maxParts { if parts > maxParts {
return nil, errors.Errorf("%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size", remote, size, parts, maxParts) return nil, errors.Errorf("%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size", remote, size, parts, maxParts)
} }
sha1SliceSize = parts
}
modTime := src.ModTime() modTime := src.ModTime()
opts := rest.Opts{ opts := rest.Opts{
Method: "POST", Method: "POST",
@ -123,7 +131,7 @@ func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *lar
id: response.ID, id: response.ID,
size: size, size: size,
parts: parts, parts: parts,
sha1s: make([]string, parts), sha1s: make([]string, sha1SliceSize),
} }
return up, nil return up, nil
} }
@ -243,6 +251,7 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error {
// finish closes off the large upload // finish closes off the large upload
func (up *largeUpload) finish() error { func (up *largeUpload) finish() error {
fs.Debugf(up.o, "Finishing large file upload with %d parts", up.parts)
opts := rest.Opts{ opts := rest.Opts{
Method: "POST", Method: "POST",
Path: "/b2_finish_large_file", Path: "/b2_finish_large_file",
@ -279,6 +288,101 @@ func (up *largeUpload) cancel() error {
return err return err
} }
func (up *largeUpload) managedTransferChunk(wg *sync.WaitGroup, errs chan error, part int64, buf []byte) {
wg.Add(1)
go func(part int64, buf []byte) {
defer wg.Done()
defer up.f.putUploadBlock(buf)
err := up.transferChunk(part, buf)
if err != nil {
select {
case errs <- err:
default:
}
}
}(part, buf)
}
func (up *largeUpload) finishOrCancelOnError(err error, errs chan error) error {
if err == nil {
select {
case err = <-errs:
default:
}
}
if err != nil {
fs.Debugf(up.o, "Cancelling large file upload due to error: %v", err)
cancelErr := up.cancel()
if cancelErr != nil {
fs.Errorf(up.o, "Failed to cancel large file upload: %v", cancelErr)
}
return err
}
return up.finish()
}
// Stream uploads the chunks from the input, starting with a required initial
// chunk. Assumes the file size is unknown and will upload until the input
// reaches EOF.
func (up *largeUpload) Stream(initialUploadBlock []byte) (err error) {
fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id)
errs := make(chan error, 1)
hasMoreParts := true
var wg sync.WaitGroup
fs.AccountByPart(up.o) // Cancel whole file accounting before reading
// Transfer initial chunk
up.size = int64(len(initialUploadBlock))
up.managedTransferChunk(&wg, errs, 1, initialUploadBlock)
outer:
for part := int64(2); hasMoreParts; part++ {
// Check any errors
select {
case err = <-errs:
break outer
default:
}
// Get a block of memory
buf := up.f.getUploadBlock()
// Read the chunk
n, err := io.ReadFull(up.in, buf)
if err == io.ErrUnexpectedEOF {
fs.Debugf(up.o, "Read less than a full chunk, making this the last one.")
buf = buf[:n]
hasMoreParts = false
err = nil
} else if err == io.EOF {
fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.")
up.f.putUploadBlock(buf)
hasMoreParts = false
err = nil
break outer
} else if err != nil {
// other kinds of errors indicate failure
up.f.putUploadBlock(buf)
break outer
}
// Keep stats up to date
up.parts = part
up.size += int64(n)
if part > maxParts {
err = errors.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts)
break outer
}
// Transfer the chunk
up.managedTransferChunk(&wg, errs, part, buf)
}
wg.Wait()
up.sha1s = up.sha1s[:up.parts]
return up.finishOrCancelOnError(err, errs)
}
// Upload uploads the chunks from the input // Upload uploads the chunks from the input
func (up *largeUpload) Upload() error { func (up *largeUpload) Upload() error {
fs.Debugf(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) fs.Debugf(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id)
@ -312,37 +416,10 @@ outer:
} }
// Transfer the chunk // Transfer the chunk
wg.Add(1) up.managedTransferChunk(&wg, errs, part, buf)
go func(part int64, buf []byte) {
defer wg.Done()
defer up.f.putUploadBlock(buf)
err := up.transferChunk(part, buf)
if err != nil {
select {
case errs <- err:
default:
}
}
}(part, buf)
remaining -= reqSize remaining -= reqSize
} }
wg.Wait() wg.Wait()
if err == nil {
select { return up.finishOrCancelOnError(err, errs)
case err = <-errs:
default:
}
}
if err != nil {
fs.Debugf(up.o, "Cancelling large file upload due to error: %v", err)
cancelErr := up.cancel()
if cancelErr != nil {
fs.Errorf(up.o, "Failed to cancel large file upload: %v", cancelErr)
}
return err
}
// Check any errors
fs.Debugf(up.o, "Finishing large file upload")
return up.finish()
} }

View File

@ -121,7 +121,7 @@ func Equal(src ObjectInfo, dst Object) bool {
func equal(src ObjectInfo, dst Object, sizeOnly, checkSum bool) bool { func equal(src ObjectInfo, dst Object, sizeOnly, checkSum bool) bool {
if !Config.IgnoreSize { if !Config.IgnoreSize {
if src.Size() != dst.Size() { if src.Size() != dst.Size() {
Debugf(src, "Sizes differ") Debugf(src, "Sizes differ (src %d vs dst %d)", src.Size(), dst.Size())
return false return false
} }
} }