serve: Add rc control for serve commands #4505

This adds the framework for serving. The individual servers will be
added in separate commits.
This commit is contained in:
Nick Craig-Wood 2025-03-21 16:30:15 +00:00
parent 21e5fa192a
commit 4d9a165e56
3 changed files with 612 additions and 0 deletions

355
cmd/serve/rc.go Normal file
View File

@ -0,0 +1,355 @@
package serve
import (
"cmp"
"context"
"errors"
"fmt"
"math/rand/v2"
"net"
"slices"
"sort"
"strings"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/lib/errcount"
)
// Handle describes what a server can do
type Handle interface {
// Addr returns the listening address of the server
Addr() net.Addr
// Shutdown stops the server
Shutdown() error
// Serve starts the server - doesn't return until Shutdown is called.
Serve() (err error)
}
// Describes a running server
type server struct {
ID string `json:"id"` // id of the server
Addr string `json:"addr"` // address of the server
Params rc.Params `json:"params"` // Parameters used to start the server
h Handle `json:"-"` // control the server
errChan chan error `json:"-"` // receive errors from the server process
}
// Fn starts an rclone serve command
type Fn func(ctx context.Context, f fs.Fs, in rc.Params) (Handle, error)
// Globals
var (
// mutex to protect all the variables in this block
serveMu sync.Mutex
// Serve functions available
serveFns = map[string]Fn{}
// Running servers
servers = map[string]*server{}
)
// AddRc adds the named serve function to the rc
func AddRc(name string, serveFunction Fn) {
serveMu.Lock()
defer serveMu.Unlock()
serveFns[name] = serveFunction
}
// unquote `
func q(s string) string {
return strings.ReplaceAll(s, "|", "`")
}
func init() {
rc.Add(rc.Call{
Path: "serve/start",
AuthRequired: true,
Fn: startRc,
Title: "Create a new server",
Help: q(`Create a new server with the specified parameters.
This takes the following parameters:
- |type| - type of server: |http|, |webdav|, |ftp|, |sftp|, |nfs|, etc.
- |fs| - remote storage path to serve
- |addr| - the ip:port to run the server on, eg ":1234" or "localhost:1234"
Other parameters are as described in the documentation for the
relevant [rclone serve](/commands/rclone_serve/) command line options.
To translate a command line option to an rc parameter, remove the
leading |--| and replace |-| with |_|, so |--vfs-cache-mode| becomes
|vfs_cache_mode|. Note that global parameters must be set with
|_config| and |_filter| as described above.
Examples:
rclone rc serve/start type=nfs fs=remote: addr=:4321 vfs_cache_mode=full
rclone rc serve/start --json '{"type":"nfs","fs":"remote:","addr":":1234","vfs_cache_mode":"full"}'
This will give the reply
|||json
{
"addr": "[::]:4321", // Address the server was started on
"id": "nfs-ecfc6852" // Unique identifier for the server instance
}
|||
Or an error if it failed to start.
Stop the server with |serve/stop| and list the running servers with |serve/list|.
`),
})
}
// startRc allows the serve command to be run from rc
func startRc(ctx context.Context, in rc.Params) (out rc.Params, err error) {
serveType, err := in.GetString("type")
serveMu.Lock()
defer serveMu.Unlock()
serveFn := serveFns[serveType]
if serveFn == nil {
return nil, fmt.Errorf("could not find serve type=%q", serveType)
}
// Get Fs.fs to be served from fs parameter in the params
f, err := rc.GetFs(ctx, in)
if err != nil {
return nil, err
}
// Make a background context and copy the config back.
newCtx := context.Background()
newCtx = fs.CopyConfig(newCtx, ctx)
newCtx = filter.CopyConfig(newCtx, ctx)
// Start the server
h, err := serveFn(newCtx, f, in)
if err != nil {
return nil, fmt.Errorf("could not start serve %q: %w", serveType, err)
}
// Start the server running in the background
errChan := make(chan error, 1)
go func() {
errChan <- h.Serve()
close(errChan)
}()
// Wait for a short length of time to see if an error occurred
select {
case err = <-errChan:
if err == nil {
err = errors.New("server stopped immediately")
}
case <-time.After(100 * time.Millisecond):
err = nil
}
if err != nil {
return nil, fmt.Errorf("error when starting serve %q: %w", serveType, err)
}
// Store it for later
runningServer := server{
ID: fmt.Sprintf("%s-%08x", serveType, rand.Uint32()),
Params: in,
Addr: h.Addr().String(),
h: h,
errChan: errChan,
}
servers[runningServer.ID] = &runningServer
out = rc.Params{
"id": runningServer.ID,
"addr": runningServer.Addr,
}
fs.Debugf(f, "Started serve %s on %s", serveType, runningServer.Addr)
return out, nil
}
func init() {
rc.Add(rc.Call{
Path: "serve/stop",
AuthRequired: true,
Fn: stopRc,
Title: "Unserve selected active serve",
Help: q(`Stops a running |serve| instance by ID.
This takes the following parameters:
- id: as returned by serve/start
This will give an empty response if successful or an error if not.
Example:
rclone rc serve/stop id=12345
`),
})
}
// stopRc stops the server process
func stopRc(_ context.Context, in rc.Params) (out rc.Params, err error) {
id, err := in.GetString("id")
if err != nil {
return nil, err
}
serveMu.Lock()
defer serveMu.Unlock()
s := servers[id]
if s == nil {
return nil, fmt.Errorf("server with id=%q not found", id)
}
err = s.h.Shutdown()
<-s.errChan // ignore server return error - likely is "use of closed network connection"
delete(servers, id)
return nil, err
}
func init() {
rc.Add(rc.Call{
Path: "serve/types",
AuthRequired: true,
Fn: serveTypesRc,
Title: "Show all possible serve types",
Help: q(`This shows all possible serve types and returns them as a list.
This takes no parameters and returns
- types: list of serve types, eg "nfs", "sftp", etc
The serve types are strings like "serve", "serve2", "cserve" and can
be passed to serve/start as the serveType parameter.
Eg
rclone rc serve/types
Returns
|||json
{
"types": [
"http",
"sftp",
"nfs"
]
}
|||
`),
})
}
// serveTypesRc returns a list of available serve types.
func serveTypesRc(_ context.Context, in rc.Params) (out rc.Params, err error) {
var serveTypes = []string{}
serveMu.Lock()
defer serveMu.Unlock()
for serveType := range serveFns {
serveTypes = append(serveTypes, serveType)
}
sort.Strings(serveTypes)
return rc.Params{
"types": serveTypes,
}, nil
}
func init() {
rc.Add(rc.Call{
Path: "serve/list",
AuthRequired: true,
Fn: listRc,
Title: "Show running servers",
Help: q(`Show running servers with IDs.
This takes no parameters and returns
- list: list of running serve commands
Each list element will have
- id: ID of the server
- addr: address the server is running on
- params: parameters used to start the server
Eg
rclone rc serve/list
Returns
|||json
{
"list": [
{
"addr": "[::]:4321",
"id": "nfs-ffc2a4e5",
"params": {
"fs": "remote:",
"opt": {
"ListenAddr": ":4321"
},
"type": "nfs",
"vfsOpt": {
"CacheMode": "full"
}
}
}
]
}
|||
`),
})
}
// listRc returns a list of current serves sorted by serve path
func listRc(_ context.Context, in rc.Params) (out rc.Params, err error) {
serveMu.Lock()
defer serveMu.Unlock()
list := []*server{}
for _, item := range servers {
list = append(list, item)
}
slices.SortFunc(list, func(a, b *server) int {
return cmp.Compare(a.ID, b.ID)
})
return rc.Params{
"list": list,
}, nil
}
func init() {
rc.Add(rc.Call{
Path: "serve/stopall",
AuthRequired: true,
Fn: stopAll,
Title: "Stop all active servers",
Help: q(`Stop all active servers.
This will stop all active servers.
rclone rc serve/stopall
`),
})
}
// stopAll shuts all the servers down
func stopAll(_ context.Context, in rc.Params) (out rc.Params, err error) {
serveMu.Lock()
defer serveMu.Unlock()
ec := errcount.New()
for id, s := range servers {
ec.Add(s.h.Shutdown())
<-s.errChan // ignore server return error - likely is "use of closed network connection"
delete(servers, id)
}
return nil, ec.Err("error when stopping server")
}

