serve s3: add serve rc interface

This commit is contained in:
Nick Craig-Wood 2025-04-01 13:02:04 +01:00
parent 2a42d95385
commit aef9c2117e
3 changed files with 63 additions and 22 deletions

View File

@ -10,9 +10,12 @@ import (
"github.com/rclone/rclone/cmd/serve/proxy" "github.com/rclone/rclone/cmd/serve/proxy"
"github.com/rclone/rclone/cmd/serve/proxy/proxyflags" "github.com/rclone/rclone/cmd/serve/proxy/proxyflags"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/config/flags" "github.com/rclone/rclone/fs/config/flags"
"github.com/rclone/rclone/fs/rc"
httplib "github.com/rclone/rclone/lib/http" httplib "github.com/rclone/rclone/lib/http"
"github.com/rclone/rclone/vfs" "github.com/rclone/rclone/vfs"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/rclone/rclone/vfs/vfsflags" "github.com/rclone/rclone/vfs/vfsflags"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -61,6 +64,28 @@ func init() {
vfsflags.AddFlags(flagSet) vfsflags.AddFlags(flagSet)
proxyflags.AddFlags(flagSet) proxyflags.AddFlags(flagSet)
serve.Command.AddCommand(Command) serve.Command.AddCommand(Command)
serve.AddRc("s3", func(ctx context.Context, f fs.Fs, in rc.Params) (serve.Handle, error) {
// Read VFS Opts
var vfsOpt = vfscommon.Opt // set default opts
err := configstruct.SetAny(in, &vfsOpt)
if err != nil {
return nil, err
}
// Read Proxy Opts
var proxyOpt = proxy.Opt // set default opts
err = configstruct.SetAny(in, &proxyOpt)
if err != nil {
return nil, err
}
// Read opts
var opt = Opt // set default opts
err = configstruct.SetAny(in, &opt)
if err != nil {
return nil, err
}
// Create server
return newServer(ctx, f, &opt, &vfsOpt, &proxyOpt)
})
} }
//go:embed serve_s3.md //go:embed serve_s3.md
@ -91,18 +116,11 @@ var Command = &cobra.Command{
} }
cmd.Run(false, false, command, func() error { cmd.Run(false, false, command, func() error {
s, err := newServer(context.Background(), f, &Opt) s, err := newServer(context.Background(), f, &Opt, &vfscommon.Opt, &proxy.Opt)
if err != nil { if err != nil {
return err return err
} }
router := s.server.Router() return s.Serve()
s.Bind(router)
err = s.Serve()
if err != nil {
return err
}
s.server.Wait()
return nil
}) })
return nil return nil
}, },

View File

