From 50dc179d332af4a3dc0e69e2c4e39bbbccd3fec5 Mon Sep 17 00:00:00 2001 From: tobi <31960611+tsmethurst@users.noreply.github.com> Date: Tue, 22 Nov 2022 19:38:10 +0100 Subject: [PATCH] [feature] Prune timelines once per hour to plug memory leak (#1117) * export highest/lowest ULIDs as proper const * add stop + start to timeline manager, other small fixes * unexport unused interface funcs + tidy up * add LastGot func * add timeline Prune function * test prune * update lastGot --- internal/db/bundb/notification_test.go | 8 +- internal/id/ulid.go | 24 +- internal/processing/processor.go | 11 + internal/timeline/get.go | 82 +++-- internal/timeline/get_test.go | 165 +--------- internal/timeline/index.go | 145 ++++----- internal/timeline/index_test.go | 68 ---- .../{itemindex.go => indexeditems.go} | 28 +- internal/timeline/manager.go | 59 +++- internal/timeline/prepare.go | 298 +++++++++--------- internal/timeline/prepareditems.go | 24 +- internal/timeline/prune.go | 68 ++++ internal/timeline/prune_test.go | 110 +++++++ internal/timeline/remove.go | 19 +- internal/timeline/timeline.go | 80 ++--- internal/typeutils/internaltoas.go | 7 - 16 files changed, 594 insertions(+), 602 deletions(-) rename internal/timeline/{itemindex.go => indexeditems.go} (70%) create mode 100644 internal/timeline/prune.go create mode 100644 internal/timeline/prune_test.go diff --git a/internal/db/bundb/notification_test.go b/internal/db/bundb/notification_test.go index cd5022951..fda85fe39 100644 --- a/internal/db/bundb/notification_test.go +++ b/internal/db/bundb/notification_test.go @@ -91,7 +91,7 @@ func (suite *NotificationTestSuite) TestGetNotificationsWithSpam() { suite.spamNotifs() testAccount := suite.testAccounts["local_account_1"] before := time.Now() - notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", "00000000000000000000000000") + notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, id.Highest, id.Lowest) suite.NoError(err) timeTaken := time.Since(before) fmt.Printf("\n\n\n withSpam: got %d notifications in %s\n\n\n", len(notifications), timeTaken) @@ -105,7 +105,7 @@ func (suite *NotificationTestSuite) TestGetNotificationsWithSpam() { func (suite *NotificationTestSuite) TestGetNotificationsWithoutSpam() { testAccount := suite.testAccounts["local_account_1"] before := time.Now() - notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", "00000000000000000000000000") + notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, id.Highest, id.Lowest) suite.NoError(err) timeTaken := time.Since(before) fmt.Printf("\n\n\n withoutSpam: got %d notifications in %s\n\n\n", len(notifications), timeTaken) @@ -122,7 +122,7 @@ func (suite *NotificationTestSuite) TestClearNotificationsWithSpam() { err := suite.db.ClearNotifications(context.Background(), testAccount.ID) suite.NoError(err) - notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", "00000000000000000000000000") + notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, id.Highest, id.Lowest) suite.NoError(err) suite.NotNil(notifications) suite.Empty(notifications) @@ -134,7 +134,7 @@ func (suite *NotificationTestSuite) TestClearNotificationsWithTwoAccounts() { err := suite.db.ClearNotifications(context.Background(), testAccount.ID) suite.NoError(err) - notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", "00000000000000000000000000") + notifications, err := suite.db.GetNotifications(context.Background(), testAccount.ID, []string{}, 20, id.Highest, id.Lowest) suite.NoError(err) suite.NotNil(notifications) suite.Empty(notifications) diff --git a/internal/id/ulid.go b/internal/id/ulid.go index 1b0c2e537..d61d7fcec 100644 --- a/internal/id/ulid.go +++ b/internal/id/ulid.go @@ -1,3 +1,21 @@ +/* + GoToSocial + Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + package id import ( @@ -8,7 +26,11 @@ "github.com/oklog/ulid" ) -const randomRange = 631152381 // ~20 years in seconds +const ( + Highest = "ZZZZZZZZZZZZZZZZZZZZZZZZZZ" // Highest is the highest possible ULID + Lowest = "00000000000000000000000000" // Lowest is the lowest possible ULID + randomRange = 631152381 // ~20 years in seconds +) // ULID represents a Universally Unique Lexicographically Sortable Identifier of 26 characters. See https://github.com/oklog/ulid type ULID string diff --git a/internal/processing/processor.go b/internal/processing/processor.go index b7d42ffeb..f464a08b4 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -351,6 +351,11 @@ func (p *processor) Start() error { return err } + // Start status timelines + if err := p.statusTimelines.Start(); err != nil { + return err + } + return nil } @@ -359,8 +364,14 @@ func (p *processor) Stop() error { if err := p.clientWorker.Stop(); err != nil { return err } + if err := p.fedWorker.Stop(); err != nil { return err } + + if err := p.statusTimelines.Stop(); err != nil { + return err + } + return nil } diff --git a/internal/timeline/get.go b/internal/timeline/get.go index cb6399ec9..b286ab369 100644 --- a/internal/timeline/get.go +++ b/internal/timeline/get.go @@ -23,6 +23,7 @@ "context" "errors" "fmt" + "time" "codeberg.org/gruf/go-kv" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -30,16 +31,27 @@ const retries = 5 +func (t *timeline) LastGot() time.Time { + t.Lock() + defer t.Unlock() + return t.lastGot +} + func (t *timeline) Get(ctx context.Context, amount int, maxID string, sinceID string, minID string, prepareNext bool) ([]Preparable, error) { l := log.WithFields(kv.Fields{ - {"accountID", t.accountID}, {"amount", amount}, {"maxID", maxID}, {"sinceID", sinceID}, {"minID", minID}, }...) - l.Debug("entering get") + l.Debug("entering get and updating t.lastGot") + + // regardless of what happens below, update the + // last time Get was called for this timeline + t.Lock() + t.lastGot = time.Now() + t.Unlock() var items []Preparable var err error @@ -47,7 +59,7 @@ func (t *timeline) Get(ctx context.Context, amount int, maxID string, sinceID st // no params are defined to just fetch from the top // this is equivalent to a user asking for the top x items from their timeline if maxID == "" && sinceID == "" && minID == "" { - items, err = t.GetXFromTop(ctx, amount) + items, err = t.getXFromTop(ctx, amount) // aysnchronously prepare the next predicted query so it's ready when the user asks for it if len(items) != 0 { nextMaxID := items[len(items)-1].GetID() @@ -67,7 +79,7 @@ func (t *timeline) Get(ctx context.Context, amount int, maxID string, sinceID st // this is equivalent to a user asking for the next x items from their timeline, starting from maxID if maxID != "" && sinceID == "" { attempts := 0 - items, err = t.GetXBehindID(ctx, amount, maxID, &attempts) + items, err = t.getXBehindID(ctx, amount, maxID, &attempts) // aysnchronously prepare the next predicted query so it's ready when the user asks for it if len(items) != 0 { nextMaxID := items[len(items)-1].GetID() @@ -86,25 +98,26 @@ func (t *timeline) Get(ctx context.Context, amount int, maxID string, sinceID st // maxID is defined and sinceID || minID are as well, so take a slice between them // this is equivalent to a user asking for items older than x but newer than y if maxID != "" && sinceID != "" { - items, err = t.GetXBetweenID(ctx, amount, maxID, minID) + items, err = t.getXBetweenID(ctx, amount, maxID, minID) } if maxID != "" && minID != "" { - items, err = t.GetXBetweenID(ctx, amount, maxID, minID) + items, err = t.getXBetweenID(ctx, amount, maxID, minID) } // maxID isn't defined, but sinceID || minID are, so take x before // this is equivalent to a user asking for items newer than x (eg., refreshing the top of their timeline) if maxID == "" && sinceID != "" { - items, err = t.GetXBeforeID(ctx, amount, sinceID, true) + items, err = t.getXBeforeID(ctx, amount, sinceID, true) } if maxID == "" && minID != "" { - items, err = t.GetXBeforeID(ctx, amount, minID, true) + items, err = t.getXBeforeID(ctx, amount, minID, true) } return items, err } -func (t *timeline) GetXFromTop(ctx context.Context, amount int) ([]Preparable, error) { +// getXFromTop returns x amount of items from the top of the timeline, from newest to oldest. +func (t *timeline) getXFromTop(ctx context.Context, amount int) ([]Preparable, error) { // make a slice of preparedItems with the length we need to return preparedItems := make([]Preparable, 0, amount) @@ -124,7 +137,7 @@ func (t *timeline) GetXFromTop(ctx context.Context, amount int) ([]Preparable, e for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return nil, errors.New("GetXFromTop: could not parse e as a preparedItemsEntry") + return nil, errors.New("getXFromTop: could not parse e as a preparedItemsEntry") } preparedItems = append(preparedItems, entry.prepared) served++ @@ -136,9 +149,12 @@ func (t *timeline) GetXFromTop(ctx context.Context, amount int) ([]Preparable, e return preparedItems, nil } -func (t *timeline) GetXBehindID(ctx context.Context, amount int, behindID string, attempts *int) ([]Preparable, error) { +// getXBehindID returns x amount of items from the given id onwards, from newest to oldest. +// This will NOT include the item with the given ID. +// +// This corresponds to an api call to /timelines/home?max_id=WHATEVER +func (t *timeline) getXBehindID(ctx context.Context, amount int, behindID string, attempts *int) ([]Preparable, error) { l := log.WithFields(kv.Fields{ - {"amount", amount}, {"behindID", behindID}, {"attempts", attempts}, @@ -164,7 +180,7 @@ func (t *timeline) GetXBehindID(ctx context.Context, amount int, behindID string position++ entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return nil, errors.New("GetXBehindID: could not parse e as a preparedPostsEntry") + return nil, errors.New("getXBehindID: could not parse e as a preparedPostsEntry") } if entry.itemID <= behindID { @@ -177,10 +193,10 @@ func (t *timeline) GetXBehindID(ctx context.Context, amount int, behindID string // we didn't find it, so we need to make sure it's indexed and prepared and then try again // this can happen when a user asks for really old items if behindIDMark == nil { - if err := t.PrepareBehind(ctx, behindID, amount); err != nil { - return nil, fmt.Errorf("GetXBehindID: error preparing behind and including ID %s", behindID) + if err := t.prepareBehind(ctx, behindID, amount); err != nil { + return nil, fmt.Errorf("getXBehindID: error preparing behind and including ID %s", behindID) } - oldestID, err := t.OldestPreparedItemID(ctx) + oldestID, err := t.oldestPreparedItemID(ctx) if err != nil { return nil, err } @@ -196,13 +212,13 @@ func (t *timeline) GetXBehindID(ctx context.Context, amount int, behindID string l.Tracef("exceeded retries looking for behindID %s", behindID) return items, nil } - l.Trace("trying GetXBehindID again") - return t.GetXBehindID(ctx, amount, behindID, attempts) + l.Trace("trying getXBehindID again") + return t.getXBehindID(ctx, amount, behindID, attempts) } // make sure we have enough items prepared behind it to return what we're being asked for if t.preparedItems.data.Len() < amount+position { - if err := t.PrepareBehind(ctx, behindID, amount); err != nil { + if err := t.prepareBehind(ctx, behindID, amount); err != nil { return nil, err } } @@ -213,7 +229,7 @@ func (t *timeline) GetXBehindID(ctx context.Context, amount int, behindID string for e := behindIDMark.Next(); e != nil; e = e.Next() { entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return nil, errors.New("GetXBehindID: could not parse e as a preparedPostsEntry") + return nil, errors.New("getXBehindID: could not parse e as a preparedPostsEntry") } // serve up to the amount requested @@ -227,7 +243,11 @@ func (t *timeline) GetXBehindID(ctx context.Context, amount int, behindID string return items, nil } -func (t *timeline) GetXBeforeID(ctx context.Context, amount int, beforeID string, startFromTop bool) ([]Preparable, error) { +// getXBeforeID returns x amount of items up to the given id, from newest to oldest. +// This will NOT include the item with the given ID. +// +// This corresponds to an api call to /timelines/home?since_id=WHATEVER +func (t *timeline) getXBeforeID(ctx context.Context, amount int, beforeID string, startFromTop bool) ([]Preparable, error) { // make a slice of items with the length we need to return items := make([]Preparable, 0, amount) @@ -241,7 +261,7 @@ func (t *timeline) GetXBeforeID(ctx context.Context, amount int, beforeID string for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return nil, errors.New("GetXBeforeID: could not parse e as a preparedPostsEntry") + return nil, errors.New("getXBeforeID: could not parse e as a preparedPostsEntry") } if entry.itemID >= beforeID { @@ -263,7 +283,7 @@ func (t *timeline) GetXBeforeID(ctx context.Context, amount int, beforeID string for e := t.preparedItems.data.Front(); e != nil; e = e.Next() { entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return nil, errors.New("GetXBeforeID: could not parse e as a preparedPostsEntry") + return nil, errors.New("getXBeforeID: could not parse e as a preparedPostsEntry") } if entry.itemID == beforeID { @@ -283,7 +303,7 @@ func (t *timeline) GetXBeforeID(ctx context.Context, amount int, beforeID string for e := beforeIDMark.Prev(); e != nil; e = e.Prev() { entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return nil, errors.New("GetXBeforeID: could not parse e as a preparedPostsEntry") + return nil, errors.New("getXBeforeID: could not parse e as a preparedPostsEntry") } // serve up to the amount requested @@ -298,7 +318,11 @@ func (t *timeline) GetXBeforeID(ctx context.Context, amount int, beforeID string return items, nil } -func (t *timeline) GetXBetweenID(ctx context.Context, amount int, behindID string, beforeID string) ([]Preparable, error) { +// getXBetweenID returns x amount of items from the given maxID, up to the given id, from newest to oldest. +// This will NOT include the item with the given IDs. +// +// This corresponds to an api call to /timelines/home?since_id=WHATEVER&max_id=WHATEVER_ELSE +func (t *timeline) getXBetweenID(ctx context.Context, amount int, behindID string, beforeID string) ([]Preparable, error) { // make a slice of items with the length we need to return items := make([]Preparable, 0, amount) @@ -314,7 +338,7 @@ func (t *timeline) GetXBetweenID(ctx context.Context, amount int, behindID strin position++ entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return nil, errors.New("GetXBetweenID: could not parse e as a preparedPostsEntry") + return nil, errors.New("getXBetweenID: could not parse e as a preparedPostsEntry") } if entry.itemID == behindID { @@ -325,12 +349,12 @@ func (t *timeline) GetXBetweenID(ctx context.Context, amount int, behindID strin // we didn't find it if behindIDMark == nil { - return nil, fmt.Errorf("GetXBetweenID: couldn't find item with ID %s", behindID) + return nil, fmt.Errorf("getXBetweenID: couldn't find item with ID %s", behindID) } // make sure we have enough items prepared behind it to return what we're being asked for if t.preparedItems.data.Len() < amount+position { - if err := t.PrepareBehind(ctx, behindID, amount); err != nil { + if err := t.prepareBehind(ctx, behindID, amount); err != nil { return nil, err } } @@ -341,7 +365,7 @@ func (t *timeline) GetXBetweenID(ctx context.Context, amount int, behindID strin for e := behindIDMark.Next(); e != nil; e = e.Next() { entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return nil, errors.New("GetXBetweenID: could not parse e as a preparedPostsEntry") + return nil, errors.New("getXBetweenID: could not parse e as a preparedPostsEntry") } if entry.itemID == beforeID { diff --git a/internal/timeline/get_test.go b/internal/timeline/get_test.go index 3af0ae2f6..03ef1dc9c 100644 --- a/internal/timeline/get_test.go +++ b/internal/timeline/get_test.go @@ -89,6 +89,9 @@ func (suite *GetTestSuite) TearDownTest() { } func (suite *GetTestSuite) TestGetDefault() { + // lastGot should be zero + suite.Zero(suite.timeline.LastGot()) + // get 10 20 the top and don't prepare the next query statuses, err := suite.timeline.Get(context.Background(), 20, "", "", "", false) if err != nil { @@ -108,6 +111,9 @@ func (suite *GetTestSuite) TestGetDefault() { highest = s.GetID() } } + + // lastGot should be up to date + suite.WithinDuration(time.Now(), suite.timeline.LastGot(), 1*time.Second) } func (suite *GetTestSuite) TestGetDefaultPrepareNext() { @@ -297,165 +303,6 @@ func (suite *GetTestSuite) TestGetBetweenIDPrepareNext() { time.Sleep(1 * time.Second) } -func (suite *GetTestSuite) TestGetXFromTop() { - // get 5 from the top - statuses, err := suite.timeline.GetXFromTop(context.Background(), 5) - if err != nil { - suite.FailNow(err.Error()) - } - - suite.Len(statuses, 5) - - // statuses should be sorted highest to lowest ID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - } -} - -func (suite *GetTestSuite) TestGetXBehindID() { - // get 3 behind the 'middle' id - var attempts *int - a := 0 - attempts = &a - statuses, err := suite.timeline.GetXBehindID(context.Background(), 3, "01F8MHBQCBTDKN6X5VHGMMN4MA", attempts) - if err != nil { - suite.FailNow(err.Error()) - } - - suite.Len(statuses, 3) - - // statuses should be sorted highest to lowest ID - // all status IDs should be less than the behindID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - suite.Less(s.GetID(), "01F8MHBQCBTDKN6X5VHGMMN4MA") - } -} - -func (suite *GetTestSuite) TestGetXBehindID0() { - // try to get behind 0, the lowest possible ID - var attempts *int - a := 0 - attempts = &a - statuses, err := suite.timeline.GetXBehindID(context.Background(), 3, "0", attempts) - if err != nil { - suite.FailNow(err.Error()) - } - - // there's nothing beyond it so len should be 0 - suite.Len(statuses, 0) -} - -func (suite *GetTestSuite) TestGetXBehindNonexistentReasonableID() { - // try to get behind an id that doesn't exist, but is close to one that does so we should still get statuses back - var attempts *int - a := 0 - attempts = &a - statuses, err := suite.timeline.GetXBehindID(context.Background(), 3, "01F8MHBQCBTDKN6X5VHGMMN4MB", attempts) // change the last A to a B - if err != nil { - suite.FailNow(err.Error()) - } - suite.Len(statuses, 3) - - // statuses should be sorted highest to lowest ID - // all status IDs should be less than the behindID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - suite.Less(s.GetID(), "01F8MHBCN8120SYH7D5S050MGK") - } -} - -func (suite *GetTestSuite) TestGetXBehindVeryHighID() { - // try to get behind an id that doesn't exist, and is higher than any other ID we could possibly have - var attempts *int - a := 0 - attempts = &a - statuses, err := suite.timeline.GetXBehindID(context.Background(), 7, "9998MHBQCBTDKN6X5VHGMMN4MA", attempts) - if err != nil { - suite.FailNow(err.Error()) - } - - // we should get all 7 statuses we asked for because they all have lower IDs than the very high ID given in the query - suite.Len(statuses, 7) - - // statuses should be sorted highest to lowest ID - // all status IDs should be less than the behindID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - suite.Less(s.GetID(), "9998MHBQCBTDKN6X5VHGMMN4MA") - } -} - -func (suite *GetTestSuite) TestGetXBeforeID() { - // get 3 before the 'middle' id - statuses, err := suite.timeline.GetXBeforeID(context.Background(), 3, "01F8MHBQCBTDKN6X5VHGMMN4MA", true) - if err != nil { - suite.FailNow(err.Error()) - } - - suite.Len(statuses, 3) - - // statuses should be sorted highest to lowest ID - // all status IDs should be greater than the beforeID - var highest string - for i, s := range statuses { - if i == 0 { - highest = s.GetID() - } else { - suite.Less(s.GetID(), highest) - highest = s.GetID() - } - suite.Greater(s.GetID(), "01F8MHBQCBTDKN6X5VHGMMN4MA") - } -} - -func (suite *GetTestSuite) TestGetXBeforeIDNoStartFromTop() { - // get 3 before the 'middle' id - statuses, err := suite.timeline.GetXBeforeID(context.Background(), 3, "01F8MHBQCBTDKN6X5VHGMMN4MA", false) - if err != nil { - suite.FailNow(err.Error()) - } - - suite.Len(statuses, 3) - - // statuses should be sorted lowest to highest ID - // all status IDs should be greater than the beforeID - var lowest string - for i, s := range statuses { - if i == 0 { - lowest = s.GetID() - } else { - suite.Greater(s.GetID(), lowest) - lowest = s.GetID() - } - suite.Greater(s.GetID(), "01F8MHBQCBTDKN6X5VHGMMN4MA") - } -} - func TestGetTestSuite(t *testing.T) { suite.Run(t, new(GetTestSuite)) } diff --git a/internal/timeline/index.go b/internal/timeline/index.go index 76c16977b..f56199530 100644 --- a/internal/timeline/index.go +++ b/internal/timeline/index.go @@ -28,79 +28,80 @@ "github.com/superseriousbusiness/gotosocial/internal/log" ) -func (t *timeline) IndexBefore(ctx context.Context, itemID string, amount int) error { - l := log.WithFields(kv.Fields{ - - {"amount", amount}, - }...) - - // lazily initialize index if it hasn't been done already - if t.itemIndex.data == nil { - t.itemIndex.data = &list.List{} - t.itemIndex.data.Init() +func (t *timeline) ItemIndexLength(ctx context.Context) int { + if t.indexedItems == nil || t.indexedItems.data == nil { + return 0 } - - toIndex := []Timelineable{} - offsetID := itemID - - l.Trace("entering grabloop") -grabloop: - for i := 0; len(toIndex) < amount && i < 5; i++ { // try the grabloop 5 times only - // first grab items using the caller-provided grab function - l.Trace("grabbing...") - items, stop, err := t.grabFunction(ctx, t.accountID, "", "", offsetID, amount) - if err != nil { - return err - } - if stop { - break grabloop - } - - l.Trace("filtering...") - // now filter each item using the caller-provided filter function - for _, item := range items { - shouldIndex, err := t.filterFunction(ctx, t.accountID, item) - if err != nil { - return err - } - if shouldIndex { - toIndex = append(toIndex, item) - } - offsetID = item.GetID() - } - } - l.Trace("left grabloop") - - // index the items we got - for _, s := range toIndex { - if _, err := t.IndexOne(ctx, s.GetID(), s.GetBoostOfID(), s.GetAccountID(), s.GetBoostOfAccountID()); err != nil { - return fmt.Errorf("IndexBehind: error indexing item with id %s: %s", s.GetID(), err) - } - } - - return nil + return t.indexedItems.data.Len() } -func (t *timeline) IndexBehind(ctx context.Context, itemID string, amount int) error { - l := log.WithFields(kv.Fields{ +// func (t *timeline) indexBefore(ctx context.Context, itemID string, amount int) error { +// l := log.WithFields(kv.Fields{{"amount", amount}}...) - {"amount", amount}, - }...) +// // lazily initialize index if it hasn't been done already +// if t.indexedItems.data == nil { +// t.indexedItems.data = &list.List{} +// t.indexedItems.data.Init() +// } + +// toIndex := []Timelineable{} +// offsetID := itemID + +// l.Trace("entering grabloop") +// grabloop: +// for i := 0; len(toIndex) < amount && i < 5; i++ { // try the grabloop 5 times only +// // first grab items using the caller-provided grab function +// l.Trace("grabbing...") +// items, stop, err := t.grabFunction(ctx, t.accountID, "", "", offsetID, amount) +// if err != nil { +// return err +// } +// if stop { +// break grabloop +// } + +// l.Trace("filtering...") +// // now filter each item using the caller-provided filter function +// for _, item := range items { +// shouldIndex, err := t.filterFunction(ctx, t.accountID, item) +// if err != nil { +// return err +// } +// if shouldIndex { +// toIndex = append(toIndex, item) +// } +// offsetID = item.GetID() +// } +// } +// l.Trace("left grabloop") + +// // index the items we got +// for _, s := range toIndex { +// if _, err := t.IndexOne(ctx, s.GetID(), s.GetBoostOfID(), s.GetAccountID(), s.GetBoostOfAccountID()); err != nil { +// return fmt.Errorf("indexBefore: error indexing item with id %s: %s", s.GetID(), err) +// } +// } + +// return nil +// } + +func (t *timeline) indexBehind(ctx context.Context, itemID string, amount int) error { + l := log.WithFields(kv.Fields{{"amount", amount}}...) // lazily initialize index if it hasn't been done already - if t.itemIndex.data == nil { - t.itemIndex.data = &list.List{} - t.itemIndex.data.Init() + if t.indexedItems.data == nil { + t.indexedItems.data = &list.List{} + t.indexedItems.data.Init() } // If we're already indexedBehind given itemID by the required amount, we can return nil. // First find position of itemID (or as near as possible). var position int positionLoop: - for e := t.itemIndex.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*itemIndexEntry) + for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*indexedItemsEntry) if !ok { - return errors.New("IndexBehind: could not parse e as an itemIndexEntry") + return errors.New("indexBehind: could not parse e as an itemIndexEntry") } if entry.itemID <= itemID { @@ -111,7 +112,7 @@ func (t *timeline) IndexBehind(ctx context.Context, itemID string, amount int) e } // now check if the length of indexed items exceeds the amount of items required (position of itemID, plus amount of posts requested after that) - if t.itemIndex.data.Len() > position+amount { + if t.indexedItems.data.Len() > position+amount { // we have enough indexed behind already to satisfy amount, so don't need to make db calls l.Trace("returning nil since we already have enough items indexed") return nil @@ -151,7 +152,7 @@ func (t *timeline) IndexBehind(ctx context.Context, itemID string, amount int) e // index the items we got for _, s := range toIndex { if _, err := t.IndexOne(ctx, s.GetID(), s.GetBoostOfID(), s.GetAccountID(), s.GetBoostOfAccountID()); err != nil { - return fmt.Errorf("IndexBehind: error indexing item with id %s: %s", s.GetID(), err) + return fmt.Errorf("indexBehind: error indexing item with id %s: %s", s.GetID(), err) } } @@ -162,28 +163,28 @@ func (t *timeline) IndexOne(ctx context.Context, itemID string, boostOfID string t.Lock() defer t.Unlock() - postIndexEntry := &itemIndexEntry{ + postIndexEntry := &indexedItemsEntry{ itemID: itemID, boostOfID: boostOfID, accountID: accountID, boostOfAccountID: boostOfAccountID, } - return t.itemIndex.insertIndexed(ctx, postIndexEntry) + return t.indexedItems.insertIndexed(ctx, postIndexEntry) } func (t *timeline) IndexAndPrepareOne(ctx context.Context, statusID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) { t.Lock() defer t.Unlock() - postIndexEntry := &itemIndexEntry{ + postIndexEntry := &indexedItemsEntry{ itemID: statusID, boostOfID: boostOfID, accountID: accountID, boostOfAccountID: boostOfAccountID, } - inserted, err := t.itemIndex.insertIndexed(ctx, postIndexEntry) + inserted, err := t.indexedItems.insertIndexed(ctx, postIndexEntry) if err != nil { return inserted, fmt.Errorf("IndexAndPrepareOne: error inserting indexed: %s", err) } @@ -199,13 +200,13 @@ func (t *timeline) IndexAndPrepareOne(ctx context.Context, statusID string, boos func (t *timeline) OldestIndexedItemID(ctx context.Context) (string, error) { var id string - if t.itemIndex == nil || t.itemIndex.data == nil || t.itemIndex.data.Back() == nil { + if t.indexedItems == nil || t.indexedItems.data == nil || t.indexedItems.data.Back() == nil { // return an empty string if postindex hasn't been initialized yet return id, nil } - e := t.itemIndex.data.Back() - entry, ok := e.Value.(*itemIndexEntry) + e := t.indexedItems.data.Back() + entry, ok := e.Value.(*indexedItemsEntry) if !ok { return id, errors.New("OldestIndexedItemID: could not parse e as itemIndexEntry") } @@ -214,13 +215,13 @@ func (t *timeline) OldestIndexedItemID(ctx context.Context) (string, error) { func (t *timeline) NewestIndexedItemID(ctx context.Context) (string, error) { var id string - if t.itemIndex == nil || t.itemIndex.data == nil || t.itemIndex.data.Front() == nil { + if t.indexedItems == nil || t.indexedItems.data == nil || t.indexedItems.data.Front() == nil { // return an empty string if postindex hasn't been initialized yet return id, nil } - e := t.itemIndex.data.Front() - entry, ok := e.Value.(*itemIndexEntry) + e := t.indexedItems.data.Front() + entry, ok := e.Value.(*indexedItemsEntry) if !ok { return id, errors.New("NewestIndexedItemID: could not parse e as itemIndexEntry") } diff --git a/internal/timeline/index_test.go b/internal/timeline/index_test.go index c89c85a09..5571bfb05 100644 --- a/internal/timeline/index_test.go +++ b/internal/timeline/index_test.go @@ -69,63 +69,6 @@ func (suite *IndexTestSuite) TearDownTest() { testrig.StandardDBTeardown(suite.db) } -func (suite *IndexTestSuite) TestIndexBeforeLowID() { - // index 10 before the lowest status ID possible - err := suite.timeline.IndexBefore(context.Background(), "00000000000000000000000000", 10) - suite.NoError(err) - - postID, err := suite.timeline.OldestIndexedItemID(context.Background()) - suite.NoError(err) - suite.Equal("01F8MHBQCBTDKN6X5VHGMMN4MA", postID) - - indexLength := suite.timeline.ItemIndexLength(context.Background()) - suite.Equal(10, indexLength) -} - -func (suite *IndexTestSuite) TestIndexBeforeHighID() { - // index 10 before the highest status ID possible - err := suite.timeline.IndexBefore(context.Background(), "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", 10) - suite.NoError(err) - - // the oldest indexed post should be empty - postID, err := suite.timeline.OldestIndexedItemID(context.Background()) - suite.NoError(err) - suite.Empty(postID) - - // indexLength should be 0 - indexLength := suite.timeline.ItemIndexLength(context.Background()) - suite.Equal(0, indexLength) -} - -func (suite *IndexTestSuite) TestIndexBehindHighID() { - // index 10 behind the highest status ID possible - err := suite.timeline.IndexBehind(context.Background(), "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", 10) - suite.NoError(err) - - // the newest indexed post should be the highest one we have in our testrig - postID, err := suite.timeline.NewestIndexedItemID(context.Background()) - suite.NoError(err) - suite.Equal("01G36SF3V6Y6V5BF9P4R7PQG7G", postID) - - indexLength := suite.timeline.ItemIndexLength(context.Background()) - suite.Equal(10, indexLength) -} - -func (suite *IndexTestSuite) TestIndexBehindLowID() { - // index 10 behind the lowest status ID possible - err := suite.timeline.IndexBehind(context.Background(), "00000000000000000000000000", 10) - suite.NoError(err) - - // the newest indexed post should be empty - postID, err := suite.timeline.NewestIndexedItemID(context.Background()) - suite.NoError(err) - suite.Empty(postID) - - // indexLength should be 0 - indexLength := suite.timeline.ItemIndexLength(context.Background()) - suite.Equal(0, indexLength) -} - func (suite *IndexTestSuite) TestOldestIndexedItemIDEmpty() { // the oldest indexed post should be an empty string since there's nothing indexed yet postID, err := suite.timeline.OldestIndexedItemID(context.Background()) @@ -137,17 +80,6 @@ func (suite *IndexTestSuite) TestOldestIndexedItemIDEmpty() { suite.Equal(0, indexLength) } -func (suite *IndexTestSuite) TestNewestIndexedItemIDEmpty() { - // the newest indexed post should be an empty string since there's nothing indexed yet - postID, err := suite.timeline.NewestIndexedItemID(context.Background()) - suite.NoError(err) - suite.Empty(postID) - - // indexLength should be 0 - indexLength := suite.timeline.ItemIndexLength(context.Background()) - suite.Equal(0, indexLength) -} - func (suite *IndexTestSuite) TestIndexAlreadyIndexed() { testStatus := suite.testStatuses["local_account_1_status_1"] diff --git a/internal/timeline/itemindex.go b/internal/timeline/indexeditems.go similarity index 70% rename from internal/timeline/itemindex.go rename to internal/timeline/indexeditems.go index 968650e07..9685611de 100644 --- a/internal/timeline/itemindex.go +++ b/internal/timeline/indexeditems.go @@ -24,26 +24,26 @@ "errors" ) -type itemIndex struct { +type indexedItems struct { data *list.List skipInsert SkipInsertFunction } -type itemIndexEntry struct { +type indexedItemsEntry struct { itemID string boostOfID string accountID string boostOfAccountID string } -func (p *itemIndex) insertIndexed(ctx context.Context, i *itemIndexEntry) (bool, error) { - if p.data == nil { - p.data = &list.List{} +func (i *indexedItems) insertIndexed(ctx context.Context, newEntry *indexedItemsEntry) (bool, error) { + if i.data == nil { + i.data = &list.List{} } // if we have no entries yet, this is both the newest and oldest entry, so just put it in the front - if p.data.Len() == 0 { - p.data.PushFront(i) + if i.data.Len() == 0 { + i.data.PushFront(newEntry) return true, nil } @@ -51,15 +51,15 @@ func (p *itemIndex) insertIndexed(ctx context.Context, i *itemIndexEntry) (bool, var position int // We need to iterate through the index to make sure we put this item in the appropriate place according to when it was created. // We also need to make sure we're not inserting a duplicate item -- this can happen sometimes and it's not nice UX (*shudder*). - for e := p.data.Front(); e != nil; e = e.Next() { + for e := i.data.Front(); e != nil; e = e.Next() { position++ - entry, ok := e.Value.(*itemIndexEntry) + entry, ok := e.Value.(*indexedItemsEntry) if !ok { - return false, errors.New("index: could not parse e as an itemIndexEntry") + return false, errors.New("insertIndexed: could not parse e as an indexedItemsEntry") } - skip, err := p.skipInsert(ctx, i.itemID, i.accountID, i.boostOfID, i.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position) + skip, err := i.skipInsert(ctx, newEntry.itemID, newEntry.accountID, newEntry.boostOfID, newEntry.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position) if err != nil { return false, err } @@ -69,18 +69,18 @@ func (p *itemIndex) insertIndexed(ctx context.Context, i *itemIndexEntry) (bool, // if the item to index is newer than e, insert it before e in the list if insertMark == nil { - if i.itemID > entry.itemID { + if newEntry.itemID > entry.itemID { insertMark = e } } } if insertMark != nil { - p.data.InsertBefore(i, insertMark) + i.data.InsertBefore(newEntry, insertMark) return true, nil } // if we reach this point it's the oldest item we've seen so put it at the back - p.data.PushBack(i) + i.data.PushBack(newEntry) return true, nil } diff --git a/internal/timeline/manager.go b/internal/timeline/manager.go index 51bd65fbf..ecf4c3dbf 100644 --- a/internal/timeline/manager.go +++ b/internal/timeline/manager.go @@ -23,15 +23,12 @@ "fmt" "strings" "sync" + "time" "codeberg.org/gruf/go-kv" "github.com/superseriousbusiness/gotosocial/internal/log" ) -const ( - desiredPostIndexLength = 400 -) - // Manager abstracts functions for creating timelines for multiple accounts, and adding, removing, and fetching entries from those timelines. // // By the time a timelineable hits the manager interface, it should already have been filtered and it should be established that the item indeed @@ -65,8 +62,6 @@ type Manager interface { GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) // GetIndexedLength returns the amount of items that have been *indexed* for the given account ID. GetIndexedLength(ctx context.Context, timelineAccountID string) int - // GetDesiredIndexLength returns the amount of items that we, ideally, index for each user. - GetDesiredIndexLength(ctx context.Context) int // GetOldestIndexedID returns the id ID for the oldest item that we have indexed for the given account. GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error) // PrepareXFromTop prepares limit n amount of items, based on their indexed representations, from the top of the index. @@ -77,6 +72,10 @@ type Manager interface { WipeItemFromAllTimelines(ctx context.Context, itemID string) error // WipeStatusesFromAccountID removes all items by the given accountID from the timelineAccountID's timelines. WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error + // Start starts hourly cleanup jobs for this timeline manager. + Start() error + // Stop stops the timeline manager (currently a stub, doesn't do anything). + Stop() error } // NewManager returns a new timeline manager. @@ -98,9 +97,44 @@ type manager struct { skipInsertFunction SkipInsertFunction } +func (m *manager) Start() error { + // range through all timelines in the sync map once per hour + prune as necessary + go func() { + for now := range time.NewTicker(1 * time.Hour).C { + m.accountTimelines.Range(func(key any, value any) bool { + timelineAccountID, ok := key.(string) + if !ok { + panic("couldn't parse timeline manager sync map key as string, this should never happen so panic") + } + + t, ok := value.(Timeline) + if !ok { + panic("couldn't parse timeline manager sync map value as Timeline, this should never happen so panic") + } + + anHourAgo := now.Add(-1 * time.Hour) + if lastGot := t.LastGot(); lastGot.Before(anHourAgo) { + amountPruned := t.Prune(defaultDesiredPreparedItemsLength, defaultDesiredIndexedItemsLength) + log.WithFields(kv.Fields{ + {"timelineAccountID", timelineAccountID}, + {"amountPruned", amountPruned}, + }...).Info("pruned indexed and prepared items from timeline") + } + + return true + }) + } + }() + + return nil +} + +func (m *manager) Stop() error { + return nil +} + func (m *manager) Ingest(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) { l := log.WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, {"itemID", item.GetID()}, }...) @@ -116,7 +150,6 @@ func (m *manager) Ingest(ctx context.Context, item Timelineable, timelineAccount func (m *manager) IngestAndPrepare(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) { l := log.WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, {"itemID", item.GetID()}, }...) @@ -132,7 +165,6 @@ func (m *manager) IngestAndPrepare(ctx context.Context, item Timelineable, timel func (m *manager) Remove(ctx context.Context, timelineAccountID string, itemID string) (int, error) { l := log.WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, {"itemID", itemID}, }...) @@ -147,10 +179,7 @@ func (m *manager) Remove(ctx context.Context, timelineAccountID string, itemID s } func (m *manager) GetTimeline(ctx context.Context, timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) { - l := log.WithFields(kv.Fields{ - - {"timelineAccountID", timelineAccountID}, - }...) + l := log.WithFields(kv.Fields{{"timelineAccountID", timelineAccountID}}...) t, err := m.getOrCreateTimeline(ctx, timelineAccountID) if err != nil { @@ -173,10 +202,6 @@ func (m *manager) GetIndexedLength(ctx context.Context, timelineAccountID string return t.ItemIndexLength(ctx) } -func (m *manager) GetDesiredIndexLength(ctx context.Context) int { - return desiredPostIndexLength -} - func (m *manager) GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error) { t, err := m.getOrCreateTimeline(ctx, timelineAccountID) if err != nil { diff --git a/internal/timeline/prepare.go b/internal/timeline/prepare.go index c643d1873..a3b0d06ce 100644 --- a/internal/timeline/prepare.go +++ b/internal/timeline/prepare.go @@ -26,154 +26,12 @@ "codeberg.org/gruf/go-kv" "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" ) -func (t *timeline) prepareNextQuery(ctx context.Context, amount int, maxID string, sinceID string, minID string) error { - l := log.WithFields(kv.Fields{ - - {"amount", amount}, - {"maxID", maxID}, - {"sinceID", sinceID}, - {"minID", minID}, - }...) - - var err error - - // maxID is defined but sinceID isn't so take from behind - if maxID != "" && sinceID == "" { - l.Debug("preparing behind maxID") - err = t.PrepareBehind(ctx, maxID, amount) - } - - // maxID isn't defined, but sinceID || minID are, so take x before - if maxID == "" && sinceID != "" { - l.Debug("preparing before sinceID") - err = t.PrepareBefore(ctx, sinceID, false, amount) - } - if maxID == "" && minID != "" { - l.Debug("preparing before minID") - err = t.PrepareBefore(ctx, minID, false, amount) - } - - return err -} - -func (t *timeline) PrepareBehind(ctx context.Context, itemID string, amount int) error { - // lazily initialize prepared items if it hasn't been done already - if t.preparedItems.data == nil { - t.preparedItems.data = &list.List{} - t.preparedItems.data.Init() - } - - if err := t.IndexBehind(ctx, itemID, amount); err != nil { - return fmt.Errorf("PrepareBehind: error indexing behind id %s: %s", itemID, err) - } - - // if the itemindex is nil, nothing has been indexed yet so there's nothing to prepare - if t.itemIndex.data == nil { - return nil - } - - var prepared int - var preparing bool - t.Lock() - defer t.Unlock() -prepareloop: - for e := t.itemIndex.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*itemIndexEntry) - if !ok { - return errors.New("PrepareBehind: could not parse e as itemIndexEntry") - } - - if !preparing { - // we haven't hit the position we need to prepare from yet - if entry.itemID == itemID { - preparing = true - } - } - - if preparing { - if err := t.prepare(ctx, entry.itemID); err != nil { - // there's been an error - if err != db.ErrNoEntries { - // it's a real error - return fmt.Errorf("PrepareBehind: error preparing item with id %s: %s", entry.itemID, err) - } - // the status just doesn't exist (anymore) so continue to the next one - continue - } - if prepared == amount { - // we're done - break prepareloop - } - prepared++ - } - } - - return nil -} - -func (t *timeline) PrepareBefore(ctx context.Context, statusID string, include bool, amount int) error { - t.Lock() - defer t.Unlock() - - // lazily initialize prepared posts if it hasn't been done already - if t.preparedItems.data == nil { - t.preparedItems.data = &list.List{} - t.preparedItems.data.Init() - } - - // if the postindex is nil, nothing has been indexed yet so there's nothing to prepare - if t.itemIndex.data == nil { - return nil - } - - var prepared int - var preparing bool -prepareloop: - for e := t.itemIndex.data.Back(); e != nil; e = e.Prev() { - entry, ok := e.Value.(*itemIndexEntry) - if !ok { - return errors.New("PrepareBefore: could not parse e as a postIndexEntry") - } - - if !preparing { - // we haven't hit the position we need to prepare from yet - if entry.itemID == statusID { - preparing = true - if !include { - continue - } - } - } - - if preparing { - if err := t.prepare(ctx, entry.itemID); err != nil { - // there's been an error - if err != db.ErrNoEntries { - // it's a real error - return fmt.Errorf("PrepareBefore: error preparing status with id %s: %s", entry.itemID, err) - } - // the status just doesn't exist (anymore) so continue to the next one - continue - } - if prepared == amount { - // we're done - break prepareloop - } - prepared++ - } - } - - return nil -} - func (t *timeline) PrepareFromTop(ctx context.Context, amount int) error { - l := log.WithFields(kv.Fields{ - - {"amount", amount}, - }...) + l := log.WithFields(kv.Fields{{"amount", amount}}...) // lazily initialize prepared posts if it hasn't been done already if t.preparedItems.data == nil { @@ -182,10 +40,10 @@ func (t *timeline) PrepareFromTop(ctx context.Context, amount int) error { } // if the postindex is nil, nothing has been indexed yet so index from the highest ID possible - if t.itemIndex.data == nil { + if t.indexedItems.data == nil { l.Debug("postindex.data was nil, indexing behind highest possible ID") - if err := t.IndexBehind(ctx, "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", amount); err != nil { - return fmt.Errorf("PrepareFromTop: error indexing behind id %s: %s", "ZZZZZZZZZZZZZZZZZZZZZZZZZZ", err) + if err := t.indexBehind(ctx, id.Highest, amount); err != nil { + return fmt.Errorf("PrepareFromTop: error indexing behind id %s: %s", id.Highest, err) } } @@ -194,12 +52,12 @@ func (t *timeline) PrepareFromTop(ctx context.Context, amount int) error { defer t.Unlock() var prepared int prepareloop: - for e := t.itemIndex.data.Front(); e != nil; e = e.Next() { + for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { if e == nil { continue } - entry, ok := e.Value.(*itemIndexEntry) + entry, ok := e.Value.(*indexedItemsEntry) if !ok { return errors.New("PrepareFromTop: could not parse e as a postIndexEntry") } @@ -226,6 +84,142 @@ func (t *timeline) PrepareFromTop(ctx context.Context, amount int) error { return nil } +func (t *timeline) prepareNextQuery(ctx context.Context, amount int, maxID string, sinceID string, minID string) error { + l := log.WithFields(kv.Fields{ + {"amount", amount}, + {"maxID", maxID}, + {"sinceID", sinceID}, + {"minID", minID}, + }...) + + var err error + switch { + case maxID != "" && sinceID == "": + l.Debug("preparing behind maxID") + err = t.prepareBehind(ctx, maxID, amount) + case maxID == "" && sinceID != "": + l.Debug("preparing before sinceID") + err = t.prepareBefore(ctx, sinceID, false, amount) + case maxID == "" && minID != "": + l.Debug("preparing before minID") + err = t.prepareBefore(ctx, minID, false, amount) + } + + return err +} + +// prepareBehind instructs the timeline to prepare the next amount of entries for serialization, from position onwards. +// If include is true, then the given item ID will also be prepared, otherwise only entries behind it will be prepared. +func (t *timeline) prepareBehind(ctx context.Context, itemID string, amount int) error { + // lazily initialize prepared items if it hasn't been done already + if t.preparedItems.data == nil { + t.preparedItems.data = &list.List{} + t.preparedItems.data.Init() + } + + if err := t.indexBehind(ctx, itemID, amount); err != nil { + return fmt.Errorf("prepareBehind: error indexing behind id %s: %s", itemID, err) + } + + // if the itemindex is nil, nothing has been indexed yet so there's nothing to prepare + if t.indexedItems.data == nil { + return nil + } + + var prepared int + var preparing bool + t.Lock() + defer t.Unlock() +prepareloop: + for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*indexedItemsEntry) + if !ok { + return errors.New("prepareBehind: could not parse e as itemIndexEntry") + } + + if !preparing { + // we haven't hit the position we need to prepare from yet + if entry.itemID == itemID { + preparing = true + } + } + + if preparing { + if err := t.prepare(ctx, entry.itemID); err != nil { + // there's been an error + if err != db.ErrNoEntries { + // it's a real error + return fmt.Errorf("prepareBehind: error preparing item with id %s: %s", entry.itemID, err) + } + // the status just doesn't exist (anymore) so continue to the next one + continue + } + if prepared == amount { + // we're done + break prepareloop + } + prepared++ + } + } + + return nil +} + +func (t *timeline) prepareBefore(ctx context.Context, statusID string, include bool, amount int) error { + t.Lock() + defer t.Unlock() + + // lazily initialize prepared posts if it hasn't been done already + if t.preparedItems.data == nil { + t.preparedItems.data = &list.List{} + t.preparedItems.data.Init() + } + + // if the postindex is nil, nothing has been indexed yet so there's nothing to prepare + if t.indexedItems.data == nil { + return nil + } + + var prepared int + var preparing bool +prepareloop: + for e := t.indexedItems.data.Back(); e != nil; e = e.Prev() { + entry, ok := e.Value.(*indexedItemsEntry) + if !ok { + return errors.New("prepareBefore: could not parse e as a postIndexEntry") + } + + if !preparing { + // we haven't hit the position we need to prepare from yet + if entry.itemID == statusID { + preparing = true + if !include { + continue + } + } + } + + if preparing { + if err := t.prepare(ctx, entry.itemID); err != nil { + // there's been an error + if err != db.ErrNoEntries { + // it's a real error + return fmt.Errorf("prepareBefore: error preparing status with id %s: %s", entry.itemID, err) + } + // the status just doesn't exist (anymore) so continue to the next one + continue + } + if prepared == amount { + // we're done + break prepareloop + } + prepared++ + } + } + + return nil +} + func (t *timeline) prepare(ctx context.Context, itemID string) error { // trigger the caller-provided prepare function prepared, err := t.prepareFunction(ctx, t.accountID, itemID) @@ -245,7 +239,9 @@ func (t *timeline) prepare(ctx context.Context, itemID string) error { return t.preparedItems.insertPrepared(ctx, preparedItemsEntry) } -func (t *timeline) OldestPreparedItemID(ctx context.Context) (string, error) { +// oldestPreparedItemID returns the id of the rearmost (ie., the oldest) prepared item, or an error if something goes wrong. +// If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this. +func (t *timeline) oldestPreparedItemID(ctx context.Context) (string, error) { var id string if t.preparedItems == nil || t.preparedItems.data == nil { // return an empty string if prepared items hasn't been initialized yet @@ -260,7 +256,7 @@ func (t *timeline) OldestPreparedItemID(ctx context.Context) (string, error) { entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return id, errors.New("OldestPreparedItemID: could not parse e as a preparedItemsEntry") + return id, errors.New("oldestPreparedItemID: could not parse e as a preparedItemsEntry") } return entry.itemID, nil diff --git a/internal/timeline/prepareditems.go b/internal/timeline/prepareditems.go index 07a8c69ee..39433fbc7 100644 --- a/internal/timeline/prepareditems.go +++ b/internal/timeline/prepareditems.go @@ -37,30 +37,30 @@ type preparedItemsEntry struct { prepared Preparable } -func (p *preparedItems) insertPrepared(ctx context.Context, i *preparedItemsEntry) error { +func (p *preparedItems) insertPrepared(ctx context.Context, newEntry *preparedItemsEntry) error { if p.data == nil { p.data = &list.List{} } // if we have no entries yet, this is both the newest and oldest entry, so just put it in the front if p.data.Len() == 0 { - p.data.PushFront(i) + p.data.PushFront(newEntry) return nil } var insertMark *list.Element var position int - // We need to iterate through the index to make sure we put this post in the appropriate place according to when it was created. - // We also need to make sure we're not inserting a duplicate post -- this can happen sometimes and it's not nice UX (*shudder*). + // We need to iterate through the index to make sure we put this entry in the appropriate place according to when it was created. + // We also need to make sure we're not inserting a duplicate entry -- this can happen sometimes and it's not nice UX (*shudder*). for e := p.data.Front(); e != nil; e = e.Next() { position++ entry, ok := e.Value.(*preparedItemsEntry) if !ok { - return errors.New("index: could not parse e as a preparedPostsEntry") + return errors.New("insertPrepared: could not parse e as a preparedItemsEntry") } - skip, err := p.skipInsert(ctx, i.itemID, i.accountID, i.boostOfID, i.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position) + skip, err := p.skipInsert(ctx, newEntry.itemID, newEntry.accountID, newEntry.boostOfID, newEntry.boostOfAccountID, entry.itemID, entry.accountID, entry.boostOfID, entry.boostOfAccountID, position) if err != nil { return err } @@ -68,25 +68,25 @@ func (p *preparedItems) insertPrepared(ctx context.Context, i *preparedItemsEntr return nil } - // if the post to index is newer than e, insert it before e in the list + // if the entry to index is newer than e, insert it before e in the list if insertMark == nil { - if i.itemID > entry.itemID { + if newEntry.itemID > entry.itemID { insertMark = e } } // make sure we don't insert a duplicate - if entry.itemID == i.itemID { + if entry.itemID == newEntry.itemID { return nil } } if insertMark != nil { - p.data.InsertBefore(i, insertMark) + p.data.InsertBefore(newEntry, insertMark) return nil } - // if we reach this point it's the oldest post we've seen so put it at the back - p.data.PushBack(i) + // if we reach this point it's the oldest entry we've seen so put it at the back + p.data.PushBack(newEntry) return nil } diff --git a/internal/timeline/prune.go b/internal/timeline/prune.go new file mode 100644 index 000000000..72dba20bf --- /dev/null +++ b/internal/timeline/prune.go @@ -0,0 +1,68 @@ +/* + GoToSocial + Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +package timeline + +import ( + "container/list" +) + +const ( + defaultDesiredIndexedItemsLength = 400 + defaultDesiredPreparedItemsLength = 50 +) + +func (t *timeline) Prune(desiredPreparedItemsLength int, desiredIndexedItemsLength int) int { + t.Lock() + defer t.Unlock() + + pruneList := func(pruneTo int, listToPrune *list.List) int { + if listToPrune == nil { + // no need to prune + return 0 + } + + unprunedLength := listToPrune.Len() + if unprunedLength <= pruneTo { + // no need to prune + return 0 + } + + // work from the back + assemble a slice of entries that we will prune + amountStillToPrune := unprunedLength - pruneTo + itemsToPrune := make([]*list.Element, 0, amountStillToPrune) + for e := listToPrune.Back(); amountStillToPrune > 0; e = e.Prev() { + itemsToPrune = append(itemsToPrune, e) + amountStillToPrune-- + } + + // remove the entries we found + var totalPruned int + for _, e := range itemsToPrune { + listToPrune.Remove(e) + totalPruned++ + } + + return totalPruned + } + + prunedPrepared := pruneList(desiredPreparedItemsLength, t.preparedItems.data) + prunedIndexed := pruneList(desiredIndexedItemsLength, t.indexedItems.data) + + return prunedPrepared + prunedIndexed +} diff --git a/internal/timeline/prune_test.go b/internal/timeline/prune_test.go new file mode 100644 index 000000000..d96596c7d --- /dev/null +++ b/internal/timeline/prune_test.go @@ -0,0 +1,110 @@ +/* + GoToSocial + Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +package timeline_test + +import ( + "context" + "sort" + "testing" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/processing" + "github.com/superseriousbusiness/gotosocial/internal/timeline" + "github.com/superseriousbusiness/gotosocial/internal/visibility" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type PruneTestSuite struct { + TimelineStandardTestSuite +} + +func (suite *PruneTestSuite) SetupSuite() { + suite.testAccounts = testrig.NewTestAccounts() + suite.testStatuses = testrig.NewTestStatuses() +} + +func (suite *PruneTestSuite) SetupTest() { + testrig.InitTestLog() + testrig.InitTestConfig() + + suite.db = testrig.NewTestDB() + suite.tc = testrig.NewTestTypeConverter(suite.db) + suite.filter = visibility.NewFilter(suite.db) + + testrig.StandardDBSetup(suite.db, nil) + + // let's take local_account_1 as the timeline owner + tl, err := timeline.NewTimeline( + context.Background(), + suite.testAccounts["local_account_1"].ID, + processing.StatusGrabFunction(suite.db), + processing.StatusFilterFunction(suite.db, suite.filter), + processing.StatusPrepareFunction(suite.db, suite.tc), + processing.StatusSkipInsertFunction(), + ) + if err != nil { + suite.FailNow(err.Error()) + } + + // put the status IDs in a determinate order since we can't trust a map to keep its order + statuses := []*gtsmodel.Status{} + for _, s := range suite.testStatuses { + statuses = append(statuses, s) + } + sort.Slice(statuses, func(i, j int) bool { + return statuses[i].ID > statuses[j].ID + }) + + // prepare the timeline by just shoving all test statuses in it -- let's not be fussy about who sees what + for _, s := range statuses { + _, err := tl.IndexAndPrepareOne(context.Background(), s.GetID(), s.BoostOfID, s.AccountID, s.BoostOfAccountID) + if err != nil { + suite.FailNow(err.Error()) + } + } + + suite.timeline = tl +} + +func (suite *PruneTestSuite) TearDownTest() { + testrig.StandardDBTeardown(suite.db) +} + +func (suite *PruneTestSuite) TestPrune() { + // prune down to 5 prepared + 5 indexed + suite.Equal(24, suite.timeline.Prune(5, 5)) + suite.Equal(5, suite.timeline.ItemIndexLength(context.Background())) +} + +func (suite *PruneTestSuite) TestPruneTo0() { + // prune down to 0 prepared + 0 indexed + suite.Equal(34, suite.timeline.Prune(0, 0)) + suite.Equal(0, suite.timeline.ItemIndexLength(context.Background())) +} + +func (suite *PruneTestSuite) TestPruneToInfinityAndBeyond() { + // prune to 99999, this should result in no entries being pruned + suite.Equal(0, suite.timeline.Prune(99999, 99999)) + suite.Equal(17, suite.timeline.ItemIndexLength(context.Background())) +} + +func TestPruneTestSuite(t *testing.T) { + suite.Run(t, new(PruneTestSuite)) +} diff --git a/internal/timeline/remove.go b/internal/timeline/remove.go index 1e70e28a7..4539a5232 100644 --- a/internal/timeline/remove.go +++ b/internal/timeline/remove.go @@ -29,7 +29,6 @@ func (t *timeline) Remove(ctx context.Context, statusID string) (int, error) { l := log.WithFields(kv.Fields{ - {"accountTimeline", t.accountID}, {"statusID", statusID}, }...) @@ -40,9 +39,9 @@ func (t *timeline) Remove(ctx context.Context, statusID string) (int, error) { // remove entr(ies) from the post index removeIndexes := []*list.Element{} - if t.itemIndex != nil && t.itemIndex.data != nil { - for e := t.itemIndex.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*itemIndexEntry) + if t.indexedItems != nil && t.indexedItems.data != nil { + for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*indexedItemsEntry) if !ok { return removed, errors.New("Remove: could not parse e as a postIndexEntry") } @@ -53,7 +52,7 @@ func (t *timeline) Remove(ctx context.Context, statusID string) (int, error) { } } for _, e := range removeIndexes { - t.itemIndex.data.Remove(e) + t.indexedItems.data.Remove(e) removed++ } @@ -82,19 +81,19 @@ func (t *timeline) Remove(ctx context.Context, statusID string) (int, error) { func (t *timeline) RemoveAllBy(ctx context.Context, accountID string) (int, error) { l := log.WithFields(kv.Fields{ - {"accountTimeline", t.accountID}, {"accountID", accountID}, }...) + t.Lock() defer t.Unlock() var removed int // remove entr(ies) from the post index removeIndexes := []*list.Element{} - if t.itemIndex != nil && t.itemIndex.data != nil { - for e := t.itemIndex.data.Front(); e != nil; e = e.Next() { - entry, ok := e.Value.(*itemIndexEntry) + if t.indexedItems != nil && t.indexedItems.data != nil { + for e := t.indexedItems.data.Front(); e != nil; e = e.Next() { + entry, ok := e.Value.(*indexedItemsEntry) if !ok { return removed, errors.New("Remove: could not parse e as a postIndexEntry") } @@ -105,7 +104,7 @@ func (t *timeline) RemoveAllBy(ctx context.Context, accountID string) (int, erro } } for _, e := range removeIndexes { - t.itemIndex.data.Remove(e) + t.indexedItems.data.Remove(e) removed++ } diff --git a/internal/timeline/timeline.go b/internal/timeline/timeline.go index 1076d05a6..fb7671819 100644 --- a/internal/timeline/timeline.go +++ b/internal/timeline/timeline.go @@ -21,6 +21,7 @@ import ( "context" "sync" + "time" ) // GrabFunction is used by a Timeline to grab more items to index. @@ -73,26 +74,9 @@ type Timeline interface { // If prepareNext is true, then the next predicted query will be prepared already in a goroutine, // to make the next call to Get faster. Get(ctx context.Context, amount int, maxID string, sinceID string, minID string, prepareNext bool) ([]Preparable, error) - // GetXFromTop returns x amount of items from the top of the timeline, from newest to oldest. - GetXFromTop(ctx context.Context, amount int) ([]Preparable, error) - // GetXBehindID returns x amount of items from the given id onwards, from newest to oldest. - // This will NOT include the item with the given ID. - // - // This corresponds to an api call to /timelines/home?max_id=WHATEVER - GetXBehindID(ctx context.Context, amount int, fromID string, attempts *int) ([]Preparable, error) - // GetXBeforeID returns x amount of items up to the given id, from newest to oldest. - // This will NOT include the item with the given ID. - // - // This corresponds to an api call to /timelines/home?since_id=WHATEVER - GetXBeforeID(ctx context.Context, amount int, sinceID string, startFromTop bool) ([]Preparable, error) - // GetXBetweenID returns x amount of items from the given maxID, up to the given id, from newest to oldest. - // This will NOT include the item with the given IDs. - // - // This corresponds to an api call to /timelines/home?since_id=WHATEVER&max_id=WHATEVER_ELSE - GetXBetweenID(ctx context.Context, amount int, maxID string, sinceID string) ([]Preparable, error) /* - INDEXING FUNCTIONS + INDEXING + PREPARATION FUNCTIONS */ // IndexOne puts a item into the timeline at the appropriate place according to its 'createdAt' property. @@ -100,35 +84,14 @@ type Timeline interface { // The returned bool indicates whether or not the item was actually inserted into the timeline. This will be false // if the item is a boost and the original item or another boost of it already exists < boostReinsertionDepth back in the timeline. IndexOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) - - // OldestIndexedItemID returns the id of the rearmost (ie., the oldest) indexed item, or an error if something goes wrong. - // If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this. - OldestIndexedItemID(ctx context.Context) (string, error) - // NewestIndexedItemID returns the id of the frontmost (ie., the newest) indexed item, or an error if something goes wrong. - // If nothing goes wrong but there's no newest item, an empty string will be returned so make sure to check for this. - NewestIndexedItemID(ctx context.Context) (string, error) - - IndexBefore(ctx context.Context, itemID string, amount int) error - IndexBehind(ctx context.Context, itemID string, amount int) error - - /* - PREPARATION FUNCTIONS - */ - - // PrepareXFromTop instructs the timeline to prepare x amount of items from the top of the timeline. - PrepareFromTop(ctx context.Context, amount int) error - // PrepareBehind instructs the timeline to prepare the next amount of entries for serialization, from position onwards. - // If include is true, then the given item ID will also be prepared, otherwise only entries behind it will be prepared. - PrepareBehind(ctx context.Context, itemID string, amount int) error - // IndexOne puts a item into the timeline at the appropriate place according to its 'createdAt' property, + // IndexAndPrepareOne puts a item into the timeline at the appropriate place according to its 'createdAt' property, // and then immediately prepares it. // // The returned bool indicates whether or not the item was actually inserted into the timeline. This will be false // if the item is a boost and the original item or another boost of it already exists < boostReinsertionDepth back in the timeline. IndexAndPrepareOne(ctx context.Context, itemID string, boostOfID string, accountID string, boostOfAccountID string) (bool, error) - // OldestPreparedItemID returns the id of the rearmost (ie., the oldest) prepared item, or an error if something goes wrong. - // If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this. - OldestPreparedItemID(ctx context.Context) (string, error) + // PrepareXFromTop instructs the timeline to prepare x amount of items from the top of the timeline, useful during init. + PrepareFromTop(ctx context.Context, amount int) error /* INFO FUNCTIONS @@ -136,13 +99,24 @@ type Timeline interface { // ActualPostIndexLength returns the actual length of the item index at this point in time. ItemIndexLength(ctx context.Context) int + // OldestIndexedItemID returns the id of the rearmost (ie., the oldest) indexed item, or an error if something goes wrong. + // If nothing goes wrong but there's no oldest item, an empty string will be returned so make sure to check for this. + OldestIndexedItemID(ctx context.Context) (string, error) + // NewestIndexedItemID returns the id of the frontmost (ie., the newest) indexed item, or an error if something goes wrong. + // If nothing goes wrong but there's no newest item, an empty string will be returned so make sure to check for this. + NewestIndexedItemID(ctx context.Context) (string, error) /* UTILITY FUNCTIONS */ - // Reset instructs the timeline to reset to its base state -- cache only the minimum amount of items. - Reset() error + // LastGot returns the time that Get was last called. + LastGot() time.Time + // Prune prunes preparedItems and indexedItems in this timeline to the desired lengths. + // This will be a no-op if the lengths are already < the desired values. + // Prune acquires a lock on the timeline before pruning. + // The return value is the combined total of items pruned from preparedItems and indexedItems. + Prune(desiredPreparedItemsLength int, desiredIndexedItemsLength int) int // Remove removes a item from both the index and prepared items. // // If a item has multiple entries in a timeline, they will all be removed. @@ -157,12 +131,13 @@ type Timeline interface { // timeline fulfils the Timeline interface type timeline struct { - itemIndex *itemIndex + indexedItems *indexedItems preparedItems *preparedItems grabFunction GrabFunction filterFunction FilterFunction prepareFunction PrepareFunction accountID string + lastGot time.Time sync.Mutex } @@ -175,7 +150,7 @@ func NewTimeline( prepareFunction PrepareFunction, skipInsertFunction SkipInsertFunction) (Timeline, error) { return &timeline{ - itemIndex: &itemIndex{ + indexedItems: &indexedItems{ skipInsert: skipInsertFunction, }, preparedItems: &preparedItems{ @@ -185,17 +160,6 @@ func NewTimeline( filterFunction: filterFunction, prepareFunction: prepareFunction, accountID: timelineAccountID, + lastGot: time.Time{}, }, nil } - -func (t *timeline) Reset() error { - return nil -} - -func (t *timeline) ItemIndexLength(ctx context.Context) int { - if t.itemIndex == nil || t.itemIndex.data == nil { - return 0 - } - - return t.itemIndex.data.Len() -} diff --git a/internal/typeutils/internaltoas.go b/internal/typeutils/internaltoas.go index c84dd09f4..27f48f798 100644 --- a/internal/typeutils/internaltoas.go +++ b/internal/typeutils/internaltoas.go @@ -34,13 +34,6 @@ "github.com/superseriousbusiness/gotosocial/internal/log" ) -// const ( -// // highestID is the highest possible ULID -// highestID = "ZZZZZZZZZZZZZZZZZZZZZZZZZZ" -// // lowestID is the lowest possible ULID -// lowestID = "00000000000000000000000000" -// ) - // Converts a gts model account into an Activity Streams person type. func (c *converter) AccountToAS(ctx context.Context, a *gtsmodel.Account) (vocab.ActivityStreamsPerson, error) { person := streams.NewActivityStreamsPerson()