Refactored parallel-fetch logic.

The wall fetches all servers in parallel, but tasks per
server run sequence to not overwhelm individual servers.
This commit is contained in:
Marcel Hellkamp 2023-07-24 14:18:33 +02:00
parent de8d4f834e
commit 15883a53aa

View File

@ -39,7 +39,7 @@ watchDebounced(windowSize.width, () => { fixLayout() }, { debounce: 500, maxWait
const isDartPrefered = usePreferredDark()
const actualTheme = computed(() => {
var theme = config.value?.theme
if(!theme || theme === "auto")
if (!theme || theme === "auto")
theme = isDartPrefered.value ? "dark" : "light"
return theme
})
@ -59,52 +59,6 @@ watch(visibilityState, () => {
restartUpdates()
})
// Souces grouped by server
type SourceConfig = {
domain: string,
tags: string[],
accounts: string[],
}
// Source configurations grouped by server domain
const groupedSources = computed<Array<SourceConfig>>(() => {
const cfg = config.value
if (!cfg) return [];
const sources: Record<string, SourceConfig> = {}
const forServer = (domain: string) => {
if (!sources.hasOwnProperty(domain))
sources[domain] = { domain, tags: [], accounts: [] }
return sources[domain]
}
// Tags are searched on all servers
cfg.servers.forEach(domain => {
const source = forServer(domain)
source.tags = [...cfg.tags]
})
// Accounts are searched on the server they belong to.
// Non-qualified accounts are searched on all servers.
cfg.accounts.forEach(account => {
var [user, domain] = account.split('@', 2)
if (domain) {
forServer(domain).accounts.push(user)
} else {
cfg.servers.forEach(domain => {
forServer(domain).accounts.push(user)
})
}
})
return Object.values(sources).map(src => {
src.accounts = src.accounts.sort().filter((v, i, a) => a.indexOf(v) == i)
src.tags = src.tags.sort().filter((v, i, a) => a.indexOf(v) == i)
return src
});
})
/**
* Fetch a json resources from a given URL.
* Automaticaly detect mastodon rate limits and wait and retry up to 3 times.
@ -145,12 +99,12 @@ async function fetchJson(url: string) {
* Returns the instance-local account ID for a given user name.
* Results are cached. Returns null if not found, or undefined on errors.
*/
async function getLocalUserId(user: string, domain: string) {
async function getLocalUser(user: string, domain: string): Promise<any> {
const key = `${user}@${domain}`
if (!accountToLocalId.hasOwnProperty(key)) {
try {
accountToLocalId[key] = (await fetchJson(`https://${domain}/api/v1/accounts/lookup?acct=${encodeURIComponent(user)}`)).id
accountToLocalId[key] = (await fetchJson(`https://${domain}/api/v1/accounts/lookup?acct=${encodeURIComponent(user)}`))
} catch (e) {
if ((e as any).status === 404)
accountToLocalId[key] = null;
@ -209,41 +163,78 @@ const statusToWallPost = (status: any): Post => {
}
}
/**
* Fetch all new statuses from a given source.
*/
const fetchSource = async (source: SourceConfig) => {
const cfg = config.value
if (!cfg) return []
const posts = []
for (let tag of source.tags) {
if(tag.startsWith("!")) continue;
if(tag.startsWith("#")) tag = tag.substring(1)
const items = await fetchJson(`https://${source.domain}/api/v1/timelines/tag/${encodeURIComponent(tag)}?limit=${cfg.limit}`)
posts.push(...items)
}
for (let account of source.accounts) {
const localUserId = await getLocalUserId(account, source.domain)
if (!localUserId) continue;
const items = await fetchJson(`https://${source.domain}/api/v1/accounts/${encodeURIComponent(localUserId)}/statuses?limit=${cfg.limit}&exclude_replies=True`)
posts.push(...items)
}
return posts
}
/**
* Fetch Posts from all sources.
*/
async function fetchAllPosts() {
const cfg = config.value
if (!cfg) return []
const posts: Post[] = []
const addOrReplace = (post?: Post) => {
if (!post) return
type Task = () => Promise<any[]>;
// Group tasks by domain (see below)
const domainTasks: Record<string, Array<Task>> = {}
const addTask = (domain: string, task: Task) => {
(domainTasks[domain] ??= []).push(task)
}
// Load tags from all servers
for (const domain of cfg.servers) {
for (const tag of cfg.tags) {
addTask(domain, () => {
return fetchJson(`https://${domain}/api/v1/timelines/tag/${encodeURIComponent(tag)}?limit=${cfg.limit}`)
})
}
}
// Load account timelines from the home server of the account, or all servers
// if the account is not fully qualified (missing domain part).
for (const account of cfg.accounts) {
const [user, domain] = account.split('@', 2)
const domains = domain ? [domain] : [...cfg.servers]
for (const domain of domains) {
addTask(domain, async () => {
const localUser = await getLocalUser(user, domain)
if (!localUser || !localUser.id) return [];
if (localUser.bot && cfg.hideBots && cfg.hideBoosts) return [];
let url = `https://${domain}/api/v1/accounts/${encodeURIComponent(localUser.id)}/statuses?limit=${cfg.limit}`
if (cfg.hideReplies) url += "&exclude_replies=True"
if (cfg.hideBoosts) url += "&exclude_reblogs=True"
return await fetchJson(url)
})
}
}
// Load trends from all servers
if (cfg.loadTrends) {
for (const domain of cfg.servers) {
addTask(domain, async () => {
return await fetchJson(`https://${domain}/api/v1/trends/statuses?limit=${cfg.limit}`)
})
}
}
// Load public timeline from all servers, optionally limited to just local
// or just federated posts.
if (cfg.loadPublic || cfg.loadFederated) {
for (const domain of cfg.servers) {
let url = `https://${domain}/api/v1/timelines/public`
if (!cfg.loadPublic)
url += "?remote=True"
if (!cfg.loadFederated)
url += "?local=True"
addTask(domain, async () => {
return await fetchJson(url)
})
}
}
// Collect results
const posts: Post[] = []
const addOrRepaceStatus = (status: any) => {
if(!status || !filterStatus(status)) return;
const post = statusToWallPost(status)
const i = posts.findIndex(p => p.url === post.url)
if (i >= 0)
posts[i] = post
@ -251,33 +242,27 @@ async function fetchAllPosts() {
posts.unshift(post)
}
type Job = ()=>void;
const jobsPerServer: Record<string, Array<Job>> = {}
// Be nice and not overwhelm servers with parallel requests.
// Run tasks for the same domain in sequence instead.
const groupedTasks = Object.entries(domainTasks)
.map(([domain, tasks]) => {
return async () => {
for (const task of tasks) {
try {
(await task()).forEach(addOrRepaceStatus)
} catch (err) {
console.warn(`Update task failed for domain ${domain}`, err)
}
}
}
})
const addJob = (domain:string, job: Job) => {
(jobsPerServer[domain] ??= []).push(job)
}
for(const domain of cfg.servers) {
for(const tag of cfg.tags) {
if(tag.startsWith("!")) continue
if(!tag.startsWith){}
}
}
// Start all sources in parallel
const tasks = groupedSources.value.map(source => fetchSource(source));
const results = await Promise.allSettled(tasks);
for (const result of results) {
if (result.status === "fulfilled") {
result.value.filter(filterStatus).map(statusToWallPost).forEach(addOrReplace)
} else {
const err = result.reason;
}
}
// Start all the domain-grouped tasks in parallel, so reach server can be
// processed as fast as its rate-limit allows.
// TODO: Add a timeout
await Promise.allSettled(groupedTasks.map(task => task()))
// Done. Return collected posts
return posts
}
@ -443,7 +428,8 @@ const privacyLink = computed(() => {
}} mode]</button>
<button class="btn btn-link text-muted" data-bs-toggle="modal" data-bs-target="#configModal">[Customize]</button>
<div>
<a href="https://github.com/defnull/fediwall" target="_blank" class="mx-1 text-muted">Fediwall <span v-if="gitVersion">{{ gitVersion }}</span></a>
<a href="https://github.com/defnull/fediwall" target="_blank" class="mx-1 text-muted">Fediwall <span
v-if="gitVersion">{{ gitVersion }}</span></a>
- <a href="https://github.com/defnull/fediwall" target="_blank" class="mx-1">Github</a>
- <a href="https://github.com/defnull/fediwall#readme" target="_blank" class="mx-1">Documentation</a>
- <a :href="privacyLink" target="_blank" class="mx-1">Privacy policy</a>