From 1fb6ad700fff570ce221d61c4388046cb3550d2b Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 5 Nov 2020 16:59:59 +0000 Subject: [PATCH] accounting: add context.Context #3257 #4685 --- backend/alias/alias_internal_test.go | 2 +- backend/b2/b2.go | 4 +- backend/http/http_internal_test.go | 2 +- cmd/serve/dlna/dlna_test.go | 2 +- cmd/serve/ftp/ftp.go | 4 +- cmd/serve/http/http.go | 2 +- cmd/serve/http/http_test.go | 2 +- cmd/serve/httplib/serve/dir.go | 2 +- cmd/serve/httplib/serve/serve.go | 2 +- fs/accounting/accounting_test.go | 56 +++++++++++++++---------- fs/accounting/inprogress.go | 3 +- fs/accounting/prometheus.go | 8 +++- fs/accounting/stats.go | 11 +++-- fs/accounting/stats_groups.go | 28 ++++++------- fs/accounting/stats_groups_test.go | 32 +++++++------- fs/accounting/stats_test.go | 16 ++++--- fs/accounting/token_bucket.go | 4 +- fs/accounting/transfer.go | 4 +- fs/accounting/transfermap.go | 3 +- fs/config/config.go | 9 ++-- fs/config/config_test.go | 4 +- fs/operations/check.go | 6 +-- fs/operations/multithread_test.go | 2 +- fs/operations/operations.go | 24 +++++------ fs/rc/rcserver/rcserver.go | 2 +- fs/sync/sync.go | 4 +- fstest/fstest.go | 3 +- vfs/read.go | 4 +- vfs/vfscache/downloaders/downloaders.go | 2 +- 29 files changed, 138 insertions(+), 109 deletions(-) diff --git a/backend/alias/alias_internal_test.go b/backend/alias/alias_internal_test.go index 46542c5f4..830dbcb1a 100644 --- a/backend/alias/alias_internal_test.go +++ b/backend/alias/alias_internal_test.go @@ -19,7 +19,7 @@ var ( ) func prepare(t *testing.T, root string) { - config.LoadConfig() + config.LoadConfig(context.Background()) // Configure the remote config.FileSet(remoteName, "type", "alias") diff --git a/backend/b2/b2.go b/backend/b2/b2.go index 7f2f83752..e3dc38d44 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -1182,7 +1182,7 @@ func (f *Fs) purge(ctx context.Context, dir string, oldOnly bool) error { tr := accounting.Stats(ctx).NewCheckingTransfer(oi) err = f.deleteByID(ctx, object.ID, object.Name) checkErr(err) - tr.Done(err) + tr.Done(ctx, err) } }() } @@ -1210,7 +1210,7 @@ func (f *Fs) purge(ctx context.Context, dir string, oldOnly bool) error { toBeDeleted <- object } last = remote - tr.Done(nil) + tr.Done(ctx, nil) } return nil })) diff --git a/backend/http/http_internal_test.go b/backend/http/http_internal_test.go index 92017a389..55d1bbc70 100644 --- a/backend/http/http_internal_test.go +++ b/backend/http/http_internal_test.go @@ -47,7 +47,7 @@ func prepareServer(t *testing.T) (configmap.Simple, func()) { ts := httptest.NewServer(handler) // Configure the remote - config.LoadConfig() + config.LoadConfig(context.Background()) // fs.Config.LogLevel = fs.LogLevelDebug // fs.Config.DumpHeaders = true // fs.Config.DumpBodies = true diff --git a/cmd/serve/dlna/dlna_test.go b/cmd/serve/dlna/dlna_test.go index 193a76684..883255e19 100644 --- a/cmd/serve/dlna/dlna_test.go +++ b/cmd/serve/dlna/dlna_test.go @@ -41,7 +41,7 @@ func startServer(t *testing.T, f fs.Fs) { } func TestInit(t *testing.T) { - config.LoadConfig() + config.LoadConfig(context.Background()) f, err := fs.NewFs(context.Background(), "testdata/files") l, _ := f.List(context.Background(), "") diff --git a/cmd/serve/ftp/ftp.go b/cmd/serve/ftp/ftp.go index 195b0ef76..86f72ec5e 100644 --- a/cmd/serve/ftp/ftp.go +++ b/cmd/serve/ftp/ftp.go @@ -294,7 +294,7 @@ func (d *Driver) ListDir(path string, callback func(ftp.FileInfo) error) (err er // Account the transfer tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size()) defer func() { - tr.Done(err) + tr.Done(d.s.ctx, err) }() for _, file := range dirEntries { @@ -392,7 +392,7 @@ func (d *Driver) GetFile(path string, offset int64) (size int64, fr io.ReadClose // Account the transfer tr := accounting.GlobalStats().NewTransferRemoteSize(path, node.Size()) - defer tr.Done(nil) + defer tr.Done(d.s.ctx, nil) return node.Size(), handle, nil } diff --git a/cmd/serve/http/http.go b/cmd/serve/http/http.go index 0c430a7ad..84c0a3205 100644 --- a/cmd/serve/http/http.go +++ b/cmd/serve/http/http.go @@ -206,7 +206,7 @@ func (s *server) serveFile(w http.ResponseWriter, r *http.Request, remote string // Account the transfer tr := accounting.Stats(r.Context()).NewTransfer(obj) - defer tr.Done(nil) + defer tr.Done(r.Context(), nil) // FIXME in = fs.NewAccount(in, obj).WithBuffer() // account the transfer // Serve the file diff --git a/cmd/serve/http/http_test.go b/cmd/serve/http/http_test.go index b4d234265..4ace4ec2c 100644 --- a/cmd/serve/http/http_test.go +++ b/cmd/serve/http/http_test.go @@ -60,7 +60,7 @@ var ( func TestInit(t *testing.T) { // Configure the remote - config.LoadConfig() + config.LoadConfig(context.Background()) // fs.Config.LogLevel = fs.LogLevelDebug // fs.Config.DumpHeaders = true // fs.Config.DumpBodies = true diff --git a/cmd/serve/httplib/serve/dir.go b/cmd/serve/httplib/serve/dir.go index 04cb6572e..00d4579c5 100644 --- a/cmd/serve/httplib/serve/dir.go +++ b/cmd/serve/httplib/serve/dir.go @@ -225,7 +225,7 @@ const ( func (d *Directory) Serve(w http.ResponseWriter, r *http.Request) { // Account the transfer tr := accounting.Stats(r.Context()).NewTransferRemoteSize(d.DirRemote, -1) - defer tr.Done(nil) + defer tr.Done(r.Context(), nil) fs.Infof(d.DirRemote, "%s: Serving directory", r.RemoteAddr) diff --git a/cmd/serve/httplib/serve/serve.go b/cmd/serve/httplib/serve/serve.go index fd0cdb5b8..c4b70f1fd 100644 --- a/cmd/serve/httplib/serve/serve.go +++ b/cmd/serve/httplib/serve/serve.go @@ -77,7 +77,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) { } tr := accounting.Stats(r.Context()).NewTransfer(o) defer func() { - tr.Done(err) + tr.Done(r.Context(), err) }() in := tr.Account(r.Context(), file) // account the transfer (no buffering) diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index 67b7e1d49..f826d7cd0 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -28,8 +28,9 @@ var ( ) func TestNewAccountSizeName(t *testing.T) { + ctx := context.Background() in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - stats := NewStats() + stats := NewStats(ctx) acc := newAccountSizeName(context.Background(), stats, in, 1, "test") assert.Equal(t, in, acc.in) assert.Equal(t, acc, stats.inProgress.get("test")) @@ -42,10 +43,11 @@ func TestNewAccountSizeName(t *testing.T) { } func TestAccountWithBuffer(t *testing.T) { + ctx := context.Background() in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - stats := NewStats() - acc := newAccountSizeName(context.Background(), stats, in, -1, "test") + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, -1, "test") assert.False(t, acc.HasBuffer()) acc.WithBuffer() assert.True(t, acc.HasBuffer()) @@ -54,7 +56,7 @@ func TestAccountWithBuffer(t *testing.T) { require.True(t, ok) assert.NoError(t, acc.Close()) - acc = newAccountSizeName(context.Background(), stats, in, 1, "test") + acc = newAccountSizeName(ctx, stats, in, 1, "test") acc.WithBuffer() // should not have a buffer for a small size _, ok = acc.in.(*asyncreader.AsyncReader) @@ -63,11 +65,12 @@ func TestAccountWithBuffer(t *testing.T) { } func TestAccountGetUpdateReader(t *testing.T) { + ctx := context.Background() test := func(doClose bool) func(t *testing.T) { return func(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - stats := NewStats() - acc := newAccountSizeName(context.Background(), stats, in, 1, "test") + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, 1, "test") assert.Equal(t, in, acc.GetReader()) assert.Equal(t, acc, stats.inProgress.get("test")) @@ -78,7 +81,7 @@ func TestAccountGetUpdateReader(t *testing.T) { } in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc.UpdateReader(context.Background(), in2) + acc.UpdateReader(ctx, in2) assert.Equal(t, in2, acc.GetReader()) assert.Equal(t, acc, stats.inProgress.get("test")) @@ -91,9 +94,10 @@ func TestAccountGetUpdateReader(t *testing.T) { } func TestAccountRead(t *testing.T) { + ctx := context.Background() in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) - stats := NewStats() - acc := newAccountSizeName(context.Background(), stats, in, 1, "test") + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, 1, "test") assert.True(t, acc.values.start.IsZero()) acc.values.mu.Lock() @@ -128,13 +132,14 @@ func TestAccountRead(t *testing.T) { } func testAccountWriteTo(t *testing.T, withBuffer bool) { + ctx := context.Background() buf := make([]byte, 2*asyncreader.BufferSize+1) for i := range buf { buf[i] = byte(i % 251) } in := ioutil.NopCloser(bytes.NewBuffer(buf)) - stats := NewStats() - acc := newAccountSizeName(context.Background(), stats, in, int64(len(buf)), "test") + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, int64(len(buf)), "test") if withBuffer { acc = acc.WithBuffer() } @@ -172,9 +177,10 @@ func TestAccountWriteToWithBuffer(t *testing.T) { } func TestAccountString(t *testing.T) { + ctx := context.Background() in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) - stats := NewStats() - acc := newAccountSizeName(context.Background(), stats, in, 3, "test") + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, 3, "test") // FIXME not an exhaustive test! @@ -192,9 +198,10 @@ func TestAccountString(t *testing.T) { // Test the Accounter interface methods on Account and accountStream func TestAccountAccounter(t *testing.T) { + ctx := context.Background() in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) - stats := NewStats() - acc := newAccountSizeName(context.Background(), stats, in, 3, "test") + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, 3, "test") assert.True(t, in == acc.OldStream()) @@ -250,6 +257,7 @@ func TestAccountAccounter(t *testing.T) { } func TestAccountMaxTransfer(t *testing.T) { + ctx := context.Background() old := fs.Config.MaxTransfer oldMode := fs.Config.CutoffMode @@ -260,8 +268,8 @@ func TestAccountMaxTransfer(t *testing.T) { }() in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) - stats := NewStats() - acc := newAccountSizeName(context.Background(), stats, in, 1, "test") + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, 1, "test") var b = make([]byte, 10) @@ -277,8 +285,8 @@ func TestAccountMaxTransfer(t *testing.T) { assert.True(t, fserrors.IsFatalError(err)) fs.Config.CutoffMode = fs.CutoffModeSoft - stats = NewStats() - acc = newAccountSizeName(context.Background(), stats, in, 1, "test") + stats = NewStats(ctx) + acc = newAccountSizeName(ctx, stats, in, 1, "test") n, err = acc.Read(b) assert.Equal(t, 10, n) @@ -292,6 +300,7 @@ func TestAccountMaxTransfer(t *testing.T) { } func TestAccountMaxTransferWriteTo(t *testing.T) { + ctx := context.Background() old := fs.Config.MaxTransfer oldMode := fs.Config.CutoffMode @@ -302,8 +311,8 @@ func TestAccountMaxTransferWriteTo(t *testing.T) { }() in := ioutil.NopCloser(readers.NewPatternReader(1024)) - stats := NewStats() - acc := newAccountSizeName(context.Background(), stats, in, 1, "test") + stats := NewStats(ctx) + acc := newAccountSizeName(ctx, stats, in, 1, "test") var b bytes.Buffer @@ -313,9 +322,10 @@ func TestAccountMaxTransferWriteTo(t *testing.T) { } func TestAccountReadCtx(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) - stats := NewStats() + stats := NewStats(ctx) acc := newAccountSizeName(ctx, stats, in, 1, "test") var b = make([]byte, 10) diff --git a/fs/accounting/inprogress.go b/fs/accounting/inprogress.go index ffc362c83..edd21a616 100644 --- a/fs/accounting/inprogress.go +++ b/fs/accounting/inprogress.go @@ -1,6 +1,7 @@ package accounting import ( + "context" "sync" "github.com/rclone/rclone/fs" @@ -13,7 +14,7 @@ type inProgress struct { } // newInProgress makes a new inProgress object -func newInProgress() *inProgress { +func newInProgress(ctx context.Context) *inProgress { return &inProgress{ m: make(map[string]*Account, fs.Config.Transfers), } diff --git a/fs/accounting/prometheus.go b/fs/accounting/prometheus.go index c9d165c95..a4c2a625e 100644 --- a/fs/accounting/prometheus.go +++ b/fs/accounting/prometheus.go @@ -1,6 +1,8 @@ package accounting import ( + "context" + "github.com/prometheus/client_golang/prometheus" ) @@ -8,6 +10,7 @@ var namespace = "rclone_" // RcloneCollector is a Prometheus collector for Rclone type RcloneCollector struct { + ctx context.Context bytesTransferred *prometheus.Desc transferSpeed *prometheus.Desc numOfErrors *prometheus.Desc @@ -21,8 +24,9 @@ type RcloneCollector struct { } // NewRcloneCollector make a new RcloneCollector -func NewRcloneCollector() *RcloneCollector { +func NewRcloneCollector(ctx context.Context) *RcloneCollector { return &RcloneCollector{ + ctx: ctx, bytesTransferred: prometheus.NewDesc(namespace+"bytes_transferred_total", "Total transferred bytes since the start of the Rclone process", nil, nil, @@ -82,7 +86,7 @@ func (c *RcloneCollector) Describe(ch chan<- *prometheus.Desc) { // Collect is part of the Collector interface: https://godoc.org/github.com/prometheus/client_golang/prometheus#Collector func (c *RcloneCollector) Collect(ch chan<- prometheus.Metric) { - s := groups.sum() + s := groups.sum(c.ctx) s.mu.RLock() ch <- prometheus.MustNewConstMetric(c.bytesTransferred, prometheus.CounterValue, float64(s.bytes)) diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index 1f4992897..c9502d2aa 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -2,6 +2,7 @@ package accounting import ( "bytes" + "context" "fmt" "sort" "strings" @@ -22,6 +23,7 @@ var startTime = time.Now() // StatsInfo accounts all transfers type StatsInfo struct { mu sync.RWMutex + ctx context.Context bytes int64 errors int64 lastError error @@ -49,11 +51,12 @@ type StatsInfo struct { } // NewStats creates an initialised StatsInfo -func NewStats() *StatsInfo { +func NewStats(ctx context.Context) *StatsInfo { return &StatsInfo{ + ctx: ctx, checking: newTransferMap(fs.Config.Checkers, "checking"), transferring: newTransferMap(fs.Config.Transfers, "transferring"), - inProgress: newInProgress(), + inProgress: newInProgress(ctx), } } @@ -332,10 +335,10 @@ func (s *StatsInfo) String() string { // Add per transfer stats if required if !fs.Config.StatsOneLine { if !s.checking.empty() { - _, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.inProgress, s.transferring)) + _, _ = fmt.Fprintf(buf, "Checking:\n%s\n", s.checking.String(s.ctx, s.inProgress, s.transferring)) } if !s.transferring.empty() { - _, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.inProgress, nil)) + _, _ = fmt.Fprintf(buf, "Transferring:\n%s\n", s.transferring.String(s.ctx, s.inProgress, nil)) } } diff --git a/fs/accounting/stats_groups.go b/fs/accounting/stats_groups.go index c1b2379cc..4767eb5e3 100644 --- a/fs/accounting/stats_groups.go +++ b/fs/accounting/stats_groups.go @@ -59,10 +59,10 @@ func rcRemoteStats(ctx context.Context, in rc.Params) (rc.Params, error) { return rc.Params{}, err } if group != "" { - return StatsGroup(group).RemoteStats() + return StatsGroup(ctx, group).RemoteStats() } - return groups.sum().RemoteStats() + return groups.sum(ctx).RemoteStats() } func init() { @@ -129,9 +129,9 @@ func rcTransferredStats(ctx context.Context, in rc.Params) (rc.Params, error) { out := make(rc.Params) if group != "" { - out["transferred"] = StatsGroup(group).Transferred() + out["transferred"] = StatsGroup(ctx, group).Transferred() } else { - out["transferred"] = groups.sum().Transferred() + out["transferred"] = groups.sum(ctx).Transferred() } return out, nil @@ -265,28 +265,28 @@ func Stats(ctx context.Context) *StatsInfo { if !ok { return GlobalStats() } - return StatsGroup(group) + return StatsGroup(ctx, group) } // StatsGroup gets stats by group name. -func StatsGroup(group string) *StatsInfo { +func StatsGroup(ctx context.Context, group string) *StatsInfo { stats := groups.get(group) if stats == nil { - return NewStatsGroup(group) + return NewStatsGroup(ctx, group) } return stats } // GlobalStats returns special stats used for global accounting. func GlobalStats() *StatsInfo { - return StatsGroup(globalStats) + return StatsGroup(context.Background(), globalStats) } // NewStatsGroup creates new stats under named group. -func NewStatsGroup(group string) *StatsInfo { - stats := NewStats() +func NewStatsGroup(ctx context.Context, group string) *StatsInfo { + stats := NewStats(ctx) stats.group = group - groups.set(group, stats) + groups.set(ctx, group, stats) return stats } @@ -305,7 +305,7 @@ func newStatsGroups() *statsGroups { } // set marks the stats as belonging to a group -func (sg *statsGroups) set(group string, stats *StatsInfo) { +func (sg *statsGroups) set(ctx context.Context, group string, stats *StatsInfo) { sg.mu.Lock() defer sg.mu.Unlock() @@ -343,11 +343,11 @@ func (sg *statsGroups) names() []string { } // sum returns aggregate stats that contains summation of all groups. -func (sg *statsGroups) sum() *StatsInfo { +func (sg *statsGroups) sum(ctx context.Context) *StatsInfo { sg.mu.Lock() defer sg.mu.Unlock() - sum := NewStats() + sum := NewStats(ctx) for _, stats := range sg.m { stats.mu.RLock() { diff --git a/fs/accounting/stats_groups_test.go b/fs/accounting/stats_groups_test.go index 224dff9ef..8d3673a56 100644 --- a/fs/accounting/stats_groups_test.go +++ b/fs/accounting/stats_groups_test.go @@ -1,6 +1,7 @@ package accounting import ( + "context" "fmt" "runtime" "testing" @@ -11,6 +12,7 @@ import ( ) func TestStatsGroupOperations(t *testing.T) { + ctx := context.Background() t.Run("empty group returns nil", func(t *testing.T) { t.Parallel() @@ -20,10 +22,10 @@ func TestStatsGroupOperations(t *testing.T) { t.Run("set assigns stats to group", func(t *testing.T) { t.Parallel() - stats := NewStats() + stats := NewStats(ctx) sg := newStatsGroups() - sg.set("test", stats) - sg.set("test1", stats) + sg.set(ctx, "test", stats) + sg.set(ctx, "test1", stats) if len(sg.m) != len(sg.names()) || len(sg.m) != 2 { t.Fatalf("Expected two stats got %d, %d", len(sg.m), len(sg.order)) } @@ -31,10 +33,10 @@ func TestStatsGroupOperations(t *testing.T) { t.Run("get returns correct group", func(t *testing.T) { t.Parallel() - stats := NewStats() + stats := NewStats(ctx) sg := newStatsGroups() - sg.set("test", stats) - sg.set("test1", stats) + sg.set(ctx, "test", stats) + sg.set(ctx, "test1", stats) got := sg.get("test") if got != stats { t.Fatal("get returns incorrect stats") @@ -43,20 +45,20 @@ func TestStatsGroupOperations(t *testing.T) { t.Run("sum returns correct values", func(t *testing.T) { t.Parallel() - stats1 := NewStats() + stats1 := NewStats(ctx) stats1.bytes = 5 stats1.errors = 6 stats1.oldDuration = time.Second stats1.oldTimeRanges = []timeRange{{time.Now(), time.Now().Add(time.Second)}} - stats2 := NewStats() + stats2 := NewStats(ctx) stats2.bytes = 10 stats2.errors = 12 stats2.oldDuration = 2 * time.Second stats2.oldTimeRanges = []timeRange{{time.Now(), time.Now().Add(2 * time.Second)}} sg := newStatsGroups() - sg.set("test1", stats1) - sg.set("test2", stats2) - sum := sg.sum() + sg.set(ctx, "test1", stats1) + sg.set(ctx, "test2", stats2) + sum := sg.sum(ctx) assert.Equal(t, stats1.bytes+stats2.bytes, sum.bytes) assert.Equal(t, stats1.errors+stats2.errors, sum.errors) assert.Equal(t, stats1.oldDuration+stats2.oldDuration, sum.oldDuration) @@ -70,10 +72,10 @@ func TestStatsGroupOperations(t *testing.T) { t.Run("delete removes stats", func(t *testing.T) { t.Parallel() - stats := NewStats() + stats := NewStats(ctx) sg := newStatsGroups() - sg.set("test", stats) - sg.set("test1", stats) + sg.set(ctx, "test", stats) + sg.set(ctx, "test1", stats) sg.delete("test1") if sg.get("test1") != nil { t.Fatal("stats not deleted") @@ -95,7 +97,7 @@ func TestStatsGroupOperations(t *testing.T) { runtime.ReadMemStats(&start) for i := 0; i < count; i++ { - sg.set(fmt.Sprintf("test-%d", i), NewStats()) + sg.set(ctx, fmt.Sprintf("test-%d", i), NewStats(ctx)) } for i := 0; i < count; i++ { diff --git a/fs/accounting/stats_test.go b/fs/accounting/stats_test.go index 2e462cb74..1b1814b2b 100644 --- a/fs/accounting/stats_test.go +++ b/fs/accounting/stats_test.go @@ -1,6 +1,7 @@ package accounting import ( + "context" "fmt" "io" "testing" @@ -67,7 +68,8 @@ func TestPercentage(t *testing.T) { } func TestStatsError(t *testing.T) { - s := NewStats() + ctx := context.Background() + s := NewStats(ctx) assert.Equal(t, int64(0), s.GetErrors()) assert.False(t, s.HadFatalError()) assert.False(t, s.HadRetryError()) @@ -132,6 +134,7 @@ func TestStatsError(t *testing.T) { } func TestStatsTotalDuration(t *testing.T) { + ctx := context.Background() startTime := time.Now() time1 := startTime.Add(-40 * time.Second) time2 := time1.Add(10 * time.Second) @@ -139,7 +142,7 @@ func TestStatsTotalDuration(t *testing.T) { time4 := time3.Add(10 * time.Second) t.Run("Single completed transfer", func(t *testing.T) { - s := NewStats() + s := NewStats(ctx) tr1 := &Transfer{ startedAt: time1, completedAt: time2, @@ -158,7 +161,7 @@ func TestStatsTotalDuration(t *testing.T) { }) t.Run("Single uncompleted transfer", func(t *testing.T) { - s := NewStats() + s := NewStats(ctx) tr1 := &Transfer{ startedAt: time1, } @@ -174,7 +177,7 @@ func TestStatsTotalDuration(t *testing.T) { }) t.Run("Overlapping without ending", func(t *testing.T) { - s := NewStats() + s := NewStats(ctx) tr1 := &Transfer{ startedAt: time2, completedAt: time3, @@ -218,7 +221,7 @@ func TestStatsTotalDuration(t *testing.T) { }) t.Run("Mixed completed and uncompleted transfers", func(t *testing.T) { - s := NewStats() + s := NewStats(ctx) s.AddTransfer(&Transfer{ startedAt: time1, completedAt: time2, @@ -382,6 +385,7 @@ func TestTimeRangeDuration(t *testing.T) { } func TestPruneTransfers(t *testing.T) { + ctx := context.Background() for _, test := range []struct { Name string Transfers int @@ -406,7 +410,7 @@ func TestPruneTransfers(t *testing.T) { MaxCompletedTransfers = test.Limit defer func() { MaxCompletedTransfers = prevLimit }() - s := NewStats() + s := NewStats(ctx) for i := int64(1); i <= int64(test.Transfers); i++ { s.AddTransfer(&Transfer{ startedAt: time.Unix(i, 0), diff --git a/fs/accounting/token_bucket.go b/fs/accounting/token_bucket.go index e920b2f19..b4a72f1d6 100644 --- a/fs/accounting/token_bucket.go +++ b/fs/accounting/token_bucket.go @@ -35,7 +35,7 @@ func newTokenBucket(bandwidth fs.SizeSuffix) *rate.Limiter { } // StartTokenBucket starts the token bucket if necessary -func StartTokenBucket() { +func StartTokenBucket(ctx context.Context) { currLimitMu.Lock() currLimit := fs.Config.BwLimit.LimitAt(time.Now()) currLimitMu.Unlock() @@ -51,7 +51,7 @@ func StartTokenBucket() { } // StartTokenTicker creates a ticker to update the bandwidth limiter every minute. -func StartTokenTicker() { +func StartTokenTicker(ctx context.Context) { // If the timetable has a single entry or was not specified, we don't need // a ticker to update the bandwidth. if len(fs.Config.BwLimit) <= 1 { diff --git a/fs/accounting/transfer.go b/fs/accounting/transfer.go index e51d97c2d..8172bb727 100644 --- a/fs/accounting/transfer.go +++ b/fs/accounting/transfer.go @@ -86,7 +86,7 @@ func newTransferRemoteSize(stats *StatsInfo, remote string, size int64, checking // Done ends the transfer. // Must be called after transfer is finished to run proper cleanups. -func (tr *Transfer) Done(err error) { +func (tr *Transfer) Done(ctx context.Context, err error) { if err != nil { err = tr.stats.Error(err) @@ -123,7 +123,7 @@ func (tr *Transfer) Done(err error) { } // Reset allows to switch the Account to another transfer method. -func (tr *Transfer) Reset() { +func (tr *Transfer) Reset(ctx context.Context) { tr.mu.RLock() acc := tr.acc tr.acc = nil diff --git a/fs/accounting/transfermap.go b/fs/accounting/transfermap.go index ef3801b15..22a5f8c1c 100644 --- a/fs/accounting/transfermap.go +++ b/fs/accounting/transfermap.go @@ -1,6 +1,7 @@ package accounting import ( + "context" "fmt" "sort" "strings" @@ -88,7 +89,7 @@ func (tm *transferMap) _sortedSlice() []*Transfer { // String returns string representation of map items excluding any in // exclude (if set). -func (tm *transferMap) String(progress *inProgress, exclude *transferMap) string { +func (tm *transferMap) String(ctx context.Context, progress *inProgress, exclude *transferMap) string { tm.mu.RLock() defer tm.mu.RUnlock() stringList := make([]string, 0, len(tm.items)) diff --git a/fs/config/config.go b/fs/config/config.go index f7066ad83..fb3d90189 100644 --- a/fs/config/config.go +++ b/fs/config/config.go @@ -4,6 +4,7 @@ package config import ( "bufio" "bytes" + "context" "crypto/rand" "crypto/sha256" "encoding/base64" @@ -113,7 +114,7 @@ func init() { func getConfigData() *goconfig.ConfigFile { if configFile == nil { - LoadConfig() + LoadConfig(context.Background()) } return configFile } @@ -212,7 +213,7 @@ func makeConfigPath() string { } // LoadConfig loads the config file -func LoadConfig() { +func LoadConfig(ctx context.Context) { // Set RCLONE_CONFIG_DIR for backend config and subprocesses _ = os.Setenv("RCLONE_CONFIG_DIR", filepath.Dir(ConfigPath)) @@ -229,10 +230,10 @@ func LoadConfig() { } // Start the token bucket limiter - accounting.StartTokenBucket() + accounting.StartTokenBucket(ctx) // Start the bandwidth update ticker - accounting.StartTokenTicker() + accounting.StartTokenTicker(ctx) // Start the transactions per second limiter fshttp.StartHTTPTokenBucket() diff --git a/fs/config/config_test.go b/fs/config/config_test.go index 7bea45f5e..eb05e9242 100644 --- a/fs/config/config_test.go +++ b/fs/config/config_test.go @@ -2,6 +2,7 @@ package config import ( "bytes" + "context" "fmt" "io/ioutil" "os" @@ -15,6 +16,7 @@ import ( ) func testConfigFile(t *testing.T, configFileName string) func() { + ctx := context.Background() configKey = nil // reset password _ = os.Unsetenv("_RCLONE_CONFIG_KEY_FILE") _ = os.Unsetenv("RCLONE_CONFIG_PASS") @@ -36,7 +38,7 @@ func testConfigFile(t *testing.T, configFileName string) func() { fs.Config = &fs.ConfigInfo{} configFile = nil - LoadConfig() + LoadConfig(ctx) assert.Equal(t, []string{}, getConfigData().GetSectionList()) // Fake a remote diff --git a/fs/operations/check.go b/fs/operations/check.go index a1edc0f3d..619b73249 100644 --- a/fs/operations/check.go +++ b/fs/operations/check.go @@ -116,7 +116,7 @@ func (c *checkMarch) SrcOnly(src fs.DirEntry) (recurse bool) { func (c *checkMarch) checkIdentical(ctx context.Context, dst, src fs.Object) (differ bool, noHash bool, err error) { tr := accounting.Stats(ctx).NewCheckingTransfer(src) defer func() { - tr.Done(err) + tr.Done(ctx, err) }() if sizeDiffers(src, dst) { err = errors.Errorf("Sizes differ") @@ -323,7 +323,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo } tr1 := accounting.Stats(ctx).NewTransfer(dst) defer func() { - tr1.Done(nil) // error handling is done by the caller + tr1.Done(ctx, nil) // error handling is done by the caller }() in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer @@ -333,7 +333,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo } tr2 := accounting.Stats(ctx).NewTransfer(dst) defer func() { - tr2.Done(nil) // error handling is done by the caller + tr2.Done(ctx, nil) // error handling is done by the caller }() in2 = tr2.Account(ctx, in2).WithBuffer() // account and buffer the transfer diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index 988c3bce0..849af6bd7 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -132,7 +132,7 @@ func TestMultithreadCopy(t *testing.T) { tr := accounting.GlobalStats().NewTransfer(src) defer func() { - tr.Done(err) + tr.Done(ctx, err) }() dst, err := multiThreadCopy(ctx, r.Flocal, "file1", src, 2, tr) require.NoError(t, err) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 43725343f..d967d11de 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -359,7 +359,7 @@ func CommonHash(fa, fb fs.Info) (hash.Type, *fs.HashesOption) { func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { tr := accounting.Stats(ctx).NewTransfer(src) defer func() { - tr.Done(err) + tr.Done(ctx, err) }() newDst = dst if SkipDestructive(ctx, src, "copy") { @@ -401,7 +401,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj _ = in.Close() } if err == fs.ErrorCantCopy { - tr.Reset() // skip incomplete accounting - will be overwritten by the manual copy below + tr.Reset(ctx) // skip incomplete accounting - will be overwritten by the manual copy below } } else { err = fs.ErrorCantCopy @@ -478,7 +478,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj // Retry if err returned a retry error if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) { fs.Debugf(src, "Received error: %v - low level retry %d/%d", err, tries, maxTries) - tr.Reset() // skip incomplete accounting - will be overwritten by retry + tr.Reset(ctx) // skip incomplete accounting - will be overwritten by retry continue } // otherwise finish @@ -550,7 +550,7 @@ func Move(ctx context.Context, fdst fs.Fs, dst fs.Object, remote string, src fs. if err == nil { accounting.Stats(ctx).Renames(1) } - tr.Done(err) + tr.Done(ctx, err) }() newDst = dst if SkipDestructive(ctx, src, "move") { @@ -627,7 +627,7 @@ func SuffixName(remote string) string { func DeleteFileWithBackupDir(ctx context.Context, dst fs.Object, backupDir fs.Fs) (err error) { tr := accounting.Stats(ctx).NewCheckingTransfer(dst) defer func() { - tr.Done(err) + tr.Done(ctx, err) }() numDeletes := accounting.Stats(ctx).Deletes(1) if fs.Config.MaxDelete != -1 && numDeletes > fs.Config.MaxDelete { @@ -817,7 +817,7 @@ func ListLong(ctx context.Context, f fs.Fs, w io.Writer) error { return ListFn(ctx, f, func(o fs.Object) { tr := accounting.Stats(ctx).NewCheckingTransfer(o) defer func() { - tr.Done(nil) + tr.Done(ctx, nil) }() modTime := o.ModTime(ctx) syncFprintf(w, "%9d %s %s\n", o.Size(), modTime.Local().Format("2006-01-02 15:04:05.000000000"), o.Remote()) @@ -850,7 +850,7 @@ func hashSum(ctx context.Context, ht hash.Type, o fs.Object) (string, error) { var err error tr := accounting.Stats(ctx).NewCheckingTransfer(o) defer func() { - tr.Done(err) + tr.Done(ctx, err) }() sum, err := o.Hash(ctx, ht) if err == hash.ErrUnsupported { @@ -1058,7 +1058,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { var err error tr := accounting.Stats(ctx).NewTransfer(o) defer func() { - tr.Done(err) + tr.Done(ctx, err) }() opt := fs.RangeOption{Start: offset, End: -1} size := o.Size() @@ -1100,7 +1100,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time) (dst fs.Object, err error) { tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1) defer func() { - tr.Done(err) + tr.Done(ctx, err) }() in = tr.Account(ctx, in).WithBuffer() @@ -1447,7 +1447,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo // Size known use Put tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, size) defer func() { - tr.Done(err) + tr.Done(ctx, err) }() body := ioutil.NopCloser(in) // we let the server close the body in := tr.Account(ctx, body) // account the transfer (no buffering) @@ -1624,7 +1624,7 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str } tr := accounting.Stats(ctx).NewTransfer(srcObj) defer func() { - tr.Done(err) + tr.Done(ctx, err) }() tmpObj, err := Op(ctx, fdst, nil, tmpObjName, srcObj) if err != nil { @@ -1673,7 +1673,7 @@ func moveOrCopyFile(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, dstFileName str if !cp { err = DeleteFile(ctx, srcObj) } - tr.Done(err) + tr.Done(ctx, err) } return err } diff --git a/fs/rc/rcserver/rcserver.go b/fs/rc/rcserver/rcserver.go index 4b904d7fa..1c5d98d10 100644 --- a/fs/rc/rcserver/rcserver.go +++ b/fs/rc/rcserver/rcserver.go @@ -42,7 +42,7 @@ var promHandler http.Handler var onlyOnceWarningAllowOrigin sync.Once func init() { - rcloneCollector := accounting.NewRcloneCollector() + rcloneCollector := accounting.NewRcloneCollector(context.Background()) prometheus.MustRegister(rcloneCollector) promHandler = promhttp.Handler() } diff --git a/fs/sync/sync.go b/fs/sync/sync.go index 62bae0820..165527dae 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -344,7 +344,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.W } } } - tr.Done(err) + tr.Done(s.ctx, err) } } @@ -749,7 +749,7 @@ func (s *syncCopyMove) makeRenameMap() { s.pushRenameMap(hash, obj) } - tr.Done(nil) + tr.Done(s.ctx, nil) } } }() diff --git a/fstest/fstest.go b/fstest/fstest.go index 0f91cf85e..82a22cc53 100644 --- a/fstest/fstest.go +++ b/fstest/fstest.go @@ -58,6 +58,7 @@ func init() { // Initialise rclone for testing func Initialise() { + ctx := context.Background() // Never ask for passwords, fail instead. // If your local config is encrypted set environment variable // "RCLONE_CONFIG_PASS=hunter2" (or your password) @@ -68,7 +69,7 @@ func Initialise() { if envConfig := os.Getenv("RCLONE_CONFIG"); envConfig != "" { config.ConfigPath = envConfig } - config.LoadConfig() + config.LoadConfig(ctx) if *Verbose { fs.Config.LogLevel = fs.LogLevelDebug } diff --git a/vfs/read.go b/vfs/read.go index 45437f5b4..865b922c9 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -17,7 +17,7 @@ import ( // ReadFileHandle is an open for read file handle on a File type ReadFileHandle struct { baseHandle - done func(err error) + done func(ctx context.Context, err error) mu sync.Mutex cond *sync.Cond // cond lock for out of sequence reads closed bool // set if handle has been closed @@ -414,7 +414,7 @@ func (fh *ReadFileHandle) close() error { if fh.opened { var err error defer func() { - fh.done(err) + fh.done(context.TODO(), err) }() // Close first so that we have hashes err = fh.r.Close() diff --git a/vfs/vfscache/downloaders/downloaders.go b/vfs/vfscache/downloaders/downloaders.go index 399fcd3dd..a82f2a364 100644 --- a/vfs/vfscache/downloaders/downloaders.go +++ b/vfs/vfscache/downloaders/downloaders.go @@ -553,7 +553,7 @@ func (dl *downloader) close(inErr error) (err error) { dl.in = nil } if dl.tr != nil { - dl.tr.Done(inErr) + dl.tr.Done(dl.dls.ctx, inErr) dl.tr = nil } dl._closed = true