s3: support sub-bucket paths

This commit is contained in:
Nick Craig-Wood 2014-05-05 18:25:32 +01:00
parent d0ca58bbb1
commit ca3752f824

View File

@ -7,7 +7,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"mime" "mime"
"net/http" "net/http"
"path" "path"
@ -111,6 +110,7 @@ type FsS3 struct {
b *s3.Bucket // the connection to the bucket b *s3.Bucket // the connection to the bucket
bucket string // the bucket we are working on bucket string // the bucket we are working on
perm s3.ACL // permissions for new buckets / objects perm s3.ACL // permissions for new buckets / objects
root string // root of the bucket - ignore all objects above this
} }
// FsObjectS3 describes a s3 object // FsObjectS3 describes a s3 object
@ -131,8 +131,11 @@ type FsObjectS3 struct {
// String converts this FsS3 to a string // String converts this FsS3 to a string
func (f *FsS3) String() string { func (f *FsS3) String() string {
if f.root == "" {
return fmt.Sprintf("S3 bucket %s", f.bucket) return fmt.Sprintf("S3 bucket %s", f.bucket)
} }
return fmt.Sprintf("S3 bucket %s path %s", f.bucket, f.root)
}
// Pattern to match a s3 path // Pattern to match a s3 path
var matcher = regexp.MustCompile(`^([^/]*)(.*)$`) var matcher = regexp.MustCompile(`^([^/]*)(.*)$`)
@ -190,18 +193,20 @@ func NewFs(name, path string) (fs.Fs, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if directory != "" {
return nil, fmt.Errorf("Directories not supported yet in %q: %q", path, directory)
}
c, err := s3Connection(name) c, err := s3Connection(name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// FIXME - check if it is a file before doing this and make a limited fs
if directory != "" {
directory += "/"
}
f := &FsS3{ f := &FsS3{
c: c, c: c,
bucket: bucket, bucket: bucket,
b: c.Bucket(bucket), b: c.Bucket(bucket),
perm: s3.Private, // FIXME need user to specify perm: s3.Private, // FIXME need user to specify
root: directory,
} }
return f, nil return f, nil
} }
@ -241,37 +246,76 @@ func (f *FsS3) NewFsObject(remote string) fs.Object {
return f.NewFsObjectWithInfo(remote, nil) return f.NewFsObjectWithInfo(remote, nil)
} }
// Walk the path returning a channel of FsObjects // list the objects into the function supplied
func (f *FsS3) List() fs.ObjectsChan { //
out := make(fs.ObjectsChan, fs.Config.Checkers) // If directories is set it only sends directories
go func() { func (f *FsS3) list(directories bool, fn func(string, *s3.Key)) {
delimiter := ""
if directories {
delimiter = "/"
}
// FIXME need to implement ALL loop // FIXME need to implement ALL loop
objects, err := f.b.List("", "", "", 10000) objects, err := f.b.List(f.root, delimiter, "", 10000)
if err != nil { if err != nil {
fs.Stats.Error() fs.Stats.Error()
log.Printf("Couldn't read bucket %q: %s", f.bucket, err) fs.Log(f, "Couldn't read bucket %q: %s", f.bucket, err)
} else {
rootLength := len(f.root)
if directories {
for _, remote := range objects.CommonPrefixes {
if !strings.HasPrefix(remote, f.root) {
fs.Log(f, "Odd name received %q", remote)
continue
}
remote := remote[rootLength:]
fn(remote, &s3.Key{Key: remote})
}
} else { } else {
for i := range objects.Contents { for i := range objects.Contents {
object := &objects.Contents[i] object := &objects.Contents[i]
if fs := f.NewFsObjectWithInfo(object.Key, object); fs != nil { if !strings.HasPrefix(object.Key, f.root) {
fs.Log(f, "Odd name received %q", object.Key)
continue
}
remote := object.Key[rootLength:]
fn(remote, object)
}
}
}
}
// Walk the path returning a channel of FsObjects
func (f *FsS3) List() fs.ObjectsChan {
out := make(fs.ObjectsChan, fs.Config.Checkers)
if f.bucket == "" {
// Return no objects at top level list
close(out)
fs.Stats.Error()
fs.Log(f, "Can't list objects at root - choose a bucket using lsd")
} else {
go func() {
defer close(out)
f.list(false, func(remote string, object *s3.Key) {
if fs := f.NewFsObjectWithInfo(remote, object); fs != nil {
out <- fs out <- fs
} }
} })
}
close(out)
}() }()
}
return out return out
} }
// Lists the buckets // Lists the buckets
func (f *FsS3) ListDir() fs.DirChan { func (f *FsS3) ListDir() fs.DirChan {
out := make(fs.DirChan, fs.Config.Checkers) out := make(fs.DirChan, fs.Config.Checkers)
if f.bucket == "" {
// List the buckets
go func() { go func() {
defer close(out) defer close(out)
buckets, err := f.c.ListBuckets() buckets, err := f.c.ListBuckets()
if err != nil { if err != nil {
fs.Stats.Error() fs.Stats.Error()
log.Printf("Couldn't list buckets: %s", err) fs.Log(f, "Couldn't list buckets: %s", err)
} else { } else {
for _, bucket := range buckets { for _, bucket := range buckets {
out <- &fs.Dir{ out <- &fs.Dir{
@ -283,6 +327,19 @@ func (f *FsS3) ListDir() fs.DirChan {
} }
} }
}() }()
} else {
// List the directories in the path in the container
go func() {
defer close(out)
f.list(true, func(remote string, object *s3.Key) {
out <- &fs.Dir{
Name: remote,
Bytes: object.Size,
Count: 0,
}
})
}()
}
return out return out
} }
@ -354,7 +411,7 @@ func (o *FsObjectS3) readMetaData() (err error) {
return nil return nil
} }
headers, err := o.s3.b.Head(o.remote, nil) headers, err := o.s3.b.Head(o.s3.root+o.remote, nil)
if err != nil { if err != nil {
fs.Debug(o, "Failed to read info: %s", err) fs.Debug(o, "Failed to read info: %s", err)
return err return err
@ -407,7 +464,7 @@ func (o *FsObjectS3) SetModTime(modTime time.Time) {
return return
} }
o.meta[metaMtime] = swift.TimeToFloatString(modTime) o.meta[metaMtime] = swift.TimeToFloatString(modTime)
_, err = o.s3.b.Update(o.remote, o.s3.perm, o.meta) _, err = o.s3.b.Update(o.s3.root+o.remote, o.s3.perm, o.meta)
if err != nil { if err != nil {
fs.Stats.Error() fs.Stats.Error()
fs.Log(o, "Failed to update remote mtime: %s", err) fs.Log(o, "Failed to update remote mtime: %s", err)
@ -421,7 +478,7 @@ func (o *FsObjectS3) Storable() bool {
// Open an object for read // Open an object for read
func (o *FsObjectS3) Open() (in io.ReadCloser, err error) { func (o *FsObjectS3) Open() (in io.ReadCloser, err error) {
in, err = o.s3.b.GetReader(o.remote) in, err = o.s3.b.GetReader(o.s3.root + o.remote)
return return
} }
@ -438,13 +495,13 @@ func (o *FsObjectS3) Update(in io.Reader, modTime time.Time, size int64) error {
contentType = "application/octet-stream" contentType = "application/octet-stream"
} }
_, err := o.s3.b.PutReaderHeaders(o.remote, in, size, contentType, o.s3.perm, headers) _, err := o.s3.b.PutReaderHeaders(o.s3.root+o.remote, in, size, contentType, o.s3.perm, headers)
return err return err
} }
// Remove an object // Remove an object
func (o *FsObjectS3) Remove() error { func (o *FsObjectS3) Remove() error {
return o.s3.b.Del(o.remote) return o.s3.b.Del(o.s3.root + o.remote)
} }
// Check the interfaces are satisfied // Check the interfaces are satisfied