From b510c70c1ef2bd41eee962dc04fbbe0880edebd6 Mon Sep 17 00:00:00 2001 From: Stefan Breunig Date: Sat, 12 Aug 2017 12:57:34 +0200 Subject: [PATCH] =?UTF-8?q?b2:=20calculate=20missing=20hashes=20on=20the?= =?UTF-8?q?=20fly=20instead=20of=20spooling=20=E2=80=93=20fixes=20#1288?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- b2/b2.go | 41 +++++-------------------------------- b2/upload.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 57 insertions(+), 41 deletions(-) diff --git a/b2/b2.go b/b2/b2.go index 0ee1cd233..bc93ac7ae 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -10,9 +10,7 @@ import ( "fmt" "hash" "io" - "io/ioutil" "net/http" - "os" "path" "regexp" "strconv" @@ -1249,42 +1247,13 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio } modTime := src.ModTime() + calculatedSha1, _ := src.Hash(fs.HashSHA1) - - // If source cannot provide the hash, copy to a temporary file - // and calculate the hash while doing so. - // Then we serve the temporary file. if calculatedSha1 == "" { - // Open a temp file to copy the input - fd, err := ioutil.TempFile("", "rclone-b2-") - if err != nil { - return err - } - _ = os.Remove(fd.Name()) // Delete the file - may not work on Windows - defer func() { - _ = fd.Close() // Ignore error may have been closed already - _ = os.Remove(fd.Name()) // Delete the file - may have been deleted already - }() - - // Copy the input while calculating the sha1 - hash := sha1.New() - teed := io.TeeReader(in, hash) - n, err := io.Copy(fd, teed) - if err != nil { - return err - } - if n != size { - return errors.Errorf("read %d bytes expecting %d", n, size) - } - calculatedSha1 = fmt.Sprintf("%x", hash.Sum(nil)) - - // Rewind the temporary file - _, err = fd.Seek(0, 0) - if err != nil { - return err - } - // Set input to temporary file - in = fd + calculatedSha1 = "hex_digits_at_end" + har := newHashAppendingReader(in, sha1.New()) + size += int64(har.AdditionalLength()) + in = har } // Get upload URL diff --git a/b2/upload.go b/b2/upload.go index 214b23e1c..a803991b3 100644 --- a/b2/upload.go +++ b/b2/upload.go @@ -7,8 +7,11 @@ package b2 import ( "bytes" "crypto/sha1" + "encoding/hex" "fmt" + "hash" "io" + "strings" "sync" "github.com/ncw/rclone/b2/api" @@ -17,6 +20,49 @@ import ( "github.com/pkg/errors" ) +type hashAppendingReader struct { + h hash.Hash + in io.Reader + hexSum string + hexReader io.Reader +} + +// Read returns bytes all bytes from the original reader, then the hex sum +// of what was read so far, then EOF. +func (har *hashAppendingReader) Read(b []byte) (int, error) { + if har.hexReader == nil { + n, err := har.in.Read(b) + if err == io.EOF { + har.in = nil // allow GC + err = nil // allow reading hexSum before EOF + + har.hexSum = hex.EncodeToString(har.h.Sum(nil)) + har.hexReader = strings.NewReader(har.hexSum) + } + return n, err + } + return har.hexReader.Read(b) +} + +// AdditionalLength returns how many bytes the appended hex sum will take up. +func (har *hashAppendingReader) AdditionalLength() int { + return hex.EncodedLen(har.h.Size()) +} + +// HexSum returns the hash sum as hex. It's only available after the original +// reader has EOF'd. It's an empty string before that. +func (har *hashAppendingReader) HexSum() string { + return har.hexSum +} + +// newHashAppendingReader takes a Reader and a Hash and will append the hex sum +// after the original reader reaches EOF. The increased size depends on the +// given hash, which may be queried through AdditionalLength() +func newHashAppendingReader(in io.Reader, h hash.Hash) *hashAppendingReader { + withHash := io.TeeReader(in, h) + return &hashAppendingReader{h: h, in: withHash} +} + // largeUpload is used to control the upload of large files which need chunking type largeUpload struct { f *Fs // parent Fs @@ -128,9 +174,9 @@ func (up *largeUpload) clearUploadURL() { // Transfer a chunk func (up *largeUpload) transferChunk(part int64, body []byte) error { - calculatedSHA1 := fmt.Sprintf("%x", sha1.Sum(body)) - up.sha1s[part-1] = calculatedSHA1 - size := int64(len(body)) + in := newHashAppendingReader(bytes.NewReader(body), sha1.New()) + size := int64(len(body)) + int64(in.AdditionalLength()) + err := up.f.pacer.Call(func() (bool, error) { fs.Debugf(up.o, "Sending chunk %d length %d", part, len(body)) @@ -165,11 +211,11 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error { opts := rest.Opts{ Method: "POST", RootURL: upload.UploadURL, - Body: fs.AccountPart(up.o, bytes.NewBuffer(body)), + Body: fs.AccountPart(up.o, in), ExtraHeaders: map[string]string{ "Authorization": upload.AuthorizationToken, "X-Bz-Part-Number": fmt.Sprintf("%d", part), - sha1Header: calculatedSHA1, + sha1Header: "hex_digits_at_end", }, ContentLength: &size, } @@ -191,6 +237,7 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error { } else { fs.Debugf(up.o, "Done sending chunk %d", part) } + up.sha1s[part-1] = in.HexSum() return err }