From cc7b9af50ead0ed209afd3e70fd29975180eb8db Mon Sep 17 00:00:00 2001 From: Sergey Tolmachev Date: Wed, 23 Sep 2015 09:57:48 +0300 Subject: [PATCH] swift: support files > 5GB - fixes #46 * Write segments to ..._segments container * Choice of container and segment names compatible with swift tool * See http://docs.openstack.org/developer/swift/overview_large_objects.html * Controlled by command line flag --swift-chunk-size * Segments removed on delete --- swift/swift.go | 123 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 117 insertions(+), 6 deletions(-) diff --git a/swift/swift.go b/swift/swift.go index d87ca2963..f052fa2ad 100644 --- a/swift/swift.go +++ b/swift/swift.go @@ -2,6 +2,7 @@ package swift import ( + "bytes" "errors" "fmt" "io" @@ -12,6 +13,12 @@ import ( "github.com/ncw/rclone/fs" "github.com/ncw/swift" + "github.com/spf13/pflag" +) + +// Globals +var ( + chunkSize = fs.SizeSuffix(5 * 1024 * 1024 * 1024) ) // Register with Fs @@ -51,9 +58,10 @@ func init() { Name: "region", Help: "Region name - optional", }, - // snet = flag.Bool("swift-snet", false, "Use internal service network") // FIXME not implemented }, }) + // snet = flag.Bool("swift-snet", false, "Use internal service network") // FIXME not implemented + pflag.VarP(&chunkSize, "swift-chunk-size", "", "Above this size files will be chunked into a _segments container.") } // FsSwift represents a remote swift server @@ -381,9 +389,27 @@ func (o *FsObjectSwift) Remote() string { // Md5sum returns the Md5sum of an object returning a lowercase hex string func (o *FsObjectSwift) Md5sum() (string, error) { + isManifest, err := o.isManifestFile() + if err != nil { + return "", err + } + if isManifest { + fs.Debug(o, "Return empty md5 for swift manifest file. Md5 of manifest file calculate as md5 of md5 of it's parts, so it's not original md5") + return "", nil + } return strings.ToLower(o.info.Hash), nil } +// isManifestFile checks for manifest header +func (o *FsObjectSwift) isManifestFile() (bool, error) { + err := o.readMetaData() + if err != nil { + return false, err + } + _, isManifestFile := (*o.headers)["X-Object-Manifest"] + return isManifestFile, nil +} + // Size returns the size of an object in bytes func (o *FsObjectSwift) Size() int64 { return o.info.Bytes @@ -457,6 +483,30 @@ func (o *FsObjectSwift) Open() (in io.ReadCloser, err error) { return } +// min returns the smallest of x, y +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} + +// nsToSwiftFloatString turns a number of ns into a floating point +// string in seconds the same way as the "swift" tool +func nsToSwiftFloatString(ns int64) string { + if ns < 0 { + return "-" + nsToSwiftFloatString(-ns) + } + result := fmt.Sprintf("%010d", ns) + split := len(result) - 9 + result, decimals := result[:split], result[split:split+2] + if decimals != "" { + result += "." + result += decimals + } + return result +} + // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned @@ -464,18 +514,79 @@ func (o *FsObjectSwift) Update(in io.Reader, modTime time.Time, size int64) erro // Set the mtime m := swift.Metadata{} m.SetModTime(modTime) - _, err := o.swift.c.ObjectPut(o.swift.container, o.swift.root+o.remote, in, true, "", "", m.ObjectHeaders()) - if err != nil { - return err + if size > int64(chunkSize) { + segmentsContainerName := o.swift.container + "_segments" + left := size + i := 0 + nowFloat := nsToSwiftFloatString(time.Now().UnixNano()) + for left > 0 { + n := min(left, int64(chunkSize)) + segmentReader := io.LimitReader(in, n) + segmentPath := fmt.Sprintf("%s%s/%s/%d/%08d", o.swift.root, o.remote, nowFloat, size, i) + _, err := o.swift.c.ObjectPut(segmentsContainerName, segmentPath, segmentReader, true, "", "", m.ObjectHeaders()) + if err != nil { + return err + } + left -= n + i++ + } + manifestHeaders := swift.Headers{"X-Object-Manifest": fmt.Sprintf("%s/%s%s/%s/%d", segmentsContainerName, o.swift.root, o.remote, nowFloat, size)} + for k, v := range m.ObjectHeaders() { + manifestHeaders[k] = v + } + emptyReader := bytes.NewReader(nil) + manifestName := o.swift.root + o.remote + _, err := o.swift.c.ObjectPut(o.swift.container, manifestName, emptyReader, true, "", "", manifestHeaders) + if err != nil { + return err + } + // remove old segments + segmentsPath := fmt.Sprintf("%s/%s%s/", segmentsContainerName, o.swift.root, o.remote) + segmentsFs, err := NewFs(o.swift.name, segmentsPath) + if err != nil { + return err + } + for o := range segmentsFs.List() { + if !strings.HasPrefix(o.Remote(), nowFloat) { + fs.Log(o, "Remove old file segment '%s'", o.Remote()) + err := o.Remove() + if err != nil { + return err + } + } + } + } else { + _, err := o.swift.c.ObjectPut(o.swift.container, o.swift.root+o.remote, in, true, "", "", m.ObjectHeaders()) + if err != nil { + return err + } } // Read the metadata from the newly created object o.headers = nil // wipe old metadata - err = o.readMetaData() - return err + return o.readMetaData() } // Remove an object func (o *FsObjectSwift) Remove() error { + isManifestFile, err := o.isManifestFile() + if err != nil { + return err + } + if isManifestFile { + // remove segments + segmentsContainerName := o.swift.container + "_segments" + segmentsPath := fmt.Sprintf("%s/%s%s/", segmentsContainerName, o.swift.root, o.remote) + segmentsFs, err := NewFs(o.swift.name, segmentsPath) + if err != nil { + return err + } + for o := range segmentsFs.List() { + err := o.Remove() + if err != nil { + return err + } + } + } return o.swift.c.ObjectDelete(o.swift.container, o.swift.root+o.remote) }