diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go
index 828b9c875..475804218 100644
--- a/cmd/gotosocial/action/server/server.go
+++ b/cmd/gotosocial/action/server/server.go
@@ -290,6 +290,11 @@ func(context.Context, time.Time) {
return fmt.Errorf("error initializing metrics: %w", err)
}
+ // Run advanced migrations.
+ if err := processor.AdvancedMigrations().Migrate(ctx); err != nil {
+ return err
+ }
+
/*
HTTP router initialization
*/
diff --git a/cmd/gotosocial/action/testrig/testrig.go b/cmd/gotosocial/action/testrig/testrig.go
index 99f366fbe..95982fc72 100644
--- a/cmd/gotosocial/action/testrig/testrig.go
+++ b/cmd/gotosocial/action/testrig/testrig.go
@@ -204,6 +204,11 @@
return fmt.Errorf("error initializing metrics: %w", err)
}
+ // Run advanced migrations.
+ if err := processor.AdvancedMigrations().Migrate(ctx); err != nil {
+ return err
+ }
+
/*
HTTP router initialization
*/
diff --git a/docs/api/swagger.yaml b/docs/api/swagger.yaml
index 9fcc8bab3..22ce536fd 100644
--- a/docs/api/swagger.yaml
+++ b/docs/api/swagger.yaml
@@ -6208,11 +6208,43 @@ paths:
- read:bookmarks
tags:
- bookmarks
+ /api/v1/conversation/{id}/read:
+ post:
+ operationId: conversationRead
+ parameters:
+ - description: ID of the conversation.
+ in: path
+ name: id
+ required: true
+ type: string
+ produces:
+ - application/json
+ responses:
+ "200":
+ description: Updated conversation.
+ schema:
+ $ref: '#/definitions/conversation'
+ "400":
+ description: bad request
+ "401":
+ description: unauthorized
+ "404":
+ description: not found
+ "406":
+ description: not acceptable
+ "422":
+ description: unprocessable content
+ "500":
+ description: internal server error
+ security:
+ - OAuth2 Bearer:
+ - write:conversations
+ summary: Mark a conversation with the given ID as read.
+ tags:
+ - conversations
/api/v1/conversations:
get:
description: |-
- NOT IMPLEMENTED YET: Will currently always return an array of length 0.
-
The next and previous queries can be parsed from the returned Link header.
Example:
@@ -6221,15 +6253,15 @@ paths:
````
operationId: conversationsGet
parameters:
- - description: 'Return only conversations *OLDER* than the given max ID. The conversation with the specified ID will not be included in the response. NOTE: the ID is of the internal conversation, use the Link header for pagination.'
+ - description: 'Return only conversations with last statuses *OLDER* than the given max ID. The conversation with the specified ID will not be included in the response. NOTE: The ID is a status ID. Use the Link header for pagination.'
in: query
name: max_id
type: string
- - description: 'Return only conversations *NEWER* than the given since ID. The conversation with the specified ID will not be included in the response. NOTE: the ID is of the internal conversation, use the Link header for pagination.'
+ - description: 'Return only conversations with last statuses *NEWER* than the given since ID. The conversation with the specified ID will not be included in the response. NOTE: The ID is a status ID. Use the Link header for pagination.'
in: query
name: since_id
type: string
- - description: 'Return only conversations *IMMEDIATELY NEWER* than the given min ID. The conversation with the specified ID will not be included in the response. NOTE: the ID is of the internal conversation, use the Link header for pagination.'
+ - description: 'Return only conversations with last statuses *IMMEDIATELY NEWER* than the given min ID. The conversation with the specified ID will not be included in the response. NOTE: The ID is a status ID. Use the Link header for pagination.'
in: query
name: min_id
type: string
@@ -6269,6 +6301,39 @@ paths:
summary: Get an array of (direct message) conversations that requesting account is involved in.
tags:
- conversations
+ /api/v1/conversations/{id}:
+ delete:
+ description: |-
+ This doesn't delete the actual statuses in the conversation,
+ nor does it prevent a new conversation from being created later from the same thread and participants.
+ operationId: conversationDelete
+ parameters:
+ - description: ID of the conversation
+ in: path
+ name: id
+ required: true
+ type: string
+ produces:
+ - application/json
+ responses:
+ "200":
+ description: conversation deleted
+ "400":
+ description: bad request
+ "401":
+ description: unauthorized
+ "404":
+ description: not found
+ "406":
+ description: not acceptable
+ "500":
+ description: internal server error
+ security:
+ - OAuth2 Bearer:
+ - write:conversations
+ summary: Delete a single conversation with the given ID.
+ tags:
+ - conversations
/api/v1/custom_emojis:
get:
operationId: customEmojisGet
diff --git a/internal/api/client/conversations/conversationdelete.go b/internal/api/client/conversations/conversationdelete.go
new file mode 100644
index 000000000..6f8f43a94
--- /dev/null
+++ b/internal/api/client/conversations/conversationdelete.go
@@ -0,0 +1,93 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/oauth"
+)
+
+// ConversationDELETEHandler swagger:operation DELETE /api/v1/conversations/{id} conversationDelete
+//
+// Delete a single conversation with the given ID.
+//
+// This doesn't delete the actual statuses in the conversation,
+// nor does it prevent a new conversation from being created later from the same thread and participants.
+//
+// ---
+// tags:
+// - conversations
+//
+// produces:
+// - application/json
+//
+// parameters:
+// -
+// name: id
+// type: string
+// description: ID of the conversation
+// in: path
+// required: true
+//
+// security:
+// - OAuth2 Bearer:
+// - write:conversations
+//
+// responses:
+// '200':
+// description: conversation deleted
+// '400':
+// description: bad request
+// '401':
+// description: unauthorized
+// '404':
+// description: not found
+// '406':
+// description: not acceptable
+// '500':
+// description: internal server error
+func (m *Module) ConversationDELETEHandler(c *gin.Context) {
+ authed, err := oauth.Authed(c, true, true, true, true)
+ if err != nil {
+ apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1)
+ return
+ }
+
+ if _, err := apiutil.NegotiateAccept(c, apiutil.JSONAcceptHeaders...); err != nil {
+ apiutil.ErrorHandler(c, gtserror.NewErrorNotAcceptable(err, err.Error()), m.processor.InstanceGetV1)
+ return
+ }
+
+ id, errWithCode := apiutil.ParseID(c.Param(apiutil.IDKey))
+ if errWithCode != nil {
+ apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
+ return
+ }
+
+ errWithCode = m.processor.Conversations().Delete(c.Request.Context(), authed.Account, id)
+ if errWithCode != nil {
+ apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
+ return
+ }
+
+ c.JSON(http.StatusOK, apiutil.EmptyJSONObject)
+}
diff --git a/internal/api/client/conversations/conversationread.go b/internal/api/client/conversations/conversationread.go
new file mode 100644
index 000000000..7f68a2a33
--- /dev/null
+++ b/internal/api/client/conversations/conversationread.go
@@ -0,0 +1,95 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations
+
+import (
+ "net/http"
+
+ "github.com/gin-gonic/gin"
+ apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/oauth"
+)
+
+// ConversationReadPOSTHandler swagger:operation POST /api/v1/conversation/{id}/read conversationRead
+//
+// Mark a conversation with the given ID as read.
+//
+// ---
+// tags:
+// - conversations
+//
+// produces:
+// - application/json
+//
+// parameters:
+// -
+// name: id
+// in: path
+// type: string
+// required: true
+// description: ID of the conversation.
+//
+// security:
+// - OAuth2 Bearer:
+// - write:conversations
+//
+// responses:
+// '200':
+// name: conversation
+// description: Updated conversation.
+// schema:
+// "$ref": "#/definitions/conversation"
+// '400':
+// description: bad request
+// '401':
+// description: unauthorized
+// '404':
+// description: not found
+// '406':
+// description: not acceptable
+// '422':
+// description: unprocessable content
+// '500':
+// description: internal server error
+func (m *Module) ConversationReadPOSTHandler(c *gin.Context) {
+ authed, err := oauth.Authed(c, true, true, true, true)
+ if err != nil {
+ apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1)
+ return
+ }
+
+ if _, err := apiutil.NegotiateAccept(c, apiutil.JSONAcceptHeaders...); err != nil {
+ apiutil.ErrorHandler(c, gtserror.NewErrorNotAcceptable(err, err.Error()), m.processor.InstanceGetV1)
+ return
+ }
+
+ id, errWithCode := apiutil.ParseID(c.Param(apiutil.IDKey))
+ if errWithCode != nil {
+ apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
+ return
+ }
+
+ apiConversation, errWithCode := m.processor.Conversations().Read(c.Request.Context(), authed.Account, id)
+ if errWithCode != nil {
+ apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
+ return
+ }
+
+ apiutil.JSON(c, http.StatusOK, apiConversation)
+}
diff --git a/internal/api/client/conversations/conversations.go b/internal/api/client/conversations/conversations.go
index be19a9cdc..e742c8d3d 100644
--- a/internal/api/client/conversations/conversations.go
+++ b/internal/api/client/conversations/conversations.go
@@ -21,13 +21,17 @@
"net/http"
"github.com/gin-gonic/gin"
+ apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/processing"
)
const (
- // BasePath is the base URI path for serving
- // conversations, minus the api prefix.
+ // BasePath is the base path for serving the conversations API, minus the 'api' prefix.
BasePath = "/v1/conversations"
+ // BasePathWithID is the base path with the ID key in it, for operations on an existing conversation.
+ BasePathWithID = BasePath + "/:" + apiutil.IDKey
+ // ReadPathWithID is the path for marking an existing conversation as read.
+ ReadPathWithID = BasePathWithID + "/read"
)
type Module struct {
@@ -42,4 +46,6 @@ func New(processor *processing.Processor) *Module {
func (m *Module) Route(attachHandler func(method string, path string, f ...gin.HandlerFunc) gin.IRoutes) {
attachHandler(http.MethodGet, BasePath, m.ConversationsGETHandler)
+ attachHandler(http.MethodDelete, BasePathWithID, m.ConversationDELETEHandler)
+ attachHandler(http.MethodPost, ReadPathWithID, m.ConversationReadPOSTHandler)
}
diff --git a/internal/api/client/conversations/conversationsget.go b/internal/api/client/conversations/conversationsget.go
index 11bddb1ce..663b9a707 100644
--- a/internal/api/client/conversations/conversationsget.go
+++ b/internal/api/client/conversations/conversationsget.go
@@ -24,14 +24,13 @@
apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
+ "github.com/superseriousbusiness/gotosocial/internal/paging"
)
// ConversationsGETHandler swagger:operation GET /api/v1/conversations conversationsGet
//
// Get an array of (direct message) conversations that requesting account is involved in.
//
-// NOT IMPLEMENTED YET: Will currently always return an array of length 0.
-//
// The next and previous queries can be parsed from the returned Link header.
// Example:
//
@@ -51,26 +50,26 @@
// name: max_id
// type: string
// description: >-
-// Return only conversations *OLDER* than the given max ID.
+// Return only conversations with last statuses *OLDER* than the given max ID.
// The conversation with the specified ID will not be included in the response.
-// NOTE: the ID is of the internal conversation, use the Link header for pagination.
+// NOTE: The ID is a status ID. Use the Link header for pagination.
// in: query
// required: false
// -
// name: since_id
// type: string
// description: >-
-// Return only conversations *NEWER* than the given since ID.
+// Return only conversations with last statuses *NEWER* than the given since ID.
// The conversation with the specified ID will not be included in the response.
-// NOTE: the ID is of the internal conversation, use the Link header for pagination.
+// NOTE: The ID is a status ID. Use the Link header for pagination.
// in: query
// -
// name: min_id
// type: string
// description: >-
-// Return only conversations *IMMEDIATELY NEWER* than the given min ID.
+// Return only conversations with last statuses *IMMEDIATELY NEWER* than the given min ID.
// The conversation with the specified ID will not be included in the response.
-// NOTE: the ID is of the internal conversation, use the Link header for pagination.
+// NOTE: The ID is a status ID. Use the Link header for pagination.
// in: query
// required: false
// -
@@ -108,7 +107,8 @@
// '500':
// description: internal server error
func (m *Module) ConversationsGETHandler(c *gin.Context) {
- if _, err := oauth.Authed(c, true, true, true, true); err != nil {
+ authed, err := oauth.Authed(c, true, true, true, true)
+ if err != nil {
apiutil.ErrorHandler(c, gtserror.NewErrorUnauthorized(err, err.Error()), m.processor.InstanceGetV1)
return
}
@@ -118,5 +118,29 @@ func (m *Module) ConversationsGETHandler(c *gin.Context) {
return
}
- apiutil.Data(c, http.StatusOK, apiutil.AppJSON, apiutil.EmptyJSONArray)
+ page, errWithCode := paging.ParseIDPage(c,
+ 1, // min limit
+ 80, // max limit
+ 40, // default limit
+ )
+ if errWithCode != nil {
+ apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
+ return
+ }
+
+ resp, errWithCode := m.processor.Conversations().GetAll(
+ c.Request.Context(),
+ authed.Account,
+ page,
+ )
+ if errWithCode != nil {
+ apiutil.ErrorHandler(c, errWithCode, m.processor.InstanceGetV1)
+ return
+ }
+
+ if resp.LinkHeader != "" {
+ c.Header("Link", resp.LinkHeader)
+ }
+
+ apiutil.JSON(c, http.StatusOK, resp.Items)
}
diff --git a/internal/cache/cache.go b/internal/cache/cache.go
index 5a8a92ca3..8b0c04ea4 100644
--- a/internal/cache/cache.go
+++ b/internal/cache/cache.go
@@ -60,6 +60,8 @@ func (c *Caches) Init() {
c.initBlockIDs()
c.initBoostOfIDs()
c.initClient()
+ c.initConversation()
+ c.initConversationLastStatusIDs()
c.initDomainAllow()
c.initDomainBlock()
c.initEmoji()
diff --git a/internal/cache/db.go b/internal/cache/db.go
index 50acf00d1..4c063b06d 100644
--- a/internal/cache/db.go
+++ b/internal/cache/db.go
@@ -56,6 +56,12 @@ type GTSCaches struct {
// Client provides access to the gtsmodel Client database cache.
Client StructCache[*gtsmodel.Client]
+ // Conversation provides access to the gtsmodel Conversation database cache.
+ Conversation StructCache[*gtsmodel.Conversation]
+
+ // ConversationLastStatusIDs provides access to the conversation last status IDs database cache.
+ ConversationLastStatusIDs SliceCache[string]
+
// DomainAllow provides access to the domain allow database cache.
DomainAllow *domain.Cache
@@ -426,6 +432,52 @@ func (c *Caches) initClient() {
})
}
+func (c *Caches) initConversation() {
+ cap := calculateResultCacheMax(
+ sizeofConversation(), // model in-mem size.
+ config.GetCacheConversationMemRatio(),
+ )
+
+ log.Infof(nil, "cache size = %d", cap)
+
+ copyF := func(c1 *gtsmodel.Conversation) *gtsmodel.Conversation {
+ c2 := new(gtsmodel.Conversation)
+ *c2 = *c1
+
+ // Don't include ptr fields that
+ // will be populated separately.
+ // See internal/db/bundb/conversation.go.
+ c2.Account = nil
+ c2.OtherAccounts = nil
+ c2.LastStatus = nil
+
+ return c2
+ }
+
+ c.GTS.Conversation.Init(structr.CacheConfig[*gtsmodel.Conversation]{
+ Indices: []structr.IndexConfig{
+ {Fields: "ID"},
+ {Fields: "ThreadID,AccountID,OtherAccountsKey"},
+ {Fields: "AccountID,LastStatusID"},
+ {Fields: "AccountID", Multiple: true},
+ },
+ MaxSize: cap,
+ IgnoreErr: ignoreErrors,
+ Copy: copyF,
+ Invalidate: c.OnInvalidateConversation,
+ })
+}
+
+func (c *Caches) initConversationLastStatusIDs() {
+ cap := calculateSliceCacheMax(
+ config.GetCacheConversationLastStatusIDsMemRatio(),
+ )
+
+ log.Infof(nil, "cache size = %d", cap)
+
+ c.GTS.ConversationLastStatusIDs.Init(0, cap)
+}
+
func (c *Caches) initDomainAllow() {
c.GTS.DomainAllow = new(domain.Cache)
}
diff --git a/internal/cache/invalidate.go b/internal/cache/invalidate.go
index 088e7f91f..987a6eb64 100644
--- a/internal/cache/invalidate.go
+++ b/internal/cache/invalidate.go
@@ -83,6 +83,11 @@ func (c *Caches) OnInvalidateClient(client *gtsmodel.Client) {
c.GTS.Token.Invalidate("ClientID", client.ID)
}
+func (c *Caches) OnInvalidateConversation(conversation *gtsmodel.Conversation) {
+ // Invalidate owning account's conversation list.
+ c.GTS.ConversationLastStatusIDs.Invalidate(conversation.AccountID)
+}
+
func (c *Caches) OnInvalidateEmojiCategory(category *gtsmodel.EmojiCategory) {
// Invalidate any emoji in this category.
c.GTS.Emoji.Invalidate("CategoryID", category.ID)
diff --git a/internal/cache/size.go b/internal/cache/size.go
index 4ec30fbb7..4c474fa28 100644
--- a/internal/cache/size.go
+++ b/internal/cache/size.go
@@ -19,6 +19,7 @@
import (
"crypto/rsa"
+ "strings"
"time"
"unsafe"
@@ -320,6 +321,20 @@ func sizeofClient() uintptr {
}))
}
+func sizeofConversation() uintptr {
+ return uintptr(size.Of(>smodel.Conversation{
+ ID: exampleID,
+ CreatedAt: exampleTime,
+ UpdatedAt: exampleTime,
+ AccountID: exampleID,
+ OtherAccountIDs: []string{exampleID, exampleID, exampleID},
+ OtherAccountsKey: strings.Join([]string{exampleID, exampleID, exampleID}, ","),
+ ThreadID: exampleID,
+ LastStatusID: exampleID,
+ Read: util.Ptr(true),
+ }))
+}
+
func sizeofEmoji() uintptr {
return uintptr(size.Of(>smodel.Emoji{
ID: exampleID,
diff --git a/internal/cache/wrappers.go b/internal/cache/wrappers.go
index edeea9bcd..9cb4fca98 100644
--- a/internal/cache/wrappers.go
+++ b/internal/cache/wrappers.go
@@ -158,6 +158,34 @@ func (c *StructCache[T]) LoadIDs(index string, ids []string, load func([]string)
})
}
+// LoadIDs2Part works as LoadIDs, except using a two-part key,
+// where the first part is an ID shared by all the objects,
+// and the second part is a list of per-object IDs.
+func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, load func(string, []string) ([]T, error)) ([]T, error) {
+ i := c.index[index]
+ if i == nil {
+ // we only perform this check here as
+ // we're going to use the index before
+ // passing it to cache in main .Load().
+ panic("missing index for cache type")
+ }
+
+ // Generate cache keys for two-part IDs.
+ keys := make([]structr.Key, len(id2s))
+ for x, id2 := range id2s {
+ keys[x] = i.Key(id1, id2)
+ }
+
+ // Pass loader callback with wrapper onto main cache load function.
+ return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) {
+ uncachedIDs := make([]string, len(uncached))
+ for i := range uncached {
+ uncachedIDs[i] = uncached[i].Values()[1].(string)
+ }
+ return load(id1, uncachedIDs)
+ })
+}
+
// Store: see structr.Cache{}.Store().
func (c *StructCache[T]) Store(value T, store func() error) error {
return c.cache.Store(value, store)
diff --git a/internal/config/config.go b/internal/config/config.go
index bffa5b455..1b8cf2759 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -191,53 +191,55 @@ type HTTPClientConfiguration struct {
}
type CacheConfiguration struct {
- MemoryTarget bytesize.Size `name:"memory-target"`
- AccountMemRatio float64 `name:"account-mem-ratio"`
- AccountNoteMemRatio float64 `name:"account-note-mem-ratio"`
- AccountSettingsMemRatio float64 `name:"account-settings-mem-ratio"`
- AccountStatsMemRatio float64 `name:"account-stats-mem-ratio"`
- ApplicationMemRatio float64 `name:"application-mem-ratio"`
- BlockMemRatio float64 `name:"block-mem-ratio"`
- BlockIDsMemRatio float64 `name:"block-ids-mem-ratio"`
- BoostOfIDsMemRatio float64 `name:"boost-of-ids-mem-ratio"`
- ClientMemRatio float64 `name:"client-mem-ratio"`
- EmojiMemRatio float64 `name:"emoji-mem-ratio"`
- EmojiCategoryMemRatio float64 `name:"emoji-category-mem-ratio"`
- FilterMemRatio float64 `name:"filter-mem-ratio"`
- FilterKeywordMemRatio float64 `name:"filter-keyword-mem-ratio"`
- FilterStatusMemRatio float64 `name:"filter-status-mem-ratio"`
- FollowMemRatio float64 `name:"follow-mem-ratio"`
- FollowIDsMemRatio float64 `name:"follow-ids-mem-ratio"`
- FollowRequestMemRatio float64 `name:"follow-request-mem-ratio"`
- FollowRequestIDsMemRatio float64 `name:"follow-request-ids-mem-ratio"`
- InReplyToIDsMemRatio float64 `name:"in-reply-to-ids-mem-ratio"`
- InstanceMemRatio float64 `name:"instance-mem-ratio"`
- InteractionApprovalMemRatio float64 `name:"interaction-approval-mem-ratio"`
- ListMemRatio float64 `name:"list-mem-ratio"`
- ListEntryMemRatio float64 `name:"list-entry-mem-ratio"`
- MarkerMemRatio float64 `name:"marker-mem-ratio"`
- MediaMemRatio float64 `name:"media-mem-ratio"`
- MentionMemRatio float64 `name:"mention-mem-ratio"`
- MoveMemRatio float64 `name:"move-mem-ratio"`
- NotificationMemRatio float64 `name:"notification-mem-ratio"`
- PollMemRatio float64 `name:"poll-mem-ratio"`
- PollVoteMemRatio float64 `name:"poll-vote-mem-ratio"`
- PollVoteIDsMemRatio float64 `name:"poll-vote-ids-mem-ratio"`
- ReportMemRatio float64 `name:"report-mem-ratio"`
- StatusMemRatio float64 `name:"status-mem-ratio"`
- StatusBookmarkMemRatio float64 `name:"status-bookmark-mem-ratio"`
- StatusBookmarkIDsMemRatio float64 `name:"status-bookmark-ids-mem-ratio"`
- StatusFaveMemRatio float64 `name:"status-fave-mem-ratio"`
- StatusFaveIDsMemRatio float64 `name:"status-fave-ids-mem-ratio"`
- TagMemRatio float64 `name:"tag-mem-ratio"`
- ThreadMuteMemRatio float64 `name:"thread-mute-mem-ratio"`
- TokenMemRatio float64 `name:"token-mem-ratio"`
- TombstoneMemRatio float64 `name:"tombstone-mem-ratio"`
- UserMemRatio float64 `name:"user-mem-ratio"`
- UserMuteMemRatio float64 `name:"user-mute-mem-ratio"`
- UserMuteIDsMemRatio float64 `name:"user-mute-ids-mem-ratio"`
- WebfingerMemRatio float64 `name:"webfinger-mem-ratio"`
- VisibilityMemRatio float64 `name:"visibility-mem-ratio"`
+ MemoryTarget bytesize.Size `name:"memory-target"`
+ AccountMemRatio float64 `name:"account-mem-ratio"`
+ AccountNoteMemRatio float64 `name:"account-note-mem-ratio"`
+ AccountSettingsMemRatio float64 `name:"account-settings-mem-ratio"`
+ AccountStatsMemRatio float64 `name:"account-stats-mem-ratio"`
+ ApplicationMemRatio float64 `name:"application-mem-ratio"`
+ BlockMemRatio float64 `name:"block-mem-ratio"`
+ BlockIDsMemRatio float64 `name:"block-ids-mem-ratio"`
+ BoostOfIDsMemRatio float64 `name:"boost-of-ids-mem-ratio"`
+ ClientMemRatio float64 `name:"client-mem-ratio"`
+ ConversationMemRatio float64 `name:"conversation-mem-ratio"`
+ ConversationLastStatusIDsMemRatio float64 `name:"conversation-last-status-ids-mem-ratio"`
+ EmojiMemRatio float64 `name:"emoji-mem-ratio"`
+ EmojiCategoryMemRatio float64 `name:"emoji-category-mem-ratio"`
+ FilterMemRatio float64 `name:"filter-mem-ratio"`
+ FilterKeywordMemRatio float64 `name:"filter-keyword-mem-ratio"`
+ FilterStatusMemRatio float64 `name:"filter-status-mem-ratio"`
+ FollowMemRatio float64 `name:"follow-mem-ratio"`
+ FollowIDsMemRatio float64 `name:"follow-ids-mem-ratio"`
+ FollowRequestMemRatio float64 `name:"follow-request-mem-ratio"`
+ FollowRequestIDsMemRatio float64 `name:"follow-request-ids-mem-ratio"`
+ InReplyToIDsMemRatio float64 `name:"in-reply-to-ids-mem-ratio"`
+ InstanceMemRatio float64 `name:"instance-mem-ratio"`
+ InteractionApprovalMemRatio float64 `name:"interaction-approval-mem-ratio"`
+ ListMemRatio float64 `name:"list-mem-ratio"`
+ ListEntryMemRatio float64 `name:"list-entry-mem-ratio"`
+ MarkerMemRatio float64 `name:"marker-mem-ratio"`
+ MediaMemRatio float64 `name:"media-mem-ratio"`
+ MentionMemRatio float64 `name:"mention-mem-ratio"`
+ MoveMemRatio float64 `name:"move-mem-ratio"`
+ NotificationMemRatio float64 `name:"notification-mem-ratio"`
+ PollMemRatio float64 `name:"poll-mem-ratio"`
+ PollVoteMemRatio float64 `name:"poll-vote-mem-ratio"`
+ PollVoteIDsMemRatio float64 `name:"poll-vote-ids-mem-ratio"`
+ ReportMemRatio float64 `name:"report-mem-ratio"`
+ StatusMemRatio float64 `name:"status-mem-ratio"`
+ StatusBookmarkMemRatio float64 `name:"status-bookmark-mem-ratio"`
+ StatusBookmarkIDsMemRatio float64 `name:"status-bookmark-ids-mem-ratio"`
+ StatusFaveMemRatio float64 `name:"status-fave-mem-ratio"`
+ StatusFaveIDsMemRatio float64 `name:"status-fave-ids-mem-ratio"`
+ TagMemRatio float64 `name:"tag-mem-ratio"`
+ ThreadMuteMemRatio float64 `name:"thread-mute-mem-ratio"`
+ TokenMemRatio float64 `name:"token-mem-ratio"`
+ TombstoneMemRatio float64 `name:"tombstone-mem-ratio"`
+ UserMemRatio float64 `name:"user-mem-ratio"`
+ UserMuteMemRatio float64 `name:"user-mute-mem-ratio"`
+ UserMuteIDsMemRatio float64 `name:"user-mute-ids-mem-ratio"`
+ WebfingerMemRatio float64 `name:"webfinger-mem-ratio"`
+ VisibilityMemRatio float64 `name:"visibility-mem-ratio"`
}
// MarshalMap will marshal current Configuration into a map structure (useful for JSON/TOML/YAML).
diff --git a/internal/config/defaults.go b/internal/config/defaults.go
index 267e7b4bc..82ea07e10 100644
--- a/internal/config/defaults.go
+++ b/internal/config/defaults.go
@@ -156,52 +156,54 @@
// when TODO items in the size.go source
// file have been addressed, these should
// be able to make some more sense :D
- AccountMemRatio: 5,
- AccountNoteMemRatio: 1,
- AccountSettingsMemRatio: 0.1,
- AccountStatsMemRatio: 2,
- ApplicationMemRatio: 0.1,
- BlockMemRatio: 2,
- BlockIDsMemRatio: 3,
- BoostOfIDsMemRatio: 3,
- ClientMemRatio: 0.1,
- EmojiMemRatio: 3,
- EmojiCategoryMemRatio: 0.1,
- FilterMemRatio: 0.5,
- FilterKeywordMemRatio: 0.5,
- FilterStatusMemRatio: 0.5,
- FollowMemRatio: 2,
- FollowIDsMemRatio: 4,
- FollowRequestMemRatio: 2,
- FollowRequestIDsMemRatio: 2,
- InReplyToIDsMemRatio: 3,
- InstanceMemRatio: 1,
- InteractionApprovalMemRatio: 1,
- ListMemRatio: 1,
- ListEntryMemRatio: 2,
- MarkerMemRatio: 0.5,
- MediaMemRatio: 4,
- MentionMemRatio: 2,
- MoveMemRatio: 0.1,
- NotificationMemRatio: 2,
- PollMemRatio: 1,
- PollVoteMemRatio: 2,
- PollVoteIDsMemRatio: 2,
- ReportMemRatio: 1,
- StatusMemRatio: 5,
- StatusBookmarkMemRatio: 0.5,
- StatusBookmarkIDsMemRatio: 2,
- StatusFaveMemRatio: 2,
- StatusFaveIDsMemRatio: 3,
- TagMemRatio: 2,
- ThreadMuteMemRatio: 0.2,
- TokenMemRatio: 0.75,
- TombstoneMemRatio: 0.5,
- UserMemRatio: 0.25,
- UserMuteMemRatio: 2,
- UserMuteIDsMemRatio: 3,
- WebfingerMemRatio: 0.1,
- VisibilityMemRatio: 2,
+ AccountMemRatio: 5,
+ AccountNoteMemRatio: 1,
+ AccountSettingsMemRatio: 0.1,
+ AccountStatsMemRatio: 2,
+ ApplicationMemRatio: 0.1,
+ BlockMemRatio: 2,
+ BlockIDsMemRatio: 3,
+ BoostOfIDsMemRatio: 3,
+ ClientMemRatio: 0.1,
+ ConversationMemRatio: 1,
+ ConversationLastStatusIDsMemRatio: 2,
+ EmojiMemRatio: 3,
+ EmojiCategoryMemRatio: 0.1,
+ FilterMemRatio: 0.5,
+ FilterKeywordMemRatio: 0.5,
+ FilterStatusMemRatio: 0.5,
+ FollowMemRatio: 2,
+ FollowIDsMemRatio: 4,
+ FollowRequestMemRatio: 2,
+ FollowRequestIDsMemRatio: 2,
+ InReplyToIDsMemRatio: 3,
+ InstanceMemRatio: 1,
+ InteractionApprovalMemRatio: 1,
+ ListMemRatio: 1,
+ ListEntryMemRatio: 2,
+ MarkerMemRatio: 0.5,
+ MediaMemRatio: 4,
+ MentionMemRatio: 2,
+ MoveMemRatio: 0.1,
+ NotificationMemRatio: 2,
+ PollMemRatio: 1,
+ PollVoteMemRatio: 2,
+ PollVoteIDsMemRatio: 2,
+ ReportMemRatio: 1,
+ StatusMemRatio: 5,
+ StatusBookmarkMemRatio: 0.5,
+ StatusBookmarkIDsMemRatio: 2,
+ StatusFaveMemRatio: 2,
+ StatusFaveIDsMemRatio: 3,
+ TagMemRatio: 2,
+ ThreadMuteMemRatio: 0.2,
+ TokenMemRatio: 0.75,
+ TombstoneMemRatio: 0.5,
+ UserMemRatio: 0.25,
+ UserMuteMemRatio: 2,
+ UserMuteIDsMemRatio: 3,
+ WebfingerMemRatio: 0.1,
+ VisibilityMemRatio: 2,
},
HTTPClient: HTTPClientConfiguration{
diff --git a/internal/config/helpers.gen.go b/internal/config/helpers.gen.go
index 8c27da439..932cb802d 100644
--- a/internal/config/helpers.gen.go
+++ b/internal/config/helpers.gen.go
@@ -2975,6 +2975,62 @@ func GetCacheClientMemRatio() float64 { return global.GetCacheClientMemRatio() }
// SetCacheClientMemRatio safely sets the value for global configuration 'Cache.ClientMemRatio' field
func SetCacheClientMemRatio(v float64) { global.SetCacheClientMemRatio(v) }
+// GetCacheConversationMemRatio safely fetches the Configuration value for state's 'Cache.ConversationMemRatio' field
+func (st *ConfigState) GetCacheConversationMemRatio() (v float64) {
+ st.mutex.RLock()
+ v = st.config.Cache.ConversationMemRatio
+ st.mutex.RUnlock()
+ return
+}
+
+// SetCacheConversationMemRatio safely sets the Configuration value for state's 'Cache.ConversationMemRatio' field
+func (st *ConfigState) SetCacheConversationMemRatio(v float64) {
+ st.mutex.Lock()
+ defer st.mutex.Unlock()
+ st.config.Cache.ConversationMemRatio = v
+ st.reloadToViper()
+}
+
+// CacheConversationMemRatioFlag returns the flag name for the 'Cache.ConversationMemRatio' field
+func CacheConversationMemRatioFlag() string { return "cache-conversation-mem-ratio" }
+
+// GetCacheConversationMemRatio safely fetches the value for global configuration 'Cache.ConversationMemRatio' field
+func GetCacheConversationMemRatio() float64 { return global.GetCacheConversationMemRatio() }
+
+// SetCacheConversationMemRatio safely sets the value for global configuration 'Cache.ConversationMemRatio' field
+func SetCacheConversationMemRatio(v float64) { global.SetCacheConversationMemRatio(v) }
+
+// GetCacheConversationLastStatusIDsMemRatio safely fetches the Configuration value for state's 'Cache.ConversationLastStatusIDsMemRatio' field
+func (st *ConfigState) GetCacheConversationLastStatusIDsMemRatio() (v float64) {
+ st.mutex.RLock()
+ v = st.config.Cache.ConversationLastStatusIDsMemRatio
+ st.mutex.RUnlock()
+ return
+}
+
+// SetCacheConversationLastStatusIDsMemRatio safely sets the Configuration value for state's 'Cache.ConversationLastStatusIDsMemRatio' field
+func (st *ConfigState) SetCacheConversationLastStatusIDsMemRatio(v float64) {
+ st.mutex.Lock()
+ defer st.mutex.Unlock()
+ st.config.Cache.ConversationLastStatusIDsMemRatio = v
+ st.reloadToViper()
+}
+
+// CacheConversationLastStatusIDsMemRatioFlag returns the flag name for the 'Cache.ConversationLastStatusIDsMemRatio' field
+func CacheConversationLastStatusIDsMemRatioFlag() string {
+ return "cache-conversation-last-status-ids-mem-ratio"
+}
+
+// GetCacheConversationLastStatusIDsMemRatio safely fetches the value for global configuration 'Cache.ConversationLastStatusIDsMemRatio' field
+func GetCacheConversationLastStatusIDsMemRatio() float64 {
+ return global.GetCacheConversationLastStatusIDsMemRatio()
+}
+
+// SetCacheConversationLastStatusIDsMemRatio safely sets the value for global configuration 'Cache.ConversationLastStatusIDsMemRatio' field
+func SetCacheConversationLastStatusIDsMemRatio(v float64) {
+ global.SetCacheConversationLastStatusIDsMemRatio(v)
+}
+
// GetCacheEmojiMemRatio safely fetches the Configuration value for state's 'Cache.EmojiMemRatio' field
func (st *ConfigState) GetCacheEmojiMemRatio() (v float64) {
st.mutex.RLock()
diff --git a/internal/db/advancedmigration.go b/internal/db/advancedmigration.go
new file mode 100644
index 000000000..2b4601bdb
--- /dev/null
+++ b/internal/db/advancedmigration.go
@@ -0,0 +1,29 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package db
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+)
+
+type AdvancedMigration interface {
+ GetAdvancedMigration(ctx context.Context, id string) (*gtsmodel.AdvancedMigration, error)
+ PutAdvancedMigration(ctx context.Context, advancedMigration *gtsmodel.AdvancedMigration) error
+}
diff --git a/internal/db/bundb/advancedmigration.go b/internal/db/bundb/advancedmigration.go
new file mode 100644
index 000000000..2a0ec93e6
--- /dev/null
+++ b/internal/db/bundb/advancedmigration.go
@@ -0,0 +1,52 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package bundb
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/uptrace/bun"
+)
+
+type advancedMigrationDB struct {
+ db *bun.DB
+ state *state.State
+}
+
+func (a *advancedMigrationDB) GetAdvancedMigration(ctx context.Context, id string) (*gtsmodel.AdvancedMigration, error) {
+ var advancedMigration gtsmodel.AdvancedMigration
+ err := a.db.NewSelect().
+ Model(&advancedMigration).
+ Where("? = ?", bun.Ident("id"), id).
+ Limit(1).
+ Scan(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return &advancedMigration, nil
+}
+
+func (a *advancedMigrationDB) PutAdvancedMigration(ctx context.Context, advancedMigration *gtsmodel.AdvancedMigration) error {
+ _, err := NewUpsert(a.db).
+ Model(advancedMigration).
+ Constraint("id").
+ Exec(ctx)
+ return err
+}
diff --git a/internal/db/bundb/bundb.go b/internal/db/bundb/bundb.go
index 57fb661df..070d4eb91 100644
--- a/internal/db/bundb/bundb.go
+++ b/internal/db/bundb/bundb.go
@@ -54,8 +54,10 @@
type DBService struct {
db.Account
db.Admin
+ db.AdvancedMigration
db.Application
db.Basic
+ db.Conversation
db.Domain
db.Emoji
db.HeaderFilter
@@ -158,6 +160,7 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) {
// https://bun.uptrace.dev/orm/many-to-many-relation/
for _, t := range []interface{}{
>smodel.AccountToEmoji{},
+ >smodel.ConversationToStatus{},
>smodel.StatusToEmoji{},
>smodel.StatusToTag{},
>smodel.ThreadToStatus{},
@@ -181,6 +184,10 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) {
db: db,
state: state,
},
+ AdvancedMigration: &advancedMigrationDB{
+ db: db,
+ state: state,
+ },
Application: &applicationDB{
db: db,
state: state,
@@ -188,6 +195,10 @@ func NewBunDBService(ctx context.Context, state *state.State) (db.DB, error) {
Basic: &basicDB{
db: db,
},
+ Conversation: &conversationDB{
+ db: db,
+ state: state,
+ },
Domain: &domainDB{
db: db,
state: state,
diff --git a/internal/db/bundb/conversation.go b/internal/db/bundb/conversation.go
new file mode 100644
index 000000000..1a3958a79
--- /dev/null
+++ b/internal/db/bundb/conversation.go
@@ -0,0 +1,494 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package bundb
+
+import (
+ "context"
+ "errors"
+ "slices"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/paging"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+ "github.com/uptrace/bun"
+ "github.com/uptrace/bun/dialect"
+)
+
+type conversationDB struct {
+ db *bun.DB
+ state *state.State
+}
+
+func (c *conversationDB) GetConversationByID(ctx context.Context, id string) (*gtsmodel.Conversation, error) {
+ return c.getConversation(
+ ctx,
+ "ID",
+ func(conversation *gtsmodel.Conversation) error {
+ return c.db.
+ NewSelect().
+ Model(conversation).
+ Where("? = ?", bun.Ident("id"), id).
+ Scan(ctx)
+ },
+ id,
+ )
+}
+
+func (c *conversationDB) GetConversationByThreadAndAccountIDs(ctx context.Context, threadID string, accountID string, otherAccountIDs []string) (*gtsmodel.Conversation, error) {
+ otherAccountsKey := gtsmodel.ConversationOtherAccountsKey(otherAccountIDs)
+ return c.getConversation(
+ ctx,
+ "ThreadID,AccountID,OtherAccountsKey",
+ func(conversation *gtsmodel.Conversation) error {
+ return c.db.
+ NewSelect().
+ Model(conversation).
+ Where("? = ?", bun.Ident("thread_id"), threadID).
+ Where("? = ?", bun.Ident("account_id"), accountID).
+ Where("? = ?", bun.Ident("other_accounts_key"), otherAccountsKey).
+ Scan(ctx)
+ },
+ threadID,
+ accountID,
+ otherAccountsKey,
+ )
+}
+
+func (c *conversationDB) getConversation(
+ ctx context.Context,
+ lookup string,
+ dbQuery func(conversation *gtsmodel.Conversation) error,
+ keyParts ...any,
+) (*gtsmodel.Conversation, error) {
+ // Fetch conversation from cache with loader callback
+ conversation, err := c.state.Caches.GTS.Conversation.LoadOne(lookup, func() (*gtsmodel.Conversation, error) {
+ var conversation gtsmodel.Conversation
+
+ // Not cached! Perform database query
+ if err := dbQuery(&conversation); err != nil {
+ return nil, err
+ }
+
+ return &conversation, nil
+ }, keyParts...)
+ if err != nil {
+ // already processe
+ return nil, err
+ }
+
+ if gtscontext.Barebones(ctx) {
+ // Only a barebones model was requested.
+ return conversation, nil
+ }
+
+ if err := c.populateConversation(ctx, conversation); err != nil {
+ return nil, err
+ }
+
+ return conversation, nil
+}
+
+func (c *conversationDB) populateConversation(ctx context.Context, conversation *gtsmodel.Conversation) error {
+ var (
+ errs gtserror.MultiError
+ err error
+ )
+
+ if conversation.Account == nil {
+ conversation.Account, err = c.state.DB.GetAccountByID(
+ gtscontext.SetBarebones(ctx),
+ conversation.AccountID,
+ )
+ if err != nil {
+ errs.Appendf("error populating conversation owner account: %w", err)
+ }
+ }
+
+ if conversation.OtherAccounts == nil {
+ conversation.OtherAccounts, err = c.state.DB.GetAccountsByIDs(
+ gtscontext.SetBarebones(ctx),
+ conversation.OtherAccountIDs,
+ )
+ if err != nil {
+ errs.Appendf("error populating other conversation accounts: %w", err)
+ }
+ }
+
+ if conversation.LastStatus == nil && conversation.LastStatusID != "" {
+ conversation.LastStatus, err = c.state.DB.GetStatusByID(
+ gtscontext.SetBarebones(ctx),
+ conversation.LastStatusID,
+ )
+ if err != nil {
+ errs.Appendf("error populating conversation last status: %w", err)
+ }
+ }
+
+ return errs.Combine()
+}
+
+func (c *conversationDB) GetConversationsByOwnerAccountID(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Conversation, error) {
+ conversationLastStatusIDs, err := c.getAccountConversationLastStatusIDs(ctx, accountID, page)
+ if err != nil {
+ return nil, err
+ }
+ return c.getConversationsByLastStatusIDs(ctx, accountID, conversationLastStatusIDs)
+}
+
+func (c *conversationDB) getAccountConversationLastStatusIDs(ctx context.Context, accountID string, page *paging.Page) ([]string, error) {
+ return loadPagedIDs(&c.state.Caches.GTS.ConversationLastStatusIDs, accountID, page, func() ([]string, error) {
+ var conversationLastStatusIDs []string
+
+ // Conversation last status IDs not in cache. Perform DB query.
+ if _, err := c.db.
+ NewSelect().
+ Model((*gtsmodel.Conversation)(nil)).
+ Column("last_status_id").
+ Where("? = ?", bun.Ident("account_id"), accountID).
+ OrderExpr("? DESC", bun.Ident("last_status_id")).
+ Exec(ctx, &conversationLastStatusIDs); // nocollapse
+ err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return nil, err
+ }
+
+ return conversationLastStatusIDs, nil
+ })
+}
+
+func (c *conversationDB) getConversationsByLastStatusIDs(
+ ctx context.Context,
+ accountID string,
+ conversationLastStatusIDs []string,
+) ([]*gtsmodel.Conversation, error) {
+ // Load all conversation IDs via cache loader callbacks.
+ conversations, err := c.state.Caches.GTS.Conversation.LoadIDs2Part(
+ "AccountID,LastStatusID",
+ accountID,
+ conversationLastStatusIDs,
+ func(accountID string, uncached []string) ([]*gtsmodel.Conversation, error) {
+ // Preallocate expected length of uncached conversations.
+ conversations := make([]*gtsmodel.Conversation, 0, len(uncached))
+
+ // Perform database query scanning the remaining (uncached) IDs.
+ if err := c.db.NewSelect().
+ Model(&conversations).
+ Where("? = ?", bun.Ident("account_id"), accountID).
+ Where("? IN (?)", bun.Ident("last_status_id"), bun.In(uncached)).
+ Scan(ctx); err != nil {
+ return nil, err
+ }
+
+ return conversations, nil
+ },
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // Reorder the conversations by their last status IDs to ensure correct order.
+ getID := func(b *gtsmodel.Conversation) string { return b.ID }
+ util.OrderBy(conversations, conversationLastStatusIDs, getID)
+
+ if gtscontext.Barebones(ctx) {
+ // no need to fully populate.
+ return conversations, nil
+ }
+
+ // Populate all loaded conversations, removing those we fail to populate.
+ conversations = slices.DeleteFunc(conversations, func(conversation *gtsmodel.Conversation) bool {
+ if err := c.populateConversation(ctx, conversation); err != nil {
+ log.Errorf(ctx, "error populating conversation %s: %v", conversation.ID, err)
+ return true
+ }
+ return false
+ })
+
+ return conversations, nil
+}
+
+func (c *conversationDB) UpsertConversation(ctx context.Context, conversation *gtsmodel.Conversation, columns ...string) error {
+ // If we're updating by column, ensure "updated_at" is included.
+ if len(columns) > 0 {
+ columns = append(columns, "updated_at")
+ }
+
+ return c.state.Caches.GTS.Conversation.Store(conversation, func() error {
+ _, err := NewUpsert(c.db).
+ Model(conversation).
+ Constraint("id").
+ Column(columns...).
+ Exec(ctx)
+ return err
+ })
+}
+
+func (c *conversationDB) LinkConversationToStatus(ctx context.Context, conversationID string, statusID string) error {
+ conversationToStatus := >smodel.ConversationToStatus{
+ ConversationID: conversationID,
+ StatusID: statusID,
+ }
+
+ if _, err := c.db.NewInsert().
+ Model(conversationToStatus).
+ Exec(ctx); // nocollapse
+ err != nil {
+ return err
+ }
+ return nil
+}
+
+func (c *conversationDB) DeleteConversationByID(ctx context.Context, id string) error {
+ // Load conversation into cache before attempting a delete,
+ // as we need it cached in order to trigger the invalidate
+ // callback. This in turn invalidates others.
+ _, err := c.GetConversationByID(gtscontext.SetBarebones(ctx), id)
+ if err != nil {
+ if errors.Is(err, db.ErrNoEntries) {
+ // not an issue.
+ err = nil
+ }
+ return err
+ }
+
+ // Drop this now-cached conversation on return after delete.
+ defer c.state.Caches.GTS.Conversation.Invalidate("ID", id)
+
+ // Finally delete conversation from DB.
+ _, err = c.db.NewDelete().
+ Model((*gtsmodel.Conversation)(nil)).
+ Where("? = ?", bun.Ident("id"), id).
+ Exec(ctx)
+ return err
+}
+
+func (c *conversationDB) DeleteConversationsByOwnerAccountID(ctx context.Context, accountID string) error {
+ defer func() {
+ // Invalidate any cached conversations and conversation IDs owned by this account on return.
+ // Conversation invalidate hooks only invalidate the conversation ID cache,
+ // so we don't need to load all conversations into the cache to run invalidation hooks,
+ // as with some other object types (blocks, for example).
+ c.state.Caches.GTS.Conversation.Invalidate("AccountID", accountID)
+ // In case there were no cached conversations,
+ // explicitly invalidate the user's conversation last status ID cache.
+ c.state.Caches.GTS.ConversationLastStatusIDs.Invalidate(accountID)
+ }()
+
+ return c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
+ // Delete conversations matching the account ID.
+ deletedConversationIDs := []string{}
+ if err := tx.NewDelete().
+ Model((*gtsmodel.Conversation)(nil)).
+ Where("? = ?", bun.Ident("account_id"), accountID).
+ Returning("?", bun.Ident("id")).
+ Scan(ctx, &deletedConversationIDs); // nocollapse
+ err != nil {
+ return gtserror.Newf("error deleting conversations for account %s: %w", accountID, err)
+ }
+
+ // Delete any conversation-to-status links matching the deleted conversation IDs.
+ if _, err := tx.NewDelete().
+ Model((*gtsmodel.ConversationToStatus)(nil)).
+ Where("? IN (?)", bun.Ident("conversation_id"), bun.In(deletedConversationIDs)).
+ Exec(ctx); // nocollapse
+ err != nil {
+ return gtserror.Newf("error deleting conversation-to-status links for account %s: %w", accountID, err)
+ }
+
+ return nil
+ })
+}
+
+func (c *conversationDB) DeleteStatusFromConversations(ctx context.Context, statusID string) error {
+ // SQL returning the current time.
+ var nowSQL string
+ switch c.db.Dialect().Name() {
+ case dialect.SQLite:
+ nowSQL = "DATE('now')"
+ case dialect.PG:
+ nowSQL = "NOW()"
+ default:
+ log.Panicf(nil, "db conn %s was neither pg nor sqlite", c.db)
+ }
+
+ updatedConversationIDs := []string{}
+ deletedConversationIDs := []string{}
+
+ if err := c.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
+ // Delete this status from conversation-to-status links.
+ if _, err := tx.NewDelete().
+ Model((*gtsmodel.ConversationToStatus)(nil)).
+ Where("? = ?", bun.Ident("status_id"), statusID).
+ Exec(ctx); // nocollapse
+ err != nil {
+ return gtserror.Newf("error deleting conversation-to-status links while deleting status %s: %w", statusID, err)
+ }
+
+ // Note: Bun doesn't currently support CREATE TABLE … AS SELECT … so we need to use raw queries here.
+
+ // Create a temporary table with all statuses other than the deleted status
+ // in each conversation for which the deleted status is the last status
+ // (if there are such statuses).
+ conversationStatusesTempTable := "conversation_statuses_" + id.NewULID()
+ if _, err := tx.NewRaw(
+ "CREATE TEMPORARY TABLE ? AS ?",
+ bun.Ident(conversationStatusesTempTable),
+ tx.NewSelect().
+ ColumnExpr(
+ "? AS ?",
+ bun.Ident("conversations.id"),
+ bun.Ident("conversation_id"),
+ ).
+ ColumnExpr(
+ "? AS ?",
+ bun.Ident("conversation_to_statuses.status_id"),
+ bun.Ident("id"),
+ ).
+ Column("statuses.created_at").
+ Table("conversations").
+ Join("LEFT JOIN ?", bun.Ident("conversation_to_statuses")).
+ JoinOn(
+ "? = ?",
+ bun.Ident("conversations.id"),
+ bun.Ident("conversation_to_statuses.conversation_id"),
+ ).
+ JoinOn(
+ "? != ?",
+ bun.Ident("conversation_to_statuses.status_id"),
+ statusID,
+ ).
+ Join("LEFT JOIN ?", bun.Ident("statuses")).
+ JoinOn(
+ "? = ?",
+ bun.Ident("conversation_to_statuses.status_id"),
+ bun.Ident("statuses.id"),
+ ).
+ Where(
+ "? = ?",
+ bun.Ident("conversations.last_status_id"),
+ statusID,
+ ),
+ ).
+ Exec(ctx); // nocollapse
+ err != nil {
+ return gtserror.Newf("error creating conversationStatusesTempTable while deleting status %s: %w", statusID, err)
+ }
+
+ // Create a temporary table with the most recently created status in each conversation
+ // for which the deleted status is the last status (if there is such a status).
+ latestConversationStatusesTempTable := "latest_conversation_statuses_" + id.NewULID()
+ if _, err := tx.NewRaw(
+ "CREATE TEMPORARY TABLE ? AS ?",
+ bun.Ident(latestConversationStatusesTempTable),
+ tx.NewSelect().
+ Column(
+ "conversation_statuses.conversation_id",
+ "conversation_statuses.id",
+ ).
+ TableExpr(
+ "? AS ?",
+ bun.Ident(conversationStatusesTempTable),
+ bun.Ident("conversation_statuses"),
+ ).
+ Join(
+ "LEFT JOIN ? AS ?",
+ bun.Ident(conversationStatusesTempTable),
+ bun.Ident("later_statuses"),
+ ).
+ JoinOn(
+ "? = ?",
+ bun.Ident("conversation_statuses.conversation_id"),
+ bun.Ident("later_statuses.conversation_id"),
+ ).
+ JoinOn(
+ "? > ?",
+ bun.Ident("later_statuses.created_at"),
+ bun.Ident("conversation_statuses.created_at"),
+ ).
+ Where("? IS NULL", bun.Ident("later_statuses.id")),
+ ).
+ Exec(ctx); // nocollapse
+ err != nil {
+ return gtserror.Newf("error creating latestConversationStatusesTempTable while deleting status %s: %w", statusID, err)
+ }
+
+ // For every conversation where the given status was the last one,
+ // reset its last status to the most recently created in the conversation other than that one,
+ // if there is such a status.
+ // Return conversation IDs for invalidation.
+ if err := tx.NewUpdate().
+ Model((*gtsmodel.Conversation)(nil)).
+ SetColumn("last_status_id", "?", bun.Ident("latest_conversation_statuses.id")).
+ SetColumn("updated_at", "?", bun.Safe(nowSQL)).
+ TableExpr("? AS ?", bun.Ident(latestConversationStatusesTempTable), bun.Ident("latest_conversation_statuses")).
+ Where("?TableAlias.? = ?", bun.Ident("id"), bun.Ident("latest_conversation_statuses.conversation_id")).
+ Where("? IS NOT NULL", bun.Ident("latest_conversation_statuses.id")).
+ Returning("?TableName.?", bun.Ident("id")).
+ Scan(ctx, &updatedConversationIDs); // nocollapse
+ err != nil {
+ return gtserror.Newf("error rolling back last status for conversation while deleting status %s: %w", statusID, err)
+ }
+
+ // If there is no such status, delete the conversation.
+ // Return conversation IDs for invalidation.
+ if err := tx.NewDelete().
+ Model((*gtsmodel.Conversation)(nil)).
+ Where(
+ "? IN (?)",
+ bun.Ident("id"),
+ tx.NewSelect().
+ Table(latestConversationStatusesTempTable).
+ Column("conversation_id").
+ Where("? IS NULL", bun.Ident("id")),
+ ).
+ Returning("?", bun.Ident("id")).
+ Scan(ctx, &deletedConversationIDs); // nocollapse
+ err != nil {
+ return gtserror.Newf("error deleting conversation while deleting status %s: %w", statusID, err)
+ }
+
+ // Clean up.
+ for _, tempTable := range []string{
+ conversationStatusesTempTable,
+ latestConversationStatusesTempTable,
+ } {
+ if _, err := tx.NewDropTable().Table(tempTable).Exec(ctx); err != nil {
+ return gtserror.Newf(
+ "error dropping temporary table %s after deleting status %s: %w",
+ tempTable,
+ statusID,
+ err,
+ )
+ }
+ }
+
+ return nil
+ }); err != nil {
+ return err
+ }
+
+ updatedConversationIDs = append(updatedConversationIDs, deletedConversationIDs...)
+ c.state.Caches.GTS.Conversation.InvalidateIDs("ID", updatedConversationIDs)
+
+ return nil
+}
diff --git a/internal/db/bundb/conversation_test.go b/internal/db/bundb/conversation_test.go
new file mode 100644
index 000000000..24d35d482
--- /dev/null
+++ b/internal/db/bundb/conversation_test.go
@@ -0,0 +1,115 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package bundb_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/suite"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/db/test"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+)
+
+type ConversationTestSuite struct {
+ BunDBStandardTestSuite
+
+ cf test.ConversationFactory
+
+ // testAccount is the owner of statuses and conversations in these tests (must be local).
+ testAccount *gtsmodel.Account
+ // threadID is the thread used for statuses in any given test.
+ threadID string
+}
+
+func (suite *ConversationTestSuite) SetupSuite() {
+ suite.BunDBStandardTestSuite.SetupSuite()
+
+ suite.cf.SetupSuite(suite)
+
+ suite.testAccount = suite.testAccounts["local_account_1"]
+}
+
+func (suite *ConversationTestSuite) SetupTest() {
+ suite.BunDBStandardTestSuite.SetupTest()
+
+ suite.cf.SetupTest(suite.db)
+
+ suite.threadID = suite.cf.NewULID(0)
+}
+
+// deleteStatus deletes a status from conversations and ends the test if that fails.
+func (suite *ConversationTestSuite) deleteStatus(statusID string) {
+ err := suite.db.DeleteStatusFromConversations(context.Background(), statusID)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+}
+
+// getConversation fetches a conversation by ID and ends the test if that fails.
+func (suite *ConversationTestSuite) getConversation(conversationID string) *gtsmodel.Conversation {
+ conversation, err := suite.db.GetConversationByID(context.Background(), conversationID)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+ return conversation
+}
+
+// If we delete a status that is in a conversation but not the last status,
+// the conversation's last status should not change.
+func (suite *ConversationTestSuite) TestDeleteNonLastStatus() {
+ conversation := suite.cf.NewTestConversation(suite.testAccount, 0)
+ initial := conversation.LastStatus
+ reply := suite.cf.NewTestStatus(suite.testAccount, conversation.ThreadID, 1*time.Second, initial)
+ conversation = suite.cf.SetLastStatus(conversation, reply)
+
+ suite.deleteStatus(initial.ID)
+ conversation = suite.getConversation(conversation.ID)
+ suite.Equal(reply.ID, conversation.LastStatusID)
+}
+
+// If we delete the last status in a conversation that has other statuses,
+// a previous status should become the new last status.
+func (suite *ConversationTestSuite) TestDeleteLastStatus() {
+ conversation := suite.cf.NewTestConversation(suite.testAccount, 0)
+ initial := conversation.LastStatus
+ reply := suite.cf.NewTestStatus(suite.testAccount, conversation.ThreadID, 1*time.Second, initial)
+ conversation = suite.cf.SetLastStatus(conversation, reply)
+ conversation = suite.getConversation(conversation.ID)
+
+ suite.deleteStatus(reply.ID)
+ conversation = suite.getConversation(conversation.ID)
+ suite.Equal(initial.ID, conversation.LastStatusID)
+}
+
+// If we delete the only status in a conversation,
+// the conversation should be deleted as well.
+func (suite *ConversationTestSuite) TestDeleteOnlyStatus() {
+ conversation := suite.cf.NewTestConversation(suite.testAccount, 0)
+ initial := conversation.LastStatus
+
+ suite.deleteStatus(initial.ID)
+ _, err := suite.db.GetConversationByID(context.Background(), conversation.ID)
+ suite.ErrorIs(err, db.ErrNoEntries)
+}
+
+func TestConversationTestSuite(t *testing.T) {
+ suite.Run(t, new(ConversationTestSuite))
+}
diff --git a/internal/db/bundb/migrations/20240611190733_add_conversations.go b/internal/db/bundb/migrations/20240611190733_add_conversations.go
new file mode 100644
index 000000000..25b226aff
--- /dev/null
+++ b/internal/db/bundb/migrations/20240611190733_add_conversations.go
@@ -0,0 +1,78 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package migrations
+
+import (
+ "context"
+
+ gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/uptrace/bun"
+)
+
+// Note: this migration has an advanced migration followup.
+// See Conversations.MigrateDMs().
+func init() {
+ up := func(ctx context.Context, db *bun.DB) error {
+ return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
+ for _, model := range []interface{}{
+ >smodel.Conversation{},
+ >smodel.ConversationToStatus{},
+ } {
+ if _, err := tx.
+ NewCreateTable().
+ Model(model).
+ IfNotExists().
+ Exec(ctx); err != nil {
+ return err
+ }
+ }
+
+ // Add indexes to the conversations table.
+ for index, columns := range map[string][]string{
+ "conversations_account_id_idx": {
+ "account_id",
+ },
+ "conversations_last_status_id_idx": {
+ "last_status_id",
+ },
+ } {
+ if _, err := tx.
+ NewCreateIndex().
+ Model(>smodel.Conversation{}).
+ Index(index).
+ Column(columns...).
+ IfNotExists().
+ Exec(ctx); err != nil {
+ return err
+ }
+ }
+
+ return nil
+ })
+ }
+
+ down := func(ctx context.Context, db *bun.DB) error {
+ return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
+ return nil
+ })
+ }
+
+ if err := Migrations.Register(up, down); err != nil {
+ panic(err)
+ }
+}
diff --git a/internal/db/bundb/migrations/20240712005536_add_advanced_migrations.go b/internal/db/bundb/migrations/20240712005536_add_advanced_migrations.go
new file mode 100644
index 000000000..183065285
--- /dev/null
+++ b/internal/db/bundb/migrations/20240712005536_add_advanced_migrations.go
@@ -0,0 +1,49 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package migrations
+
+import (
+ "context"
+
+ gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/uptrace/bun"
+)
+
+// Create the advanced migrations table.
+func init() {
+ up := func(ctx context.Context, db *bun.DB) error {
+ return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
+ _, err := tx.
+ NewCreateTable().
+ Model((*gtsmodel.AdvancedMigration)(nil)).
+ IfNotExists().
+ Exec(ctx)
+ return err
+ })
+ }
+
+ down := func(ctx context.Context, db *bun.DB) error {
+ return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
+ return nil
+ })
+ }
+
+ if err := Migrations.Register(up, down); err != nil {
+ panic(err)
+ }
+}
diff --git a/internal/db/bundb/status.go b/internal/db/bundb/status.go
index dfb97cff1..b0ed32e0e 100644
--- a/internal/db/bundb/status.go
+++ b/internal/db/bundb/status.go
@@ -682,3 +682,35 @@ func (s *statusDB) getStatusBoostIDs(ctx context.Context, statusID string) ([]st
return statusIDs, nil
})
}
+
+func (s *statusDB) MaxDirectStatusID(ctx context.Context) (string, error) {
+ maxID := ""
+ if err := s.db.
+ NewSelect().
+ Model((*gtsmodel.Status)(nil)).
+ ColumnExpr("COALESCE(MAX(?), '')", bun.Ident("id")).
+ Where("? = ?", bun.Ident("visibility"), gtsmodel.VisibilityDirect).
+ Scan(ctx, &maxID); // nocollapse
+ err != nil {
+ return "", err
+ }
+ return maxID, nil
+}
+
+func (s *statusDB) GetDirectStatusIDsBatch(ctx context.Context, minID string, maxIDInclusive string, count int) ([]string, error) {
+ var statusIDs []string
+ if err := s.db.
+ NewSelect().
+ Model((*gtsmodel.Status)(nil)).
+ Column("id").
+ Where("? = ?", bun.Ident("visibility"), gtsmodel.VisibilityDirect).
+ Where("? > ?", bun.Ident("id"), minID).
+ Where("? <= ?", bun.Ident("id"), maxIDInclusive).
+ Order("id ASC").
+ Limit(count).
+ Scan(ctx, &statusIDs); // nocollapse
+ err != nil {
+ return nil, err
+ }
+ return statusIDs, nil
+}
diff --git a/internal/db/conversation.go b/internal/db/conversation.go
new file mode 100644
index 000000000..3d0b4213e
--- /dev/null
+++ b/internal/db/conversation.go
@@ -0,0 +1,52 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package db
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/paging"
+)
+
+type Conversation interface {
+ // GetConversationByID gets a single conversation by ID.
+ GetConversationByID(ctx context.Context, id string) (*gtsmodel.Conversation, error)
+
+ // GetConversationByThreadAndAccountIDs retrieves a conversation by thread ID and participant account IDs, if it exists.
+ GetConversationByThreadAndAccountIDs(ctx context.Context, threadID string, accountID string, otherAccountIDs []string) (*gtsmodel.Conversation, error)
+
+ // GetConversationsByOwnerAccountID gets all conversations owned by the given account,
+ // with optional paging based on last status ID.
+ GetConversationsByOwnerAccountID(ctx context.Context, accountID string, page *paging.Page) ([]*gtsmodel.Conversation, error)
+
+ // UpsertConversation creates or updates a conversation.
+ UpsertConversation(ctx context.Context, conversation *gtsmodel.Conversation, columns ...string) error
+
+ // LinkConversationToStatus creates a conversation-to-status link.
+ LinkConversationToStatus(ctx context.Context, statusID string, conversationID string) error
+
+ // DeleteConversationByID deletes a conversation, removing it from the owning account's conversation list.
+ DeleteConversationByID(ctx context.Context, id string) error
+
+ // DeleteConversationsByOwnerAccountID deletes all conversations owned by the given account.
+ DeleteConversationsByOwnerAccountID(ctx context.Context, accountID string) error
+
+ // DeleteStatusFromConversations handles when a status is deleted by updating or deleting conversations for which it was the last status.
+ DeleteStatusFromConversations(ctx context.Context, statusID string) error
+}
diff --git a/internal/db/db.go b/internal/db/db.go
index a148d778a..4b2152732 100644
--- a/internal/db/db.go
+++ b/internal/db/db.go
@@ -26,8 +26,10 @@
type DB interface {
Account
Admin
+ AdvancedMigration
Application
Basic
+ Conversation
Domain
Emoji
HeaderFilter
diff --git a/internal/db/status.go b/internal/db/status.go
index 88ae12a12..ade900728 100644
--- a/internal/db/status.go
+++ b/internal/db/status.go
@@ -78,4 +78,16 @@ type Status interface {
// GetStatusChildren gets the child statuses of a given status.
GetStatusChildren(ctx context.Context, statusID string) ([]*gtsmodel.Status, error)
+
+ // MaxDirectStatusID returns the newest ID across all DM statuses.
+ // Returns the empty string with no error if there are no DM statuses yet.
+ // It is used only by the conversation advanced migration.
+ MaxDirectStatusID(ctx context.Context) (string, error)
+
+ // GetDirectStatusIDsBatch returns up to count DM status IDs strictly greater than minID
+ // and less than or equal to maxIDInclusive. Note that this is different from most of our paging,
+ // which uses a maxID and returns IDs strictly less than that, because it's called with the result of
+ // MaxDirectStatusID, and expects to eventually return the status with that ID.
+ // It is used only by the conversation advanced migration.
+ GetDirectStatusIDsBatch(ctx context.Context, minID string, maxIDInclusive string, count int) ([]string, error)
}
diff --git a/internal/db/test/conversation.go b/internal/db/test/conversation.go
new file mode 100644
index 000000000..95713927e
--- /dev/null
+++ b/internal/db/test/conversation.go
@@ -0,0 +1,122 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package test
+
+import (
+ "context"
+ "crypto/rand"
+ "time"
+
+ "github.com/oklog/ulid"
+ "github.com/superseriousbusiness/gotosocial/internal/ap"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+type testSuite interface {
+ FailNow(string, ...interface{}) bool
+}
+
+// ConversationFactory can be embedded or included by test suites that want to generate statuses and conversations.
+type ConversationFactory struct {
+ // Test suite, or at least the methods from it that we care about.
+ suite testSuite
+ // Test DB.
+ db db.DB
+
+ // TestStart is the timestamp used as a base for timestamps and ULIDs in any given test.
+ TestStart time.Time
+}
+
+// SetupSuite should be called by the SetupSuite of test suites that use this mixin.
+func (f *ConversationFactory) SetupSuite(suite testSuite) {
+ f.suite = suite
+}
+
+// SetupTest should be called by the SetupTest of test suites that use this mixin.
+func (f *ConversationFactory) SetupTest(db db.DB) {
+ f.db = db
+ f.TestStart = time.Now()
+}
+
+// NewULID is a version of id.NewULID that uses the test start time and an offset instead of the real time.
+func (f *ConversationFactory) NewULID(offset time.Duration) string {
+ ulid, err := ulid.New(
+ ulid.Timestamp(f.TestStart.Add(offset)), rand.Reader,
+ )
+ if err != nil {
+ panic(err)
+ }
+ return ulid.String()
+}
+
+func (f *ConversationFactory) NewTestStatus(localAccount *gtsmodel.Account, threadID string, nowOffset time.Duration, inReplyToStatus *gtsmodel.Status) *gtsmodel.Status {
+ statusID := f.NewULID(nowOffset)
+ createdAt := f.TestStart.Add(nowOffset)
+ status := >smodel.Status{
+ ID: statusID,
+ CreatedAt: createdAt,
+ UpdatedAt: createdAt,
+ URI: "http://localhost:8080/users/" + localAccount.Username + "/statuses/" + statusID,
+ AccountID: localAccount.ID,
+ AccountURI: localAccount.URI,
+ Local: util.Ptr(true),
+ ThreadID: threadID,
+ Visibility: gtsmodel.VisibilityDirect,
+ ActivityStreamsType: ap.ObjectNote,
+ Federated: util.Ptr(true),
+ }
+ if inReplyToStatus != nil {
+ status.InReplyToID = inReplyToStatus.ID
+ status.InReplyToURI = inReplyToStatus.URI
+ status.InReplyToAccountID = inReplyToStatus.AccountID
+ }
+ if err := f.db.PutStatus(context.Background(), status); err != nil {
+ f.suite.FailNow(err.Error())
+ }
+ return status
+}
+
+// NewTestConversation creates a new status and adds it to a new unread conversation, returning the conversation.
+func (f *ConversationFactory) NewTestConversation(localAccount *gtsmodel.Account, nowOffset time.Duration) *gtsmodel.Conversation {
+ threadID := f.NewULID(nowOffset)
+ status := f.NewTestStatus(localAccount, threadID, nowOffset, nil)
+ conversation := >smodel.Conversation{
+ ID: f.NewULID(nowOffset),
+ AccountID: localAccount.ID,
+ ThreadID: status.ThreadID,
+ Read: util.Ptr(false),
+ }
+ f.SetLastStatus(conversation, status)
+ return conversation
+}
+
+// SetLastStatus sets an already stored status as the last status of a new or already stored conversation,
+// and returns the updated conversation.
+func (f *ConversationFactory) SetLastStatus(conversation *gtsmodel.Conversation, status *gtsmodel.Status) *gtsmodel.Conversation {
+ conversation.LastStatusID = status.ID
+ conversation.LastStatus = status
+ if err := f.db.UpsertConversation(context.Background(), conversation, "last_status_id"); err != nil {
+ f.suite.FailNow(err.Error())
+ }
+ if err := f.db.LinkConversationToStatus(context.Background(), conversation.ID, status.ID); err != nil {
+ f.suite.FailNow(err.Error())
+ }
+ return conversation
+}
diff --git a/internal/gtsmodel/advancedmigration.go b/internal/gtsmodel/advancedmigration.go
new file mode 100644
index 000000000..d9ce9d543
--- /dev/null
+++ b/internal/gtsmodel/advancedmigration.go
@@ -0,0 +1,32 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package gtsmodel
+
+import (
+ "time"
+)
+
+// AdvancedMigration stores state for an "advanced migration", which is a migration
+// that doesn't fit into the Bun migration framework.
+type AdvancedMigration struct {
+ ID string `bun:",pk,nullzero,notnull,unique"` // id of this migration (preassigned, not a ULID)
+ CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
+ UpdatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item last updated
+ StateJSON []byte `bun:",nullzero"` // JSON dump of the migration state
+ Finished *bool `bun:",nullzero,notnull,default:false"` // has this migration finished?
+}
diff --git a/internal/gtsmodel/conversation.go b/internal/gtsmodel/conversation.go
new file mode 100644
index 000000000..f03f27458
--- /dev/null
+++ b/internal/gtsmodel/conversation.go
@@ -0,0 +1,77 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package gtsmodel
+
+import (
+ "slices"
+ "strings"
+ "time"
+
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+// Conversation represents direct messages between the owner account and a set of other accounts.
+type Conversation struct {
+ // ID of this item in the database.
+ ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"`
+
+ // When was this item created?
+ CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"`
+
+ // When was this item last updated?
+ UpdatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"`
+
+ // Account that owns the conversation.
+ AccountID string `bun:"type:CHAR(26),nullzero,notnull,unique:conversations_thread_id_account_id_other_accounts_key_uniq,unique:conversations_account_id_last_status_id_uniq"`
+ Account *Account `bun:"-"`
+
+ // Other accounts participating in the conversation.
+ // Doesn't include the owner. May be empty in the case of a DM to yourself.
+ OtherAccountIDs []string `bun:"other_account_ids,array"`
+ OtherAccounts []*Account `bun:"-"`
+
+ // Denormalized lookup key derived from unique OtherAccountIDs, sorted and concatenated with commas.
+ // May be empty in the case of a DM to yourself.
+ OtherAccountsKey string `bun:",notnull,unique:conversations_thread_id_account_id_other_accounts_key_uniq"`
+
+ // Thread that the conversation is part of.
+ ThreadID string `bun:"type:CHAR(26),nullzero,notnull,unique:conversations_thread_id_account_id_other_accounts_key_uniq"`
+
+ // ID of the last status in this conversation.
+ LastStatusID string `bun:"type:CHAR(26),nullzero,notnull,unique:conversations_account_id_last_status_id_uniq"`
+ LastStatus *Status `bun:"-"`
+
+ // Has the owner read all statuses in this conversation?
+ Read *bool `bun:",default:false"`
+}
+
+// ConversationOtherAccountsKey creates an OtherAccountsKey from a list of OtherAccountIDs.
+func ConversationOtherAccountsKey(otherAccountIDs []string) string {
+ otherAccountIDs = util.UniqueStrings(otherAccountIDs)
+ slices.Sort(otherAccountIDs)
+ return strings.Join(otherAccountIDs, ",")
+}
+
+// ConversationToStatus is an intermediate struct to facilitate the many2many relationship between a conversation and its statuses,
+// including but not limited to the last status. These are used only when deleting a status from a conversation.
+type ConversationToStatus struct {
+ ConversationID string `bun:"type:CHAR(26),unique:conversation_to_statuses_conversation_id_status_id_uniq,nullzero,notnull"`
+ Conversation *Conversation `bun:"rel:belongs-to"`
+ StatusID string `bun:"type:CHAR(26),unique:conversation_to_statuses_conversation_id_status_id_uniq,nullzero,notnull"`
+ Status *Status `bun:"rel:belongs-to"`
+}
diff --git a/internal/processing/account/delete.go b/internal/processing/account/delete.go
index 075e94544..702b46cda 100644
--- a/internal/processing/account/delete.go
+++ b/internal/processing/account/delete.go
@@ -460,6 +460,14 @@ func (p *Processor) deleteAccountPeripheral(ctx context.Context, account *gtsmod
// TODO: add status mutes here when they're implemented.
+ // Delete all conversations owned by given account.
+ // Conversations in which it has only participated will be retained;
+ // they can always be deleted by their owners.
+ if err := p.state.DB.DeleteConversationsByOwnerAccountID(ctx, account.ID); // nocollapse
+ err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("error deleting conversations owned by account: %w", err)
+ }
+
// Delete all poll votes owned by given account.
if err := p.state.DB.DeletePollVotesByAccountID(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
diff --git a/internal/processing/advancedmigrations/advancedmigrations.go b/internal/processing/advancedmigrations/advancedmigrations.go
new file mode 100644
index 000000000..3f1876539
--- /dev/null
+++ b/internal/processing/advancedmigrations/advancedmigrations.go
@@ -0,0 +1,48 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package advancedmigrations
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
+)
+
+// Processor holds references to any other processor that has migrations to run.
+type Processor struct {
+ conversations *conversations.Processor
+}
+
+func New(
+ conversations *conversations.Processor,
+) Processor {
+ return Processor{
+ conversations: conversations,
+ }
+}
+
+// Migrate runs all advanced migrations.
+// Errors should be in the same format thrown by other server or testrig startup failures.
+func (p *Processor) Migrate(ctx context.Context) error {
+ if err := p.conversations.MigrateDMsToConversations(ctx); err != nil {
+ return fmt.Errorf("error running conversations advanced migration: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/processing/conversations/conversations.go b/internal/processing/conversations/conversations.go
new file mode 100644
index 000000000..d95740605
--- /dev/null
+++ b/internal/processing/conversations/conversations.go
@@ -0,0 +1,126 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations
+
+import (
+ "context"
+ "errors"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/filter/usermute"
+ "github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+)
+
+type Processor struct {
+ state *state.State
+ converter *typeutils.Converter
+ filter *visibility.Filter
+}
+
+func New(
+ state *state.State,
+ converter *typeutils.Converter,
+ filter *visibility.Filter,
+) Processor {
+ return Processor{
+ state: state,
+ converter: converter,
+ filter: filter,
+ }
+}
+
+const conversationNotFoundHelpText = "conversation not found"
+
+// getConversationOwnedBy gets a conversation by ID and checks that it is owned by the given account.
+func (p *Processor) getConversationOwnedBy(
+ ctx context.Context,
+ id string,
+ requestingAccount *gtsmodel.Account,
+) (*gtsmodel.Conversation, gtserror.WithCode) {
+ // Get the conversation so that we can check its owning account ID.
+ conversation, err := p.state.DB.GetConversationByID(ctx, id)
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return nil, gtserror.NewErrorInternalError(
+ gtserror.Newf(
+ "DB error getting conversation %s for account %s: %w",
+ id,
+ requestingAccount.ID,
+ err,
+ ),
+ )
+ }
+ if conversation == nil {
+ return nil, gtserror.NewErrorNotFound(
+ gtserror.Newf(
+ "conversation %s not found: %w",
+ id,
+ err,
+ ),
+ conversationNotFoundHelpText,
+ )
+ }
+ if conversation.AccountID != requestingAccount.ID {
+ return nil, gtserror.NewErrorNotFound(
+ gtserror.Newf(
+ "conversation %s not owned by account %s: %w",
+ id,
+ requestingAccount.ID,
+ err,
+ ),
+ conversationNotFoundHelpText,
+ )
+ }
+
+ return conversation, nil
+}
+
+// getFiltersAndMutes gets the given account's filters and compiled mute list.
+func (p *Processor) getFiltersAndMutes(
+ ctx context.Context,
+ requestingAccount *gtsmodel.Account,
+) ([]*gtsmodel.Filter, *usermute.CompiledUserMuteList, gtserror.WithCode) {
+ filters, err := p.state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID)
+ if err != nil {
+ return nil, nil, gtserror.NewErrorInternalError(
+ gtserror.Newf(
+ "DB error getting filters for account %s: %w",
+ requestingAccount.ID,
+ err,
+ ),
+ )
+ }
+
+ mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil)
+ if err != nil {
+ return nil, nil, gtserror.NewErrorInternalError(
+ gtserror.Newf(
+ "DB error getting mutes for account %s: %w",
+ requestingAccount.ID,
+ err,
+ ),
+ )
+ }
+ compiledMutes := usermute.NewCompiledUserMuteList(mutes)
+
+ return filters, compiledMutes, nil
+}
diff --git a/internal/processing/conversations/conversations_test.go b/internal/processing/conversations/conversations_test.go
new file mode 100644
index 000000000..cc7ec617e
--- /dev/null
+++ b/internal/processing/conversations/conversations_test.go
@@ -0,0 +1,151 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/suite"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ dbtest "github.com/superseriousbusiness/gotosocial/internal/db/test"
+ "github.com/superseriousbusiness/gotosocial/internal/email"
+ "github.com/superseriousbusiness/gotosocial/internal/federation"
+ "github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/media"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/storage"
+ "github.com/superseriousbusiness/gotosocial/internal/transport"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/testrig"
+)
+
+type ConversationsTestSuite struct {
+ // standard suite interfaces
+ suite.Suite
+ db db.DB
+ tc *typeutils.Converter
+ storage *storage.Driver
+ state state.State
+ mediaManager *media.Manager
+ transportController transport.Controller
+ federator *federation.Federator
+ emailSender email.Sender
+ sentEmails map[string]string
+ filter *visibility.Filter
+
+ // standard suite models
+ testTokens map[string]*gtsmodel.Token
+ testClients map[string]*gtsmodel.Client
+ testApplications map[string]*gtsmodel.Application
+ testUsers map[string]*gtsmodel.User
+ testAccounts map[string]*gtsmodel.Account
+ testFollows map[string]*gtsmodel.Follow
+ testAttachments map[string]*gtsmodel.MediaAttachment
+ testStatuses map[string]*gtsmodel.Status
+
+ // module being tested
+ conversationsProcessor conversations.Processor
+
+ // Owner of test conversations
+ testAccount *gtsmodel.Account
+
+ // Mixin for conversation tests
+ dbtest.ConversationFactory
+}
+
+func (suite *ConversationsTestSuite) getClientMsg(timeout time.Duration) (*messages.FromClientAPI, bool) {
+ ctx := context.Background()
+ ctx, cncl := context.WithTimeout(ctx, timeout)
+ defer cncl()
+ return suite.state.Workers.Client.Queue.PopCtx(ctx)
+}
+
+func (suite *ConversationsTestSuite) SetupSuite() {
+ suite.testTokens = testrig.NewTestTokens()
+ suite.testClients = testrig.NewTestClients()
+ suite.testApplications = testrig.NewTestApplications()
+ suite.testUsers = testrig.NewTestUsers()
+ suite.testAccounts = testrig.NewTestAccounts()
+ suite.testFollows = testrig.NewTestFollows()
+ suite.testAttachments = testrig.NewTestAttachments()
+ suite.testStatuses = testrig.NewTestStatuses()
+
+ suite.ConversationFactory.SetupSuite(suite)
+}
+
+func (suite *ConversationsTestSuite) SetupTest() {
+ suite.state.Caches.Init()
+ testrig.StartNoopWorkers(&suite.state)
+
+ testrig.InitTestConfig()
+ testrig.InitTestLog()
+
+ suite.db = testrig.NewTestDB(&suite.state)
+ suite.state.DB = suite.db
+ suite.tc = typeutils.NewConverter(&suite.state)
+ suite.filter = visibility.NewFilter(&suite.state)
+
+ testrig.StartTimelines(
+ &suite.state,
+ suite.filter,
+ suite.tc,
+ )
+
+ suite.storage = testrig.NewInMemoryStorage()
+ suite.state.Storage = suite.storage
+ suite.mediaManager = testrig.NewTestMediaManager(&suite.state)
+
+ suite.transportController = testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media"))
+ suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager)
+ suite.sentEmails = make(map[string]string)
+ suite.emailSender = testrig.NewEmailSender("../../../web/template/", suite.sentEmails)
+
+ suite.conversationsProcessor = conversations.New(&suite.state, suite.tc, suite.filter)
+ testrig.StandardDBSetup(suite.db, nil)
+ testrig.StandardStorageSetup(suite.storage, "../../../testrig/media")
+
+ suite.ConversationFactory.SetupTest(suite.db)
+
+ suite.testAccount = suite.testAccounts["local_account_1"]
+}
+
+func (suite *ConversationsTestSuite) TearDownTest() {
+ conversationModels := []interface{}{
+ (*gtsmodel.Conversation)(nil),
+ (*gtsmodel.ConversationToStatus)(nil),
+ }
+ for _, model := range conversationModels {
+ if err := suite.db.DropTable(context.Background(), model); err != nil {
+ log.Error(context.Background(), err)
+ }
+ }
+
+ testrig.StandardDBTeardown(suite.db)
+ testrig.StandardStorageTeardown(suite.storage)
+ testrig.StopWorkers(&suite.state)
+}
+
+func TestConversationsTestSuite(t *testing.T) {
+ suite.Run(t, new(ConversationsTestSuite))
+}
diff --git a/internal/processing/conversations/delete.go b/internal/processing/conversations/delete.go
new file mode 100644
index 000000000..5cbdd00a5
--- /dev/null
+++ b/internal/processing/conversations/delete.go
@@ -0,0 +1,45 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+)
+
+func (p *Processor) Delete(
+ ctx context.Context,
+ requestingAccount *gtsmodel.Account,
+ id string,
+) gtserror.WithCode {
+ // Get the conversation so that we can check its owning account ID.
+ conversation, errWithCode := p.getConversationOwnedBy(gtscontext.SetBarebones(ctx), id, requestingAccount)
+ if errWithCode != nil {
+ return errWithCode
+ }
+
+ // Delete the conversation.
+ if err := p.state.DB.DeleteConversationByID(ctx, conversation.ID); err != nil {
+ return gtserror.NewErrorInternalError(err)
+ }
+
+ return nil
+}
diff --git a/internal/processing/conversations/delete_test.go b/internal/processing/conversations/delete_test.go
new file mode 100644
index 000000000..23b4f1c1a
--- /dev/null
+++ b/internal/processing/conversations/delete_test.go
@@ -0,0 +1,27 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations_test
+
+import "context"
+
+func (suite *ConversationsTestSuite) TestDelete() {
+ conversation := suite.NewTestConversation(suite.testAccount, 0)
+
+ err := suite.conversationsProcessor.Delete(context.Background(), suite.testAccount, conversation.ID)
+ suite.NoError(err)
+}
diff --git a/internal/processing/conversations/get.go b/internal/processing/conversations/get.go
new file mode 100644
index 000000000..0c7832cae
--- /dev/null
+++ b/internal/processing/conversations/get.go
@@ -0,0 +1,101 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations
+
+import (
+ "context"
+ "errors"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/paging"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+// GetAll returns conversations owned by the given account.
+// The additional parameters can be used for paging.
+func (p *Processor) GetAll(
+ ctx context.Context,
+ requestingAccount *gtsmodel.Account,
+ page *paging.Page,
+) (*apimodel.PageableResponse, gtserror.WithCode) {
+ conversations, err := p.state.DB.GetConversationsByOwnerAccountID(
+ ctx,
+ requestingAccount.ID,
+ page,
+ )
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return nil, gtserror.NewErrorInternalError(
+ gtserror.Newf(
+ "DB error getting conversations for account %s: %w",
+ requestingAccount.ID,
+ err,
+ ),
+ )
+ }
+
+ // Check for empty response.
+ count := len(conversations)
+ if len(conversations) == 0 {
+ return util.EmptyPageableResponse(), nil
+ }
+
+ // Get the lowest and highest last status ID values, used for paging.
+ lo := conversations[count-1].LastStatusID
+ hi := conversations[0].LastStatusID
+
+ items := make([]interface{}, 0, count)
+
+ filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount)
+ if errWithCode != nil {
+ return nil, errWithCode
+ }
+
+ for _, conversation := range conversations {
+ // Convert conversation to frontend API model.
+ apiConversation, err := p.converter.ConversationToAPIConversation(
+ ctx,
+ conversation,
+ requestingAccount,
+ filters,
+ mutes,
+ )
+ if err != nil {
+ log.Errorf(
+ ctx,
+ "error converting conversation %s to API representation: %v",
+ conversation.ID,
+ err,
+ )
+ continue
+ }
+
+ // Append conversation to return items.
+ items = append(items, apiConversation)
+ }
+
+ return paging.PackageResponse(paging.ResponseParams{
+ Items: items,
+ Path: "/api/v1/conversations",
+ Next: page.Next(lo, hi),
+ Prev: page.Prev(lo, hi),
+ }), nil
+}
diff --git a/internal/processing/conversations/get_test.go b/internal/processing/conversations/get_test.go
new file mode 100644
index 000000000..7b3d60749
--- /dev/null
+++ b/internal/processing/conversations/get_test.go
@@ -0,0 +1,65 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations_test
+
+import (
+ "context"
+ "time"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+)
+
+func (suite *ConversationsTestSuite) TestGetAll() {
+ conversation := suite.NewTestConversation(suite.testAccount, 0)
+
+ resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil)
+ if suite.NoError(err) && suite.Len(resp.Items, 1) && suite.IsType((*apimodel.Conversation)(nil), resp.Items[0]) {
+ apiConversation := resp.Items[0].(*apimodel.Conversation)
+ suite.Equal(conversation.ID, apiConversation.ID)
+ suite.True(apiConversation.Unread)
+ }
+}
+
+// Test that conversations with newer last status IDs are returned earlier.
+func (suite *ConversationsTestSuite) TestGetAllOrder() {
+ // Create a new conversation.
+ conversation1 := suite.NewTestConversation(suite.testAccount, 0)
+
+ // Create another new conversation with a last status newer than conversation1's.
+ conversation2 := suite.NewTestConversation(suite.testAccount, 1*time.Second)
+
+ // Add an even newer status than that to conversation1.
+ conversation1Status2 := suite.NewTestStatus(suite.testAccount, conversation1.LastStatus.ThreadID, 2*time.Second, conversation1.LastStatus)
+ conversation1.LastStatusID = conversation1Status2.ID
+ if err := suite.db.UpsertConversation(context.Background(), conversation1, "last_status_id"); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil)
+ if suite.NoError(err) && suite.Len(resp.Items, 2) {
+ // conversation1 should be the first conversation returned.
+ apiConversation1 := resp.Items[0].(*apimodel.Conversation)
+ suite.Equal(conversation1.ID, apiConversation1.ID)
+ // It should have the newest status added to it.
+ suite.Equal(conversation1.LastStatusID, conversation1Status2.ID)
+
+ // conversation2 should be the second conversation returned.
+ apiConversation2 := resp.Items[1].(*apimodel.Conversation)
+ suite.Equal(conversation2.ID, apiConversation2.ID)
+ }
+}
diff --git a/internal/processing/conversations/migrate.go b/internal/processing/conversations/migrate.go
new file mode 100644
index 000000000..959ffcca4
--- /dev/null
+++ b/internal/processing/conversations/migrate.go
@@ -0,0 +1,131 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+const advancedMigrationID = "20240611190733_add_conversations"
+const statusBatchSize = 100
+
+type AdvancedMigrationState struct {
+ MinID string
+ MaxIDInclusive string
+}
+
+func (p *Processor) MigrateDMsToConversations(ctx context.Context) error {
+ advancedMigration, err := p.state.DB.GetAdvancedMigration(ctx, advancedMigrationID)
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("couldn't get advanced migration with ID %s: %w", advancedMigrationID, err)
+ }
+ state := AdvancedMigrationState{}
+ if advancedMigration != nil {
+ // There was a previous migration.
+ if *advancedMigration.Finished {
+ // This migration has already been run to completion; we don't need to run it again.
+ return nil
+ }
+ // Otherwise, pick up where we left off.
+ if err := json.Unmarshal(advancedMigration.StateJSON, &state); err != nil {
+ // This should never happen.
+ return gtserror.Newf("couldn't deserialize advanced migration state from JSON: %w", err)
+ }
+ } else {
+ // Start at the beginning.
+ state.MinID = id.Lowest
+
+ // Find the max ID of all existing statuses.
+ // This will be the last one we migrate;
+ // newer ones will be handled by the normal conversation flow.
+ state.MaxIDInclusive, err = p.state.DB.MaxDirectStatusID(ctx)
+ if err != nil {
+ return gtserror.Newf("couldn't get max DM status ID for migration: %w", err)
+ }
+
+ // Save a new advanced migration record.
+ advancedMigration = >smodel.AdvancedMigration{
+ ID: advancedMigrationID,
+ Finished: util.Ptr(false),
+ }
+ if advancedMigration.StateJSON, err = json.Marshal(state); err != nil {
+ // This should never happen.
+ return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err)
+ }
+ if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil {
+ return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err)
+ }
+ }
+
+ log.Info(ctx, "migrating DMs to conversations…")
+
+ // In batches, get all statuses up to and including the max ID,
+ // and update conversations for each in order.
+ for {
+ // Get status IDs for this batch.
+ statusIDs, err := p.state.DB.GetDirectStatusIDsBatch(ctx, state.MinID, state.MaxIDInclusive, statusBatchSize)
+ if err != nil {
+ return gtserror.Newf("couldn't get DM status ID batch for migration: %w", err)
+ }
+ if len(statusIDs) == 0 {
+ break
+ }
+ log.Infof(ctx, "migrating %d DMs starting after %s", len(statusIDs), state.MinID)
+
+ // Load the batch by IDs.
+ statuses, err := p.state.DB.GetStatusesByIDs(ctx, statusIDs)
+ if err != nil {
+ return gtserror.Newf("couldn't get DM statuses for migration: %w", err)
+ }
+
+ // Update conversations for each status. Don't generate notifications.
+ for _, status := range statuses {
+ if _, err := p.UpdateConversationsForStatus(ctx, status); err != nil {
+ return gtserror.Newf("couldn't update conversations for status %s during migration: %w", status.ID, err)
+ }
+ }
+
+ // Save the migration state with the new min ID.
+ state.MinID = statusIDs[len(statusIDs)-1]
+ if advancedMigration.StateJSON, err = json.Marshal(state); err != nil {
+ // This should never happen.
+ return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err)
+ }
+ if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil {
+ return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err)
+ }
+ }
+
+ // Mark the migration as finished.
+ advancedMigration.Finished = util.Ptr(true)
+ if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil {
+ return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err)
+ }
+
+ log.Info(ctx, "finished migrating DMs to conversations.")
+ return nil
+}
diff --git a/internal/processing/conversations/migrate_test.go b/internal/processing/conversations/migrate_test.go
new file mode 100644
index 000000000..b625e59ba
--- /dev/null
+++ b/internal/processing/conversations/migrate_test.go
@@ -0,0 +1,85 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations_test
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/db/bundb"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+)
+
+// Test that we can migrate DMs to conversations.
+// This test assumes that we're using the standard test fixtures, which contain some conversation-eligible DMs.
+func (suite *ConversationsTestSuite) TestMigrateDMsToConversations() {
+ advancedMigrationID := "20240611190733_add_conversations"
+ ctx := context.Background()
+ rawDB := (suite.db).(*bundb.DBService).DB()
+
+ // Precondition: we shouldn't have any conversations yet.
+ numConversations := 0
+ if err := rawDB.NewSelect().
+ Model((*gtsmodel.Conversation)(nil)).
+ ColumnExpr("COUNT(*)").
+ Scan(ctx, &numConversations); // nocollapse
+ err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.Zero(numConversations)
+
+ // Precondition: there is no record of the conversations advanced migration.
+ _, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID)
+ suite.ErrorIs(err, db.ErrNoEntries)
+
+ // Run the migration, which should not fail.
+ if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // We should now have some conversations.
+ if err := rawDB.NewSelect().
+ Model((*gtsmodel.Conversation)(nil)).
+ ColumnExpr("COUNT(*)").
+ Scan(ctx, &numConversations); // nocollapse
+ err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.NotZero(numConversations)
+
+ // The advanced migration should now be marked as finished.
+ advancedMigration, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+ if suite.NotNil(advancedMigration) && suite.NotNil(advancedMigration.Finished) {
+ suite.True(*advancedMigration.Finished)
+ }
+
+ // Run the migration again, which should not fail.
+ if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // However, it shouldn't have done anything, so the advanced migration should not have been updated.
+ advancedMigration2, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.Equal(advancedMigration.UpdatedAt, advancedMigration2.UpdatedAt)
+}
diff --git a/internal/processing/conversations/read.go b/internal/processing/conversations/read.go
new file mode 100644
index 000000000..512a004a3
--- /dev/null
+++ b/internal/processing/conversations/read.go
@@ -0,0 +1,65 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations
+
+import (
+ "context"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+func (p *Processor) Read(
+ ctx context.Context,
+ requestingAccount *gtsmodel.Account,
+ id string,
+) (*apimodel.Conversation, gtserror.WithCode) {
+ // Get the conversation, including participating accounts and last status.
+ conversation, errWithCode := p.getConversationOwnedBy(ctx, id, requestingAccount)
+ if errWithCode != nil {
+ return nil, errWithCode
+ }
+
+ // Mark the conversation as read.
+ conversation.Read = util.Ptr(true)
+ if err := p.state.DB.UpsertConversation(ctx, conversation, "read"); err != nil {
+ err = gtserror.Newf("DB error updating conversation %s: %w", id, err)
+ return nil, gtserror.NewErrorInternalError(err)
+ }
+
+ filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount)
+ if errWithCode != nil {
+ return nil, errWithCode
+ }
+
+ apiConversation, err := p.converter.ConversationToAPIConversation(
+ ctx,
+ conversation,
+ requestingAccount,
+ filters,
+ mutes,
+ )
+ if err != nil {
+ err = gtserror.Newf("error converting conversation %s to API representation: %w", id, err)
+ return nil, gtserror.NewErrorInternalError(err)
+ }
+
+ return apiConversation, nil
+}
diff --git a/internal/processing/conversations/read_test.go b/internal/processing/conversations/read_test.go
new file mode 100644
index 000000000..ebd8f7fe5
--- /dev/null
+++ b/internal/processing/conversations/read_test.go
@@ -0,0 +1,34 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations_test
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+func (suite *ConversationsTestSuite) TestRead() {
+ conversation := suite.NewTestConversation(suite.testAccount, 0)
+
+ suite.False(util.PtrOrValue(conversation.Read, false))
+ apiConversation, err := suite.conversationsProcessor.Read(context.Background(), suite.testAccount, conversation.ID)
+ if suite.NoError(err) {
+ suite.False(apiConversation.Unread)
+ }
+}
diff --git a/internal/processing/conversations/update.go b/internal/processing/conversations/update.go
new file mode 100644
index 000000000..7445994ae
--- /dev/null
+++ b/internal/processing/conversations/update.go
@@ -0,0 +1,242 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations
+
+import (
+ "context"
+ "errors"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+// ConversationNotification carries the arguments to processing/stream.Processor.Conversation.
+type ConversationNotification struct {
+ // AccountID of a local account to deliver the notification to.
+ AccountID string
+ // Conversation as the notification payload.
+ Conversation *apimodel.Conversation
+}
+
+// UpdateConversationsForStatus updates all conversations related to a status,
+// and returns a map from local account IDs to conversation notifications that should be sent to them.
+func (p *Processor) UpdateConversationsForStatus(ctx context.Context, status *gtsmodel.Status) ([]ConversationNotification, error) {
+ if status.Visibility != gtsmodel.VisibilityDirect {
+ // Only DMs are considered part of conversations.
+ return nil, nil
+ }
+ if status.BoostOfID != "" {
+ // Boosts can't be part of conversations.
+ // FUTURE: This may change if we ever implement quote posts.
+ return nil, nil
+ }
+ if status.ThreadID == "" {
+ // If the status doesn't have a thread ID, it didn't mention a local account,
+ // and thus can't be part of a conversation.
+ return nil, nil
+ }
+
+ // We need accounts to be populated for this.
+ if err := p.state.DB.PopulateStatus(ctx, status); err != nil {
+ return nil, gtserror.Newf("DB error populating status %s: %w", status.ID, err)
+ }
+
+ // The account which authored the status plus all mentioned accounts.
+ allParticipantsSet := make(map[string]*gtsmodel.Account, 1+len(status.Mentions))
+ allParticipantsSet[status.AccountID] = status.Account
+ for _, mention := range status.Mentions {
+ allParticipantsSet[mention.TargetAccountID] = mention.TargetAccount
+ }
+
+ // Create or update conversations for and send notifications to each local participant.
+ notifications := make([]ConversationNotification, 0, len(allParticipantsSet))
+ for _, participant := range allParticipantsSet {
+ if participant.IsRemote() {
+ continue
+ }
+ localAccount := participant
+
+ // If the status is not visible to this account, skip processing it for this account.
+ visible, err := p.filter.StatusVisible(ctx, localAccount, status)
+ if err != nil {
+ log.Errorf(
+ ctx,
+ "error checking status %s visibility for account %s: %v",
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ continue
+ } else if !visible {
+ continue
+ }
+
+ // Is the status filtered or muted for this user?
+ // Converting the status to an API status runs the filter/mute checks.
+ filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, localAccount)
+ if errWithCode != nil {
+ log.Error(ctx, errWithCode)
+ continue
+ }
+ _, err = p.converter.StatusToAPIStatus(
+ ctx,
+ status,
+ localAccount,
+ statusfilter.FilterContextNotifications,
+ filters,
+ mutes,
+ )
+ if err != nil {
+ // If the status matched a hide filter, skip processing it for this account.
+ // If there was another kind of error, log that and skip it anyway.
+ if !errors.Is(err, statusfilter.ErrHideStatus) {
+ log.Errorf(
+ ctx,
+ "error checking status %s filtering/muting for account %s: %v",
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ }
+ continue
+ }
+
+ // Collect other accounts participating in the conversation.
+ otherAccounts := make([]*gtsmodel.Account, 0, len(allParticipantsSet)-1)
+ otherAccountIDs := make([]string, 0, len(allParticipantsSet)-1)
+ for accountID, account := range allParticipantsSet {
+ if accountID != localAccount.ID {
+ otherAccounts = append(otherAccounts, account)
+ otherAccountIDs = append(otherAccountIDs, accountID)
+ }
+ }
+
+ // Check for a previously existing conversation, if there is one.
+ conversation, err := p.state.DB.GetConversationByThreadAndAccountIDs(
+ ctx,
+ status.ThreadID,
+ localAccount.ID,
+ otherAccountIDs,
+ )
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ log.Errorf(
+ ctx,
+ "error trying to find a previous conversation for status %s and account %s: %v",
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ continue
+ }
+
+ if conversation == nil {
+ // Create a new conversation.
+ conversation = >smodel.Conversation{
+ ID: id.NewULID(),
+ AccountID: localAccount.ID,
+ OtherAccountIDs: otherAccountIDs,
+ OtherAccounts: otherAccounts,
+ OtherAccountsKey: gtsmodel.ConversationOtherAccountsKey(otherAccountIDs),
+ ThreadID: status.ThreadID,
+ Read: util.Ptr(true),
+ }
+ }
+
+ // Assume that if the conversation owner posted the status, they've already read it.
+ statusAuthoredByConversationOwner := status.AccountID == conversation.AccountID
+
+ // Update the conversation.
+ // If there is no previous last status or this one is more recently created, set it as the last status.
+ if conversation.LastStatus == nil || conversation.LastStatus.CreatedAt.Before(status.CreatedAt) {
+ conversation.LastStatusID = status.ID
+ conversation.LastStatus = status
+ }
+ // If the conversation is unread, leave it marked as unread.
+ // If the conversation is read but this status might not have been, mark the conversation as unread.
+ if !statusAuthoredByConversationOwner {
+ conversation.Read = util.Ptr(false)
+ }
+
+ // Create or update the conversation.
+ err = p.state.DB.UpsertConversation(ctx, conversation)
+ if err != nil {
+ log.Errorf(
+ ctx,
+ "error creating or updating conversation %s for status %s and account %s: %v",
+ conversation.ID,
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ continue
+ }
+
+ // Link the conversation to the status.
+ if err := p.state.DB.LinkConversationToStatus(ctx, conversation.ID, status.ID); err != nil {
+ log.Errorf(
+ ctx,
+ "error linking conversation %s to status %s: %v",
+ conversation.ID,
+ status.ID,
+ err,
+ )
+ continue
+ }
+
+ // Convert the conversation to API representation.
+ apiConversation, err := p.converter.ConversationToAPIConversation(
+ ctx,
+ conversation,
+ localAccount,
+ filters,
+ mutes,
+ )
+ if err != nil {
+ // If the conversation's last status matched a hide filter, skip it.
+ // If there was another kind of error, log that and skip it anyway.
+ if !errors.Is(err, statusfilter.ErrHideStatus) {
+ log.Errorf(
+ ctx,
+ "error converting conversation %s to API representation for account %s: %v",
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ }
+ continue
+ }
+
+ // Generate a notification,
+ // unless the status was authored by the user who would be notified,
+ // in which case they already know.
+ if status.AccountID != localAccount.ID {
+ notifications = append(notifications, ConversationNotification{
+ AccountID: localAccount.ID,
+ Conversation: apiConversation,
+ })
+ }
+ }
+
+ return notifications, nil
+}
diff --git a/internal/processing/conversations/update_test.go b/internal/processing/conversations/update_test.go
new file mode 100644
index 000000000..8ba2800fe
--- /dev/null
+++ b/internal/processing/conversations/update_test.go
@@ -0,0 +1,54 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package conversations_test
+
+import (
+ "context"
+)
+
+// Test that we can create conversations when a new status comes in.
+func (suite *ConversationsTestSuite) TestUpdateConversationsForStatus() {
+ ctx := context.Background()
+
+ // Precondition: the test user shouldn't have any conversations yet.
+ conversations, err := suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.Empty(conversations)
+
+ // Create a status.
+ threadID := suite.NewULID(0)
+ status := suite.NewTestStatus(suite.testAccount, threadID, 0, nil)
+
+ // Update conversations for it.
+ notifications, err := suite.conversationsProcessor.UpdateConversationsForStatus(ctx, status)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // In this test, the user is DMing themself, and should not receive a notification from that.
+ suite.Empty(notifications)
+
+ // The test user should have a conversation now.
+ conversations, err = suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.NotEmpty(conversations)
+}
diff --git a/internal/processing/processor.go b/internal/processing/processor.go
index fb6b05d80..a07df76e1 100644
--- a/internal/processing/processor.go
+++ b/internal/processing/processor.go
@@ -27,7 +27,9 @@
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/processing/admin"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/advancedmigrations"
"github.com/superseriousbusiness/gotosocial/internal/processing/common"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/fedi"
filtersv1 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v1"
filtersv2 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v2"
@@ -70,22 +72,24 @@ type Processor struct {
SUB-PROCESSORS
*/
- account account.Processor
- admin admin.Processor
- fedi fedi.Processor
- filtersv1 filtersv1.Processor
- filtersv2 filtersv2.Processor
- list list.Processor
- markers markers.Processor
- media media.Processor
- polls polls.Processor
- report report.Processor
- search search.Processor
- status status.Processor
- stream stream.Processor
- timeline timeline.Processor
- user user.Processor
- workers workers.Processor
+ account account.Processor
+ admin admin.Processor
+ advancedmigrations advancedmigrations.Processor
+ conversations conversations.Processor
+ fedi fedi.Processor
+ filtersv1 filtersv1.Processor
+ filtersv2 filtersv2.Processor
+ list list.Processor
+ markers markers.Processor
+ media media.Processor
+ polls polls.Processor
+ report report.Processor
+ search search.Processor
+ status status.Processor
+ stream stream.Processor
+ timeline timeline.Processor
+ user user.Processor
+ workers workers.Processor
}
func (p *Processor) Account() *account.Processor {
@@ -96,6 +100,14 @@ func (p *Processor) Admin() *admin.Processor {
return &p.admin
}
+func (p *Processor) AdvancedMigrations() *advancedmigrations.Processor {
+ return &p.advancedmigrations
+}
+
+func (p *Processor) Conversations() *conversations.Processor {
+ return &p.conversations
+}
+
func (p *Processor) Fedi() *fedi.Processor {
return &p.fedi
}
@@ -188,6 +200,7 @@ func NewProcessor(
// processors + pin them to this struct.
processor.account = account.New(&common, state, converter, mediaManager, federator, filter, parseMentionFunc)
processor.admin = admin.New(&common, state, cleaner, federator, converter, mediaManager, federator.TransportController(), emailSender)
+ processor.conversations = conversations.New(state, converter, filter)
processor.fedi = fedi.New(state, &common, converter, federator, filter)
processor.filtersv1 = filtersv1.New(state, converter, &processor.stream)
processor.filtersv2 = filtersv2.New(state, converter, &processor.stream)
@@ -200,6 +213,9 @@ func NewProcessor(
processor.status = status.New(state, &common, &processor.polls, federator, converter, filter, parseMentionFunc)
processor.user = user.New(state, converter, oauthServer, emailSender)
+ // The advanced migrations processor sequences advanced migrations from all other processors.
+ processor.advancedmigrations = advancedmigrations.New(&processor.conversations)
+
// Workers processor handles asynchronous
// worker jobs; instantiate it separately
// and pass subset of sub processors it needs.
@@ -212,6 +228,7 @@ func NewProcessor(
&processor.account,
&processor.media,
&processor.stream,
+ &processor.conversations,
)
return processor
diff --git a/internal/processing/stream/conversation.go b/internal/processing/stream/conversation.go
new file mode 100644
index 000000000..a0236c459
--- /dev/null
+++ b/internal/processing/stream/conversation.go
@@ -0,0 +1,44 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package stream
+
+import (
+ "context"
+ "encoding/json"
+
+ "codeberg.org/gruf/go-byteutil"
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+)
+
+// Conversation streams the given conversation to any open, appropriate streams belonging to the given account.
+func (p *Processor) Conversation(ctx context.Context, accountID string, conversation *apimodel.Conversation) {
+ b, err := json.Marshal(conversation)
+ if err != nil {
+ log.Errorf(ctx, "error marshaling json: %v", err)
+ return
+ }
+ p.streams.Post(ctx, accountID, stream.Message{
+ Payload: byteutil.B2S(b),
+ Event: stream.EventTypeConversation,
+ Stream: []string{
+ stream.TimelineDirect,
+ },
+ })
+}
diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go
index 49a68d27a..35c2c31b7 100644
--- a/internal/processing/workers/fromclientapi_test.go
+++ b/internal/processing/workers/fromclientapi_test.go
@@ -50,6 +50,8 @@ func (suite *FromClientAPITestSuite) newStatus(
visibility gtsmodel.Visibility,
replyToStatus *gtsmodel.Status,
boostOfStatus *gtsmodel.Status,
+ mentionedAccounts []*gtsmodel.Account,
+ createThread bool,
) *gtsmodel.Status {
var (
protocol = config.GetProtocol()
@@ -102,6 +104,39 @@ func (suite *FromClientAPITestSuite) newStatus(
newStatus.Visibility = boostOfStatus.Visibility
}
+ for _, mentionedAccount := range mentionedAccounts {
+ newMention := >smodel.Mention{
+ ID: id.NewULID(),
+ StatusID: newStatus.ID,
+ Status: newStatus,
+ OriginAccountID: account.ID,
+ OriginAccountURI: account.URI,
+ OriginAccount: account,
+ TargetAccountID: mentionedAccount.ID,
+ TargetAccount: mentionedAccount,
+ Silent: util.Ptr(false),
+ }
+
+ newStatus.Mentions = append(newStatus.Mentions, newMention)
+ newStatus.MentionIDs = append(newStatus.MentionIDs, newMention.ID)
+
+ if err := state.DB.PutMention(ctx, newMention); err != nil {
+ suite.FailNow(err.Error())
+ }
+ }
+
+ if createThread {
+ newThread := >smodel.Thread{
+ ID: id.NewULID(),
+ }
+
+ newStatus.ThreadID = newThread.ID
+
+ if err := state.DB.PutThread(ctx, newThread); err != nil {
+ suite.FailNow(err.Error())
+ }
+ }
+
// Put the status in the db, to mimic what would
// have already happened earlier up the flow.
if err := state.DB.PutStatus(ctx, newStatus); err != nil {
@@ -168,6 +203,31 @@ func (suite *FromClientAPITestSuite) statusJSON(
return string(statusJSON)
}
+func (suite *FromClientAPITestSuite) conversationJSON(
+ ctx context.Context,
+ typeConverter *typeutils.Converter,
+ conversation *gtsmodel.Conversation,
+ requestingAccount *gtsmodel.Account,
+) string {
+ apiConversation, err := typeConverter.ConversationToAPIConversation(
+ ctx,
+ conversation,
+ requestingAccount,
+ nil,
+ nil,
+ )
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ conversationJSON, err := json.Marshal(apiConversation)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ return string(conversationJSON)
+}
+
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs)
@@ -194,6 +254,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
gtsmodel.VisibilityPublic,
nil,
nil,
+ nil,
+ false,
)
)
@@ -303,6 +365,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -362,6 +426,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() {
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_1_status_1"],
nil,
+ nil,
+ false,
)
threadMute = >smodel.ThreadMute{
ID: "01HD3KRMBB1M85QRWHD912QWRE",
@@ -420,6 +486,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() {
gtsmodel.VisibilityPublic,
nil,
suite.testStatuses["local_account_1_status_1"],
+ nil,
+ false,
)
threadMute = >smodel.ThreadMute{
ID: "01HD3KRMBB1M85QRWHD912QWRE",
@@ -483,6 +551,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -556,6 +626,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -634,6 +706,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -704,6 +778,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
gtsmodel.VisibilityPublic,
nil,
suite.testStatuses["local_account_2_status_1"],
+ nil,
+ false,
)
)
@@ -765,6 +841,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
gtsmodel.VisibilityPublic,
nil,
suite.testStatuses["local_account_2_status_1"],
+ nil,
+ false,
)
)
@@ -807,6 +885,159 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
)
}
+// A DM to a local user should create a conversation and accompanying notification.
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() {
+ testStructs := suite.SetupTestStructs()
+ defer suite.TearDownTestStructs(testStructs)
+
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["local_account_2"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ streams = suite.openStreams(ctx,
+ testStructs.Processor,
+ receivingAccount,
+ nil,
+ )
+ homeStream = streams[stream.TimelineHome]
+ directStream = streams[stream.TimelineDirect]
+
+ // turtle posts a new top-level DM mentioning zork.
+ status = suite.newStatus(
+ ctx,
+ testStructs.State,
+ postingAccount,
+ gtsmodel.VisibilityDirect,
+ nil,
+ nil,
+ []*gtsmodel.Account{receivingAccount},
+ true,
+ )
+ )
+
+ // Process the new status.
+ if err := testStructs.Processor.Workers().ProcessFromClientAPI(
+ ctx,
+ &messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ Origin: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Locate the conversation which should now exist for zork.
+ conversation, err := testStructs.State.DB.GetConversationByThreadAndAccountIDs(
+ ctx,
+ status.ThreadID,
+ receivingAccount.ID,
+ []string{postingAccount.ID},
+ )
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check status in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeUpdate,
+ )
+
+ // Check mention notification in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeNotification,
+ )
+
+ // Check conversation in direct stream.
+ conversationJSON := suite.conversationJSON(
+ ctx,
+ testStructs.TypeConverter,
+ conversation,
+ receivingAccount,
+ )
+ suite.checkStreamed(
+ directStream,
+ true,
+ conversationJSON,
+ stream.EventTypeConversation,
+ )
+}
+
+// A public message to a local user should not result in a conversation notification.
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() {
+ testStructs := suite.SetupTestStructs()
+ defer suite.TearDownTestStructs(testStructs)
+
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["local_account_2"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ streams = suite.openStreams(ctx,
+ testStructs.Processor,
+ receivingAccount,
+ nil,
+ )
+ homeStream = streams[stream.TimelineHome]
+ directStream = streams[stream.TimelineDirect]
+
+ // turtle posts a new top-level public message mentioning zork.
+ status = suite.newStatus(
+ ctx,
+ testStructs.State,
+ postingAccount,
+ gtsmodel.VisibilityPublic,
+ nil,
+ nil,
+ []*gtsmodel.Account{receivingAccount},
+ true,
+ )
+ )
+
+ // Process the new status.
+ if err := testStructs.Processor.Workers().ProcessFromClientAPI(
+ ctx,
+ &messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ Origin: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check status in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeUpdate,
+ )
+
+ // Check mention notification in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeNotification,
+ )
+
+ // Check for absence of conversation notification in direct stream.
+ suite.checkStreamed(
+ directStream,
+ false,
+ "",
+ "",
+ )
+}
+
func (suite *FromClientAPITestSuite) TestProcessStatusDelete() {
testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs)
diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go
index 5ec905ae8..1a7dbbfe5 100644
--- a/internal/processing/workers/surface.go
+++ b/internal/processing/workers/surface.go
@@ -20,6 +20,7 @@
import (
"github.com/superseriousbusiness/gotosocial/internal/email"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
@@ -32,9 +33,10 @@
// - sending a notification to a user
// - sending an email
type Surface struct {
- State *state.State
- Converter *typeutils.Converter
- Stream *stream.Processor
- Filter *visibility.Filter
- EmailSender email.Sender
+ State *state.State
+ Converter *typeutils.Converter
+ Stream *stream.Processor
+ Filter *visibility.Filter
+ EmailSender email.Sender
+ Conversations *conversations.Processor
}
diff --git a/internal/processing/workers/surfacenotify_test.go b/internal/processing/workers/surfacenotify_test.go
index 18d0277ae..937ddeca2 100644
--- a/internal/processing/workers/surfacenotify_test.go
+++ b/internal/processing/workers/surfacenotify_test.go
@@ -39,11 +39,12 @@ func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() {
defer suite.TearDownTestStructs(testStructs)
surface := &workers.Surface{
- State: testStructs.State,
- Converter: testStructs.TypeConverter,
- Stream: testStructs.Processor.Stream(),
- Filter: visibility.NewFilter(testStructs.State),
- EmailSender: testStructs.EmailSender,
+ State: testStructs.State,
+ Converter: testStructs.TypeConverter,
+ Stream: testStructs.Processor.Stream(),
+ Filter: visibility.NewFilter(testStructs.State),
+ EmailSender: testStructs.EmailSender,
+ Conversations: testStructs.Processor.Conversations(),
}
var (
diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go
index 41d7f6f2a..8ac8293ed 100644
--- a/internal/processing/workers/surfacetimeline.go
+++ b/internal/processing/workers/surfacetimeline.go
@@ -36,8 +36,8 @@
// and LIST timelines of accounts that follow the status author.
//
// It will also handle notifications for any mentions attached to
-// the account, and notifications for any local accounts that want
-// to know when this account posts.
+// the account, notifications for any local accounts that want
+// to know when this account posts, and conversations containing the status.
func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error {
// Ensure status fully populated; including account, mentions, etc.
if err := s.State.DB.PopulateStatus(ctx, status); err != nil {
@@ -73,6 +73,15 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err)
}
+ // Update any conversations containing this status, and send conversation notifications.
+ notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status)
+ if err != nil {
+ return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err)
+ }
+ for _, notification := range notifications {
+ s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation)
+ }
+
return nil
}
diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go
index 780e5ca14..915370976 100644
--- a/internal/processing/workers/util.go
+++ b/internal/processing/workers/util.go
@@ -137,6 +137,11 @@ func (u *utils) wipeStatus(
errs.Appendf("error deleting status from timelines: %w", err)
}
+ // delete this status from any conversations that it's part of
+ if err := u.state.DB.DeleteStatusFromConversations(ctx, statusToDelete.ID); err != nil {
+ errs.Appendf("error deleting status from conversations: %w", err)
+ }
+
// finally, delete the status itself
if err := u.state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil {
errs.Appendf("error deleting status: %w", err)
diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go
index 6b4cc07a6..c7f67b025 100644
--- a/internal/processing/workers/workers.go
+++ b/internal/processing/workers/workers.go
@@ -22,6 +22,7 @@
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/media"
"github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/state"
@@ -44,6 +45,7 @@ func New(
account *account.Processor,
media *media.Processor,
stream *stream.Processor,
+ conversations *conversations.Processor,
) Processor {
// Init federate logic
// wrapper struct.
@@ -56,11 +58,12 @@ func New(
// Init surface logic
// wrapper struct.
surface := &Surface{
- State: state,
- Converter: converter,
- Stream: stream,
- Filter: filter,
- EmailSender: emailSender,
+ State: state,
+ Converter: converter,
+ Stream: stream,
+ Filter: filter,
+ EmailSender: emailSender,
+ Conversations: conversations,
}
// Init shared util funcs.
diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go
index f66190d75..3093fd93a 100644
--- a/internal/processing/workers/workers_test.go
+++ b/internal/processing/workers/workers_test.go
@@ -108,6 +108,7 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce
stream.TimelineHome,
stream.TimelinePublic,
stream.TimelineNotifications,
+ stream.TimelineDirect,
} {
stream, err := processor.Stream().Open(ctx, account, streamType)
if err != nil {
diff --git a/internal/stream/stream.go b/internal/stream/stream.go
index e843a1b76..0a352133a 100644
--- a/internal/stream/stream.go
+++ b/internal/stream/stream.go
@@ -46,6 +46,10 @@
// EventTypeFiltersChanged -- the user's filters
// (including keywords and statuses) have changed.
EventTypeFiltersChanged = "filters_changed"
+
+ // EventTypeConversation -- a user
+ // should be shown an updated conversation.
+ EventTypeConversation = "conversation"
)
const (
diff --git a/internal/typeutils/internaltofrontend.go b/internal/typeutils/internaltofrontend.go
index 7d2889b05..a13304bd8 100644
--- a/internal/typeutils/internaltofrontend.go
+++ b/internal/typeutils/internaltofrontend.go
@@ -1739,6 +1739,67 @@ func (c *Converter) NotificationToAPINotification(
}, nil
}
+// ConversationToAPIConversation converts a conversation into its API representation.
+// The conversation status will be filtered using the notification filter context,
+// and may be nil if the status was hidden.
+func (c *Converter) ConversationToAPIConversation(
+ ctx context.Context,
+ conversation *gtsmodel.Conversation,
+ requestingAccount *gtsmodel.Account,
+ filters []*gtsmodel.Filter,
+ mutes *usermute.CompiledUserMuteList,
+) (*apimodel.Conversation, error) {
+ apiConversation := &apimodel.Conversation{
+ ID: conversation.ID,
+ Unread: !*conversation.Read,
+ }
+ for _, account := range conversation.OtherAccounts {
+ var apiAccount *apimodel.Account
+ blocked, err := c.state.DB.IsEitherBlocked(ctx, requestingAccount.ID, account.ID)
+ if err != nil {
+ return nil, gtserror.Newf(
+ "DB error checking blocks between accounts %s and %s: %w",
+ requestingAccount.ID,
+ account.ID,
+ err,
+ )
+ }
+ if blocked || account.IsSuspended() {
+ apiAccount, err = c.AccountToAPIAccountBlocked(ctx, account)
+ } else {
+ apiAccount, err = c.AccountToAPIAccountPublic(ctx, account)
+ }
+ if err != nil {
+ return nil, gtserror.Newf(
+ "error converting account %s to API representation: %w",
+ account.ID,
+ err,
+ )
+ }
+ apiConversation.Accounts = append(apiConversation.Accounts, *apiAccount)
+ }
+ if conversation.LastStatus != nil {
+ var err error
+ apiConversation.LastStatus, err = c.StatusToAPIStatus(
+ ctx,
+ conversation.LastStatus,
+ requestingAccount,
+ statusfilter.FilterContextNotifications,
+ filters,
+ mutes,
+ )
+ if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) {
+ return nil, gtserror.Newf(
+ "error converting status %s to API representation: %w",
+ conversation.LastStatus.ID,
+ err,
+ )
+ }
+ }
+
+ return apiConversation, nil
+}
+
// DomainPermToAPIDomainPerm converts a gts model domin block or allow into an api domain permission.
func (c *Converter) DomainPermToAPIDomainPerm(
ctx context.Context,
diff --git a/test/envparsing.sh b/test/envparsing.sh
index 22abff48a..83dfb85fc 100755
--- a/test/envparsing.sh
+++ b/test/envparsing.sh
@@ -32,6 +32,8 @@ EXPECT=$(cat << "EOF"
"block-mem-ratio": 2,
"boost-of-ids-mem-ratio": 3,
"client-mem-ratio": 0.1,
+ "conversation-last-status-ids-mem-ratio": 2,
+ "conversation-mem-ratio": 1,
"emoji-category-mem-ratio": 0.1,
"emoji-mem-ratio": 3,
"filter-keyword-mem-ratio": 0.5,