Swap to using iterators for uploading to avoid storing all chunks in memory

This commit is contained in:
David Dworken 2024-02-04 21:03:39 -08:00
parent 08598f4954
commit 632ecc5c81
No known key found for this signature in database
4 changed files with 66 additions and 40 deletions

View File

@ -2829,11 +2829,24 @@ func testMultipleUsers(t *testing.T, tester shellTester) {
func createSyntheticImportEntries(t testing.TB, numSyntheticEntries int) {
homedir, err := os.UserHomeDir()
require.NoError(t, err)
f, err := os.OpenFile(path.Join(homedir, ".bash_history"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
filenames := []string{".bash_history", ".zsh_history", ".zhistory"}
numFiles := len(filenames) + 1 // The +1 accounts for the fish history file
for _, filename := range filenames {
f, err := os.OpenFile(path.Join(homedir, filename), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
defer f.Close()
for i := 1; i <= numSyntheticEntries; i++ {
_, err := f.WriteString(fmt.Sprintf("echo command-%d\n", i))
for i := 1; i <= numSyntheticEntries/numFiles; i++ {
_, err := f.WriteString(fmt.Sprintf("echo command-%s-%d\n", filename, i))
require.NoError(t, err)
}
require.NoError(t, f.Close())
}
// Write the file for fish too, in the special fish format
f, err := os.OpenFile(path.Join(homedir, ".local/share/fish/fish_history"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
defer f.Close()
for i := 1; i <= numSyntheticEntries/numFiles; i++ {
_, err := f.WriteString(fmt.Sprintf("- cmd: echo command-fish-%d\n", i))
require.NoError(t, err)
}
require.NoError(t, f.Close())
@ -2868,32 +2881,6 @@ func TestImportHistory(t *testing.T) {
testutils.CompareGoldens(t, out, "TestImportHistory-export")
}
func BenchmarkImport(b *testing.B) {
b.StopTimer()
// Setup
tester := bashTester{}
defer testutils.BackupAndRestore(b)()
// Benchmark it
for n := 0; n < b.N; n++ {
// Setup
testutils.ResetLocalState(b)
installHishtory(b, tester, "")
// Create a large history in bash that we will pre-import
numSyntheticEntries := 100_000
createSyntheticImportEntries(b, numSyntheticEntries)
// Benchmarked code:
b.StartTimer()
ctx := hctx.MakeContext()
numImported, err := lib.ImportHistory(ctx, false, true)
require.NoError(b, err)
require.GreaterOrEqual(b, numImported, numSyntheticEntries)
b.StopTimer()
}
}
func TestAugmentedIsOfflineError(t *testing.T) {
defer testutils.BackupAndRestore(t)()
installHishtory(t, zshTester{}, "")
@ -2910,4 +2897,30 @@ func TestAugmentedIsOfflineError(t *testing.T) {
require.True(t, lib.IsOfflineError(ctx, fmt.Errorf("unchecked error type")))
}
func BenchmarkImport(b *testing.B) {
b.StopTimer()
// Setup
tester := zshTester{}
defer testutils.BackupAndRestore(b)()
// Benchmark it
for n := 0; n < b.N; n++ {
// Setup
testutils.ResetLocalState(b)
installHishtory(b, tester, "")
// Create a large history in bash that we will pre-import
numSyntheticEntries := 1_000_000
createSyntheticImportEntries(b, numSyntheticEntries)
// Benchmarked code:
b.StartTimer()
ctx := hctx.MakeContext()
numImported, err := lib.ImportHistory(ctx, false, true)
require.NoError(b, err)
require.GreaterOrEqual(b, numImported, numSyntheticEntries)
b.StopTimer()
}
}
// TODO: somehow test/confirm that hishtory works even if only bash/only zsh is installed

View File

@ -246,6 +246,7 @@ func countLinesInFiles(filenames ...string) (int, error) {
if err != nil {
return 0, err
}
hctx.GetLogger().Infof("Importing history entries, file=%#v contains %d lines", f, l)
total += l
}
return total, nil
@ -680,7 +681,7 @@ func Reupload(ctx context.Context) error {
defer bar.Finish()
}
chunkSize := 500
chunks := shared.Chunks(entries, chunkSize)
chunks := shared.ChunksIter(entries, chunkSize)
return shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error {
jsonValue, err := EncryptAndMarshal(config, chunk)
if err != nil {

View File

@ -122,3 +122,20 @@ func Chunks[k any](slice []k, chunkSize int) [][]k {
}
return chunks
}
type Seq1[K any] func(yield func(K) bool) bool
func ChunksIter[k any](slice []k, chunkSize int) Seq1[[]k] {
return func(yield func([]k) bool) bool {
for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize
if end > len(slice) {
end = len(slice)
}
if !yield(slice[i:end]) {
return false
}
}
return true
}
}

View File

@ -2,14 +2,12 @@ package shared
import "sync"
func ForEach[T any](arr []T, numThreads int, fn func(T) error) error {
func ForEach[T any](iter Seq1[T], numThreads int, fn func(T) error) error {
wg := &sync.WaitGroup{}
wg.Add(len(arr))
limiter := make(chan bool, numThreads)
var errors []error
for _, item := range arr {
iter(func(item T) bool {
wg.Add(1)
limiter <- true
go func(x T) {
defer wg.Done()
@ -19,11 +17,8 @@ func ForEach[T any](arr []T, numThreads int, fn func(T) error) error {
}
<-limiter
}(item)
if len(errors) > 0 {
return errors[0]
}
}
return true
})
wg.Wait()
if len(errors) > 0 {
return errors[0]