// Upload large files for b2 // // Docs - https://www.backblaze.com/b2/docs/large_files.html package b2 import ( "bytes" "crypto/sha1" "encoding/hex" "fmt" gohash "hash" "io" "strings" "sync" "github.com/ncw/rclone/backend/b2/api" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/accounting" "github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/lib/rest" "github.com/pkg/errors" ) type hashAppendingReader struct { h gohash.Hash in io.Reader hexSum string hexReader io.Reader } // Read returns bytes all bytes from the original reader, then the hex sum // of what was read so far, then EOF. func (har *hashAppendingReader) Read(b []byte) (int, error) { if har.hexReader == nil { n, err := har.in.Read(b) if err == io.EOF { har.in = nil // allow GC err = nil // allow reading hexSum before EOF har.hexSum = hex.EncodeToString(har.h.Sum(nil)) har.hexReader = strings.NewReader(har.hexSum) } return n, err } return har.hexReader.Read(b) } // AdditionalLength returns how many bytes the appended hex sum will take up. func (har *hashAppendingReader) AdditionalLength() int { return hex.EncodedLen(har.h.Size()) } // HexSum returns the hash sum as hex. It's only available after the original // reader has EOF'd. It's an empty string before that. func (har *hashAppendingReader) HexSum() string { return har.hexSum } // newHashAppendingReader takes a Reader and a Hash and will append the hex sum // after the original reader reaches EOF. The increased size depends on the // given hash, which may be queried through AdditionalLength() func newHashAppendingReader(in io.Reader, h gohash.Hash) *hashAppendingReader { withHash := io.TeeReader(in, h) return &hashAppendingReader{h: h, in: withHash} } // largeUpload is used to control the upload of large files which need chunking type largeUpload struct { f *Fs // parent Fs o *Object // object being uploaded in io.Reader // read the data from here id string // ID of the file being uploaded size int64 // total size parts int64 // calculated number of parts, if known sha1s []string // slice of SHA1s for each part uploadMu sync.Mutex // lock for upload variable uploads []*api.GetUploadPartURLResponse // result of get upload URL calls } // newLargeUpload starts an upload of object o from in with metadata in src func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) { remote := o.remote size := src.Size() parts := int64(0) sha1SliceSize := int64(maxParts) if size == -1 { fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", fs.SizeSuffix(chunkSize), fs.SizeSuffix(maxParts*chunkSize)) } else { parts = size / int64(chunkSize) if size%int64(chunkSize) != 0 { parts++ } if parts > maxParts { return nil, errors.Errorf("%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size", remote, size, parts, maxParts) } sha1SliceSize = parts } modTime := src.ModTime() opts := rest.Opts{ Method: "POST", Path: "/b2_start_large_file", } bucketID, err := f.getBucketID() if err != nil { return nil, err } var request = api.StartLargeFileRequest{ BucketID: bucketID, Name: o.fs.root + remote, ContentType: fs.MimeType(src), Info: map[string]string{ timeKey: timeString(modTime), }, } // Set the SHA1 if known if calculatedSha1, err := src.Hash(hash.SHA1); err == nil && calculatedSha1 != "" { request.Info[sha1Key] = calculatedSha1 } var response api.StartLargeFileResponse err = f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &response) return f.shouldRetry(resp, err) }) if err != nil { return nil, err } up = &largeUpload{ f: f, o: o, in: in, id: response.ID, size: size, parts: parts, sha1s: make([]string, sha1SliceSize), } return up, nil } // getUploadURL returns the upload info with the UploadURL and the AuthorizationToken // // This should be returned with returnUploadURL when finished func (up *largeUpload) getUploadURL() (upload *api.GetUploadPartURLResponse, err error) { up.uploadMu.Lock() defer up.uploadMu.Unlock() if len(up.uploads) == 0 { opts := rest.Opts{ Method: "POST", Path: "/b2_get_upload_part_url", } var request = api.GetUploadPartURLRequest{ ID: up.id, } err := up.f.pacer.Call(func() (bool, error) { resp, err := up.f.srv.CallJSON(&opts, &request, &upload) return up.f.shouldRetry(resp, err) }) if err != nil { return nil, errors.Wrap(err, "failed to get upload URL") } } else { upload, up.uploads = up.uploads[0], up.uploads[1:] } return upload, nil } // returnUploadURL returns the UploadURL to the cache func (up *largeUpload) returnUploadURL(upload *api.GetUploadPartURLResponse) { if upload == nil { return } up.uploadMu.Lock() up.uploads = append(up.uploads, upload) up.uploadMu.Unlock() } // clearUploadURL clears the current UploadURL and the AuthorizationToken func (up *largeUpload) clearUploadURL() { up.uploadMu.Lock() up.uploads = nil up.uploadMu.Unlock() } // Transfer a chunk func (up *largeUpload) transferChunk(part int64, body []byte) error { err := up.f.pacer.Call(func() (bool, error) { fs.Debugf(up.o, "Sending chunk %d length %d", part, len(body)) // Get upload URL upload, err := up.getUploadURL() if err != nil { return false, err } in := newHashAppendingReader(bytes.NewReader(body), sha1.New()) size := int64(len(body)) + int64(in.AdditionalLength()) // Authorization // // An upload authorization token, from b2_get_upload_part_url. // // X-Bz-Part-Number // // A number from 1 to 10000. The parts uploaded for one file // must have contiguous numbers, starting with 1. // // Content-Length // // The number of bytes in the file being uploaded. Note that // this header is required; you cannot leave it out and just // use chunked encoding. The minimum size of every part but // the last one is 100MB. // // X-Bz-Content-Sha1 // // The SHA1 checksum of the this part of the file. B2 will // check this when the part is uploaded, to make sure that the // data arrived correctly. The same SHA1 checksum must be // passed to b2_finish_large_file. opts := rest.Opts{ Method: "POST", RootURL: upload.UploadURL, Body: accounting.AccountPart(up.o, in), ExtraHeaders: map[string]string{ "Authorization": upload.AuthorizationToken, "X-Bz-Part-Number": fmt.Sprintf("%d", part), sha1Header: "hex_digits_at_end", }, ContentLength: &size, } var response api.UploadPartResponse resp, err := up.f.srv.CallJSON(&opts, nil, &response) retry, err := up.f.shouldRetry(resp, err) // On retryable error clear PartUploadURL if retry { fs.Debugf(up.o, "Clearing part upload URL because of error: %v", err) upload = nil } up.returnUploadURL(upload) up.sha1s[part-1] = in.HexSum() return retry, err }) if err != nil { fs.Debugf(up.o, "Error sending chunk %d: %v", part, err) } else { fs.Debugf(up.o, "Done sending chunk %d", part) } return err } // finish closes off the large upload func (up *largeUpload) finish() error { fs.Debugf(up.o, "Finishing large file upload with %d parts", up.parts) opts := rest.Opts{ Method: "POST", Path: "/b2_finish_large_file", } var request = api.FinishLargeFileRequest{ ID: up.id, SHA1s: up.sha1s, } var response api.FileInfo err := up.f.pacer.Call(func() (bool, error) { resp, err := up.f.srv.CallJSON(&opts, &request, &response) return up.f.shouldRetry(resp, err) }) if err != nil { return err } return up.o.decodeMetaDataFileInfo(&response) } // cancel aborts the large upload func (up *largeUpload) cancel() error { opts := rest.Opts{ Method: "POST", Path: "/b2_cancel_large_file", } var request = api.CancelLargeFileRequest{ ID: up.id, } var response api.CancelLargeFileResponse err := up.f.pacer.Call(func() (bool, error) { resp, err := up.f.srv.CallJSON(&opts, &request, &response) return up.f.shouldRetry(resp, err) }) return err } func (up *largeUpload) managedTransferChunk(wg *sync.WaitGroup, errs chan error, part int64, buf []byte) { wg.Add(1) go func(part int64, buf []byte) { defer wg.Done() defer up.f.putUploadBlock(buf) err := up.transferChunk(part, buf) if err != nil { select { case errs <- err: default: } } }(part, buf) } func (up *largeUpload) finishOrCancelOnError(err error, errs chan error) error { if err == nil { select { case err = <-errs: default: } } if err != nil { fs.Debugf(up.o, "Cancelling large file upload due to error: %v", err) cancelErr := up.cancel() if cancelErr != nil { fs.Errorf(up.o, "Failed to cancel large file upload: %v", cancelErr) } return err } return up.finish() } // Stream uploads the chunks from the input, starting with a required initial // chunk. Assumes the file size is unknown and will upload until the input // reaches EOF. func (up *largeUpload) Stream(initialUploadBlock []byte) (err error) { fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id) errs := make(chan error, 1) hasMoreParts := true var wg sync.WaitGroup accounting.AccountByPart(up.o) // Cancel whole file accounting before reading // Transfer initial chunk up.size = int64(len(initialUploadBlock)) up.managedTransferChunk(&wg, errs, 1, initialUploadBlock) outer: for part := int64(2); hasMoreParts; part++ { // Check any errors select { case err = <-errs: break outer default: } // Get a block of memory buf := up.f.getUploadBlock() // Read the chunk n, err := io.ReadFull(up.in, buf) if err == io.ErrUnexpectedEOF { fs.Debugf(up.o, "Read less than a full chunk, making this the last one.") buf = buf[:n] hasMoreParts = false err = nil } else if err == io.EOF { fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.") up.f.putUploadBlock(buf) hasMoreParts = false err = nil break outer } else if err != nil { // other kinds of errors indicate failure up.f.putUploadBlock(buf) break outer } // Keep stats up to date up.parts = part up.size += int64(n) if part > maxParts { err = errors.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts) break outer } // Transfer the chunk up.managedTransferChunk(&wg, errs, part, buf) } wg.Wait() up.sha1s = up.sha1s[:up.parts] return up.finishOrCancelOnError(err, errs) } // Upload uploads the chunks from the input func (up *largeUpload) Upload() error { fs.Debugf(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) remaining := up.size errs := make(chan error, 1) var wg sync.WaitGroup var err error accounting.AccountByPart(up.o) // Cancel whole file accounting before reading outer: for part := int64(1); part <= up.parts; part++ { // Check any errors select { case err = <-errs: break outer default: } reqSize := remaining if reqSize >= int64(chunkSize) { reqSize = int64(chunkSize) } // Get a block of memory buf := up.f.getUploadBlock()[:reqSize] // Read the chunk _, err = io.ReadFull(up.in, buf) if err != nil { up.f.putUploadBlock(buf) break outer } // Transfer the chunk up.managedTransferChunk(&wg, errs, part, buf) remaining -= reqSize } wg.Wait() return up.finishOrCancelOnError(err, errs) }