@ -24,8 +24,10 @@ import (
"github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/random"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -35,17 +37,16 @@ const (
) )
// Configure and serve the server // Configure and serve the server
func serveS3(f fs.Fs) (testURL string, keyid string, keysec string, w *Server) { func serveS3(t *testing.T, f fs.Fs) (testURL string, keyid string, keysec string, w *Server) {
keyid = random.String(16) keyid = random.String(16)
keysec = random.String(16) keysec = random.String(16)
opt := Opt // copy default options opt := Opt // copy default options
opt.AuthKey = []string{fmt.Sprintf("%s,%s", keyid, keysec)} opt.AuthKey = []string{fmt.Sprintf("%s,%s", keyid, keysec)}
opt.HTTP.ListenAddr = []string{endpoint} opt.HTTP.ListenAddr = []string{endpoint}
w, _ = newServer(context.Background(), f, &opt) w, _ = newServer(context.Background(), f, &opt, &vfscommon.Opt, &proxy.Opt)
router := w.server.Router() go func() {
require.NoError(t, w.Serve())
w.Bind(router) }()
_ = w.Serve()
testURL = w.server.URLs()[0] testURL = w.server.URLs()[0]
return return
@ -55,7 +56,7 @@ func serveS3(f fs.Fs) (testURL string, keyid string, keysec string, w *Server) {
// s3 remote against it. // s3 remote against it.
func TestS3(t *testing.T) { func TestS3(t *testing.T) {
start := func(f fs.Fs) (configmap.Simple, func()) { start := func(f fs.Fs) (configmap.Simple, func()) {
testURL, keyid, keysec, _ := serveS3(f) testURL, keyid, keysec, _ := serveS3(t, f)
// Config for the backend we'll use to connect to the server // Config for the backend we'll use to connect to the server
config := configmap.Simple{ config := configmap.Simple{
"type": "s3", "type": "s3",
@ -118,7 +119,7 @@ func TestEncodingWithMinioClient(t *testing.T) {
_, err = f.Put(context.Background(), in, obji) _, err = f.Put(context.Background(), in, obji)
assert.NoError(t, err) assert.NoError(t, err)
endpoint, keyid, keysec, _ := serveS3(f) endpoint, keyid, keysec, _ := serveS3(t, f)
testURL, _ := url.Parse(endpoint) testURL, _ := url.Parse(endpoint)
minioClient, err := minio.New(testURL.Host, &minio.Options{ minioClient, err := minio.New(testURL.Host, &minio.Options{
Creds: credentials.NewStaticV4(keyid, keysec, ""), Creds: credentials.NewStaticV4(keyid, keysec, ""),
@ -181,7 +182,7 @@ func testListBuckets(t *testing.T, cases []TestCase, useProxy bool) {
for _, tt := range cases { for _, tt := range cases {
t.Run(tt.description, func(t *testing.T) { t.Run(tt.description, func(t *testing.T) {
endpoint, keyid, keysec, s := serveS3(f) endpoint, keyid, keysec, s := serveS3(t, f)
defer func() { defer func() {
assert.NoError(t, s.server.Shutdown()) assert.NoError(t, s.server.Shutdown())
}() }()
@ -289,3 +290,10 @@ func TestListBucketsAuthProxy(t *testing.T) {
testListBuckets(t, cases, true) testListBuckets(t, cases, true)
} }
func TestRc(t *testing.T) {
servetest.TestRc(t, rc.Params{
"type": "s3",
"vfs_cache_mode": "off",
})
}

View File

@ -8,6 +8,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net"
"net/http" "net/http"
"strings" "strings"
@ -43,7 +44,7 @@ type Server struct {
} }
// Make a new S3 Server to serve the remote // Make a new S3 Server to serve the remote
func newServer(ctx context.Context, f fs.Fs, opt *Options) (s *Server, err error) { func newServer(ctx context.Context, f fs.Fs, opt *Options, vfsOpt *vfscommon.Options, proxyOpt *proxy.Options) (s *Server, err error) {
w := &Server{ w := &Server{
f: f, f: f,
ctx: ctx, ctx: ctx,
@ -84,12 +85,12 @@ func newServer(ctx context.Context, f fs.Fs, opt *Options) (s *Server, err error
w.handler = w.faker.Server() w.handler = w.faker.Server()
if proxy.Opt.AuthProxy != "" { if proxy.Opt.AuthProxy != "" {
w.proxy = proxy.New(ctx, &proxy.Opt, &vfscommon.Opt) w.proxy = proxy.New(ctx, proxyOpt, vfsOpt)
// proxy auth middleware // proxy auth middleware
w.handler = proxyAuthMiddleware(w.handler, w) w.handler = proxyAuthMiddleware(w.handler, w)
w.handler = authPairMiddleware(w.handler, w) w.handler = authPairMiddleware(w.handler, w)
} else { } else {
w._vfs = vfs.New(f, &vfscommon.Opt) w._vfs = vfs.New(f, vfsOpt)
if len(opt.AuthKey) > 0 { if len(opt.AuthKey) > 0 {
w.faker.AddAuthKeys(authlistResolver(opt.AuthKey)) w.faker.AddAuthKeys(authlistResolver(opt.AuthKey))
@ -104,6 +105,9 @@ func newServer(ctx context.Context, f fs.Fs, opt *Options) (s *Server, err error
return nil, fmt.Errorf("failed to init server: %w", err) return nil, fmt.Errorf("failed to init server: %w", err)
} }
router := w.server.Router()
w.Bind(router)
return w, nil return w, nil
} }
@ -138,13 +142,24 @@ func (w *Server) Bind(router chi.Router) {
router.Handle("/*", w.handler) router.Handle("/*", w.handler)
} }
// Serve serves the s3 server // Serve serves the s3 server until the server is shutdown
func (w *Server) Serve() error { func (w *Server) Serve() error {
w.server.Serve() w.server.Serve()
fs.Logf(w.f, "Starting s3 server on %s", w.server.URLs()) fs.Logf(w.f, "Starting s3 server on %s", w.server.URLs())
w.server.Wait()
return nil return nil
} }
// Addr returns the first address of the server
func (w *Server) Addr() net.Addr {
return w.server.Addr()
}
// Shutdown the server
func (w *Server) Shutdown() error {
return w.server.Shutdown()
}
func authPairMiddleware(next http.Handler, ws *Server) http.Handler { func authPairMiddleware(next http.Handler, ws *Server) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
accessKey, _ := parseAccessKeyID(r) accessKey, _ := parseAccessKeyID(r)