180
cmd/serve/rc_test.go Normal file
View File

@ -0,0 +1,180 @@
package serve
import (
"context"
"errors"
"net"
"testing"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/rc"
"github.com/rclone/rclone/fstest/mockfs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type dummyServer struct {
addr *net.TCPAddr
shutdownCh chan struct{}
shutdownCalled bool
}
func (d *dummyServer) Addr() net.Addr {
return d.addr
}
func (d *dummyServer) Shutdown() error {
d.shutdownCalled = true
close(d.shutdownCh)
return nil
}
func (d *dummyServer) Serve() error {
<-d.shutdownCh
return nil
}
func newServer(ctx context.Context, f fs.Fs, in rc.Params) (Handle, error) {
return &dummyServer{
addr: &net.TCPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 8080,
},
shutdownCh: make(chan struct{}),
}, nil
}
func newServerError(ctx context.Context, f fs.Fs, in rc.Params) (Handle, error) {
return nil, errors.New("serve error")
}
func newServerImmediateStop(ctx context.Context, f fs.Fs, in rc.Params) (Handle, error) {
h, _ := newServer(ctx, f, in)
close(h.(*dummyServer).shutdownCh)
return h, nil
}
func resetGlobals() {
serveMu.Lock()
defer serveMu.Unlock()
serveFns = make(map[string]Fn)
servers = make(map[string]*server)
}
func newTest(t *testing.T) {
_, err := fs.Find("mockfs")
if err != nil {
mockfs.Register()
}
resetGlobals()
t.Cleanup(resetGlobals)
}
func TestRcStartServeType(t *testing.T) {
newTest(t)
serveStart := rc.Calls.Get("serve/start")
in := rc.Params{"fs": ":mockfs:", "type": "nonexistent"}
_, err := serveStart.Fn(context.Background(), in)
assert.ErrorContains(t, err, "could not find serve type")
}
func TestRcStartServeFnError(t *testing.T) {
newTest(t)
serveStart := rc.Calls.Get("serve/start")
AddRc("error", newServerError)
in := rc.Params{"fs": ":mockfs:", "type": "error"}
_, err := serveStart.Fn(context.Background(), in)
assert.ErrorContains(t, err, "could not start serve")
}
func TestRcStartImmediateStop(t *testing.T) {
newTest(t)
serveStart := rc.Calls.Get("serve/start")
AddRc("immediate", newServerImmediateStop)
in := rc.Params{"fs": ":mockfs:", "type": "immediate"}
_, err := serveStart.Fn(context.Background(), in)
assert.ErrorContains(t, err, "server stopped immediately")
}
func TestRcStartAndStop(t *testing.T) {
newTest(t)
serveStart := rc.Calls.Get("serve/start")
serveStop := rc.Calls.Get("serve/stop")
AddRc("dummy", newServer)
in := rc.Params{"fs": ":mockfs:", "type": "dummy"}
out, err := serveStart.Fn(context.Background(), in)
require.NoError(t, err)
id := out["id"].(string)
assert.Contains(t, id, "dummy")
assert.Equal(t, 1, len(servers))
_, err = serveStop.Fn(context.Background(), rc.Params{"id": id})
require.NoError(t, err)
assert.Equal(t, 0, len(servers))
}
func TestRcStopNonexistent(t *testing.T) {
newTest(t)
serveStop := rc.Calls.Get("serve/stop")
_, err := serveStop.Fn(context.Background(), rc.Params{"id": "nonexistent"})
assert.ErrorContains(t, err, "not found")
}
func TestRcServeTypes(t *testing.T) {
newTest(t)
serveTypes := rc.Calls.Get("serve/types")
AddRc("a", newServer)
AddRc("c", newServer)
AddRc("b", newServer)
out, err := serveTypes.Fn(context.Background(), nil)
require.NoError(t, err)
types := out["types"].([]string)
assert.Equal(t, types, []string{"a", "b", "c"})
}
func TestRcList(t *testing.T) {
newTest(t)
serveStart := rc.Calls.Get("serve/start")
serveList := rc.Calls.Get("serve/list")
AddRc("dummy", newServer)
// Start two servers.
_, err := serveStart.Fn(context.Background(), rc.Params{"fs": ":mockfs:", "type": "dummy"})
require.NoError(t, err)
_, err = serveStart.Fn(context.Background(), rc.Params{"fs": ":mockfs:", "type": "dummy"})
require.NoError(t, err)
// Check list
out, err := serveList.Fn(context.Background(), nil)
require.NoError(t, err)
list := out["list"].([]*server)
assert.Equal(t, 2, len(list))
}
func TestRcStopAll(t *testing.T) {
newTest(t)
serveStart := rc.Calls.Get("serve/start")
serveStopAll := rc.Calls.Get("serve/stopall")
AddRc("dummy", newServer)
_, err := serveStart.Fn(context.Background(), rc.Params{"fs": ":mockfs:", "type": "dummy"})
require.NoError(t, err)
_, err = serveStart.Fn(context.Background(), rc.Params{"fs": ":mockfs:", "type": "dummy"})
require.NoError(t, err)
assert.Equal(t, 2, len(servers))
_, err = serveStopAll.Fn(context.Background(), nil)
require.NoError(t, err)
assert.Equal(t, 0, len(servers))
}

