rclone/s3/s3.go

815 lines
22 KiB
Go

// Package s3 provides an interface to Amazon S3 oject storage
package s3
// FIXME need to prevent anything but ListDir working for s3://
/*
Progress of port to aws-sdk
* Don't really need o.meta at all?
What happens if you CTRL-C a multipart upload
* get an incomplete upload
* disappears when you delete the bucket
*/
import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"regexp"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/corehandlers"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/ncw/rclone/fs"
"github.com/ncw/swift"
)
// Register with Fs
func init() {
fs.Register(&fs.RegInfo{
Name: "s3",
Description: "Amazon S3 (also Dreamhost, Ceph)",
NewFs: NewFs,
// AWS endpoints: http://docs.amazonwebservices.com/general/latest/gr/rande.html#s3_region
Options: []fs.Option{{
Name: "env_auth",
Help: "Get AWS credentials from runtime (environment variables or EC2 meta data if no env vars). Only applies if access_key_id and secret_access_key is blank.",
Examples: []fs.OptionExample{
{
Value: "false",
Help: "Enter AWS credentials in the next step",
}, {
Value: "true",
Help: "Get AWS credentials from the environment (env vars or IAM)",
},
},
}, {
Name: "access_key_id",
Help: "AWS Access Key ID - leave blank for anonymous access or runtime credentials.",
}, {
Name: "secret_access_key",
Help: "AWS Secret Access Key (password) - leave blank for anonymous access or runtime credentials.",
}, {
Name: "region",
Help: "Region to connect to.",
Examples: []fs.OptionExample{{
Value: "us-east-1",
Help: "The default endpoint - a good choice if you are unsure.\nUS Region, Northern Virginia or Pacific Northwest.\nLeave location constraint empty.",
}, {
Value: "us-west-2",
Help: "US West (Oregon) Region\nNeeds location constraint us-west-2.",
}, {
Value: "us-west-1",
Help: "US West (Northern California) Region\nNeeds location constraint us-west-1.",
}, {
Value: "eu-west-1",
Help: "EU (Ireland) Region Region\nNeeds location constraint EU or eu-west-1.",
}, {
Value: "eu-central-1",
Help: "EU (Frankfurt) Region\nNeeds location constraint eu-central-1.",
}, {
Value: "ap-southeast-1",
Help: "Asia Pacific (Singapore) Region\nNeeds location constraint ap-southeast-1.",
}, {
Value: "ap-southeast-2",
Help: "Asia Pacific (Sydney) Region\nNeeds location constraint ap-southeast-2.",
}, {
Value: "ap-northeast-1",
Help: "Asia Pacific (Tokyo) Region\nNeeds location constraint ap-northeast-1.",
}, {
Value: "sa-east-1",
Help: "South America (Sao Paulo) Region\nNeeds location constraint sa-east-1.",
}, {
Value: "other-v2-signature",
Help: "If using an S3 clone that only understands v2 signatures\neg Ceph/Dreamhost\nset this and make sure you set the endpoint.",
}, {
Value: "other-v4-signature",
Help: "If using an S3 clone that understands v4 signatures set this\nand make sure you set the endpoint.",
}},
}, {
Name: "endpoint",
Help: "Endpoint for S3 API.\nLeave blank if using AWS to use the default endpoint for the region.\nSpecify if using an S3 clone such as Ceph.",
}, {
Name: "location_constraint",
Help: "Location constraint - must be set to match the Region. Used when creating buckets only.",
Examples: []fs.OptionExample{{
Value: "",
Help: "Empty for US Region, Northern Virginia or Pacific Northwest.",
}, {
Value: "us-west-2",
Help: "US West (Oregon) Region.",
}, {
Value: "us-west-1",
Help: "US West (Northern California) Region.",
}, {
Value: "eu-west-1",
Help: "EU (Ireland) Region.",
}, {
Value: "EU",
Help: "EU Region.",
}, {
Value: "ap-southeast-1",
Help: "Asia Pacific (Singapore) Region.",
}, {
Value: "ap-southeast-2",
Help: "Asia Pacific (Sydney) Region.",
}, {
Value: "ap-northeast-1",
Help: "Asia Pacific (Tokyo) Region.",
}, {
Value: "sa-east-1",
Help: "South America (Sao Paulo) Region.",
}},
}},
})
}
// Constants
const (
metaMtime = "Mtime" // the meta key to store mtime in - eg X-Amz-Meta-Mtime
listChunkSize = 1024 // number of items to read at once
maxRetries = 10 // number of retries to make of operations
)
// Fs represents a remote s3 server
type Fs struct {
name string // the name of the remote
c *s3.S3 // the connection to the s3 server
ses *session.Session // the s3 session
bucket string // the bucket we are working on
perm string // permissions for new buckets / objects
root string // root of the bucket - ignore all objects above this
locationConstraint string // location constraint of new buckets
}
// Object describes a s3 object
type Object struct {
// Will definitely have everything but meta which may be nil
//
// List will read everything but meta - to fill that in need to call
// readMetaData
fs *Fs // what this object is part of
remote string // The remote path
etag string // md5sum of the object
bytes int64 // size of the object
lastModified time.Time // Last modified
meta map[string]*string // The object metadata if known - may be nil
}
// ------------------------------------------------------------
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
if f.root == "" {
return f.bucket
}
return f.bucket + "/" + f.root
}
// String converts this Fs to a string
func (f *Fs) String() string {
if f.root == "" {
return fmt.Sprintf("S3 bucket %s", f.bucket)
}
return fmt.Sprintf("S3 bucket %s path %s", f.bucket, f.root)
}
// Pattern to match a s3 path
var matcher = regexp.MustCompile(`^([^/]*)(.*)$`)
// parseParse parses a s3 'url'
func s3ParsePath(path string) (bucket, directory string, err error) {
parts := matcher.FindStringSubmatch(path)
if parts == nil {
err = fmt.Errorf("Couldn't parse bucket out of s3 path %q", path)
} else {
bucket, directory = parts[1], parts[2]
directory = strings.Trim(directory, "/")
}
return
}
// s3Connection makes a connection to s3
func s3Connection(name string) (*s3.S3, *session.Session, error) {
// Make the auth
v := credentials.Value{
AccessKeyID: fs.ConfigFile.MustValue(name, "access_key_id"),
SecretAccessKey: fs.ConfigFile.MustValue(name, "secret_access_key"),
}
// first provider to supply a credential set "wins"
providers := []credentials.Provider{
// use static credentials if they're present (checked by provider)
&credentials.StaticProvider{Value: v},
// * Access Key ID: AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY
// * Secret Access Key: AWS_SECRET_ACCESS_KEY or AWS_SECRET_KEY
&credentials.EnvProvider{},
// Pick up IAM role in case we're on EC2
&ec2rolecreds.EC2RoleProvider{
Client: ec2metadata.New(session.New(), &aws.Config{
HTTPClient: &http.Client{Timeout: 1 * time.Second}, // low timeout to ec2 metadata service
}),
ExpiryWindow: 3,
},
}
cred := credentials.NewChainCredentials(providers)
switch {
case fs.ConfigFile.MustBool(name, "env_auth", false):
// No need for empty checks if "env_auth" is true
case v.AccessKeyID == "" && v.SecretAccessKey == "":
// if no access key/secret and iam is explicitly disabled then fall back to anon interaction
cred = credentials.AnonymousCredentials
case v.AccessKeyID == "":
return nil, nil, errors.New("access_key_id not found")
case v.SecretAccessKey == "":
return nil, nil, errors.New("secret_access_key not found")
}
endpoint := fs.ConfigFile.MustValue(name, "endpoint")
region := fs.ConfigFile.MustValue(name, "region")
if region == "" && endpoint == "" {
endpoint = "https://s3.amazonaws.com/"
}
if region == "" {
region = "us-east-1"
}
awsConfig := aws.NewConfig().
WithRegion(region).
WithMaxRetries(maxRetries).
WithCredentials(cred).
WithEndpoint(endpoint).
WithHTTPClient(fs.Config.Client()).
WithS3ForcePathStyle(true)
// awsConfig.WithLogLevel(aws.LogDebugWithSigning)
ses := session.New()
c := s3.New(ses, awsConfig)
if region == "other-v2-signature" {
fs.Debug(name, "Using v2 auth")
signer := func(req *request.Request) {
// Ignore AnonymousCredentials object
if req.Config.Credentials == credentials.AnonymousCredentials {
return
}
sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest)
}
c.Handlers.Sign.Clear()
c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler)
c.Handlers.Sign.PushBack(signer)
}
// Add user agent
c.Handlers.Build.PushBack(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fs.UserAgent)
})
return c, ses, nil
}
// NewFs contstructs an Fs from the path, bucket:path
func NewFs(name, root string) (fs.Fs, error) {
bucket, directory, err := s3ParsePath(root)
if err != nil {
return nil, err
}
c, ses, err := s3Connection(name)
if err != nil {
return nil, err
}
f := &Fs{
name: name,
c: c,
bucket: bucket,
ses: ses,
// FIXME perm: s3.Private, // FIXME need user to specify
root: directory,
locationConstraint: fs.ConfigFile.MustValue(name, "location_constraint"),
}
if f.root != "" {
f.root += "/"
// Check to see if the object exists
req := s3.HeadObjectInput{
Bucket: &f.bucket,
Key: &directory,
}
_, err = f.c.HeadObject(&req)
if err == nil {
remote := path.Base(directory)
f.root = path.Dir(directory)
if f.root == "." {
f.root = ""
} else {
f.root += "/"
}
obj := f.NewFsObject(remote)
// return a Fs Limited to this object
return fs.NewLimited(f, obj), nil
}
}
// f.listMultipartUploads()
return f, nil
}
// Return an FsObject from a path
//
// May return nil if an error occurred
func (f *Fs) newFsObjectWithInfo(remote string, info *s3.Object) fs.Object {
o := &Object{
fs: f,
remote: remote,
}
if info != nil {
// Set info but not meta
if info.LastModified == nil {
fs.Log(o, "Failed to read last modified")
o.lastModified = time.Now()
} else {
o.lastModified = *info.LastModified
}
o.etag = aws.StringValue(info.ETag)
o.bytes = aws.Int64Value(info.Size)
} else {
err := o.readMetaData() // reads info and meta, returning an error
if err != nil {
// logged already FsDebug("Failed to read info: %s", err)
return nil
}
}
return o
}
// NewFsObject returns an FsObject from a path
//
// May return nil if an error occurred
func (f *Fs) NewFsObject(remote string) fs.Object {
return f.newFsObjectWithInfo(remote, nil)
}
// list the objects into the function supplied
//
// If directories is set it only sends directories
func (f *Fs) list(directories bool, fn func(string, *s3.Object)) {
maxKeys := int64(listChunkSize)
delimiter := ""
if directories {
delimiter = "/"
}
var marker *string
for {
// FIXME need to implement ALL loop
req := s3.ListObjectsInput{
Bucket: &f.bucket,
Delimiter: &delimiter,
Prefix: &f.root,
MaxKeys: &maxKeys,
Marker: marker,
}
resp, err := f.c.ListObjects(&req)
if err != nil {
fs.Stats.Error()
fs.ErrorLog(f, "Couldn't read bucket %q: %s", f.bucket, err)
break
} else {
rootLength := len(f.root)
if directories {
for _, commonPrefix := range resp.CommonPrefixes {
if commonPrefix.Prefix == nil {
fs.Log(f, "Nil common prefix received")
continue
}
remote := *commonPrefix.Prefix
if !strings.HasPrefix(remote, f.root) {
fs.Log(f, "Odd name received %q", remote)
continue
}
remote = remote[rootLength:]
if strings.HasSuffix(remote, "/") {
remote = remote[:len(remote)-1]
}
fn(remote, &s3.Object{Key: &remote})
}
} else {
for _, object := range resp.Contents {
key := aws.StringValue(object.Key)
if !strings.HasPrefix(key, f.root) {
fs.Log(f, "Odd name received %q", key)
continue
}
remote := key[rootLength:]
fn(remote, object)
}
}
if !aws.BoolValue(resp.IsTruncated) {
break
}
// Use NextMarker if set, otherwise use last Key
if resp.NextMarker == nil || *resp.NextMarker == "" {
marker = resp.Contents[len(resp.Contents)-1].Key
} else {
marker = resp.NextMarker
}
}
}
}
// List walks the path returning a channel of FsObjects
func (f *Fs) List() fs.ObjectsChan {
out := make(fs.ObjectsChan, fs.Config.Checkers)
if f.bucket == "" {
// Return no objects at top level list
close(out)
fs.Stats.Error()
fs.ErrorLog(f, "Can't list objects at root - choose a bucket using lsd")
} else {
go func() {
defer close(out)
f.list(false, func(remote string, object *s3.Object) {
if fs := f.newFsObjectWithInfo(remote, object); fs != nil {
out <- fs
}
})
}()
}
return out
}
// ListDir lists the buckets
func (f *Fs) ListDir() fs.DirChan {
out := make(fs.DirChan, fs.Config.Checkers)
if f.bucket == "" {
// List the buckets
go func() {
defer close(out)
req := s3.ListBucketsInput{}
resp, err := f.c.ListBuckets(&req)
if err != nil {
fs.Stats.Error()
fs.ErrorLog(f, "Couldn't list buckets: %s", err)
} else {
for _, bucket := range resp.Buckets {
out <- &fs.Dir{
Name: aws.StringValue(bucket.Name),
When: aws.TimeValue(bucket.CreationDate),
Bytes: -1,
Count: -1,
}
}
}
}()
} else {
// List the directories in the path in the bucket
go func() {
defer close(out)
f.list(true, func(remote string, object *s3.Object) {
size := int64(0)
if object.Size != nil {
size = *object.Size
}
out <- &fs.Dir{
Name: remote,
Bytes: size,
Count: 0,
}
})
}()
}
return out
}
// Put the FsObject into the bucket
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
// Temporary Object under construction
fs := &Object{
fs: f,
remote: src.Remote(),
}
return fs, fs.Update(in, src)
}
// Check if the bucket exists
func (f *Fs) dirExists() (bool, error) {
req := s3.HeadBucketInput{
Bucket: &f.bucket,
}
_, err := f.c.HeadBucket(&req)
if err == nil {
return true, nil
}
if err, ok := err.(awserr.RequestFailure); ok {
if err.StatusCode() == http.StatusNotFound {
return false, nil
}
}
return false, err
}
// Mkdir creates the bucket if it doesn't exist
func (f *Fs) Mkdir() error {
exists, err := f.dirExists()
if err != nil || exists {
return err
}
req := s3.CreateBucketInput{
Bucket: &f.bucket,
ACL: &f.perm,
}
if f.locationConstraint != "" {
req.CreateBucketConfiguration = &s3.CreateBucketConfiguration{
LocationConstraint: &f.locationConstraint,
}
}
_, err = f.c.CreateBucket(&req)
if err, ok := err.(awserr.Error); ok {
if err.Code() == "BucketAlreadyOwnedByYou" {
return nil
}
}
return err
}
// Rmdir deletes the bucket if the fs is at the root
//
// Returns an error if it isn't empty
func (f *Fs) Rmdir() error {
if f.root != "" {
return nil
}
req := s3.DeleteBucketInput{
Bucket: &f.bucket,
}
_, err := f.c.DeleteBucket(&req)
return err
}
// Precision of the remote
func (f *Fs) Precision() time.Duration {
return time.Nanosecond
}
// Copy src to this remote using server side copy operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
srcObj, ok := src.(*Object)
if !ok {
fs.Debug(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
srcFs := srcObj.fs
key := f.root + remote
source := url.QueryEscape(srcFs.bucket + "/" + srcFs.root + srcObj.remote)
req := s3.CopyObjectInput{
Bucket: &f.bucket,
Key: &key,
CopySource: &source,
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
}
_, err := f.c.CopyObject(&req)
if err != nil {
return nil, err
}
return f.NewFsObject(remote), err
}
// Hashes returns the supported hash sets.
func (f *Fs) Hashes() fs.HashSet {
return fs.HashSet(fs.HashMD5)
}
// ------------------------------------------------------------
// Fs returns the parent Fs
func (o *Object) Fs() fs.Info {
return o.fs
}
// Return a string version
func (o *Object) String() string {
if o == nil {
return "<nil>"
}
return o.remote
}
// Remote returns the remote path
func (o *Object) Remote() string {
return o.remote
}
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
// Hash returns the Md5sum of an object returning a lowercase hex string
func (o *Object) Hash(t fs.HashType) (string, error) {
if t != fs.HashMD5 {
return "", fs.ErrHashUnsupported
}
etag := strings.Trim(strings.ToLower(o.etag), `"`)
// Check the etag is a valid md5sum
if !matchMd5.MatchString(etag) {
// fs.Debug(o, "Invalid md5sum (probably multipart uploaded) - ignoring: %q", etag)
return "", nil
}
return etag, nil
}
// Size returns the size of an object in bytes
func (o *Object) Size() int64 {
return o.bytes
}
// readMetaData gets the metadata if it hasn't already been fetched
//
// it also sets the info
func (o *Object) readMetaData() (err error) {
if o.meta != nil {
return nil
}
key := o.fs.root + o.remote
req := s3.HeadObjectInput{
Bucket: &o.fs.bucket,
Key: &key,
}
resp, err := o.fs.c.HeadObject(&req)
if err != nil {
fs.Debug(o, "Failed to read info: %s", err)
return err
}
var size int64
// Ignore missing Content-Length assuming it is 0
// Some versions of ceph do this due their apache proxies
if resp.ContentLength != nil {
size = *resp.ContentLength
}
o.etag = aws.StringValue(resp.ETag)
o.bytes = size
o.meta = resp.Metadata
if resp.LastModified == nil {
fs.Log(o, "Failed to read last modified from HEAD: %s", err)
o.lastModified = time.Now()
} else {
o.lastModified = *resp.LastModified
}
return nil
}
// ModTime returns the modification time of the object
//
// It attempts to read the objects mtime and if that isn't present the
// LastModified returned in the http headers
func (o *Object) ModTime() time.Time {
err := o.readMetaData()
if err != nil {
fs.Log(o, "Failed to read metadata: %s", err)
return time.Now()
}
// read mtime out of metadata if available
d, ok := o.meta[metaMtime]
if !ok || d == nil {
// fs.Debug(o, "No metadata")
return o.lastModified
}
modTime, err := swift.FloatStringToTime(*d)
if err != nil {
fs.Log(o, "Failed to read mtime from object: %s", err)
return o.lastModified
}
return modTime
}
// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(modTime time.Time) error {
err := o.readMetaData()
if err != nil {
return err
}
o.meta[metaMtime] = aws.String(swift.TimeToFloatString(modTime))
// Guess the content type
contentType := fs.MimeType(o)
// Copy the object to itself to update the metadata
key := o.fs.root + o.remote
sourceKey := o.fs.bucket + "/" + key
directive := s3.MetadataDirectiveReplace // replace metadata with that passed in
req := s3.CopyObjectInput{
Bucket: &o.fs.bucket,
ACL: &o.fs.perm,
Key: &key,
ContentType: &contentType,
CopySource: aws.String(url.QueryEscape(sourceKey)),
Metadata: o.meta,
MetadataDirective: &directive,
}
_, err = o.fs.c.CopyObject(&req)
return err
}
// Storable raturns a boolean indicating if this object is storable
func (o *Object) Storable() bool {
return true
}
// Open an object for read
func (o *Object) Open() (in io.ReadCloser, err error) {
key := o.fs.root + o.remote
req := s3.GetObjectInput{
Bucket: &o.fs.bucket,
Key: &key,
}
resp, err := o.fs.c.GetObject(&req)
if err != nil {
return nil, err
}
return resp.Body, nil
}
// Update the Object from in with modTime and size
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
modTime := src.ModTime()
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {
u.Concurrency = 2
u.LeavePartsOnError = false
u.S3 = o.fs.c
u.PartSize = s3manager.MinUploadPartSize
size := src.Size()
// Adjust PartSize until the number of parts is small enough.
if size/u.PartSize >= s3manager.MaxUploadParts {
// Calculate partition size rounded up to the nearest MB
u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20
}
})
// Set the mtime in the meta data
metadata := map[string]*string{
metaMtime: aws.String(swift.TimeToFloatString(modTime)),
}
// Guess the content type
contentType := fs.MimeType(o)
key := o.fs.root + o.remote
req := s3manager.UploadInput{
Bucket: &o.fs.bucket,
ACL: &o.fs.perm,
Key: &key,
Body: in,
ContentType: &contentType,
Metadata: metadata,
//ContentLength: &size,
}
_, err := uploader.Upload(&req)
if err != nil {
return err
}
// Read the metadata from the newly created object
o.meta = nil // wipe old metadata
err = o.readMetaData()
return err
}
// Remove an object
func (o *Object) Remove() error {
key := o.fs.root + o.remote
req := s3.DeleteObjectInput{
Bucket: &o.fs.bucket,
Key: &key,
}
_, err := o.fs.c.DeleteObject(&req)
return err
}
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Copier = &Fs{}
_ fs.Object = &Object{}
)