diff --git a/backend/chunker/chunker.go b/backend/chunker/chunker.go index 76bed2e6c..2bd8a10cb 100644 --- a/backend/chunker/chunker.go +++ b/backend/chunker/chunker.go @@ -6,6 +6,8 @@ import ( "context" "crypto/md5" "crypto/sha1" + "encoding" + "encoding/base64" "encoding/hex" "encoding/json" "errors" @@ -13,6 +15,7 @@ import ( gohash "hash" "io" "io/ioutil" + "log" "math/rand" "path" "regexp" @@ -379,6 +382,8 @@ type Fs struct { features *fs.Features // optional features dirSort bool // reserved for future, ignored useNoRename bool // can be set with the transactions option + hashState string // set in resume(), used to restore hash state + resumeXactID string // set in resume(), allows reuse of xactID upon resume } // configure sets up chunker for given name format, meta format and hash type. @@ -1152,7 +1157,41 @@ func (f *Fs) put( // Prepare to upload c := f.newChunkingReader(src) - wrapIn := c.wrapStream(ctx, in, src) + // Prepare for resume if resumable + var resumeOpt *fs.OptionResume + // partialHashState will be used in wrapStream to restore hash state + var partialHashState []byte + for _, option := range options { + switch option.(type) { + case *fs.OptionResume: + resumeOpt = option.(*fs.OptionResume) + if resumeOpt.Pos != 0 { + numChunksOnRemote := resumeOpt.Pos / int64(f.opt.ChunkSize) + // Checks for existing chunks on the remote + for i := 0; i < int(numChunksOnRemote); i++ { + existingChunkName := f.makeChunkName(remote, i, "", f.resumeXactID) + existingChunk, err := f.base.NewObject(ctx, existingChunkName) + // If NewObject returns an error the chunk likely doesn't exist on the remote and we cannot resume + if err != nil { + resumeOpt.Pos = 0 + c.chunks = nil + break + } + c.chunks = append(c.chunks, existingChunk) + } + fs.Debugf(f, "Resuming at chunk number: %d", numChunksOnRemote) + partialHashState, _ = base64.StdEncoding.DecodeString(f.hashState) + // Discard bytes that already exist on remote + written, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos) + if err != nil { + return nil, err + } + c.accountBytes(written) + c.sizeLeft = c.sizeTotal - c.readCount + } + } + } + wrapIn := c.wrapStream(ctx, in, src, partialHashState) var metaObject fs.Object defer func() { @@ -1162,13 +1201,22 @@ func (f *Fs) put( }() baseRemote := remote - xactID, errXact := f.newXactID(ctx, baseRemote) - if errXact != nil { - return nil, errXact + var xactID string + if resumeOpt != nil && resumeOpt.Pos != 0 { + xactID = f.resumeXactID + } else { + xactID, err = f.newXactID(ctx, baseRemote) + if err != nil { + return nil, err + } } // Transfer chunks data for c.chunkNo = 0; !c.done; c.chunkNo++ { + // skip to chunk we can resume from if resumeOpt is set + if c.chunkNo == 0 && resumeOpt != nil && resumeOpt.Pos != 0 { + c.chunkNo = int(resumeOpt.Pos) / int(f.opt.ChunkSize) + } if c.chunkNo > maxSafeChunkNumber { return nil, ErrChunkOverflow } @@ -1230,6 +1278,41 @@ func (f *Fs) put( c.chunkLimit = c.chunkSize c.chunks = append(c.chunks, chunk) + + // If an OptionResume was passed than we should call SetID so a resume can be attempted in event of a failure + // ID keeps track of the first chunk that should be uploaded if a resume is attempted + if resumeOpt != nil { + // Publish hash state to control chunk + marshaler, ok := c.hasher.(encoding.BinaryMarshaler) + if !ok { + return nil, fmt.Errorf("The hash type does not implement encoding.BinaryMarshaler") + } + state, err := marshaler.MarshalBinary() + if err != nil { + return nil, err + } + hashType := f.opt.HashType + data, err := marshalPartialHashJSON(ctx, hashType, base64.StdEncoding.EncodeToString(state)) + if err != nil { + return nil, err + } + controlChunkName := f.makeChunkName(remote, -1, "phash", xactID) + controlInfo := f.wrapInfo(src, controlChunkName, int64(len(data))) + controlChunk, err := basePut(ctx, bytes.NewReader(data), controlInfo) + defer func() { + _ = controlChunk.Remove(ctx) + }() + if err != nil { + return nil, err + } + positionStr := strconv.Itoa(c.chunkNo + 1) // stores the number of chunks uploaded + chunkSizeStr := strconv.FormatInt(c.chunkSize, 10) + startFromStr := strconv.FormatInt(int64(f.opt.StartFrom), 10) + err = resumeOpt.SetID(ctx, chunkSizeStr+","+startFromStr+","+positionStr+","+xactID, f.opt.HashType, base64.StdEncoding.EncodeToString(state)) + if err != nil { + return nil, err + } + } } // Validate uploaded size @@ -1356,7 +1439,7 @@ func (f *Fs) newChunkingReader(src fs.ObjectInfo) *chunkingReader { return c } -func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo) io.Reader { +func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, partialHashState []byte) io.Reader { baseIn, wrapBack := accounting.UnWrap(in) switch { @@ -1391,6 +1474,15 @@ func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.Ob } if c.hasher != nil { + // Restores hash state during a resume + if partialHashState != nil { + unmarshaler, ok := c.hasher.(encoding.BinaryUnmarshaler) + if ok { + if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil { + log.Fatal("unable to unmarshal hash:", err) + } + } + } baseIn = io.TeeReader(baseIn, c.hasher) } c.baseReader = baseIn @@ -2510,6 +2602,34 @@ func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte) return info, true, nil } +// Format for partial hash control chunks +type partialHashJSON struct { + HashType string `json:"htype"` + PartialHash string `json:"phash"` +} + +// marshalPartialHashJSON +// +// Creates a JSON containing the hashType being used and the partial hash state. This will be stored in +// a control chunk and used for resume functionality. +// +func marshalPartialHashJSON(ctx context.Context, hashType, partialHash string) ([]byte, error) { + controlData := partialHashJSON{ + HashType: hashType, + PartialHash: partialHash, + } + data, err := json.Marshal(&controlData) + return data, err +} + +// unmarshalPartialHashJSON parses partial hash control chunk. +// +func unmarshalPartialHashJSON(ctx context.Context, data []byte) (hashType, partialHashState string, err error) { + var partialHashData partialHashJSON + err = json.Unmarshal(data, &partialHashData) + return partialHashData.HashType, partialHashData.PartialHash, err +} + func silentlyRemove(ctx context.Context, o fs.Object) { _ = o.Remove(ctx) // ignore error } @@ -2544,9 +2664,58 @@ func (f *Fs) CanQuickRename() bool { return f.base.Features().Move != nil } +// Resume checks whether the (remote, ID) pair is valid and returns +// the point the file should be resumed from or an error. +func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) { + idSlice := strings.Split(ID, ",") + cachedChunkSize, err := strconv.ParseInt(idSlice[0], 10, 64) + cachedStartFrom, err := strconv.ParseInt(idSlice[1], 10, 64) + cachedChunkNo, err := strconv.ParseInt(idSlice[2], 10, 64) + cachedXactID := idSlice[3] + if err != nil { + return 0, err + } + if cachedChunkSize != int64(f.opt.ChunkSize) { + return 0, errors.New("ChunkSize doesn't match for file we are trying to resume") + } + if f.opt.StartFrom != int(cachedStartFrom) { + return 0, errors.New("StartFrom doesn't match for file we are trying to resume") + } + // Check partial hash control chunk + controlChunkName := f.makeChunkName(remote, -1, "phash", cachedXactID) + hashControlChunk, err := f.base.NewObject(ctx, controlChunkName) + if err != nil { + return 0, err + } + reader, err := hashControlChunk.Open(ctx) + data, err := ioutil.ReadAll(reader) + _ = reader.Close() // ensure file handle is freed on windows + if err != nil { + return 0, err + } + remoteHashType, remoteHashState, err := unmarshalPartialHashJSON(ctx, data) + if remoteHashType == hashName && remoteHashState == hashState { + if f.opt.HashType != remoteHashType { + fs.Debugf(f, "Resume skipped, mismatch hash types. prev: %s, curr: %s", remoteHashType, f.opt.HashType) + return 0, nil + } + pos := cachedChunkNo * cachedChunkSize + if err != nil { + return 0, err + } + f.hashState = hashState + f.resumeXactID = cachedXactID + return pos, nil + } + + // No valid control chunks found, rewind from start + return 0, nil +} + // Check the interfaces are satisfied var ( _ fs.Fs = (*Fs)(nil) + _ fs.Resumer = (*Fs)(nil) _ fs.Purger = (*Fs)(nil) _ fs.Copier = (*Fs)(nil) _ fs.Mover = (*Fs)(nil) diff --git a/backend/chunker/chunker_test.go b/backend/chunker/chunker_test.go index 376a9c5ac..4acdf5b5a 100644 --- a/backend/chunker/chunker_test.go +++ b/backend/chunker/chunker_test.go @@ -43,7 +43,6 @@ func TestIntegration(t *testing.T) { "DirCacheFlush", "UserInfo", "Disconnect", - "Resume", }, } if *fstest.RemoteName == "" {