From 4d9a165e56a0b69ae3162c9677bbd10f99c9692d Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 21 Mar 2025 16:30:15 +0000 Subject: [PATCH] serve: Add rc control for serve commands #4505 This adds the framework for serving. The individual servers will be added in separate commits. --- cmd/serve/rc.go | 355 ++++++++++++++++++++++++++++++++++++++ cmd/serve/rc_test.go | 180 +++++++++++++++++++ cmd/serve/servetest/rc.go | 77 +++++++++ 3 files changed, 612 insertions(+) create mode 100644 cmd/serve/rc.go create mode 100644 cmd/serve/rc_test.go create mode 100644 cmd/serve/servetest/rc.go diff --git a/cmd/serve/rc.go b/cmd/serve/rc.go new file mode 100644 index 000000000..47c46590a --- /dev/null +++ b/cmd/serve/rc.go @@ -0,0 +1,355 @@ +package serve + +import ( + "cmp" + "context" + "errors" + "fmt" + "math/rand/v2" + "net" + "slices" + "sort" + "strings" + "sync" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/filter" + "github.com/rclone/rclone/fs/rc" + "github.com/rclone/rclone/lib/errcount" +) + +// Handle describes what a server can do +type Handle interface { + // Addr returns the listening address of the server + Addr() net.Addr + + // Shutdown stops the server + Shutdown() error + + // Serve starts the server - doesn't return until Shutdown is called. + Serve() (err error) +} + +// Describes a running server +type server struct { + ID string `json:"id"` // id of the server + Addr string `json:"addr"` // address of the server + Params rc.Params `json:"params"` // Parameters used to start the server + h Handle `json:"-"` // control the server + errChan chan error `json:"-"` // receive errors from the server process +} + +// Fn starts an rclone serve command +type Fn func(ctx context.Context, f fs.Fs, in rc.Params) (Handle, error) + +// Globals +var ( + // mutex to protect all the variables in this block + serveMu sync.Mutex + // Serve functions available + serveFns = map[string]Fn{} + // Running servers + servers = map[string]*server{} +) + +// AddRc adds the named serve function to the rc +func AddRc(name string, serveFunction Fn) { + serveMu.Lock() + defer serveMu.Unlock() + serveFns[name] = serveFunction +} + +// unquote ` +func q(s string) string { + return strings.ReplaceAll(s, "|", "`") +} + +func init() { + rc.Add(rc.Call{ + Path: "serve/start", + AuthRequired: true, + Fn: startRc, + Title: "Create a new server", + Help: q(`Create a new server with the specified parameters. + +This takes the following parameters: + +- |type| - type of server: |http|, |webdav|, |ftp|, |sftp|, |nfs|, etc. +- |fs| - remote storage path to serve +- |addr| - the ip:port to run the server on, eg ":1234" or "localhost:1234" + +Other parameters are as described in the documentation for the +relevant [rclone serve](/commands/rclone_serve/) command line options. +To translate a command line option to an rc parameter, remove the +leading |--| and replace |-| with |_|, so |--vfs-cache-mode| becomes +|vfs_cache_mode|. Note that global parameters must be set with +|_config| and |_filter| as described above. + +Examples: + + rclone rc serve/start type=nfs fs=remote: addr=:4321 vfs_cache_mode=full + rclone rc serve/start --json '{"type":"nfs","fs":"remote:","addr":":1234","vfs_cache_mode":"full"}' + +This will give the reply + +|||json +{ + "addr": "[::]:4321", // Address the server was started on + "id": "nfs-ecfc6852" // Unique identifier for the server instance +} +||| + +Or an error if it failed to start. + +Stop the server with |serve/stop| and list the running servers with |serve/list|. +`), + }) +} + +// startRc allows the serve command to be run from rc +func startRc(ctx context.Context, in rc.Params) (out rc.Params, err error) { + serveType, err := in.GetString("type") + + serveMu.Lock() + defer serveMu.Unlock() + + serveFn := serveFns[serveType] + if serveFn == nil { + return nil, fmt.Errorf("could not find serve type=%q", serveType) + } + + // Get Fs.fs to be served from fs parameter in the params + f, err := rc.GetFs(ctx, in) + if err != nil { + return nil, err + } + + // Make a background context and copy the config back. + newCtx := context.Background() + newCtx = fs.CopyConfig(newCtx, ctx) + newCtx = filter.CopyConfig(newCtx, ctx) + + // Start the server + h, err := serveFn(newCtx, f, in) + if err != nil { + return nil, fmt.Errorf("could not start serve %q: %w", serveType, err) + } + + // Start the server running in the background + errChan := make(chan error, 1) + go func() { + errChan <- h.Serve() + close(errChan) + }() + + // Wait for a short length of time to see if an error occurred + select { + case err = <-errChan: + if err == nil { + err = errors.New("server stopped immediately") + } + case <-time.After(100 * time.Millisecond): + err = nil + } + if err != nil { + return nil, fmt.Errorf("error when starting serve %q: %w", serveType, err) + } + + // Store it for later + runningServer := server{ + ID: fmt.Sprintf("%s-%08x", serveType, rand.Uint32()), + Params: in, + Addr: h.Addr().String(), + h: h, + errChan: errChan, + } + servers[runningServer.ID] = &runningServer + + out = rc.Params{ + "id": runningServer.ID, + "addr": runningServer.Addr, + } + + fs.Debugf(f, "Started serve %s on %s", serveType, runningServer.Addr) + return out, nil +} + +func init() { + rc.Add(rc.Call{ + Path: "serve/stop", + AuthRequired: true, + Fn: stopRc, + Title: "Unserve selected active serve", + Help: q(`Stops a running |serve| instance by ID. + +This takes the following parameters: + +- id: as returned by serve/start + +This will give an empty response if successful or an error if not. + +Example: + + rclone rc serve/stop id=12345 +`), + }) +} + +// stopRc stops the server process +func stopRc(_ context.Context, in rc.Params) (out rc.Params, err error) { + id, err := in.GetString("id") + if err != nil { + return nil, err + } + serveMu.Lock() + defer serveMu.Unlock() + s := servers[id] + if s == nil { + return nil, fmt.Errorf("server with id=%q not found", id) + } + err = s.h.Shutdown() + <-s.errChan // ignore server return error - likely is "use of closed network connection" + delete(servers, id) + return nil, err +} + +func init() { + rc.Add(rc.Call{ + Path: "serve/types", + AuthRequired: true, + Fn: serveTypesRc, + Title: "Show all possible serve types", + Help: q(`This shows all possible serve types and returns them as a list. + +This takes no parameters and returns + +- types: list of serve types, eg "nfs", "sftp", etc + +The serve types are strings like "serve", "serve2", "cserve" and can +be passed to serve/start as the serveType parameter. + +Eg + + rclone rc serve/types + +Returns + +|||json +{ + "types": [ + "http", + "sftp", + "nfs" + ] +} +||| +`), + }) +} + +// serveTypesRc returns a list of available serve types. +func serveTypesRc(_ context.Context, in rc.Params) (out rc.Params, err error) { + var serveTypes = []string{} + serveMu.Lock() + defer serveMu.Unlock() + for serveType := range serveFns { + serveTypes = append(serveTypes, serveType) + } + sort.Strings(serveTypes) + return rc.Params{ + "types": serveTypes, + }, nil +} + +func init() { + rc.Add(rc.Call{ + Path: "serve/list", + AuthRequired: true, + Fn: listRc, + Title: "Show running servers", + Help: q(`Show running servers with IDs. + +This takes no parameters and returns + +- list: list of running serve commands + +Each list element will have + +- id: ID of the server +- addr: address the server is running on +- params: parameters used to start the server + +Eg + + rclone rc serve/list + +Returns + +|||json +{ + "list": [ + { + "addr": "[::]:4321", + "id": "nfs-ffc2a4e5", + "params": { + "fs": "remote:", + "opt": { + "ListenAddr": ":4321" + }, + "type": "nfs", + "vfsOpt": { + "CacheMode": "full" + } + } + } + ] +} +||| +`), + }) +} + +// listRc returns a list of current serves sorted by serve path +func listRc(_ context.Context, in rc.Params) (out rc.Params, err error) { + serveMu.Lock() + defer serveMu.Unlock() + list := []*server{} + for _, item := range servers { + list = append(list, item) + } + slices.SortFunc(list, func(a, b *server) int { + return cmp.Compare(a.ID, b.ID) + }) + return rc.Params{ + "list": list, + }, nil +} + +func init() { + rc.Add(rc.Call{ + Path: "serve/stopall", + AuthRequired: true, + Fn: stopAll, + Title: "Stop all active servers", + Help: q(`Stop all active servers. + +This will stop all active servers. + + rclone rc serve/stopall +`), + }) +} + +// stopAll shuts all the servers down +func stopAll(_ context.Context, in rc.Params) (out rc.Params, err error) { + serveMu.Lock() + defer serveMu.Unlock() + ec := errcount.New() + for id, s := range servers { + ec.Add(s.h.Shutdown()) + <-s.errChan // ignore server return error - likely is "use of closed network connection" + delete(servers, id) + } + return nil, ec.Err("error when stopping server") +} diff --git a/cmd/serve/rc_test.go b/cmd/serve/rc_test.go new file mode 100644 index 000000000..e432ebfce --- /dev/null +++ b/cmd/serve/rc_test.go @@ -0,0 +1,180 @@ +package serve + +import ( + "context" + "errors" + "net" + "testing" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/rc" + "github.com/rclone/rclone/fstest/mockfs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type dummyServer struct { + addr *net.TCPAddr + shutdownCh chan struct{} + shutdownCalled bool +} + +func (d *dummyServer) Addr() net.Addr { + return d.addr +} + +func (d *dummyServer) Shutdown() error { + d.shutdownCalled = true + close(d.shutdownCh) + return nil +} + +func (d *dummyServer) Serve() error { + <-d.shutdownCh + return nil +} + +func newServer(ctx context.Context, f fs.Fs, in rc.Params) (Handle, error) { + return &dummyServer{ + addr: &net.TCPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 8080, + }, + shutdownCh: make(chan struct{}), + }, nil +} + +func newServerError(ctx context.Context, f fs.Fs, in rc.Params) (Handle, error) { + return nil, errors.New("serve error") +} + +func newServerImmediateStop(ctx context.Context, f fs.Fs, in rc.Params) (Handle, error) { + h, _ := newServer(ctx, f, in) + close(h.(*dummyServer).shutdownCh) + return h, nil +} + +func resetGlobals() { + serveMu.Lock() + defer serveMu.Unlock() + serveFns = make(map[string]Fn) + servers = make(map[string]*server) +} + +func newTest(t *testing.T) { + _, err := fs.Find("mockfs") + if err != nil { + mockfs.Register() + } + resetGlobals() + t.Cleanup(resetGlobals) +} + +func TestRcStartServeType(t *testing.T) { + newTest(t) + serveStart := rc.Calls.Get("serve/start") + + in := rc.Params{"fs": ":mockfs:", "type": "nonexistent"} + _, err := serveStart.Fn(context.Background(), in) + assert.ErrorContains(t, err, "could not find serve type") +} + +func TestRcStartServeFnError(t *testing.T) { + newTest(t) + serveStart := rc.Calls.Get("serve/start") + + AddRc("error", newServerError) + in := rc.Params{"fs": ":mockfs:", "type": "error"} + _, err := serveStart.Fn(context.Background(), in) + assert.ErrorContains(t, err, "could not start serve") +} + +func TestRcStartImmediateStop(t *testing.T) { + newTest(t) + serveStart := rc.Calls.Get("serve/start") + + AddRc("immediate", newServerImmediateStop) + in := rc.Params{"fs": ":mockfs:", "type": "immediate"} + _, err := serveStart.Fn(context.Background(), in) + assert.ErrorContains(t, err, "server stopped immediately") +} + +func TestRcStartAndStop(t *testing.T) { + newTest(t) + serveStart := rc.Calls.Get("serve/start") + serveStop := rc.Calls.Get("serve/stop") + + AddRc("dummy", newServer) + in := rc.Params{"fs": ":mockfs:", "type": "dummy"} + + out, err := serveStart.Fn(context.Background(), in) + require.NoError(t, err) + id := out["id"].(string) + assert.Contains(t, id, "dummy") + assert.Equal(t, 1, len(servers)) + + _, err = serveStop.Fn(context.Background(), rc.Params{"id": id}) + require.NoError(t, err) + assert.Equal(t, 0, len(servers)) +} + +func TestRcStopNonexistent(t *testing.T) { + newTest(t) + serveStop := rc.Calls.Get("serve/stop") + + _, err := serveStop.Fn(context.Background(), rc.Params{"id": "nonexistent"}) + assert.ErrorContains(t, err, "not found") +} + +func TestRcServeTypes(t *testing.T) { + newTest(t) + serveTypes := rc.Calls.Get("serve/types") + + AddRc("a", newServer) + AddRc("c", newServer) + AddRc("b", newServer) + out, err := serveTypes.Fn(context.Background(), nil) + require.NoError(t, err) + types := out["types"].([]string) + assert.Equal(t, types, []string{"a", "b", "c"}) +} + +func TestRcList(t *testing.T) { + newTest(t) + serveStart := rc.Calls.Get("serve/start") + serveList := rc.Calls.Get("serve/list") + + AddRc("dummy", newServer) + + // Start two servers. + _, err := serveStart.Fn(context.Background(), rc.Params{"fs": ":mockfs:", "type": "dummy"}) + require.NoError(t, err) + + _, err = serveStart.Fn(context.Background(), rc.Params{"fs": ":mockfs:", "type": "dummy"}) + require.NoError(t, err) + + // Check list + out, err := serveList.Fn(context.Background(), nil) + require.NoError(t, err) + + list := out["list"].([]*server) + assert.Equal(t, 2, len(list)) +} + +func TestRcStopAll(t *testing.T) { + newTest(t) + serveStart := rc.Calls.Get("serve/start") + serveStopAll := rc.Calls.Get("serve/stopall") + + AddRc("dummy", newServer) + + _, err := serveStart.Fn(context.Background(), rc.Params{"fs": ":mockfs:", "type": "dummy"}) + require.NoError(t, err) + _, err = serveStart.Fn(context.Background(), rc.Params{"fs": ":mockfs:", "type": "dummy"}) + require.NoError(t, err) + assert.Equal(t, 2, len(servers)) + + _, err = serveStopAll.Fn(context.Background(), nil) + require.NoError(t, err) + assert.Equal(t, 0, len(servers)) +} diff --git a/cmd/serve/servetest/rc.go b/cmd/serve/servetest/rc.go new file mode 100644 index 000000000..bd88c173a --- /dev/null +++ b/cmd/serve/servetest/rc.go @@ -0,0 +1,77 @@ +package servetest + +import ( + "context" + "fmt" + "net" + "strings" + "testing" + "time" + + "github.com/rclone/rclone/fs/rc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// GetEphemeralPort opens a listening port on localhost:0, closes it, +// and returns the address as "localhost:port". +func GetEphemeralPort(t *testing.T) string { + listener, err := net.Listen("tcp", "localhost:0") // Listen on any available port + require.NoError(t, err) + defer func() { + require.NoError(t, listener.Close()) + }() + return listener.Addr().String() +} + +// checkTCP attempts to establish a TCP connection to the given address, +// and closes it if successful. Returns an error if the connection fails. +func checkTCP(address string) error { + conn, err := net.DialTimeout("tcp", address, 5*time.Second) + if err != nil { + return fmt.Errorf("failed to connect to %s: %w", address, err) + } + + err = conn.Close() + if err != nil { + return fmt.Errorf("failed to close connection to %s: %w", address, err) + } + + return nil +} + +// TestRc tests the rc interface for the servers +// +// in should contain any options necessary however this code will add +// "fs", "addr". +func TestRc(t *testing.T, in rc.Params) { + ctx := context.Background() + dir := t.TempDir() + serveStart := rc.Calls.Get("serve/start") + serveStop := rc.Calls.Get("serve/stop") + name := in["type"].(string) + addr := GetEphemeralPort(t) + + // Start the server + in["fs"] = dir + in["addr"] = addr + out, err := serveStart.Fn(ctx, in) + require.NoError(t, err) + id := out["id"].(string) + assert.True(t, strings.HasPrefix(id, name+"-")) + gotAddr := out["addr"].(string) + assert.Equal(t, addr, gotAddr) + + // Check we can make a TCP connection to the server + t.Logf("Checking connection on %q", addr) + err = checkTCP(addr) + assert.NoError(t, err) + + // Stop the server + _, err = serveStop.Fn(ctx, rc.Params{"id": id}) + require.NoError(t, err) + + // Check we can make no longer make connections to the server + err = checkTCP(addr) + assert.Error(t, err) +}