77
cmd/serve/servetest/rc.go Normal file
View File

@ -0,0 +1,77 @@
package servetest
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
"github.com/rclone/rclone/fs/rc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// GetEphemeralPort opens a listening port on localhost:0, closes it,
// and returns the address as "localhost:port".
func GetEphemeralPort(t *testing.T) string {
listener, err := net.Listen("tcp", "localhost:0") // Listen on any available port
require.NoError(t, err)
defer func() {
require.NoError(t, listener.Close())
}()
return listener.Addr().String()
}
// checkTCP attempts to establish a TCP connection to the given address,
// and closes it if successful. Returns an error if the connection fails.
func checkTCP(address string) error {
conn, err := net.DialTimeout("tcp", address, 5*time.Second)
if err != nil {
return fmt.Errorf("failed to connect to %s: %w", address, err)
}
err = conn.Close()
if err != nil {
return fmt.Errorf("failed to close connection to %s: %w", address, err)
}
return nil
}
// TestRc tests the rc interface for the servers
//
// in should contain any options necessary however this code will add
// "fs", "addr".
func TestRc(t *testing.T, in rc.Params) {
ctx := context.Background()
dir := t.TempDir()
serveStart := rc.Calls.Get("serve/start")
serveStop := rc.Calls.Get("serve/stop")
name := in["type"].(string)
addr := GetEphemeralPort(t)
// Start the server
in["fs"] = dir
in["addr"] = addr
out, err := serveStart.Fn(ctx, in)
require.NoError(t, err)
id := out["id"].(string)
assert.True(t, strings.HasPrefix(id, name+"-"))
gotAddr := out["addr"].(string)
assert.Equal(t, addr, gotAddr)
// Check we can make a TCP connection to the server
t.Logf("Checking connection on %q", addr)
err = checkTCP(addr)
assert.NoError(t, err)
// Stop the server
_, err = serveStop.Fn(ctx, rc.Params{"id": id})
require.NoError(t, err)
// Check we can make no longer make connections to the server
err = checkTCP(addr)
assert.Error(t, err)
}