gotosocial/internal/stream/stream.go

390 lines
9.0 KiB
Go
Raw Normal View History

// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2021-09-01 18:29:25 +02:00
package stream
import (
"context"
"maps"
"slices"
"sync"
"sync/atomic"
)
const (
// EventTypeNotification -- a user
// should be shown a notification.
EventTypeNotification = "notification"
// EventTypeUpdate -- a user should
// be shown an update in their timeline.
EventTypeUpdate = "update"
// EventTypeDelete -- something
// should be deleted from a user.
EventTypeDelete = "delete"
// EventTypeStatusUpdate -- something in the
// user's timeline has been edited (yes this
// is a confusing name, blame Mastodon ...).
EventTypeStatusUpdate = "status.update"
// EventTypeFiltersChanged -- the user's filters
// (including keywords and statuses) have changed.
EventTypeFiltersChanged = "filters_changed"
[feature] Conversations API (#3013) * Implement conversations API * Sort and page conversations by last status ID * Appease linter * Fix deleting conversations and statuses * Refactor to make migrations automatic * Lint * Update tests post-merge * Fixes from live-fire testing * Linter caught a format problem * Refactor tests, fix cache * Negative test for non-DMs * Run conversations advanced migration on testrig startup as well as regular server startup * Document (lack of) side effects of API method for deleting a conversation * Make not-found check less nested for readability * Rename PutConversation to UpsertConversation * Use util.Ptr instead of IIFE * Reduce cache used by conversations * Remove unnecessary TableExpr/ColumnExpr * Use struct tags for both unique constraints on Conversation * Make it clear how paging with GetDirectStatusIDsBatch should be used * Let conversation paging skip conversations it can't render * Use Bun NewDropTable * Convert delete raw query to Bun * Convert update raw query to Bun * Convert latestConversationStatusesTempTable raw query partially to Bun * Convert conversationStatusesTempTable raw query partially to Bun * Rename field used to store result of MaxDirectStatusID * Move advanced migrations to their own tiny processor * Catch up util function name with main * Remove json.… wrappers * Remove redundant check * Combine error checks * Replace map with slice of structs * Address processor/type converter comments - Add context info for errors - Extract some common processor code into shared methods - Move conversation eligibility check ahead of populating conversation * Add error context when dropping temp tables
2024-07-23 21:44:31 +02:00
// EventTypeConversation -- a user
// should be shown an updated conversation.
EventTypeConversation = "conversation"
)
const (
// TimelineLocal:
// All public posts originating from this
// server. Analogous to the local timeline.
TimelineLocal = "public:local"
// TimelinePublic:
// All public posts known to the server.
// Analogous to the federated timeline.
TimelinePublic = "public"
// TimelineHome:
// Events related to the current user, such
// as home feed updates and notifications.
TimelineHome = "user"
// TimelineNotifications:
// Notifications for the current user.
TimelineNotifications = "user:notification"
// TimelineDirect:
// Updates to direct conversations.
TimelineDirect = "direct"
// TimelineList:
// Updates to a specific list.
TimelineList = "list"
)
// AllStatusTimelines contains all Timelines
// that a status could conceivably be delivered
// to, useful for sending out status deletes.
var AllStatusTimelines = []string{
TimelineLocal,
TimelinePublic,
TimelineHome,
TimelineDirect,
TimelineList,
}
type Streams struct {
streams map[string][]*Stream
mutex sync.Mutex
}
// Open will open open a new Stream for given account ID and stream types, the given context will be passed to Stream.
func (s *Streams) Open(accountID string, streamTypes ...string) *Stream {
if len(streamTypes) == 0 {
panic("no stream types given")
}
// Prep new Stream.
str := new(Stream)
str.done = make(chan struct{})
str.msgCh = make(chan Message, 50) // TODO: make configurable
for _, streamType := range streamTypes {
str.Subscribe(streamType)
}
// TODO: add configurable
// max streams per account.
// Acquire lock.
s.mutex.Lock()
if s.streams == nil {
// Main stream-map needs allocating.
s.streams = make(map[string][]*Stream)
}
// Add new stream for account.
strs := s.streams[accountID]
strs = append(strs, str)
s.streams[accountID] = strs
// Register close callback
// to remove stream from our
// internal map for this account.
str.close = func() {
s.mutex.Lock()
strs := s.streams[accountID]
strs = slices.DeleteFunc(strs, func(s *Stream) bool {
return s == str // remove 'str' ptr
})
s.streams[accountID] = strs
s.mutex.Unlock()
}
// Done with lock.
s.mutex.Unlock()
return str
}
// Post will post the given message to all streams of given account ID matching type.
func (s *Streams) Post(ctx context.Context, accountID string, msg Message) bool {
var deferred []func() bool
// Acquire lock.
s.mutex.Lock()
// Iterate all streams stored for account.
for _, str := range s.streams[accountID] {
// Check whether stream supports any of our message targets.
if stype := str.getStreamType(msg.Stream...); stype != "" {
// Rescope var
// to prevent
// ptr reuse.
stream := str
// Use a message copy to *only*
// include the supported stream.
msgCopy := Message{
Stream: []string{stype},
Event: msg.Event,
Payload: msg.Payload,
}
// Send message to supported stream
// DEFERRED (i.e. OUTSIDE OF MAIN MUTEX).
// This prevents deadlocks between each
// msg channel and main Streams{} mutex.
deferred = append(deferred, func() bool {
return stream.send(ctx, msgCopy)
})
}
}
// Done with lock.
s.mutex.Unlock()
var ok bool
// Execute deferred outside lock.
for _, deferfn := range deferred {
v := deferfn()
ok = ok && v
}
return ok
}
// PostAll will post the given message to all streams with matching types.
func (s *Streams) PostAll(ctx context.Context, msg Message) bool {
var deferred []func() bool
// Acquire lock.
s.mutex.Lock()
// Iterate ALL stored streams.
for _, strs := range s.streams {
for _, str := range strs {
// Check whether stream supports any of our message targets.
if stype := str.getStreamType(msg.Stream...); stype != "" {
// Rescope var
// to prevent
// ptr reuse.
stream := str
// Use a message copy to *only*
// include the supported stream.
msgCopy := Message{
Stream: []string{stype},
Event: msg.Event,
Payload: msg.Payload,
}
// Send message to supported stream
// DEFERRED (i.e. OUTSIDE OF MAIN MUTEX).
// This prevents deadlocks between each
// msg channel and main Streams{} mutex.
deferred = append(deferred, func() bool {
return stream.send(ctx, msgCopy)
})
}
}
}
// Done with lock.
s.mutex.Unlock()
var ok bool
// Execute deferred outside lock.
for _, deferfn := range deferred {
v := deferfn()
ok = ok && v
}
return ok
}
// Stream represents one
// open stream for a client.
type Stream struct {
// atomically updated ptr to a read-only copy
// of supported stream types in a hashmap. this
// gets updated via CAS operations in .cas().
types atomic.Pointer[map[string]struct{}]
// protects stream close.
done chan struct{}
// inbound msg ch.
msgCh chan Message
// close hook to remove
// stream from Streams{}.
close func()
}
// Subscribe will add given type to given types this stream supports.
func (s *Stream) Subscribe(streamType string) {
s.cas(func(m map[string]struct{}) bool {
if _, ok := m[streamType]; ok {
return false
}
m[streamType] = struct{}{}
return true
})
}
// Unsubscribe will remove given type (if found) from types this stream supports.
func (s *Stream) Unsubscribe(streamType string) {
s.cas(func(m map[string]struct{}) bool {
if _, ok := m[streamType]; !ok {
return false
}
delete(m, streamType)
return true
})
}
// getStreamType returns the first stream type in given list that stream supports.
func (s *Stream) getStreamType(streamTypes ...string) string {
if ptr := s.types.Load(); ptr != nil {
for _, streamType := range streamTypes {
if _, ok := (*ptr)[streamType]; ok {
return streamType
}
}
}
return ""
}
// send will block on posting a new Message{}, returning early with
// a false value if provided context is canceled, or stream closed.
func (s *Stream) send(ctx context.Context, msg Message) bool {
select {
case <-s.done:
return false
case <-ctx.Done():
return false
case s.msgCh <- msg:
return true
}
}
// Recv will block on receiving Message{}, returning early with a
// false value if provided context is canceled, or stream closed.
func (s *Stream) Recv(ctx context.Context) (Message, bool) {
select {
case <-s.done:
return Message{}, false
case <-ctx.Done():
return Message{}, false
case msg := <-s.msgCh:
return msg, true
}
}
// Close will close the underlying context, finally
// removing it from the parent Streams per-account-map.
func (s *Stream) Close() {
select {
case <-s.done:
default:
close(s.done)
s.close()
}
}
// cas will perform a Compare And Swap operation on s.types using modifier func.
func (s *Stream) cas(fn func(map[string]struct{}) bool) {
if fn == nil {
panic("nil function")
}
for {
var m map[string]struct{}
// Get current value.
ptr := s.types.Load()
if ptr == nil {
// Allocate new types map.
m = make(map[string]struct{})
} else {
// Clone r-only map.
m = maps.Clone(*ptr)
}
// Apply
// changes.
if !fn(m) {
return
}
// Attempt to Compare And Swap ptr.
if s.types.CompareAndSwap(ptr, &m) {
return
}
}
}
// Message represents
// one streamed message.
type Message struct {
// All the stream types this
// message should be delivered to.
Stream []string `json:"stream"`
// The event type of the message
// (update/delete/notification etc)
Event string `json:"event"`
// The actual payload of the message. In case of an
// update or notification, this will be a JSON string.
Payload string `json:"payload"`
}