From 632ecc5c819742fec397623e47387dd2ad5a1b04 Mon Sep 17 00:00:00 2001 From: David Dworken Date: Sun, 4 Feb 2024 21:03:39 -0800 Subject: [PATCH] Swap to using iterators for uploading to avoid storing all chunks in memory --- client/client_test.go | 71 +++++++++++++++++++++++++------------------ client/lib/lib.go | 3 +- shared/data.go | 17 +++++++++++ shared/utils.go | 15 +++------ 4 files changed, 66 insertions(+), 40 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 1daa008..72958da 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -2829,11 +2829,24 @@ func testMultipleUsers(t *testing.T, tester shellTester) { func createSyntheticImportEntries(t testing.TB, numSyntheticEntries int) { homedir, err := os.UserHomeDir() require.NoError(t, err) - f, err := os.OpenFile(path.Join(homedir, ".bash_history"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + filenames := []string{".bash_history", ".zsh_history", ".zhistory"} + numFiles := len(filenames) + 1 // The +1 accounts for the fish history file + for _, filename := range filenames { + f, err := os.OpenFile(path.Join(homedir, filename), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + require.NoError(t, err) + defer f.Close() + for i := 1; i <= numSyntheticEntries/numFiles; i++ { + _, err := f.WriteString(fmt.Sprintf("echo command-%s-%d\n", filename, i)) + require.NoError(t, err) + } + require.NoError(t, f.Close()) + } + // Write the file for fish too, in the special fish format + f, err := os.OpenFile(path.Join(homedir, ".local/share/fish/fish_history"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) require.NoError(t, err) defer f.Close() - for i := 1; i <= numSyntheticEntries; i++ { - _, err := f.WriteString(fmt.Sprintf("echo command-%d\n", i)) + for i := 1; i <= numSyntheticEntries/numFiles; i++ { + _, err := f.WriteString(fmt.Sprintf("- cmd: echo command-fish-%d\n", i)) require.NoError(t, err) } require.NoError(t, f.Close()) @@ -2868,32 +2881,6 @@ func TestImportHistory(t *testing.T) { testutils.CompareGoldens(t, out, "TestImportHistory-export") } -func BenchmarkImport(b *testing.B) { - b.StopTimer() - // Setup - tester := bashTester{} - defer testutils.BackupAndRestore(b)() - - // Benchmark it - for n := 0; n < b.N; n++ { - // Setup - testutils.ResetLocalState(b) - installHishtory(b, tester, "") - - // Create a large history in bash that we will pre-import - numSyntheticEntries := 100_000 - createSyntheticImportEntries(b, numSyntheticEntries) - - // Benchmarked code: - b.StartTimer() - ctx := hctx.MakeContext() - numImported, err := lib.ImportHistory(ctx, false, true) - require.NoError(b, err) - require.GreaterOrEqual(b, numImported, numSyntheticEntries) - b.StopTimer() - } -} - func TestAugmentedIsOfflineError(t *testing.T) { defer testutils.BackupAndRestore(t)() installHishtory(t, zshTester{}, "") @@ -2910,4 +2897,30 @@ func TestAugmentedIsOfflineError(t *testing.T) { require.True(t, lib.IsOfflineError(ctx, fmt.Errorf("unchecked error type"))) } +func BenchmarkImport(b *testing.B) { + b.StopTimer() + // Setup + tester := zshTester{} + defer testutils.BackupAndRestore(b)() + + // Benchmark it + for n := 0; n < b.N; n++ { + // Setup + testutils.ResetLocalState(b) + installHishtory(b, tester, "") + + // Create a large history in bash that we will pre-import + numSyntheticEntries := 1_000_000 + createSyntheticImportEntries(b, numSyntheticEntries) + + // Benchmarked code: + b.StartTimer() + ctx := hctx.MakeContext() + numImported, err := lib.ImportHistory(ctx, false, true) + require.NoError(b, err) + require.GreaterOrEqual(b, numImported, numSyntheticEntries) + b.StopTimer() + } +} + // TODO: somehow test/confirm that hishtory works even if only bash/only zsh is installed diff --git a/client/lib/lib.go b/client/lib/lib.go index 6c85e3f..1d746f0 100644 --- a/client/lib/lib.go +++ b/client/lib/lib.go @@ -246,6 +246,7 @@ func countLinesInFiles(filenames ...string) (int, error) { if err != nil { return 0, err } + hctx.GetLogger().Infof("Importing history entries, file=%#v contains %d lines", f, l) total += l } return total, nil @@ -680,7 +681,7 @@ func Reupload(ctx context.Context) error { defer bar.Finish() } chunkSize := 500 - chunks := shared.Chunks(entries, chunkSize) + chunks := shared.ChunksIter(entries, chunkSize) return shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error { jsonValue, err := EncryptAndMarshal(config, chunk) if err != nil { diff --git a/shared/data.go b/shared/data.go index 5cd5aef..292fa3c 100644 --- a/shared/data.go +++ b/shared/data.go @@ -122,3 +122,20 @@ func Chunks[k any](slice []k, chunkSize int) [][]k { } return chunks } + +type Seq1[K any] func(yield func(K) bool) bool + +func ChunksIter[k any](slice []k, chunkSize int) Seq1[[]k] { + return func(yield func([]k) bool) bool { + for i := 0; i < len(slice); i += chunkSize { + end := i + chunkSize + if end > len(slice) { + end = len(slice) + } + if !yield(slice[i:end]) { + return false + } + } + return true + } +} diff --git a/shared/utils.go b/shared/utils.go index 1c4b1f7..37c210c 100644 --- a/shared/utils.go +++ b/shared/utils.go @@ -2,14 +2,12 @@ package shared import "sync" -func ForEach[T any](arr []T, numThreads int, fn func(T) error) error { +func ForEach[T any](iter Seq1[T], numThreads int, fn func(T) error) error { wg := &sync.WaitGroup{} - wg.Add(len(arr)) - limiter := make(chan bool, numThreads) - var errors []error - for _, item := range arr { + iter(func(item T) bool { + wg.Add(1) limiter <- true go func(x T) { defer wg.Done() @@ -19,11 +17,8 @@ func ForEach[T any](arr []T, numThreads int, fn func(T) error) error { } <-limiter }(item) - if len(errors) > 0 { - return errors[0] - } - } - + return true + }) wg.Wait() if len(errors) > 0 { return errors[0]