From 2089405e1b64de27c3f6350144b0f1661edcab86 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 26 Oct 2018 14:48:22 +0100 Subject: [PATCH] fs/rc: add more infrastructure to help writing rc functions - Fs cache for rc commands - Helper functions for parsing the input - Reshape command for manipulating JSON blobs - Background Job starting, control, query and expiry --- docs/content/rc.md | 67 +++++++++++ fs/rc/cache.go | 60 ++++++++++ fs/rc/cache_test.go | 107 ++++++++++++++++++ fs/rc/config_test.go | 88 +++++++++++++++ fs/rc/internal_test.go | 76 +++++++++++++ fs/rc/job.go | 215 +++++++++++++++++++++++++++++++++++ fs/rc/job_test.go | 194 +++++++++++++++++++++++++++++++ fs/rc/params.go | 204 +++++++++++++++++++++++++++++++++ fs/rc/params_test.go | 251 +++++++++++++++++++++++++++++++++++++++++ fs/rc/rc.go | 28 ++++- fs/rc/rc_test.go | 23 ++++ fs/rc/registry.go | 3 - 12 files changed, 1308 insertions(+), 8 deletions(-) create mode 100644 fs/rc/cache.go create mode 100644 fs/rc/cache_test.go create mode 100644 fs/rc/config_test.go create mode 100644 fs/rc/internal_test.go create mode 100644 fs/rc/job.go create mode 100644 fs/rc/job_test.go create mode 100644 fs/rc/params.go create mode 100644 fs/rc/params_test.go create mode 100644 fs/rc/rc_test.go diff --git a/docs/content/rc.md b/docs/content/rc.md index 0997a1d0f..92cb99e14 100644 --- a/docs/content/rc.md +++ b/docs/content/rc.md @@ -86,6 +86,73 @@ $ rclone rc --json '{ "p1": [1,"2",null,4], "p2": { "a":1, "b":2 } }' rc/noop } ``` +## Special parameters + +The rc interface supports some special parameters which apply to +**all** commands. These start with `_` to show they are different. + +### Running asynchronous jobs with _async = true + +If `_async` has a true value when supplied to an rc call then it will +return immediately with a job id and the task will be run in the +background. The `job/status` call can be used to get information of +the background job. The job can be queried for up to 1 minute after +it has finished. + +It is recommended that potentially long running jobs, eg `sync/sync`, +`sync/copy`, `sync/move`, `operations/purge` are run with the `_async` +flag to avoid any potential problems with the HTTP request and +response timing out. + +Starting a job with the `_async` flag: + +``` +$ rclone rc --json '{ "p1": [1,"2",null,4], "p2": { "a":1, "b":2 }, "_async": true }' rc/noop +{ + "jobid": 2 +} +``` + +Query the status to see if the job has finished. For more information +on the meaning of these return parameters see the `job/status` call. + +``` +$ rclone rc --json '{ "jobid":2 }' job/status +{ + "duration": 0.000124163, + "endTime": "2018-10-27T11:38:07.911245881+01:00", + "error": "", + "finished": true, + "id": 2, + "output": { + "_async": true, + "p1": [ + 1, + "2", + null, + 4 + ], + "p2": { + "a": 1, + "b": 2 + } + }, + "startTime": "2018-10-27T11:38:07.911121728+01:00", + "success": true +} +``` + +`job/list` can be used to show the running or recently completed jobs + +``` +$ rclone rc job/list +{ + "jobids": [ + 2 + ] +} +``` + ## Supported commands ### cache/expire: Purge a remote from cache diff --git a/fs/rc/cache.go b/fs/rc/cache.go new file mode 100644 index 000000000..14158fe34 --- /dev/null +++ b/fs/rc/cache.go @@ -0,0 +1,60 @@ +// This implements the Fs cache + +package rc + +import ( + "sync" + + "github.com/ncw/rclone/fs" +) + +var ( + fsCacheMu sync.Mutex + fsCache = map[string]fs.Fs{} + fsNewFs = fs.NewFs // for tests +) + +// GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh +func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) { + fsCacheMu.Lock() + defer fsCacheMu.Unlock() + + fsString, err := in.GetString(fsName) + if err != nil { + return nil, err + } + + f = fsCache[fsString] + if f == nil { + f, err = fsNewFs(fsString) + if err == nil { + fsCache[fsString] = f + } + } + return f, err +} + +// GetFs gets a fs.Fs named "fs" either from the cache or creates it afresh +func GetFs(in Params) (f fs.Fs, err error) { + return GetFsNamed(in, "fs") +} + +// GetFsAndRemoteNamed gets the fsName parameter from in, makes a +// remote or fetches it from the cache then gets the remoteName +// parameter from in too. +func GetFsAndRemoteNamed(in Params, fsName, remoteName string) (f fs.Fs, remote string, err error) { + remote, err = in.GetString(remoteName) + if err != nil { + return + } + f, err = GetFsNamed(in, fsName) + return + +} + +// GetFsAndRemote gets the `fs` parameter from in, makes a remote or +// fetches it from the cache then gets the `remote` parameter from in +// too. +func GetFsAndRemote(in Params) (f fs.Fs, remote string, err error) { + return GetFsAndRemoteNamed(in, "fs", "remote") +} diff --git a/fs/rc/cache_test.go b/fs/rc/cache_test.go new file mode 100644 index 000000000..07c696bca --- /dev/null +++ b/fs/rc/cache_test.go @@ -0,0 +1,107 @@ +package rc + +import ( + "testing" + + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fstest/mockfs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var called = 0 + +func mockNewFs(t *testing.T) func() { + called = 0 + oldFsNewFs := fsNewFs + fsNewFs = func(path string) (fs.Fs, error) { + assert.Equal(t, 0, called) + called++ + assert.Equal(t, "/", path) + return mockfs.NewFs("mock", "mock"), nil + } + return func() { + fsNewFs = oldFsNewFs + fsCache = map[string]fs.Fs{} + } +} + +func TestGetCachedFs(t *testing.T) { + defer mockNewFs(t)() + + assert.Equal(t, 0, len(fsCache)) + + f, err := GetCachedFs("/") + require.NoError(t, err) + + assert.Equal(t, 1, len(fsCache)) + + f2, err := GetCachedFs("/") + require.NoError(t, err) + + assert.Equal(t, f, f2) +} + +func TestGetFsNamed(t *testing.T) { + defer mockNewFs(t)() + + in := Params{ + "potato": "/", + } + f, err := GetFsNamed(in, "potato") + require.NoError(t, err) + assert.NotNil(t, f) + + in = Params{ + "sausage": "/", + } + f, err = GetFsNamed(in, "potato") + require.Error(t, err) + assert.Nil(t, f) +} + +func TestGetFs(t *testing.T) { + defer mockNewFs(t)() + + in := Params{ + "fs": "/", + } + f, err := GetFs(in) + require.NoError(t, err) + assert.NotNil(t, f) +} + +func TestGetFsAndRemoteNamed(t *testing.T) { + defer mockNewFs(t)() + + in := Params{ + "fs": "/", + "remote": "hello", + } + f, remote, err := GetFsAndRemoteNamed(in, "fs", "remote") + require.NoError(t, err) + assert.NotNil(t, f) + assert.Equal(t, "hello", remote) + + f, remote, err = GetFsAndRemoteNamed(in, "fsX", "remote") + require.Error(t, err) + assert.Nil(t, f) + + f, remote, err = GetFsAndRemoteNamed(in, "fs", "remoteX") + require.Error(t, err) + assert.Nil(t, f) + +} + +func TestGetFsAndRemote(t *testing.T) { + defer mockNewFs(t)() + + in := Params{ + "fs": "/", + "remote": "hello", + } + f, remote, err := GetFsAndRemote(in) + require.NoError(t, err) + assert.NotNil(t, f) + assert.Equal(t, "hello", remote) +} diff --git a/fs/rc/config_test.go b/fs/rc/config_test.go new file mode 100644 index 000000000..ed954a203 --- /dev/null +++ b/fs/rc/config_test.go @@ -0,0 +1,88 @@ +package rc + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func clearOptionBlock() { + optionBlock = map[string]interface{}{} +} + +var testOptions = struct { + String string + Int int +}{ + String: "hello", + Int: 42, +} + +func TestAddOption(t *testing.T) { + defer clearOptionBlock() + assert.Equal(t, len(optionBlock), 0) + AddOption("potato", &testOptions) + assert.Equal(t, len(optionBlock), 1) + assert.Equal(t, &testOptions, optionBlock["potato"]) +} + +func TestOptionsBlocks(t *testing.T) { + defer clearOptionBlock() + AddOption("potato", &testOptions) + call := Calls.Get("options/blocks") + require.NotNil(t, call) + in := Params{} + out, err := call.Fn(in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, Params{"options": []string{"potato"}}, out) +} + +func TestOptionsGet(t *testing.T) { + defer clearOptionBlock() + AddOption("potato", &testOptions) + call := Calls.Get("options/get") + require.NotNil(t, call) + in := Params{} + out, err := call.Fn(in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, Params{"potato": &testOptions}, out) +} + +func TestOptionsSet(t *testing.T) { + defer clearOptionBlock() + AddOption("potato", &testOptions) + call := Calls.Get("options/set") + require.NotNil(t, call) + + in := Params{ + "potato": Params{ + "Int": 50, + }, + } + out, err := call.Fn(in) + require.NoError(t, err) + require.Nil(t, out) + assert.Equal(t, 50, testOptions.Int) + assert.Equal(t, "hello", testOptions.String) + + // unknown option block + in = Params{ + "sausage": Params{ + "Int": 50, + }, + } + out, err = call.Fn(in) + require.Error(t, err) + assert.Contains(t, err.Error(), "unknown option block") + + // bad shape + in = Params{ + "potato": []string{"a", "b"}, + } + out, err = call.Fn(in) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to write options") +} diff --git a/fs/rc/internal_test.go b/fs/rc/internal_test.go new file mode 100644 index 000000000..cf8c39ff2 --- /dev/null +++ b/fs/rc/internal_test.go @@ -0,0 +1,76 @@ +package rc + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestInternalNoop(t *testing.T) { + call := Calls.Get("rc/noop") + assert.NotNil(t, call) + in := Params{ + "String": "hello", + "Int": 42, + } + out, err := call.Fn(in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, in, out) +} + +func TestInternalError(t *testing.T) { + call := Calls.Get("rc/error") + assert.NotNil(t, call) + in := Params{} + out, err := call.Fn(in) + require.Error(t, err) + require.Nil(t, out) +} + +func TestInternalList(t *testing.T) { + call := Calls.Get("rc/list") + assert.NotNil(t, call) + in := Params{} + out, err := call.Fn(in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, Params{"commands": Calls.List()}, out) +} + +func TestCorePid(t *testing.T) { + call := Calls.Get("core/pid") + assert.NotNil(t, call) + in := Params{} + out, err := call.Fn(in) + require.NoError(t, err) + require.NotNil(t, out) + pid := out["pid"] + assert.NotEqual(t, nil, pid) + _, ok := pid.(int) + assert.Equal(t, true, ok) +} + +func TestCoreMemstats(t *testing.T) { + call := Calls.Get("core/memstats") + assert.NotNil(t, call) + in := Params{} + out, err := call.Fn(in) + require.NoError(t, err) + require.NotNil(t, out) + sys := out["Sys"] + assert.NotEqual(t, nil, sys) + _, ok := sys.(uint64) + assert.Equal(t, true, ok) +} + +func TestCoreGC(t *testing.T) { + call := Calls.Get("core/gc") + assert.NotNil(t, call) + in := Params{} + out, err := call.Fn(in) + require.NoError(t, err) + require.Nil(t, out) + assert.Equal(t, Params(nil), out) +} diff --git a/fs/rc/job.go b/fs/rc/job.go new file mode 100644 index 000000000..daf41b418 --- /dev/null +++ b/fs/rc/job.go @@ -0,0 +1,215 @@ +// Manage background jobs that the rc is running + +package rc + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" +) + +const ( + // expire the job when it is finished and older than this + expireDuration = 60 * time.Second + // inteval to run the expire cache + expireInterval = 10 * time.Second +) + +// Job describes a asynchronous task started via the rc package +type Job struct { + mu sync.Mutex + ID int64 `json:"id"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` + Error string `json:"error"` + Finished bool `json:"finished"` + Success bool `json:"success"` + Duration float64 `json:"duration"` + Output Params `json:"output"` +} + +// Jobs describes a collection of running tasks +type Jobs struct { + mu sync.RWMutex + jobs map[int64]*Job + expireInterval time.Duration + expireRunning bool +} + +var ( + running = newJobs() + jobID = int64(0) +) + +// newJobs makes a new Jobs structure +func newJobs() *Jobs { + return &Jobs{ + jobs: map[int64]*Job{}, + expireInterval: expireInterval, + } +} + +// kickExpire makes sure Expire is running +func (jobs *Jobs) kickExpire() { + jobs.mu.Lock() + defer jobs.mu.Unlock() + if !jobs.expireRunning { + time.AfterFunc(jobs.expireInterval, jobs.Expire) + jobs.expireRunning = true + } +} + +// Expire expires any jobs that haven't been collected +func (jobs *Jobs) Expire() { + jobs.mu.Lock() + defer jobs.mu.Unlock() + now := time.Now() + for ID, job := range jobs.jobs { + job.mu.Lock() + if job.Finished && now.Sub(job.EndTime) > expireDuration { + delete(jobs.jobs, ID) + } + job.mu.Unlock() + } + if len(jobs.jobs) != 0 { + time.AfterFunc(jobs.expireInterval, jobs.Expire) + jobs.expireRunning = true + } else { + jobs.expireRunning = false + } +} + +// IDs returns the IDs of the running jobs +func (jobs *Jobs) IDs() (IDs []int64) { + jobs.mu.RLock() + defer jobs.mu.RUnlock() + IDs = []int64{} + for ID := range jobs.jobs { + IDs = append(IDs, ID) + } + return IDs +} + +// Get a job with a given ID or nil if it doesn't exist +func (jobs *Jobs) Get(ID int64) *Job { + jobs.mu.RLock() + defer jobs.mu.RUnlock() + return jobs.jobs[ID] +} + +// mark the job as finished +func (job *Job) finish(out Params, err error) { + job.mu.Lock() + job.EndTime = time.Now() + if out == nil { + out = make(Params) + } + job.Output = out + job.Duration = job.EndTime.Sub(job.StartTime).Seconds() + if err != nil { + job.Error = err.Error() + job.Success = false + } else { + job.Error = "" + job.Success = true + } + job.Finished = true + job.mu.Unlock() + running.kickExpire() // make sure this job gets expired +} + +// run the job until completion writing the return status +func (job *Job) run(fn Func, in Params) { + defer func() { + if r := recover(); r != nil { + job.finish(nil, errors.Errorf("panic received: %v", r)) + } + }() + job.finish(fn(in)) +} + +// NewJob start a new Job off +func (jobs *Jobs) NewJob(fn Func, in Params) *Job { + job := &Job{ + ID: atomic.AddInt64(&jobID, 1), + StartTime: time.Now(), + } + go job.run(fn, in) + jobs.mu.Lock() + jobs.jobs[job.ID] = job + jobs.mu.Unlock() + return job + +} + +// StartJob starts a new job and returns a Param suitable for output +func StartJob(fn Func, in Params) (Params, error) { + job := running.NewJob(fn, in) + out := make(Params) + out["jobid"] = job.ID + return out, nil +} + +func init() { + Add(Call{ + Path: "job/status", + Fn: rcJobStatus, + Title: "Reads the status of the job ID", + Help: `Parameters +- jobid - id of the job (integer) + +Results +- finished - boolean +- duration - time in seconds that the job ran for +- endTime - time the job finished (eg "2018-10-26T18:50:20.528746884+01:00") +- error - error from the job or empty string for no error +- finished - boolean whether the job has finished or not +- id - as passed in above +- startTime - time the job started (eg "2018-10-26T18:50:20.528336039+01:00") +- success - boolean - true for success false otherwise +- output - output of the job as would have been returned if called synchronously +`, + }) +} + +// Returns the status of a job +func rcJobStatus(in Params) (out Params, err error) { + jobID, err := in.GetInt64("jobid") + if err != nil { + return nil, err + } + job := running.Get(jobID) + if job == nil { + return nil, errors.New("job not found") + } + job.mu.Lock() + defer job.mu.Unlock() + out = make(Params) + err = Reshape(&out, job) + if job == nil { + return nil, errors.New("Reshape failed in job status") + } + return out, nil +} + +func init() { + Add(Call{ + Path: "job/list", + Fn: rcJobList, + Title: "Lists the IDs of the running jobs", + Help: `Parameters - None + +Results +- jobids - array of integer job ids +`, + }) +} + +// Returns the status of a job +func rcJobList(in Params) (out Params, err error) { + out = make(Params) + out["jobids"] = running.IDs() + return out, nil +} diff --git a/fs/rc/job_test.go b/fs/rc/job_test.go new file mode 100644 index 000000000..5c9b56446 --- /dev/null +++ b/fs/rc/job_test.go @@ -0,0 +1,194 @@ +package rc + +import ( + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewJobs(t *testing.T) { + jobs := newJobs() + assert.Equal(t, 0, len(jobs.jobs)) +} + +func TestJobsKickExpire(t *testing.T) { + jobs := newJobs() + jobs.expireInterval = time.Millisecond + assert.Equal(t, false, jobs.expireRunning) + jobs.kickExpire() + assert.Equal(t, true, jobs.expireRunning) + time.Sleep(10 * time.Millisecond) + assert.Equal(t, false, jobs.expireRunning) +} + +func TestJobsExpire(t *testing.T) { + wait := make(chan struct{}) + jobs := newJobs() + jobs.expireInterval = time.Millisecond + assert.Equal(t, false, jobs.expireRunning) + job := jobs.NewJob(func(in Params) (Params, error) { + defer close(wait) + return in, nil + }, Params{}) + <-wait + assert.Equal(t, 1, len(jobs.jobs)) + jobs.Expire() + assert.Equal(t, 1, len(jobs.jobs)) + job.EndTime = time.Now().Add(-expireDuration - 60*time.Second) + assert.Equal(t, true, jobs.expireRunning) + time.Sleep(10 * time.Millisecond) + assert.Equal(t, false, jobs.expireRunning) + assert.Equal(t, 0, len(jobs.jobs)) +} + +var noopFn = func(in Params) (Params, error) { + return nil, nil +} + +func TestJobsIDs(t *testing.T) { + jobs := newJobs() + job1 := jobs.NewJob(noopFn, Params{}) + job2 := jobs.NewJob(noopFn, Params{}) + wantIDs := []int64{job1.ID, job2.ID} + gotIDs := jobs.IDs() + require.Equal(t, 2, len(gotIDs)) + if gotIDs[0] != wantIDs[0] { + gotIDs[0], gotIDs[1] = gotIDs[1], gotIDs[0] + } + assert.Equal(t, wantIDs, gotIDs) +} + +func TestJobsGet(t *testing.T) { + jobs := newJobs() + job := jobs.NewJob(noopFn, Params{}) + assert.Equal(t, job, jobs.Get(job.ID)) + assert.Nil(t, jobs.Get(123123123123)) +} + +var longFn = func(in Params) (Params, error) { + time.Sleep(1 * time.Hour) + return nil, nil +} + +func TestJobFinish(t *testing.T) { + jobs := newJobs() + job := jobs.NewJob(longFn, Params{}) + + assert.Equal(t, true, job.EndTime.IsZero()) + assert.Equal(t, Params(nil), job.Output) + assert.Equal(t, 0.0, job.Duration) + assert.Equal(t, "", job.Error) + assert.Equal(t, false, job.Success) + assert.Equal(t, false, job.Finished) + + wantOut := Params{"a": 1} + job.finish(wantOut, nil) + + assert.Equal(t, false, job.EndTime.IsZero()) + assert.Equal(t, wantOut, job.Output) + assert.NotEqual(t, 0.0, job.Duration) + assert.Equal(t, "", job.Error) + assert.Equal(t, true, job.Success) + assert.Equal(t, true, job.Finished) + + job = jobs.NewJob(longFn, Params{}) + job.finish(nil, nil) + + assert.Equal(t, false, job.EndTime.IsZero()) + assert.Equal(t, Params{}, job.Output) + assert.NotEqual(t, 0.0, job.Duration) + assert.Equal(t, "", job.Error) + assert.Equal(t, true, job.Success) + assert.Equal(t, true, job.Finished) + + job = jobs.NewJob(longFn, Params{}) + job.finish(wantOut, errors.New("potato")) + + assert.Equal(t, false, job.EndTime.IsZero()) + assert.Equal(t, wantOut, job.Output) + assert.NotEqual(t, 0.0, job.Duration) + assert.Equal(t, "potato", job.Error) + assert.Equal(t, false, job.Success) + assert.Equal(t, true, job.Finished) +} + +// We've tested the functionality of run() already as it is +// part of NewJob, now just test the panic catching +func TestJobRunPanic(t *testing.T) { + wait := make(chan struct{}) + boom := func(in Params) (Params, error) { + defer close(wait) + panic("boom") + } + + jobs := newJobs() + job := jobs.NewJob(boom, Params{}) + <-wait + + assert.Equal(t, false, job.EndTime.IsZero()) + assert.Equal(t, Params{}, job.Output) + assert.NotEqual(t, 0.0, job.Duration) + assert.Equal(t, "panic received: boom", job.Error) + assert.Equal(t, false, job.Success) + assert.Equal(t, true, job.Finished) +} + +func TestJobsNewJob(t *testing.T) { + jobID = 0 + jobs := newJobs() + job := jobs.NewJob(noopFn, Params{}) + assert.Equal(t, int64(1), job.ID) + assert.Equal(t, job, jobs.Get(1)) + +} + +func TestStartJob(t *testing.T) { + jobID = 0 + out, err := StartJob(longFn, Params{}) + assert.NoError(t, err) + assert.Equal(t, Params{"jobid": int64(1)}, out) +} + +func TestRcJobStatus(t *testing.T) { + jobID = 0 + _, err := StartJob(longFn, Params{}) + assert.NoError(t, err) + + call := Calls.Get("job/status") + assert.NotNil(t, call) + in := Params{"jobid": 1} + out, err := call.Fn(in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, float64(1), out["id"]) + assert.Equal(t, "", out["error"]) + assert.Equal(t, false, out["finished"]) + assert.Equal(t, false, out["success"]) + + in = Params{"jobid": 123123123} + _, err = call.Fn(in) + require.Error(t, err) + assert.Contains(t, err.Error(), "job not found") + + in = Params{"jobidx": 123123123} + _, err = call.Fn(in) + require.Error(t, err) + assert.Contains(t, err.Error(), "Didn't find key") +} + +func TestRcJobList(t *testing.T) { + jobID = 0 + _, err := StartJob(longFn, Params{}) + assert.NoError(t, err) + + call := Calls.Get("job/list") + assert.NotNil(t, call) + in := Params{} + out, err := call.Fn(in) + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, Params{"jobids": []int64{1}}, out) +} diff --git a/fs/rc/params.go b/fs/rc/params.go new file mode 100644 index 000000000..07855b16c --- /dev/null +++ b/fs/rc/params.go @@ -0,0 +1,204 @@ +// Parameter parsing + +package rc + +import ( + "encoding/json" + "fmt" + "math" + "strconv" + + "github.com/pkg/errors" +) + +// Params is the input and output type for the Func +type Params map[string]interface{} + +// ErrParamNotFound - this is returned from the Get* functions if the +// parameter isn't found along with a zero value of the requested +// item. +// +// Returning an error of this type from an rc.Func will cause the http +// method to return http.StatusBadRequest +type ErrParamNotFound string + +// Error turns this error into a string +func (e ErrParamNotFound) Error() string { + return fmt.Sprintf("Didn't find key %q in input", string(e)) +} + +// IsErrParamNotFound returns whether err is ErrParamNotFound +func IsErrParamNotFound(err error) bool { + _, isNotFound := err.(ErrParamNotFound) + return isNotFound +} + +// NotErrParamNotFound returns true if err != nil and +// !IsErrParamNotFound(err) +// +// This is for checking error returns of the Get* functions to ignore +// error not found returns and take the default value. +func NotErrParamNotFound(err error) bool { + return err != nil && !IsErrParamNotFound(err) +} + +// ErrParamInvalid - this is returned from the Get* functions if the +// parameter is invalid. +// +// +// Returning an error of this type from an rc.Func will cause the http +// method to return http.StatusBadRequest +type ErrParamInvalid struct { + error +} + +// IsErrParamInvalid returns whether err is ErrParamInvalid +func IsErrParamInvalid(err error) bool { + _, isInvalid := err.(ErrParamInvalid) + return isInvalid +} + +// Reshape reshapes one blob of data into another via json serialization +// +// out should be a pointer type +// +// This isn't a very efficient way of dealing with this! +func Reshape(out interface{}, in interface{}) error { + b, err := json.Marshal(in) + if err != nil { + return errors.Wrapf(err, "Reshape failed to Marshal") + } + err = json.Unmarshal(b, out) + if err != nil { + return errors.Wrapf(err, "Reshape failed to Unmarshal") + } + return nil +} + +// Get gets a parameter from the input +// +// If the parameter isn't found then error will be of type +// ErrParamNotFound and the returned value will be nil. +func (p Params) Get(key string) (interface{}, error) { + value, ok := p[key] + if !ok { + return nil, ErrParamNotFound(key) + } + return value, nil +} + +// GetString gets a string parameter from the input +// +// If the parameter isn't found then error will be of type +// ErrParamNotFound and the returned value will be "". +func (p Params) GetString(key string) (string, error) { + value, err := p.Get(key) + if err != nil { + return "", err + } + str, ok := value.(string) + if !ok { + return "", ErrParamInvalid{errors.Errorf("expecting string value for key %q (was %T)", key, value)} + } + return str, nil +} + +// GetInt64 gets a int64 parameter from the input +// +// If the parameter isn't found then error will be of type +// ErrParamNotFound and the returned value will be 0. +func (p Params) GetInt64(key string) (int64, error) { + value, err := p.Get(key) + if err != nil { + return 0, err + } + switch x := value.(type) { + case int: + return int64(x), nil + case int64: + return x, nil + case float64: + if x > math.MaxInt64 || x < math.MinInt64 { + return 0, ErrParamInvalid{errors.Errorf("key %q (%v) overflows int64 ", key, value)} + } + return int64(x), nil + case string: + i, err := strconv.ParseInt(x, 10, 0) + if err != nil { + return 0, ErrParamInvalid{errors.Wrapf(err, "couldn't parse key %q (%v) as int64", key, value)} + } + return i, nil + } + return 0, ErrParamInvalid{errors.Errorf("expecting int64 value for key %q (was %T)", key, value)} +} + +// GetFloat64 gets a float64 parameter from the input +// +// If the parameter isn't found then error will be of type +// ErrParamNotFound and the returned value will be 0. +func (p Params) GetFloat64(key string) (float64, error) { + value, err := p.Get(key) + if err != nil { + return 0, err + } + switch x := value.(type) { + case float64: + return x, nil + case int: + return float64(x), nil + case int64: + return float64(x), nil + case string: + f, err := strconv.ParseFloat(x, 64) + if err != nil { + return 0, ErrParamInvalid{errors.Wrapf(err, "couldn't parse key %q (%v) as float64", key, value)} + } + return f, nil + } + return 0, ErrParamInvalid{errors.Errorf("expecting float64 value for key %q (was %T)", key, value)} +} + +// GetBool gets a boolean parameter from the input +// +// If the parameter isn't found then error will be of type +// ErrParamNotFound and the returned value will be false. +func (p Params) GetBool(key string) (bool, error) { + value, err := p.Get(key) + if err != nil { + return false, err + } + switch x := value.(type) { + case int: + return x != 0, nil + case int64: + return x != 0, nil + case float64: + return x != 0, nil + case bool: + return x, nil + case string: + b, err := strconv.ParseBool(x) + if err != nil { + return false, ErrParamInvalid{errors.Wrapf(err, "couldn't parse key %q (%v) as bool", key, value)} + } + return b, nil + } + return false, ErrParamInvalid{errors.Errorf("expecting bool value for key %q (was %T)", key, value)} +} + +// GetStruct gets a struct from key from the input into the struct +// pointed to by out. out must be a pointer type. +// +// If the parameter isn't found then error will be of type +// ErrParamNotFound and out will be unchanged. +func (p Params) GetStruct(key string, out interface{}) error { + value, err := p.Get(key) + if err != nil { + return err + } + err = Reshape(out, value) + if err != nil { + return ErrParamInvalid{errors.Wrapf(err, "key %q", key)} + } + return nil +} diff --git a/fs/rc/params_test.go b/fs/rc/params_test.go new file mode 100644 index 000000000..67cd8ab10 --- /dev/null +++ b/fs/rc/params_test.go @@ -0,0 +1,251 @@ +package rc + +import ( + "fmt" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestErrParamNotFoundError(t *testing.T) { + e := ErrParamNotFound("key") + assert.Equal(t, "Didn't find key \"key\" in input", e.Error()) +} + +func TestIsErrParamNotFound(t *testing.T) { + assert.Equal(t, true, IsErrParamNotFound(ErrParamNotFound("key"))) + assert.Equal(t, false, IsErrParamNotFound(nil)) + assert.Equal(t, false, IsErrParamNotFound(errors.New("potato"))) +} + +func TestNotErrParamNotFound(t *testing.T) { + assert.Equal(t, false, NotErrParamNotFound(ErrParamNotFound("key"))) + assert.Equal(t, false, NotErrParamNotFound(nil)) + assert.Equal(t, true, NotErrParamNotFound(errors.New("potato"))) +} + +func TestIsErrParamInvalid(t *testing.T) { + e := ErrParamInvalid{errors.New("potato")} + assert.Equal(t, true, IsErrParamInvalid(e)) + assert.Equal(t, false, IsErrParamInvalid(nil)) + assert.Equal(t, false, IsErrParamInvalid(errors.New("potato"))) +} + +func TestReshape(t *testing.T) { + in := Params{ + "String": "hello", + "Float": 4.2, + } + var out struct { + String string + Float float64 + } + require.NoError(t, Reshape(&out, in)) + assert.Equal(t, "hello", out.String) + assert.Equal(t, 4.2, out.Float) + var inCopy = Params{} + require.NoError(t, Reshape(&inCopy, out)) + assert.Equal(t, in, inCopy) + + // Now a failure to marshal + var in2 func() + require.Error(t, Reshape(&inCopy, in2)) + + // Now a failure to unmarshal + require.Error(t, Reshape(&out, "string")) + +} + +func TestParamsGet(t *testing.T) { + in := Params{ + "ok": 1, + } + v1, e1 := in.Get("ok") + assert.NoError(t, e1) + assert.Equal(t, 1, v1) + v2, e2 := in.Get("notOK") + assert.Error(t, e2) + assert.Equal(t, nil, v2) + assert.Equal(t, ErrParamNotFound("notOK"), e2) +} + +func TestParamsGetString(t *testing.T) { + in := Params{ + "string": "one", + "notString": 17, + } + v1, e1 := in.GetString("string") + assert.NoError(t, e1) + assert.Equal(t, "one", v1) + v2, e2 := in.GetString("notOK") + assert.Error(t, e2) + assert.Equal(t, "", v2) + assert.Equal(t, ErrParamNotFound("notOK"), e2) + v3, e3 := in.GetString("notString") + assert.Error(t, e3) + assert.Equal(t, "", v3) + assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error()) +} + +func TestParamsGetInt64(t *testing.T) { + for _, test := range []struct { + value interface{} + result int64 + errString string + }{ + {"123", 123, ""}, + {"123x", 0, "couldn't parse"}, + {int(12), 12, ""}, + {int64(13), 13, ""}, + {float64(14), 14, ""}, + {float64(9.3E18), 0, "overflows int64"}, + {float64(-9.3E18), 0, "overflows int64"}, + } { + t.Run(fmt.Sprintf("%T=%v", test.value, test.value), func(t *testing.T) { + in := Params{ + "key": test.value, + } + v1, e1 := in.GetInt64("key") + if test.errString == "" { + require.NoError(t, e1) + assert.Equal(t, test.result, v1) + } else { + require.NotNil(t, e1) + require.Error(t, e1) + assert.Contains(t, e1.Error(), test.errString) + assert.Equal(t, int64(0), v1) + } + }) + } + in := Params{ + "notInt64": []string{"a", "b"}, + } + v2, e2 := in.GetInt64("notOK") + assert.Error(t, e2) + assert.Equal(t, int64(0), v2) + assert.Equal(t, ErrParamNotFound("notOK"), e2) + v3, e3 := in.GetInt64("notInt64") + assert.Error(t, e3) + assert.Equal(t, int64(0), v3) + assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error()) +} + +func TestParamsGetFloat64(t *testing.T) { + for _, test := range []struct { + value interface{} + result float64 + errString string + }{ + {"123.1", 123.1, ""}, + {"123x1", 0, "couldn't parse"}, + {int(12), 12, ""}, + {int64(13), 13, ""}, + {float64(14), 14, ""}, + } { + t.Run(fmt.Sprintf("%T=%v", test.value, test.value), func(t *testing.T) { + in := Params{ + "key": test.value, + } + v1, e1 := in.GetFloat64("key") + if test.errString == "" { + require.NoError(t, e1) + assert.Equal(t, test.result, v1) + } else { + require.NotNil(t, e1) + require.Error(t, e1) + assert.Contains(t, e1.Error(), test.errString) + assert.Equal(t, float64(0), v1) + } + }) + } + in := Params{ + "notFloat64": []string{"a", "b"}, + } + v2, e2 := in.GetFloat64("notOK") + assert.Error(t, e2) + assert.Equal(t, float64(0), v2) + assert.Equal(t, ErrParamNotFound("notOK"), e2) + v3, e3 := in.GetFloat64("notFloat64") + assert.Error(t, e3) + assert.Equal(t, float64(0), v3) + assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error()) +} + +func TestParamsGetBool(t *testing.T) { + for _, test := range []struct { + value interface{} + result bool + errString string + }{ + {true, true, ""}, + {false, false, ""}, + {"true", true, ""}, + {"false", false, ""}, + {"fasle", false, "couldn't parse"}, + {int(12), true, ""}, + {int(0), false, ""}, + {int64(13), true, ""}, + {int64(0), false, ""}, + {float64(14), true, ""}, + {float64(0), false, ""}, + } { + t.Run(fmt.Sprintf("%T=%v", test.value, test.value), func(t *testing.T) { + in := Params{ + "key": test.value, + } + v1, e1 := in.GetBool("key") + if test.errString == "" { + require.NoError(t, e1) + assert.Equal(t, test.result, v1) + } else { + require.NotNil(t, e1) + require.Error(t, e1) + assert.Contains(t, e1.Error(), test.errString) + assert.Equal(t, false, v1) + } + }) + } + in := Params{ + "notBool": []string{"a", "b"}, + } + v2, e2 := Params{}.GetBool("notOK") + assert.Error(t, e2) + assert.Equal(t, false, v2) + assert.Equal(t, ErrParamNotFound("notOK"), e2) + v3, e3 := in.GetBool("notBool") + assert.Error(t, e3) + assert.Equal(t, false, v3) + assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error()) +} + +func TestParamsGetStruct(t *testing.T) { + in := Params{ + "struct": Params{ + "String": "one", + "Float": 4.2, + }, + } + var out struct { + String string + Float float64 + } + e1 := in.GetStruct("struct", &out) + assert.NoError(t, e1) + assert.Equal(t, "one", out.String) + assert.Equal(t, 4.2, out.Float) + + e2 := in.GetStruct("notOK", &out) + assert.Error(t, e2) + assert.Equal(t, "one", out.String) + assert.Equal(t, 4.2, out.Float) + assert.Equal(t, ErrParamNotFound("notOK"), e2) + + in["struct"] = "string" + e3 := in.GetStruct("struct", &out) + assert.Error(t, e3) + assert.Equal(t, "one", out.String) + assert.Equal(t, 4.2, out.Float) + assert.Equal(t, true, IsErrParamInvalid(e3), e3.Error()) +} diff --git a/fs/rc/rc.go b/fs/rc/rc.go index 26c34c6f2..0cb68aa2a 100644 --- a/fs/rc/rc.go +++ b/fs/rc/rc.go @@ -116,8 +116,6 @@ func (s *server) handler(w http.ResponseWriter, r *http.Request) { } } - fs.Debugf(nil, "form = %+v", r.Form) - w.Header().Add("Access-Control-Allow-Origin", "*") //echo back headers client needs reqAccessHeaders := r.Header.Get("Access-Control-Request-Headers") @@ -137,6 +135,11 @@ func (s *server) handler(w http.ResponseWriter, r *http.Request) { func (s *server) handlePost(w http.ResponseWriter, r *http.Request, path string, in Params) { writeError := func(err error, status int) { fs.Errorf(nil, "rc: %q: error: %v", path, err) + // Adjust the error return for some well known errors + switch errors.Cause(err) { + case fs.ErrorDirNotFound, fs.ErrorObjectNotFound: + status = http.StatusNotFound + } w.WriteHeader(status) err = WriteJSON(w, Params{ "error": err.Error(), @@ -155,13 +158,28 @@ func (s *server) handlePost(w http.ResponseWriter, r *http.Request, path string, return } - fs.Debugf(nil, "rc: %q: with parameters %+v", path, in) - out, err := call.Fn(in) + // Check to see if it is async or not + isAsync, err := in.GetBool("_async") if err != nil { - writeError(errors.Wrap(err, "remote control command failed"), http.StatusInternalServerError) + writeError(err, http.StatusBadRequest) return } + fs.Debugf(nil, "rc: %q: with parameters %+v", path, in) + var out Params + if isAsync { + out, err = StartJob(call.Fn, in) + } else { + out, err = call.Fn(in) + } + if err != nil { + writeError(err, http.StatusInternalServerError) + return + } + if out == nil { + out = make(Params) + } + fs.Debugf(nil, "rc: %q: reply %+v: %v", path, out, err) err = WriteJSON(w, out) if err != nil { diff --git a/fs/rc/rc_test.go b/fs/rc/rc_test.go new file mode 100644 index 000000000..c6c2ad30a --- /dev/null +++ b/fs/rc/rc_test.go @@ -0,0 +1,23 @@ +package rc + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWriteJSON(t *testing.T) { + var buf bytes.Buffer + err := WriteJSON(&buf, Params{ + "String": "hello", + "Int": 42, + }) + require.NoError(t, err) + assert.Equal(t, `{ + "Int": 42, + "String": "hello" +} +`, buf.String()) +} diff --git a/fs/rc/registry.go b/fs/rc/registry.go index 54a4dc953..30e5bbd44 100644 --- a/fs/rc/registry.go +++ b/fs/rc/registry.go @@ -10,9 +10,6 @@ import ( "github.com/ncw/rclone/fs" ) -// Params is the input and output type for the Func -type Params map[string]interface{} - // Func defines a type for a remote control function type Func func(in Params) (out Params, err error)