mirror of
https://github.com/rclone/rclone.git
synced 2025-02-16 18:41:54 +01:00
Factor local filesystem and remote swift into Fs and FsObject interfaces
This will enable * local -> local and remote -> remote copies * a much more uniform style * could do s3 as well
This commit is contained in:
parent
12658efef2
commit
3bf6348f57
112
fs.go
Normal file
112
fs.go
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
// File system interface
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A Filesystem, describes the local filesystem and the remote object store
|
||||||
|
type Fs interface {
|
||||||
|
List() FsObjectsChan
|
||||||
|
NewFsObject(remote string) FsObject
|
||||||
|
Put(src FsObject)
|
||||||
|
Mkdir() error
|
||||||
|
Rmdir() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME make f.Debugf...
|
||||||
|
|
||||||
|
// A filesystem like object which can either be a remote object or a
|
||||||
|
// local file/directory
|
||||||
|
type FsObject interface {
|
||||||
|
Remote() string
|
||||||
|
Debugf(string, ...interface{})
|
||||||
|
Md5sum() (string, error)
|
||||||
|
ModTime() (time.Time, error)
|
||||||
|
SetModTime(time.Time)
|
||||||
|
Size() int64
|
||||||
|
Open() (io.ReadCloser, error)
|
||||||
|
Storable() bool
|
||||||
|
// Exists() bool
|
||||||
|
Remove() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type FsObjectsChan chan FsObject
|
||||||
|
|
||||||
|
type FsObjects []FsObject
|
||||||
|
|
||||||
|
// checkClose is a utility function used to check the return from
|
||||||
|
// Close in a defer statement.
|
||||||
|
func checkClose(c io.Closer, err *error) {
|
||||||
|
cerr := c.Close()
|
||||||
|
if *err == nil {
|
||||||
|
*err = cerr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Checks to see if the src and dst objects are equal by looking at
|
||||||
|
// size, mtime and MD5SUM
|
||||||
|
//
|
||||||
|
// If the src and dst size are different then it is considered to be
|
||||||
|
// not equal.
|
||||||
|
//
|
||||||
|
// If the size is the same and the mtime is the same then it is
|
||||||
|
// considered to be equal. This is the heuristic rsync uses when
|
||||||
|
// not using --checksum.
|
||||||
|
//
|
||||||
|
// If the size is the same and and mtime is different or unreadable
|
||||||
|
// and the MD5SUM is the same then the file is considered to be equal.
|
||||||
|
// In this case the mtime on the dst is updated.
|
||||||
|
//
|
||||||
|
// Otherwise the file is considered to be not equal including if there
|
||||||
|
// were errors reading info.
|
||||||
|
func Equal(src, dst FsObject) bool {
|
||||||
|
if src.Size() != dst.Size() {
|
||||||
|
src.Debugf("Sizes differ")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size the same so check the mtime
|
||||||
|
srcModTime, err := src.ModTime()
|
||||||
|
if err != nil {
|
||||||
|
src.Debugf("Failed to read src mtime: %s", err)
|
||||||
|
} else {
|
||||||
|
dstModTime, err := dst.ModTime()
|
||||||
|
if err != nil {
|
||||||
|
dst.Debugf("Failed to read dst mtime: %s", err)
|
||||||
|
} else if !dstModTime.Equal(srcModTime) {
|
||||||
|
src.Debugf("Modification times differ")
|
||||||
|
} else {
|
||||||
|
src.Debugf("Size and modification time the same")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// mtime is unreadable or different but size is the same so
|
||||||
|
// check the MD5SUM
|
||||||
|
srcMd5, err := src.Md5sum()
|
||||||
|
if err != nil {
|
||||||
|
src.Debugf("Failed to calculate src md5: %s", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
dstMd5, err := dst.Md5sum()
|
||||||
|
if err != nil {
|
||||||
|
dst.Debugf("Failed to calculate dst md5: %s", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// fs.Debugf("Src MD5 %s", srcMd5)
|
||||||
|
// fs.Debugf("Dst MD5 %s", obj.Hash)
|
||||||
|
if srcMd5 != dstMd5 {
|
||||||
|
src.Debugf("Md5sums differ")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size and MD5 the same but mtime different so update the
|
||||||
|
// mtime of the dst object here
|
||||||
|
dst.SetModTime(srcModTime)
|
||||||
|
|
||||||
|
src.Debugf("Size and MD5SUM of src and dst objects identical")
|
||||||
|
return true
|
||||||
|
}
|
244
fs_local.go
Normal file
244
fs_local.go
Normal file
@ -0,0 +1,244 @@
|
|||||||
|
// Local filesystem interface
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/md5"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FsLocal represents a local filesystem rooted at root
|
||||||
|
type FsLocal struct {
|
||||||
|
root string
|
||||||
|
}
|
||||||
|
|
||||||
|
// FsObjectLocal represents a local filesystem object
|
||||||
|
type FsObjectLocal struct {
|
||||||
|
remote string // The remote path
|
||||||
|
path string // The local path
|
||||||
|
info os.FileInfo // Interface for file info
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
|
// Return an FsObject from a path
|
||||||
|
//
|
||||||
|
// May return nil if an error occurred
|
||||||
|
func (f *FsLocal) NewFsObjectWithInfo(remote string, info os.FileInfo) FsObject {
|
||||||
|
path := filepath.Join(f.root, remote)
|
||||||
|
fs := &FsObjectLocal{remote: remote, path: path}
|
||||||
|
if info != nil {
|
||||||
|
fs.info = info
|
||||||
|
} else {
|
||||||
|
err := fs.lstat()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to stat %s: %s", path, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return an FsObject from a path
|
||||||
|
//
|
||||||
|
// May return nil if an error occurred
|
||||||
|
func (f *FsLocal) NewFsObject(remote string) FsObject {
|
||||||
|
return f.NewFsObjectWithInfo(remote, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Walk the path returning a channel of FsObjects
|
||||||
|
//
|
||||||
|
// FIXME ignore symlinks?
|
||||||
|
// FIXME what about hardlinks / etc
|
||||||
|
func (f *FsLocal) List() FsObjectsChan {
|
||||||
|
out := make(FsObjectsChan, *checkers)
|
||||||
|
go func() {
|
||||||
|
err := filepath.Walk(f.root, func(path string, fi os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to open directory: %s: %s", path, err)
|
||||||
|
} else {
|
||||||
|
remote, err := filepath.Rel(f.root, path)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to get relative path %s: %s", path, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if remote == "." {
|
||||||
|
return nil
|
||||||
|
// remote = ""
|
||||||
|
}
|
||||||
|
if fs := f.NewFsObjectWithInfo(remote, fi); fs != nil {
|
||||||
|
if fs.Storable() {
|
||||||
|
out <- fs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to open directory: %s: %s", f.root, err)
|
||||||
|
}
|
||||||
|
close(out)
|
||||||
|
}()
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME most of this is generic
|
||||||
|
// could make it into Copy(dst, src FsObject)
|
||||||
|
|
||||||
|
// Puts the FsObject to the local filesystem
|
||||||
|
//
|
||||||
|
// FIXME return the object?
|
||||||
|
func (f *FsLocal) Put(src FsObject) {
|
||||||
|
dstRemote := src.Remote()
|
||||||
|
dstPath := filepath.Join(f.root, dstRemote)
|
||||||
|
log.Printf("Download %s to %s", dstRemote, dstPath)
|
||||||
|
// Temporary FsObject under construction
|
||||||
|
fs := &FsObjectLocal{remote: dstRemote, path: dstPath}
|
||||||
|
|
||||||
|
dir := path.Dir(dstPath)
|
||||||
|
err := os.MkdirAll(dir, 0770)
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Couldn't make directory: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := os.Create(dstPath)
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to open: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close and remove file on error at the end
|
||||||
|
defer func() {
|
||||||
|
checkClose(out, &err)
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Removing failed download")
|
||||||
|
removeErr := os.Remove(dstPath)
|
||||||
|
if removeErr != nil {
|
||||||
|
fs.Debugf("Failed to remove failed download: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
in, err := src.Open()
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to open: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer checkClose(in, &err)
|
||||||
|
|
||||||
|
_, err = io.Copy(out, in)
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to download: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the mtime
|
||||||
|
modTime, err := src.ModTime()
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to read mtime from object: %s", err)
|
||||||
|
} else {
|
||||||
|
fs.SetModTime(modTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mkdir creates the directory if it doesn't exist
|
||||||
|
func (f *FsLocal) Mkdir() error {
|
||||||
|
return os.MkdirAll(f.root, 0770)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rmdir removes the directory
|
||||||
|
//
|
||||||
|
// If it isn't empty it will return an error
|
||||||
|
func (f *FsLocal) Rmdir() error {
|
||||||
|
return os.Remove(f.root)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
|
// Return the remote path
|
||||||
|
func (fs *FsObjectLocal) Remote() string {
|
||||||
|
return fs.remote
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write debuging output for this FsObject
|
||||||
|
func (fs *FsObjectLocal) Debugf(text string, args ...interface{}) {
|
||||||
|
out := fmt.Sprintf(text, args...)
|
||||||
|
log.Printf("%s: %s", fs.remote, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Md5sum calculates the Md5sum of a file returning a lowercase hex string
|
||||||
|
func (fs *FsObjectLocal) Md5sum() (string, error) {
|
||||||
|
in, err := os.Open(fs.path)
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to open: %s", err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer in.Close() // FIXME ignoring error
|
||||||
|
hash := md5.New()
|
||||||
|
_, err = io.Copy(hash, in)
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to read: %s", err)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%x", hash.Sum(nil)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size returns the size of an object in bytes
|
||||||
|
func (fs *FsObjectLocal) Size() int64 {
|
||||||
|
return fs.info.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModTime returns the modification time of the object
|
||||||
|
func (fs *FsObjectLocal) ModTime() (modTime time.Time, err error) {
|
||||||
|
return fs.info.ModTime(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sets the modification time of the local fs object
|
||||||
|
func (fs *FsObjectLocal) SetModTime(modTime time.Time) {
|
||||||
|
err := Chtimes(fs.path, modTime, modTime)
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to set mtime on file: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is this object storable
|
||||||
|
func (fs *FsObjectLocal) Storable() bool {
|
||||||
|
mode := fs.info.Mode()
|
||||||
|
if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 {
|
||||||
|
fs.Debugf("Can't transfer non file/directory")
|
||||||
|
return false
|
||||||
|
} else if mode&os.ModeDir != 0 {
|
||||||
|
// Debug?
|
||||||
|
fs.Debugf("FIXME Skipping directory")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open an object for read
|
||||||
|
func (fs *FsObjectLocal) Open() (in io.ReadCloser, err error) {
|
||||||
|
in, err = os.Open(fs.path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat a FsObject into info
|
||||||
|
func (fs *FsObjectLocal) lstat() error {
|
||||||
|
info, err := os.Lstat(fs.path)
|
||||||
|
fs.info = info
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove an object
|
||||||
|
func (fs *FsObjectLocal) Remove() error {
|
||||||
|
return os.Remove(fs.path)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the interfaces are satisfied
|
||||||
|
var _ Fs = &FsLocal{}
|
||||||
|
var _ FsObject = &FsObjectLocal{}
|
215
fs_swift.go
Normal file
215
fs_swift.go
Normal file
@ -0,0 +1,215 @@
|
|||||||
|
// Swift interface
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/ncw/swift"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FsSwift represents a remote swift server
|
||||||
|
type FsSwift struct {
|
||||||
|
c swift.Connection // the connection to the swift server
|
||||||
|
container string // the container we are working on
|
||||||
|
}
|
||||||
|
|
||||||
|
// FsObjectSwift describes a swift object
|
||||||
|
//
|
||||||
|
// Will definitely have info but maybe not meta
|
||||||
|
type FsObjectSwift struct {
|
||||||
|
swift *FsSwift // what this object is part of
|
||||||
|
remote string // The remote path
|
||||||
|
info swift.Object // Info from the swift object if known
|
||||||
|
meta *swift.Metadata // The object metadata if known
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
|
// Return an FsObject from a path
|
||||||
|
//
|
||||||
|
// May return nil if an error occurred
|
||||||
|
func (f *FsSwift) NewFsObjectWithInfo(remote string, info *swift.Object) FsObject {
|
||||||
|
fs := &FsObjectSwift{
|
||||||
|
swift: f,
|
||||||
|
remote: remote,
|
||||||
|
}
|
||||||
|
if info != nil {
|
||||||
|
// Set info but not meta
|
||||||
|
fs.info = *info
|
||||||
|
} else {
|
||||||
|
err := fs.readMetaData() // reads info and meta, returning an error
|
||||||
|
if err != nil {
|
||||||
|
// logged already fs.Debugf("Failed to read info: %s", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return an FsObject from a path
|
||||||
|
//
|
||||||
|
// May return nil if an error occurred
|
||||||
|
func (f *FsSwift) NewFsObject(remote string) FsObject {
|
||||||
|
return f.NewFsObjectWithInfo(remote, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Walk the path returning a channel of FsObjects
|
||||||
|
//
|
||||||
|
// FIXME ignore symlinks?
|
||||||
|
// FIXME what about hardlinks / etc
|
||||||
|
//
|
||||||
|
// FIXME not returning error if dir not found?
|
||||||
|
func (f *FsSwift) List() FsObjectsChan {
|
||||||
|
out := make(FsObjectsChan, *checkers)
|
||||||
|
go func() {
|
||||||
|
// FIXME use a smaller limit?
|
||||||
|
err := f.c.ObjectsAllFn(f.container, nil, func(objects []swift.Object) bool {
|
||||||
|
for i := range objects {
|
||||||
|
object := &objects[i]
|
||||||
|
if fs := f.NewFsObjectWithInfo(object.Name, object); fs != nil {
|
||||||
|
out <- fs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Couldn't read container %q: %s", f.container, err)
|
||||||
|
}
|
||||||
|
close(out)
|
||||||
|
}()
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put the FsObject into the container
|
||||||
|
func (f *FsSwift) Put(src FsObject) {
|
||||||
|
// Temporary FsObject under construction
|
||||||
|
fs := &FsObjectSwift{swift: f, remote: src.Remote()}
|
||||||
|
// FIXME content type
|
||||||
|
in, err := src.Open()
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to open: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer in.Close()
|
||||||
|
|
||||||
|
// Set the mtime
|
||||||
|
m := swift.Metadata{}
|
||||||
|
modTime, err := src.ModTime()
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to read mtime from object: %s", err)
|
||||||
|
} else {
|
||||||
|
m.SetModTime(modTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = fs.swift.c.ObjectPut(fs.swift.container, fs.remote, in, true, "", "", m.ObjectHeaders())
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to upload: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fs.Debugf("Uploaded")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mkdir creates the container if it doesn't exist
|
||||||
|
func (f *FsSwift) Mkdir() error {
|
||||||
|
return f.c.ContainerCreate(f.container, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rmdir deletes the container
|
||||||
|
//
|
||||||
|
// Returns an error if it isn't empty
|
||||||
|
func (f *FsSwift) Rmdir() error {
|
||||||
|
return f.c.ContainerDelete(f.container)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
|
// Return the remote path
|
||||||
|
func (fs *FsObjectSwift) Remote() string {
|
||||||
|
return fs.remote
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write debuging output for this FsObject
|
||||||
|
func (fs *FsObjectSwift) Debugf(text string, args ...interface{}) {
|
||||||
|
out := fmt.Sprintf(text, args...)
|
||||||
|
log.Printf("%s: %s", fs.remote, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Md5sum returns the Md5sum of an object returning a lowercase hex string
|
||||||
|
func (fs *FsObjectSwift) Md5sum() (string, error) {
|
||||||
|
return strings.ToLower(fs.info.Hash), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size returns the size of an object in bytes
|
||||||
|
func (fs *FsObjectSwift) Size() int64 {
|
||||||
|
return fs.info.Bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
// readMetaData gets the metadata if it hasn't already been fetched
|
||||||
|
//
|
||||||
|
// it also sets the info
|
||||||
|
func (fs *FsObjectSwift) readMetaData() (err error) {
|
||||||
|
if fs.meta != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
info, h, err := fs.swift.c.Object(fs.swift.container, fs.remote)
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to read info: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
meta := h.ObjectMetadata()
|
||||||
|
fs.info = info
|
||||||
|
fs.meta = &meta
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModTime returns the modification time of the object
|
||||||
|
func (fs *FsObjectSwift) ModTime() (modTime time.Time, err error) {
|
||||||
|
err = fs.readMetaData()
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to read metadata: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
modTime, err = fs.meta.GetModTime()
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to read mtime from object: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sets the modification time of the local fs object
|
||||||
|
func (fs *FsObjectSwift) SetModTime(modTime time.Time) {
|
||||||
|
err := fs.readMetaData()
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to read metadata: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fs.meta.SetModTime(modTime)
|
||||||
|
err = fs.swift.c.ObjectUpdate(fs.swift.container, fs.remote, fs.meta.ObjectHeaders())
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf("Failed to update remote mtime: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is this object storable
|
||||||
|
func (fs *FsObjectSwift) Storable() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open an object for read
|
||||||
|
func (fs *FsObjectSwift) Open() (in io.ReadCloser, err error) {
|
||||||
|
in, _, err = fs.swift.c.ObjectOpen(fs.swift.container, fs.info.Name, true, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove an object
|
||||||
|
func (fs *FsObjectSwift) Remove() error {
|
||||||
|
return fs.swift.c.ObjectDelete(fs.swift.container, fs.remote)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the interfaces are satisfied
|
||||||
|
var _ Fs = &FsSwift{}
|
||||||
|
var _ FsObject = &FsObjectSwift{}
|
445
swiftsync.go
445
swiftsync.go
@ -4,20 +4,15 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/md5"
|
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/ncw/swift"
|
"github.com/ncw/swift"
|
||||||
"io"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
|
||||||
"path/filepath"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"runtime/pprof"
|
"runtime/pprof"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Globals
|
// Globals
|
||||||
@ -35,325 +30,77 @@ var (
|
|||||||
transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.")
|
transfers = flag.Int("transfers", 4, "Number of file transfers to run in parallel.")
|
||||||
)
|
)
|
||||||
|
|
||||||
// A filesystem like object which can either be a remote object or a
|
// Read FsObjects~s on in send to out if they need uploading
|
||||||
// local file/directory or both
|
|
||||||
type FsObject struct {
|
|
||||||
remote string // The remote path
|
|
||||||
path string // The local path
|
|
||||||
info os.FileInfo // Interface for file info
|
|
||||||
}
|
|
||||||
|
|
||||||
type FsObjectsChan chan *FsObject
|
|
||||||
|
|
||||||
type FsObjects []FsObject
|
|
||||||
|
|
||||||
// Write debuging output for this FsObject
|
|
||||||
func (fs *FsObject) Debugf(text string, args ...interface{}) {
|
|
||||||
out := fmt.Sprintf(text, args...)
|
|
||||||
log.Printf("%s: %s", fs.remote, out)
|
|
||||||
}
|
|
||||||
|
|
||||||
// md5sum calculates the md5sum of a file returning a lowercase hex string
|
|
||||||
func (fs *FsObject) md5sum() (string, error) {
|
|
||||||
in, err := os.Open(fs.path)
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to open: %s", err)
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer in.Close() // FIXME ignoring error
|
|
||||||
hash := md5.New()
|
|
||||||
_, err = io.Copy(hash, in)
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to read: %s", err)
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%x", hash.Sum(nil)), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sets the modification time of the local fs object
|
|
||||||
func (fs *FsObject) SetModTime(modTime time.Time) {
|
|
||||||
err := Chtimes(fs.path, modTime, modTime)
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to set mtime on file: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Checks to see if the remote and local objects are equal by looking
|
|
||||||
// at size, mtime and MD5SUM
|
|
||||||
//
|
|
||||||
// If the remote and local size are different then it is considered to
|
|
||||||
// be not equal.
|
|
||||||
//
|
|
||||||
// If the size is the same and the mtime is the same then it is
|
|
||||||
// considered to be equal. This is the heuristic rsync uses when
|
|
||||||
// not using --checksum.
|
|
||||||
//
|
|
||||||
// If the size is the same and and mtime is different or unreadable
|
|
||||||
// and the MD5SUM is the same then the file is considered to be
|
|
||||||
// equal. In this case the mtime on the object is updated. If
|
|
||||||
// upload is set then the remote object is changed otherwise the local
|
|
||||||
// object.
|
|
||||||
//
|
|
||||||
// Otherwise the file is considered to be not equal including if there
|
|
||||||
// were errors reading info.
|
|
||||||
func (fs *FsObject) Equal(c *swift.Connection, container string, upload bool) bool {
|
|
||||||
// FIXME could pass in an Object here if we have one which
|
|
||||||
// will mean we could do the Size and Hash checks without a
|
|
||||||
// remote call if we wanted
|
|
||||||
obj, h, err := c.Object(container, fs.remote)
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to read info: %s", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if obj.Bytes != fs.info.Size() {
|
|
||||||
fs.Debugf("Sizes differ")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Size the same so check the mtime
|
|
||||||
m := h.ObjectMetadata()
|
|
||||||
remoteModTime, err := m.GetModTime()
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to read mtime: %s", err)
|
|
||||||
} else if !remoteModTime.Equal(fs.info.ModTime()) {
|
|
||||||
fs.Debugf("Modification times differ")
|
|
||||||
} else {
|
|
||||||
fs.Debugf("Size and modification time the same")
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// mtime is unreadable or different but size is the same so
|
|
||||||
// check the MD5SUM
|
|
||||||
localMd5, err := fs.md5sum()
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to calculate md5: %s", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
// fs.Debugf("Local MD5 %s", localMd5)
|
|
||||||
// fs.Debugf("Remote MD5 %s", obj.Hash)
|
|
||||||
if localMd5 != strings.ToLower(obj.Hash) {
|
|
||||||
fs.Debugf("Md5sums differ")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Size and MD5 the same but mtime different so update the
|
|
||||||
// mtime of the local or remote object here
|
|
||||||
if upload {
|
|
||||||
m.SetModTime(fs.info.ModTime())
|
|
||||||
err = c.ObjectUpdate(container, fs.remote, m.ObjectHeaders())
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to update remote mtime: %s", err)
|
|
||||||
}
|
|
||||||
fs.Debugf("Updated mtime of remote object")
|
|
||||||
} else {
|
|
||||||
fmt.Printf("metadata %q, remoteModTime = %s\n", m, remoteModTime)
|
|
||||||
fs.SetModTime(remoteModTime)
|
|
||||||
fs.Debugf("Updated mtime of local object")
|
|
||||||
}
|
|
||||||
|
|
||||||
fs.Debugf("Size and MD5SUM of local and remote objects identical")
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is this object storable
|
|
||||||
func (fs *FsObject) storable() bool {
|
|
||||||
mode := fs.info.Mode()
|
|
||||||
if mode&(os.ModeSymlink|os.ModeNamedPipe|os.ModeSocket|os.ModeDevice) != 0 {
|
|
||||||
fs.Debugf("Can't transfer non file/directory")
|
|
||||||
return false
|
|
||||||
} else if mode&os.ModeDir != 0 {
|
|
||||||
// Debug?
|
|
||||||
fs.Debugf("FIXME Skipping directory")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Puts the FsObject into the container
|
|
||||||
func (fs *FsObject) put(c *swift.Connection, container string) {
|
|
||||||
// FIXME content type
|
|
||||||
in, err := os.Open(fs.path)
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to open: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer in.Close()
|
|
||||||
m := swift.Metadata{}
|
|
||||||
m.SetModTime(fs.info.ModTime())
|
|
||||||
_, err = c.ObjectPut(container, fs.remote, in, true, "", "", m.ObjectHeaders())
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to upload: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fs.Debugf("Uploaded")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stat a FsObject into info
|
|
||||||
func (fs *FsObject) lstat() error {
|
|
||||||
info, err := os.Lstat(fs.path)
|
|
||||||
fs.info = info
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return an FsObject from a path
|
|
||||||
//
|
|
||||||
// May return nil if an error occurred
|
|
||||||
func NewFsObject(root, path string) *FsObject {
|
|
||||||
remote, err := filepath.Rel(root, path)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to get relative path %s: %s", path, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if remote == "." {
|
|
||||||
remote = ""
|
|
||||||
}
|
|
||||||
fs := &FsObject{remote: remote, path: path}
|
|
||||||
err = fs.lstat()
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to stat %s: %s", path, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return fs
|
|
||||||
}
|
|
||||||
|
|
||||||
// Walk the path returning a channel of FsObjects
|
|
||||||
//
|
|
||||||
// FIXME ignore symlinks?
|
|
||||||
// FIXME what about hardlinks / etc
|
|
||||||
func walk(root string) FsObjectsChan {
|
|
||||||
out := make(FsObjectsChan, *checkers)
|
|
||||||
go func() {
|
|
||||||
err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error {
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to open directory: %s: %s", path, err)
|
|
||||||
} else {
|
|
||||||
if fs := NewFsObject(root, path); fs != nil {
|
|
||||||
out <- fs
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to open directory: %s: %s", root, err)
|
|
||||||
}
|
|
||||||
close(out)
|
|
||||||
}()
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read FsObjects on in and write them to out if they need uploading
|
|
||||||
//
|
//
|
||||||
// FIXME potentially doing lots of MD5SUMS at once
|
// FIXME potentially doing lots of MD5SUMS at once
|
||||||
func checker(c *swift.Connection, container string, in, out FsObjectsChan, upload bool, wg *sync.WaitGroup) {
|
func Checker(in, out FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for fs := range in {
|
for src := range in {
|
||||||
if !upload {
|
dst := fdst.NewFsObject(src.Remote())
|
||||||
_ = fs.lstat()
|
if dst == nil {
|
||||||
if fs.info == nil {
|
src.Debugf("Couldn't find local file - download")
|
||||||
fs.Debugf("Couldn't find local file - download")
|
out <- src
|
||||||
out <- fs
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Check to see if can store this
|
// Check to see if can store this
|
||||||
if !fs.storable() {
|
if !src.Storable() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Check to see if changed or not
|
// Check to see if changed or not
|
||||||
if fs.Equal(c, container, upload) {
|
if Equal(src, dst) {
|
||||||
fs.Debugf("Unchanged skipping")
|
src.Debugf("Unchanged skipping")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
out <- fs
|
out <- src
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read FsObjects on in and upload them
|
// Read FsObjects on in and copy them
|
||||||
func uploader(c *swift.Connection, container string, in FsObjectsChan, wg *sync.WaitGroup) {
|
func Copier(in FsObjectsChan, fdst Fs, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for fs := range in {
|
for src := range in {
|
||||||
fs.put(c, container)
|
fdst.Put(src)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Syncs a directory into a container
|
// Copies fsrc into fdst
|
||||||
func upload(c *swift.Connection, args []string) {
|
func Copy(fsrc, fdst Fs) {
|
||||||
root, container := args[0], args[1]
|
err := fdst.Mkdir()
|
||||||
mkdir(c, []string{container})
|
if err != nil {
|
||||||
to_be_checked := walk(root)
|
log.Fatal("Failed to make destination")
|
||||||
|
}
|
||||||
|
|
||||||
|
to_be_checked := fsrc.List()
|
||||||
to_be_uploaded := make(FsObjectsChan, *transfers)
|
to_be_uploaded := make(FsObjectsChan, *transfers)
|
||||||
|
|
||||||
var checkerWg sync.WaitGroup
|
var checkerWg sync.WaitGroup
|
||||||
checkerWg.Add(*checkers)
|
checkerWg.Add(*checkers)
|
||||||
for i := 0; i < *checkers; i++ {
|
for i := 0; i < *checkers; i++ {
|
||||||
go checker(c, container, to_be_checked, to_be_uploaded, true, &checkerWg)
|
go Checker(to_be_checked, to_be_uploaded, fdst, &checkerWg)
|
||||||
}
|
}
|
||||||
|
|
||||||
var uploaderWg sync.WaitGroup
|
var copierWg sync.WaitGroup
|
||||||
uploaderWg.Add(*transfers)
|
copierWg.Add(*transfers)
|
||||||
for i := 0; i < *transfers; i++ {
|
for i := 0; i < *transfers; i++ {
|
||||||
go uploader(c, container, to_be_uploaded, &uploaderWg)
|
go Copier(to_be_uploaded, fdst, &copierWg)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Waiting for checks to finish")
|
log.Printf("Waiting for checks to finish")
|
||||||
checkerWg.Wait()
|
checkerWg.Wait()
|
||||||
close(to_be_uploaded)
|
close(to_be_uploaded)
|
||||||
log.Printf("Waiting for uploads to finish")
|
log.Printf("Waiting for transfers to finish")
|
||||||
uploaderWg.Wait()
|
copierWg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get an object to the filepath making directories if necessary
|
// Syncs a directory into a container
|
||||||
func (fs *FsObject) get(c *swift.Connection, container string) {
|
func upload(c *swift.Connection, args []string) {
|
||||||
log.Printf("Download %s to %s", fs.remote, fs.path)
|
root, container := args[0], args[1]
|
||||||
|
// FIXME
|
||||||
|
fsrc := &FsLocal{root: root}
|
||||||
|
fdst := &FsSwift{c: *c, container: container}
|
||||||
|
|
||||||
dir := path.Dir(fs.path)
|
Copy(fsrc, fdst)
|
||||||
err := os.MkdirAll(dir, 0770)
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Couldn't make directory: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
out, err := os.Create(fs.path)
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to open: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
h, getErr := c.ObjectGet(container, fs.remote, out, true, nil)
|
|
||||||
if getErr != nil {
|
|
||||||
fs.Debugf("Failed to download: %s", getErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
closeErr := out.Close()
|
|
||||||
if closeErr != nil {
|
|
||||||
fs.Debugf("Error closing: %s", closeErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
if getErr != nil || closeErr != nil {
|
|
||||||
fs.Debugf("Removing failed download")
|
|
||||||
err = os.Remove(fs.path)
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to remove failed download: %s", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set the mtime
|
|
||||||
modTime, err := h.ObjectMetadata().GetModTime()
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf("Failed to read mtime from object: %s", err)
|
|
||||||
} else {
|
|
||||||
fs.SetModTime(modTime)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read FsObjects on in and download them
|
|
||||||
func downloader(c *swift.Connection, container string, in FsObjectsChan, wg *sync.WaitGroup) {
|
|
||||||
defer wg.Done()
|
|
||||||
for fs := range in {
|
|
||||||
fs.get(c, container)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Syncs a container into a directory
|
// Syncs a container into a directory
|
||||||
@ -363,45 +110,12 @@ func downloader(c *swift.Connection, container string, in FsObjectsChan, wg *syn
|
|||||||
// FIXME should download and stat many at once
|
// FIXME should download and stat many at once
|
||||||
func download(c *swift.Connection, args []string) {
|
func download(c *swift.Connection, args []string) {
|
||||||
container, root := args[0], args[1]
|
container, root := args[0], args[1]
|
||||||
// FIXME this would be nice running into a channel!
|
|
||||||
objects, err := c.ObjectsAll(container, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Couldn't read container %q: %s", container, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = os.MkdirAll(root, 0770)
|
// FIXME
|
||||||
if err != nil {
|
fsrc := &FsSwift{c: *c, container: container}
|
||||||
log.Fatalf("Couldn't make directory %q: %s", root, err)
|
fdst := &FsLocal{root: root}
|
||||||
}
|
|
||||||
|
|
||||||
to_be_checked := make(FsObjectsChan, *checkers)
|
|
||||||
to_be_downloaded := make(FsObjectsChan, *transfers)
|
|
||||||
|
|
||||||
var checkerWg sync.WaitGroup
|
|
||||||
checkerWg.Add(*checkers)
|
|
||||||
for i := 0; i < *checkers; i++ {
|
|
||||||
go checker(c, container, to_be_checked, to_be_downloaded, false, &checkerWg)
|
|
||||||
}
|
|
||||||
|
|
||||||
var downloaderWg sync.WaitGroup
|
|
||||||
downloaderWg.Add(*transfers)
|
|
||||||
for i := 0; i < *transfers; i++ {
|
|
||||||
go downloader(c, container, to_be_downloaded, &downloaderWg)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range objects {
|
|
||||||
object := &objects[i]
|
|
||||||
filepath := path.Join(root, object.Name)
|
|
||||||
to_be_checked <- &FsObject{remote: object.Name, path: filepath}
|
|
||||||
}
|
|
||||||
close(to_be_checked)
|
|
||||||
|
|
||||||
log.Printf("Waiting for checks to finish")
|
|
||||||
checkerWg.Wait()
|
|
||||||
close(to_be_downloaded)
|
|
||||||
log.Printf("Waiting for downloads to finish")
|
|
||||||
downloaderWg.Wait()
|
|
||||||
|
|
||||||
|
Copy(fsrc, fdst)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lists the containers
|
// Lists the containers
|
||||||
@ -415,6 +129,23 @@ func listContainers(c *swift.Connection) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List the Fs to stdout
|
||||||
|
func List(f Fs) {
|
||||||
|
// FIXME error?
|
||||||
|
in := f.List()
|
||||||
|
for fs := range in {
|
||||||
|
// FIXME
|
||||||
|
// if object.PseudoDirectory {
|
||||||
|
// fmt.Printf("%9s %19s %s\n", "Directory", "-", fs.Remote())
|
||||||
|
// } else {
|
||||||
|
// FIXME ModTime is expensive?
|
||||||
|
modTime, _ := fs.ModTime()
|
||||||
|
fmt.Printf("%9d %19s %s\n", fs.Size(), modTime.Format("2006-01-02 15:04:05"), fs.Remote())
|
||||||
|
// fmt.Printf("%9d %19s %s\n", fs.Size(), object.LastModified.Format("2006-01-02 15:04:05"), fs.Remote())
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Lists files in a container
|
// Lists files in a container
|
||||||
func list(c *swift.Connection, args []string) {
|
func list(c *swift.Connection, args []string) {
|
||||||
if len(args) == 0 {
|
if len(args) == 0 {
|
||||||
@ -422,24 +153,26 @@ func list(c *swift.Connection, args []string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
container := args[0]
|
container := args[0]
|
||||||
//objects, err := c.ObjectsAll(container, &swift.ObjectsOpts{Prefix: "", Delimiter: '/'})
|
// FIXME
|
||||||
objects, err := c.ObjectsAll(container, nil)
|
f := &FsSwift{c: *c, container: container}
|
||||||
if err != nil {
|
List(f)
|
||||||
log.Fatalf("Couldn't read container %q: %s", container, err)
|
}
|
||||||
}
|
|
||||||
for _, object := range objects {
|
// Local lists files
|
||||||
if object.PseudoDirectory {
|
func llist(c *swift.Connection, args []string) {
|
||||||
fmt.Printf("%9s %19s %s\n", "Directory", "-", object.Name)
|
root := args[0]
|
||||||
} else {
|
// FIXME
|
||||||
fmt.Printf("%9d %19s %s\n", object.Bytes, object.LastModified.Format("2006-01-02 15:04:05"), object.Name)
|
f := &FsLocal{root: root}
|
||||||
}
|
List(f)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Makes a container
|
// Makes a container
|
||||||
func mkdir(c *swift.Connection, args []string) {
|
func mkdir(c *swift.Connection, args []string) {
|
||||||
container := args[0]
|
container := args[0]
|
||||||
err := c.ContainerCreate(container, nil)
|
// FIXME
|
||||||
|
fdst := &FsSwift{c: *c, container: container}
|
||||||
|
|
||||||
|
err := fdst.Mkdir()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Couldn't create container %q: %s", container, err)
|
log.Fatalf("Couldn't create container %q: %s", container, err)
|
||||||
}
|
}
|
||||||
@ -448,7 +181,10 @@ func mkdir(c *swift.Connection, args []string) {
|
|||||||
// Removes a container
|
// Removes a container
|
||||||
func rmdir(c *swift.Connection, args []string) {
|
func rmdir(c *swift.Connection, args []string) {
|
||||||
container := args[0]
|
container := args[0]
|
||||||
err := c.ContainerDelete(container)
|
// FIXME
|
||||||
|
fdst := &FsSwift{c: *c, container: container}
|
||||||
|
|
||||||
|
err := fdst.Rmdir()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Couldn't delete container %q: %s", container, err)
|
log.Fatalf("Couldn't delete container %q: %s", container, err)
|
||||||
}
|
}
|
||||||
@ -459,34 +195,27 @@ func rmdir(c *swift.Connection, args []string) {
|
|||||||
// FIXME should make FsObjects and use debugging
|
// FIXME should make FsObjects and use debugging
|
||||||
func purge(c *swift.Connection, args []string) {
|
func purge(c *swift.Connection, args []string) {
|
||||||
container := args[0]
|
container := args[0]
|
||||||
objects, err := c.ObjectsAll(container, nil)
|
// FIXME
|
||||||
if err != nil {
|
fdst := &FsSwift{c: *c, container: container}
|
||||||
log.Fatalf("Couldn't read container %q: %s", container, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
to_be_deleted := make(chan *swift.Object, *transfers)
|
to_be_deleted := fdst.List()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(*transfers)
|
wg.Add(*transfers)
|
||||||
for i := 0; i < *transfers; i++ {
|
for i := 0; i < *transfers; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for object := range to_be_deleted {
|
for dst := range to_be_deleted {
|
||||||
err := c.ObjectDelete(container, object.Name)
|
err := dst.Remove()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("%s: Couldn't delete: %s\n", object.Name, err)
|
log.Printf("%s: Couldn't delete: %s\n", dst.Remote(), err)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("%s: Deleted\n", object.Name)
|
log.Printf("%s: Deleted\n", dst.Remote())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range objects {
|
|
||||||
to_be_deleted <- &objects[i]
|
|
||||||
}
|
|
||||||
close(to_be_deleted)
|
|
||||||
|
|
||||||
log.Printf("Waiting for deletions to finish")
|
log.Printf("Waiting for deletions to finish")
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
@ -543,6 +272,14 @@ var Commands = []Command{
|
|||||||
list,
|
list,
|
||||||
0, 1,
|
0, 1,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"lls",
|
||||||
|
`[<directory>]
|
||||||
|
List the directory
|
||||||
|
`,
|
||||||
|
llist,
|
||||||
|
1, 1,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"mkdir",
|
"mkdir",
|
||||||
`<container>
|
`<container>
|
||||||
|
Loading…
Reference in New Issue
Block a user