Chunk uploads for reuploading

This commit is contained in:
David Dworken 2024-02-04 21:44:00 -08:00
parent 632ecc5c81
commit ed583c36a3
No known key found for this signature in database
3 changed files with 63 additions and 29 deletions

View File

@ -21,7 +21,6 @@ import (
"github.com/ddworken/hishtory/shared" "github.com/ddworken/hishtory/shared"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"gorm.io/gorm"
) )
var offlineInit *bool var offlineInit *bool
@ -41,7 +40,7 @@ var installCmd = &cobra.Command{
if os.Getenv("HISHTORY_SKIP_INIT_IMPORT") == "" { if os.Getenv("HISHTORY_SKIP_INIT_IMPORT") == "" {
db, err := hctx.OpenLocalSqliteDb() db, err := hctx.OpenLocalSqliteDb()
lib.CheckFatalError(err) lib.CheckFatalError(err)
count, err := countStoredEntries(db) count, err := lib.CountStoredEntries(db)
lib.CheckFatalError(err) lib.CheckFatalError(err)
if count < 10 { if count < 10 {
fmt.Println("Importing existing shell history...") fmt.Println("Importing existing shell history...")
@ -65,7 +64,7 @@ var initCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
db, err := hctx.OpenLocalSqliteDb() db, err := hctx.OpenLocalSqliteDb()
lib.CheckFatalError(err) lib.CheckFatalError(err)
count, err := countStoredEntries(db) count, err := lib.CountStoredEntries(db)
lib.CheckFatalError(err) lib.CheckFatalError(err)
if count > 0 { if count > 0 {
fmt.Printf("Your current hishtory profile has saved history entries, are you sure you want to run `init` and reset?\nNote: This won't clear any imported history entries from your existing shell\n[y/N]") fmt.Printf("Your current hishtory profile has saved history entries, are you sure you want to run `init` and reset?\nNote: This won't clear any imported history entries from your existing shell\n[y/N]")
@ -128,13 +127,6 @@ var uninstallCmd = &cobra.Command{
}, },
} }
func countStoredEntries(db *gorm.DB) (int64, error) {
return lib.RetryingDbFunctionWithResult(func() (int64, error) {
var count int64
return count, db.Model(&data.HistoryEntry{}).Count(&count).Error
})
}
func warnIfUnsupportedBashVersion() error { func warnIfUnsupportedBashVersion() error {
_, err := exec.LookPath("bash") _, err := exec.LookPath("bash")
if err != nil { if err != nil {

View File

@ -670,32 +670,59 @@ func Reupload(ctx context.Context) error {
if config.IsOffline { if config.IsOffline {
return nil return nil
} }
entries, err := Search(ctx, hctx.GetDb(ctx), "", 0) numEntries, err := CountStoredEntries(hctx.GetDb(ctx))
if err != nil { if err != nil {
return fmt.Errorf("failed to reupload due to failed search: %w", err) return fmt.Errorf("failed to upload history entries due to error in counting entries: %v", err)
} }
var bar *progressbar.ProgressBar var bar *progressbar.ProgressBar
if len(entries) > NUM_IMPORTED_ENTRIES_SLOW { if numEntries > int64(NUM_IMPORTED_ENTRIES_SLOW) {
fmt.Println("Persisting history entries") fmt.Println("Persisting history entries")
bar = progressbar.Default(int64(len(entries))) bar = progressbar.Default(int64(numEntries))
defer bar.Finish() defer bar.Finish()
} }
chunkSize := 500
chunks := shared.ChunksIter(entries, chunkSize) // This number is a balance between speed and memory usage. If we make it too high, then
return shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error { // it will mean we use a ton of memory (since we retrieve all of those entries). But if
jsonValue, err := EncryptAndMarshal(config, chunk) // we make it too low, then it will have to do repeated SQL queries with OFFSETs, which
// are inherently slow.
searchChunkSize := 300_000
currentOffset := 0
for {
entries, err := SearchWithOffset(ctx, hctx.GetDb(ctx), "", searchChunkSize, currentOffset)
if err != nil { if err != nil {
return fmt.Errorf("failed to reupload due to failed encryption: %w", err) return fmt.Errorf("failed to reupload due to failed search: %w", err)
} }
_, err = ApiPost(ctx, "/api/v1/submit?source_device_id="+config.DeviceId, "application/json", jsonValue) if len(entries) == 0 {
if currentOffset == 0 {
return fmt.Errorf("found no entries for reuploading, something went wrong")
} else {
return nil
}
}
currentOffset += searchChunkSize
// This number is a balance between speed, and ensuring that we don't send too much data
// in a single request (since large individual requests are extremely slow). From benchmarking,
// it is apparent that this value seems to work quite well.
uploadChunkSize := 500
chunks := shared.ChunksIter(entries, uploadChunkSize)
err = shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error {
jsonValue, err := EncryptAndMarshal(config, chunk)
if err != nil {
return fmt.Errorf("failed to reupload due to failed encryption: %w", err)
}
_, err = ApiPost(ctx, "/api/v1/submit?source_device_id="+config.DeviceId, "application/json", jsonValue)
if err != nil {
return fmt.Errorf("failed to reupload due to failed POST: %w", err)
}
if bar != nil {
_ = bar.Add(uploadChunkSize)
}
return nil
})
if err != nil { if err != nil {
return fmt.Errorf("failed to reupload due to failed POST: %w", err) return err
} }
if bar != nil { }
_ = bar.Add(chunkSize)
}
return nil
})
} }
func RetrieveAdditionalEntriesFromRemote(ctx context.Context, queryReason string) error { func RetrieveAdditionalEntriesFromRemote(ctx context.Context, queryReason string) error {
@ -833,12 +860,16 @@ func MakeWhereQueryFromSearch(ctx context.Context, db *gorm.DB, query string) (*
} }
func Search(ctx context.Context, db *gorm.DB, query string, limit int) ([]*data.HistoryEntry, error) { func Search(ctx context.Context, db *gorm.DB, query string, limit int) ([]*data.HistoryEntry, error) {
return retryingSearch(ctx, db, query, limit, 0) return SearchWithOffset(ctx, db, query, limit, 0)
}
func SearchWithOffset(ctx context.Context, db *gorm.DB, query string, limit, offset int) ([]*data.HistoryEntry, error) {
return retryingSearch(ctx, db, query, limit, offset, 0)
} }
const SEARCH_RETRY_COUNT = 3 const SEARCH_RETRY_COUNT = 3
func retryingSearch(ctx context.Context, db *gorm.DB, query string, limit int, currentRetryNum int) ([]*data.HistoryEntry, error) { func retryingSearch(ctx context.Context, db *gorm.DB, query string, limit, offset int, currentRetryNum int) ([]*data.HistoryEntry, error) {
if ctx == nil && query != "" { if ctx == nil && query != "" {
return nil, fmt.Errorf("lib.Search called with a nil context and a non-empty query (this should never happen)") return nil, fmt.Errorf("lib.Search called with a nil context and a non-empty query (this should never happen)")
} }
@ -856,13 +887,16 @@ func retryingSearch(ctx context.Context, db *gorm.DB, query string, limit int, c
if limit > 0 { if limit > 0 {
tx = tx.Limit(limit) tx = tx.Limit(limit)
} }
if offset > 0 {
tx = tx.Offset(offset)
}
var historyEntries []*data.HistoryEntry var historyEntries []*data.HistoryEntry
result := tx.Find(&historyEntries) result := tx.Find(&historyEntries)
if result.Error != nil { if result.Error != nil {
if strings.Contains(result.Error.Error(), SQLITE_LOCKED_ERR_MSG) && currentRetryNum < SEARCH_RETRY_COUNT { if strings.Contains(result.Error.Error(), SQLITE_LOCKED_ERR_MSG) && currentRetryNum < SEARCH_RETRY_COUNT {
hctx.GetLogger().Infof("Ignoring err=%v and retrying search query, cnt=%d", result.Error, currentRetryNum) hctx.GetLogger().Infof("Ignoring err=%v and retrying search query, cnt=%d", result.Error, currentRetryNum)
time.Sleep(time.Duration(currentRetryNum*rand.Intn(50)) * time.Millisecond) time.Sleep(time.Duration(currentRetryNum*rand.Intn(50)) * time.Millisecond)
return retryingSearch(ctx, db, query, limit, currentRetryNum+1) return retryingSearch(ctx, db, query, limit, offset, currentRetryNum+1)
} }
return nil, fmt.Errorf("DB query error: %w", result.Error) return nil, fmt.Errorf("DB query error: %w", result.Error)
} }
@ -1070,3 +1104,10 @@ func SendDeletionRequest(ctx context.Context, deletionRequest shared.DeletionReq
} }
return nil return nil
} }
func CountStoredEntries(db *gorm.DB) (int64, error) {
return RetryingDbFunctionWithResult(func() (int64, error) {
var count int64
return count, db.Model(&data.HistoryEntry{}).Count(&count).Error
})
}

View File

@ -86,6 +86,7 @@ func BackupAndRestoreWithId(t testing.TB, id string) func() {
path.Join(homedir, data.GetHishtoryPath(), "hishtory"), path.Join(homedir, data.GetHishtoryPath(), "hishtory"),
path.Join(homedir, ".bash_history"), path.Join(homedir, ".bash_history"),
path.Join(homedir, ".zsh_history"), path.Join(homedir, ".zsh_history"),
path.Join(homedir, ".zhistory"),
path.Join(homedir, ".local/share/fish/fish_history"), path.Join(homedir, ".local/share/fish/fish_history"),
} }
for _, file := range renameFiles { for _, file := range renameFiles {