Revert "Swap to using iterators for uploading to avoid storing all chunks in memory"

This reverts commit 632ecc5c81.
This commit is contained in:
David Dworken 2024-02-04 22:19:11 -08:00
parent ed583c36a3
commit 7c07236dc0
No known key found for this signature in database
4 changed files with 40 additions and 66 deletions

View File

@ -2829,24 +2829,11 @@ func testMultipleUsers(t *testing.T, tester shellTester) {
func createSyntheticImportEntries(t testing.TB, numSyntheticEntries int) {
homedir, err := os.UserHomeDir()
require.NoError(t, err)
filenames := []string{".bash_history", ".zsh_history", ".zhistory"}
numFiles := len(filenames) + 1 // The +1 accounts for the fish history file
for _, filename := range filenames {
f, err := os.OpenFile(path.Join(homedir, filename), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
defer f.Close()
for i := 1; i <= numSyntheticEntries/numFiles; i++ {
_, err := f.WriteString(fmt.Sprintf("echo command-%s-%d\n", filename, i))
require.NoError(t, err)
}
require.NoError(t, f.Close())
}
// Write the file for fish too, in the special fish format
f, err := os.OpenFile(path.Join(homedir, ".local/share/fish/fish_history"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
f, err := os.OpenFile(path.Join(homedir, ".bash_history"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
defer f.Close()
for i := 1; i <= numSyntheticEntries/numFiles; i++ {
_, err := f.WriteString(fmt.Sprintf("- cmd: echo command-fish-%d\n", i))
for i := 1; i <= numSyntheticEntries; i++ {
_, err := f.WriteString(fmt.Sprintf("echo command-%d\n", i))
require.NoError(t, err)
}
require.NoError(t, f.Close())
@ -2881,6 +2868,32 @@ func TestImportHistory(t *testing.T) {
testutils.CompareGoldens(t, out, "TestImportHistory-export")
}
func BenchmarkImport(b *testing.B) {
b.StopTimer()
// Setup
tester := bashTester{}
defer testutils.BackupAndRestore(b)()
// Benchmark it
for n := 0; n < b.N; n++ {
// Setup
testutils.ResetLocalState(b)
installHishtory(b, tester, "")
// Create a large history in bash that we will pre-import
numSyntheticEntries := 100_000
createSyntheticImportEntries(b, numSyntheticEntries)
// Benchmarked code:
b.StartTimer()
ctx := hctx.MakeContext()
numImported, err := lib.ImportHistory(ctx, false, true)
require.NoError(b, err)
require.GreaterOrEqual(b, numImported, numSyntheticEntries)
b.StopTimer()
}
}
func TestAugmentedIsOfflineError(t *testing.T) {
defer testutils.BackupAndRestore(t)()
installHishtory(t, zshTester{}, "")
@ -2897,30 +2910,4 @@ func TestAugmentedIsOfflineError(t *testing.T) {
require.True(t, lib.IsOfflineError(ctx, fmt.Errorf("unchecked error type")))
}
func BenchmarkImport(b *testing.B) {
b.StopTimer()
// Setup
tester := zshTester{}
defer testutils.BackupAndRestore(b)()
// Benchmark it
for n := 0; n < b.N; n++ {
// Setup
testutils.ResetLocalState(b)
installHishtory(b, tester, "")
// Create a large history in bash that we will pre-import
numSyntheticEntries := 1_000_000
createSyntheticImportEntries(b, numSyntheticEntries)
// Benchmarked code:
b.StartTimer()
ctx := hctx.MakeContext()
numImported, err := lib.ImportHistory(ctx, false, true)
require.NoError(b, err)
require.GreaterOrEqual(b, numImported, numSyntheticEntries)
b.StopTimer()
}
}
// TODO: somehow test/confirm that hishtory works even if only bash/only zsh is installed

View File

@ -246,7 +246,6 @@ func countLinesInFiles(filenames ...string) (int, error) {
if err != nil {
return 0, err
}
hctx.GetLogger().Infof("Importing history entries, file=%#v contains %d lines", f, l)
total += l
}
return total, nil
@ -704,7 +703,7 @@ func Reupload(ctx context.Context) error {
// in a single request (since large individual requests are extremely slow). From benchmarking,
// it is apparent that this value seems to work quite well.
uploadChunkSize := 500
chunks := shared.ChunksIter(entries, uploadChunkSize)
chunks := shared.Chunks(entries, uploadChunkSize)
err = shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error {
jsonValue, err := EncryptAndMarshal(config, chunk)
if err != nil {

View File

@ -122,20 +122,3 @@ func Chunks[k any](slice []k, chunkSize int) [][]k {
}
return chunks
}
type Seq1[K any] func(yield func(K) bool) bool
func ChunksIter[k any](slice []k, chunkSize int) Seq1[[]k] {
return func(yield func([]k) bool) bool {
for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize
if end > len(slice) {
end = len(slice)
}
if !yield(slice[i:end]) {
return false
}
}
return true
}
}

View File

@ -2,12 +2,14 @@ package shared
import "sync"
func ForEach[T any](iter Seq1[T], numThreads int, fn func(T) error) error {
func ForEach[T any](arr []T, numThreads int, fn func(T) error) error {
wg := &sync.WaitGroup{}
wg.Add(len(arr))
limiter := make(chan bool, numThreads)
var errors []error
iter(func(item T) bool {
wg.Add(1)
for _, item := range arr {
limiter <- true
go func(x T) {
defer wg.Done()
@ -17,8 +19,11 @@ func ForEach[T any](iter Seq1[T], numThreads int, fn func(T) error) error {
}
<-limiter
}(item)
return true
})
if len(errors) > 0 {
return errors[0]
}
}
wg.Wait()
if len(errors) > 0 {
return errors[0]