diff --git a/backend/internetarchive/internetarchive.go b/backend/internetarchive/internetarchive.go index ce994c23c..ce021e35f 100644 --- a/backend/internetarchive/internetarchive.go +++ b/backend/internetarchive/internetarchive.go @@ -28,6 +28,7 @@ import ( "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/rest" ) @@ -129,6 +130,7 @@ type IAFile struct { // Source string `json:"source"` Mtime string `json:"mtime"` RcloneMtime json.RawMessage `json:"rclone-mtime"` + UpdateTrack json.RawMessage `json:"rclone-update-track"` Size string `json:"size"` Md5 string `json:"md5"` Crc32 string `json:"crc32"` @@ -294,7 +296,7 @@ func (o *Object) Storable() bool { return true } -// SetModTime is not supported +// SetModTime sets modTime on a particular file func (o *Object) SetModTime(ctx context.Context, t time.Time) (err error) { bucket, reqDir := o.split() if bucket == "" { @@ -483,6 +485,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (_ fs.Objec return nil, fs.ErrorCantCopy } + updateTracker := random.String(32) headers := map[string]string{ "x-archive-auto-make-bucket": "1", "x-archive-queue-derive": "0", @@ -494,7 +497,8 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (_ fs.Objec "x-archive-filemeta-crc32": srcObj.crc32, "x-archive-filemeta-size": fmt.Sprint(srcObj.size), // add this too for sure - "x-archive-filemeta-rclone-mtime": srcObj.modTime.Format(time.RFC3339Nano), + "x-archive-filemeta-rclone-mtime": srcObj.modTime.Format(time.RFC3339Nano), + "x-archive-filemeta-rclone-update-track": updateTracker, } // make a PUT request at (IAS3)/:item/:path without body @@ -515,7 +519,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (_ fs.Objec // we can't update/find metadata here as IA will also // queue server-side copy as well as upload/delete. - return f.waitFileUpload(ctx, trimPathPrefix(path.Join(dstBucket, dstPath), f.root, f.opt.Enc), f.getHashes(ctx, src), srcObj.size) + return f.waitFileUpload(ctx, trimPathPrefix(path.Join(dstBucket, dstPath), f.root, f.opt.Enc), updateTracker, srcObj.size) } // ListR lists the objects and directories of the Fs starting @@ -660,12 +664,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op bucket, bucketPath := o.split() modTime := src.ModTime(ctx) size := src.Size() + updateTracker := random.String(32) // Set the mtime in the metadata // internetarchive backend builds at header level as IAS3 has extension outside X-Amz- headers := map[string]string{ // https://github.com/jjjake/internetarchive/blob/2456376533251df9d05e0a14d796ec1ced4959f5/internetarchive/iarequest.py#L158 - "x-amz-filemeta-rclone-mtime": modTime.Format(time.RFC3339Nano), + "x-amz-filemeta-rclone-mtime": modTime.Format(time.RFC3339Nano), + "x-amz-filemeta-rclone-update-track": updateTracker, // we add some more headers for intuitive actions "x-amz-auto-make-bucket": "1", // create an item if does not exist, do nothing if already @@ -712,7 +718,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // or we have to wait for finish? (needs polling (frontend)/metadata/:item or scraping (frontend)/history/:item) var newObj *Object if err == nil { - newObj, err = o.fs.waitFileUpload(ctx, o.remote, o.fs.getHashes(ctx, src), size) + newObj, err = o.fs.waitFileUpload(ctx, o.remote, updateTracker, size) } else { newObj = &Object{} } @@ -782,18 +788,6 @@ func (o *Object) split() (bucket, bucketPath string) { return o.fs.split(o.remote) } -func (f *Fs) getHashes(ctx context.Context, src fs.ObjectInfo) map[hash.Type]string { - hashMap := map[hash.Type]string{} - for _, ty := range f.Hashes().Array() { - sum, err := src.Hash(ctx, ty) - if err != nil || sum == "" { - continue - } - hashMap[ty] = sum - } - return hashMap -} - func (f *Fs) requestMetadata(ctx context.Context, bucket string) (result MetadataResponse, err error) { var resp *http.Response // make a GET request to (frontend)/metadata/:item/ @@ -852,7 +846,7 @@ func (f *Fs) listAllUnconstrained(ctx context.Context, bucket string) (entries f return entries, nil } -func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[hash.Type]string, newSize int64) (ret *Object, err error) { +func (f *Fs) waitFileUpload(ctx context.Context, reqPath, tracker string, newSize int64) (ret *Object, err error) { bucket, bucketPath := f.split(reqPath) ret = &Object{ @@ -869,6 +863,10 @@ func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[h ret2, ok := ret2.(*Object) if ok { ret = ret2 + ret.crc32 = "" + ret.md5 = "" + ret.sha1 = "" + ret.size = -1 } } return ret, nil @@ -881,9 +879,6 @@ func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[h go func() { isFirstTime := true existed := false - oldMtime := "" - oldCrc32 := "" - unreliablePassCount := 0 for { if !isFirstTime { // depending on the queue, it takes time @@ -908,10 +903,6 @@ func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[h if isFirstTime { isFirstTime = false existed = iaFile != nil - if iaFile != nil { - oldMtime = iaFile.Mtime - oldCrc32 = iaFile.Crc32 - } } if iaFile == nil { continue @@ -925,38 +916,20 @@ func (f *Fs) waitFileUpload(ctx context.Context, reqPath string, newHashes map[h return } - hashMatched := true - for tt, sum := range newHashes { - if tt == hash.MD5 && !hash.Equals(iaFile.Md5, sum) { - hashMatched = false - break - } - if tt == hash.SHA1 && !hash.Equals(iaFile.Sha1, sum) { - hashMatched = false - break - } - if tt == hash.CRC32 && !hash.Equals(iaFile.Crc32, sum) { - hashMatched = false + fileTrackers, _ := listOrString(iaFile.UpdateTrack) + trackerMatch := false + for _, v := range fileTrackers { + if v == tracker { + trackerMatch = true break } } - if !hashMatched { + if !trackerMatch { continue } if !compareSize(parseSize(iaFile.Size), newSize) { continue } - if hash.Equals(oldCrc32, iaFile.Crc32) && unreliablePassCount < 60 { - // the following two are based on a sort of "bad" assumption; - // what if the file is updated immediately, before polling? - // by limiting hits of these tests, avoid infinite loop - unreliablePassCount++ - continue - } - if hash.Equals(iaFile.Mtime, oldMtime) && unreliablePassCount < 60 { - unreliablePassCount++ - continue - } // voila! retC <- struct { @@ -1036,20 +1009,24 @@ func makeValidObject2(f *Fs, file IAFile, bucket string) *Object { return makeValidObject(f, trimPathPrefix(path.Join(bucket, file.Name), f.root, f.opt.Enc), file, mtimeTime, size) } -func (file IAFile) parseMtime() (mtime time.Time) { - // method 1: use metadata added by rclone - var rmArray []string +func listOrString(jm json.RawMessage) (rmArray []string, err error) { // rclone-metadata can be an array or string // try to deserialize it as array first - err := json.Unmarshal(file.RcloneMtime, &rmArray) + err = json.Unmarshal(jm, &rmArray) if err != nil { // if not, it's a string dst := new(string) - err = json.Unmarshal(file.RcloneMtime, dst) + err = json.Unmarshal(jm, dst) if err == nil { rmArray = []string{*dst} } } + return +} + +func (file IAFile) parseMtime() (mtime time.Time) { + // method 1: use metadata added by rclone + rmArray, err := listOrString(file.RcloneMtime) // let's take the first value we can deserialize for _, value := range rmArray { mtime, err = time.Parse(time.RFC3339Nano, value)