swift: Support sub container paths

This commit is contained in:
Nick Craig-Wood 2014-05-05 17:30:55 +01:00
parent 580fa3a5a7
commit d0ca58bbb1

View File

@ -1,13 +1,10 @@
// Swift interface // Swift interface
package swift package swift
// FIXME need to prevent anything but ListDir working for swift://
import ( import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"regexp" "regexp"
"strings" "strings"
"time" "time"
@ -73,7 +70,10 @@ type FsObjectSwift struct {
// String converts this FsSwift to a string // String converts this FsSwift to a string
func (f *FsSwift) String() string { func (f *FsSwift) String() string {
return fmt.Sprintf("Swift container %s", f.container) if f.root == "" {
return fmt.Sprintf("Swift container %s", f.container)
}
return fmt.Sprintf("Swift container %s path %s", f.container, f.root)
} }
// Pattern to match a swift path // Pattern to match a swift path
@ -123,14 +123,19 @@ func NewFs(name, path string) (fs.Fs, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if directory != "" {
return nil, fmt.Errorf("Directories not supported yet in %q", path)
}
c, err := swiftConnection(name) c, err := swiftConnection(name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
f := &FsSwift{c: *c, container: container, root: directory} // FIXME - check if it is a file before doing this and make a limited fs
if directory != "" {
directory += "/"
}
f := &FsSwift{
c: *c,
container: container,
root: directory,
}
return f, nil return f, nil
} }
@ -162,51 +167,100 @@ func (f *FsSwift) NewFsObject(remote string) fs.Object {
return f.NewFsObjectWithInfo(remote, nil) return f.NewFsObjectWithInfo(remote, nil)
} }
// list the objects into the function supplied
//
// If directories is set it only sends directories
func (f *FsSwift) list(directories bool, fn func(string, *swift.Object)) {
// Options for ObjectsWalk
opts := swift.ObjectsOpts{
Prefix: f.root,
Limit: 256,
}
if directories {
opts.Delimiter = '/'
}
rootLength := len(f.root)
err := f.c.ObjectsWalk(f.container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
objects, err := f.c.Objects(f.container, opts)
if err == nil {
for i := range objects {
object := &objects[i]
// FIXME if there are no directories, swift gives back the files for some reason!
if directories && !strings.HasSuffix(object.Name, "/") {
continue
}
if !strings.HasPrefix(object.Name, f.root) {
fs.Log(f, "Odd name received %q", object.Name)
continue
}
remote := object.Name[rootLength:]
fn(remote, object)
}
}
return objects, err
})
if err != nil {
fs.Stats.Error()
fs.Log(f, "Couldn't read container %q: %s", f.container, err)
}
}
// Walk the path returning a channel of FsObjects // Walk the path returning a channel of FsObjects
func (f *FsSwift) List() fs.ObjectsChan { func (f *FsSwift) List() fs.ObjectsChan {
out := make(fs.ObjectsChan, fs.Config.Checkers) out := make(fs.ObjectsChan, fs.Config.Checkers)
go func() { if f.container == "" {
// FIXME use a smaller limit? // Return no objects at top level list
err := f.c.ObjectsWalk(f.container, nil, func(opts *swift.ObjectsOpts) (interface{}, error) {
objects, err := f.c.Objects(f.container, opts)
if err == nil {
for i := range objects {
object := &objects[i]
if fs := f.NewFsObjectWithInfo(object.Name, object); fs != nil {
out <- fs
}
}
}
return objects, err
})
if err != nil {
fs.Stats.Error()
log.Printf("Couldn't read container %q: %s", f.container, err)
}
close(out) close(out)
}() fs.Stats.Error()
fs.Log(f, "Can't list objects at root - choose a container using lsd")
} else {
// List the objects
go func() {
defer close(out)
f.list(false, func(remote string, object *swift.Object) {
if fs := f.NewFsObjectWithInfo(remote, object); fs != nil {
out <- fs
}
})
}()
}
return out return out
} }
// Lists the containers // Lists the containers
func (f *FsSwift) ListDir() fs.DirChan { func (f *FsSwift) ListDir() fs.DirChan {
out := make(fs.DirChan, fs.Config.Checkers) out := make(fs.DirChan, fs.Config.Checkers)
go func() { if f.container == "" {
defer close(out) // List the containers
containers, err := f.c.ContainersAll(nil) go func() {
if err != nil { defer close(out)
fs.Stats.Error() containers, err := f.c.ContainersAll(nil)
log.Printf("Couldn't list containers: %s", err) if err != nil {
} else { fs.Stats.Error()
for _, container := range containers { fs.Log(f, "Couldn't list containers: %v", err)
out <- &fs.Dir{ } else {
Name: container.Name, for _, container := range containers {
Bytes: container.Bytes, out <- &fs.Dir{
Count: container.Count, Name: container.Name,
Bytes: container.Bytes,
Count: container.Count,
}
} }
} }
} }()
}() } else {
// List the directories in the path in the container
go func() {
defer close(out)
f.list(true, func(remote string, object *swift.Object) {
out <- &fs.Dir{
Name: remote,
Bytes: object.Bytes,
Count: 0,
}
})
}()
}
return out return out
} }
@ -275,7 +329,7 @@ func (o *FsObjectSwift) readMetaData() (err error) {
if o.meta != nil { if o.meta != nil {
return nil return nil
} }
info, h, err := o.swift.c.Object(o.swift.container, o.remote) info, h, err := o.swift.c.Object(o.swift.container, o.swift.root+o.remote)
if err != nil { if err != nil {
fs.Debug(o, "Failed to read info: %s", err) fs.Debug(o, "Failed to read info: %s", err)
return err return err
@ -314,7 +368,7 @@ func (o *FsObjectSwift) SetModTime(modTime time.Time) {
return return
} }
o.meta.SetModTime(modTime) o.meta.SetModTime(modTime)
err = o.swift.c.ObjectUpdate(o.swift.container, o.remote, o.meta.ObjectHeaders()) err = o.swift.c.ObjectUpdate(o.swift.container, o.swift.root+o.remote, o.meta.ObjectHeaders())
if err != nil { if err != nil {
fs.Stats.Error() fs.Stats.Error()
fs.Log(o, "Failed to update remote mtime: %s", err) fs.Log(o, "Failed to update remote mtime: %s", err)
@ -328,7 +382,7 @@ func (o *FsObjectSwift) Storable() bool {
// Open an object for read // Open an object for read
func (o *FsObjectSwift) Open() (in io.ReadCloser, err error) { func (o *FsObjectSwift) Open() (in io.ReadCloser, err error) {
in, _, err = o.swift.c.ObjectOpen(o.swift.container, o.remote, true, nil) in, _, err = o.swift.c.ObjectOpen(o.swift.container, o.swift.root+o.remote, true, nil)
return return
} }
@ -339,13 +393,13 @@ func (o *FsObjectSwift) Update(in io.Reader, modTime time.Time, size int64) erro
// Set the mtime // Set the mtime
m := swift.Metadata{} m := swift.Metadata{}
m.SetModTime(modTime) m.SetModTime(modTime)
_, err := o.swift.c.ObjectPut(o.swift.container, o.remote, in, true, "", "", m.ObjectHeaders()) _, err := o.swift.c.ObjectPut(o.swift.container, o.swift.root+o.remote, in, true, "", "", m.ObjectHeaders())
return err return err
} }
// Remove an object // Remove an object
func (o *FsObjectSwift) Remove() error { func (o *FsObjectSwift) Remove() error {
return o.swift.c.ObjectDelete(o.swift.container, o.remote) return o.swift.c.ObjectDelete(o.swift.container, o.swift.root+o.remote)
} }
// Check the interfaces are satisfied // Check the interfaces are satisfied