From 15883a53aa160586f249fbd44217151631ce2aa0 Mon Sep 17 00:00:00 2001 From: Marcel Hellkamp Date: Mon, 24 Jul 2023 14:18:33 +0200 Subject: [PATCH] Refactored parallel-fetch logic. The wall fetches all servers in parallel, but tasks per server run sequence to not overwhelm individual servers. --- src/App.vue | 192 ++++++++++++++++++++++++---------------------------- 1 file changed, 89 insertions(+), 103 deletions(-) diff --git a/src/App.vue b/src/App.vue index 7cb7fce..0db7134 100644 --- a/src/App.vue +++ b/src/App.vue @@ -39,7 +39,7 @@ watchDebounced(windowSize.width, () => { fixLayout() }, { debounce: 500, maxWait const isDartPrefered = usePreferredDark() const actualTheme = computed(() => { var theme = config.value?.theme - if(!theme || theme === "auto") + if (!theme || theme === "auto") theme = isDartPrefered.value ? "dark" : "light" return theme }) @@ -59,52 +59,6 @@ watch(visibilityState, () => { restartUpdates() }) -// Souces grouped by server -type SourceConfig = { - domain: string, - tags: string[], - accounts: string[], -} - -// Source configurations grouped by server domain -const groupedSources = computed>(() => { - const cfg = config.value - if (!cfg) return []; - - const sources: Record = {} - - const forServer = (domain: string) => { - if (!sources.hasOwnProperty(domain)) - sources[domain] = { domain, tags: [], accounts: [] } - return sources[domain] - } - - // Tags are searched on all servers - cfg.servers.forEach(domain => { - const source = forServer(domain) - source.tags = [...cfg.tags] - }) - - // Accounts are searched on the server they belong to. - // Non-qualified accounts are searched on all servers. - cfg.accounts.forEach(account => { - var [user, domain] = account.split('@', 2) - if (domain) { - forServer(domain).accounts.push(user) - } else { - cfg.servers.forEach(domain => { - forServer(domain).accounts.push(user) - }) - } - }) - - return Object.values(sources).map(src => { - src.accounts = src.accounts.sort().filter((v, i, a) => a.indexOf(v) == i) - src.tags = src.tags.sort().filter((v, i, a) => a.indexOf(v) == i) - return src - }); -}) - /** * Fetch a json resources from a given URL. * Automaticaly detect mastodon rate limits and wait and retry up to 3 times. @@ -145,12 +99,12 @@ async function fetchJson(url: string) { * Returns the instance-local account ID for a given user name. * Results are cached. Returns null if not found, or undefined on errors. */ -async function getLocalUserId(user: string, domain: string) { +async function getLocalUser(user: string, domain: string): Promise { const key = `${user}@${domain}` if (!accountToLocalId.hasOwnProperty(key)) { try { - accountToLocalId[key] = (await fetchJson(`https://${domain}/api/v1/accounts/lookup?acct=${encodeURIComponent(user)}`)).id + accountToLocalId[key] = (await fetchJson(`https://${domain}/api/v1/accounts/lookup?acct=${encodeURIComponent(user)}`)) } catch (e) { if ((e as any).status === 404) accountToLocalId[key] = null; @@ -209,41 +163,78 @@ const statusToWallPost = (status: any): Post => { } } -/** - * Fetch all new statuses from a given source. - */ -const fetchSource = async (source: SourceConfig) => { - const cfg = config.value - if (!cfg) return [] - const posts = [] - - for (let tag of source.tags) { - if(tag.startsWith("!")) continue; - if(tag.startsWith("#")) tag = tag.substring(1) - const items = await fetchJson(`https://${source.domain}/api/v1/timelines/tag/${encodeURIComponent(tag)}?limit=${cfg.limit}`) - posts.push(...items) - } - - for (let account of source.accounts) { - const localUserId = await getLocalUserId(account, source.domain) - if (!localUserId) continue; - const items = await fetchJson(`https://${source.domain}/api/v1/accounts/${encodeURIComponent(localUserId)}/statuses?limit=${cfg.limit}&exclude_replies=True`) - posts.push(...items) - } - - return posts -} - /** * Fetch Posts from all sources. */ async function fetchAllPosts() { const cfg = config.value if (!cfg) return [] - const posts: Post[] = [] - const addOrReplace = (post?: Post) => { - if (!post) return + type Task = () => Promise; + + // Group tasks by domain (see below) + const domainTasks: Record> = {} + const addTask = (domain: string, task: Task) => { + (domainTasks[domain] ??= []).push(task) + } + + // Load tags from all servers + for (const domain of cfg.servers) { + for (const tag of cfg.tags) { + addTask(domain, () => { + return fetchJson(`https://${domain}/api/v1/timelines/tag/${encodeURIComponent(tag)}?limit=${cfg.limit}`) + }) + } + } + + // Load account timelines from the home server of the account, or all servers + // if the account is not fully qualified (missing domain part). + for (const account of cfg.accounts) { + const [user, domain] = account.split('@', 2) + const domains = domain ? [domain] : [...cfg.servers] + for (const domain of domains) { + addTask(domain, async () => { + const localUser = await getLocalUser(user, domain) + if (!localUser || !localUser.id) return []; + if (localUser.bot && cfg.hideBots && cfg.hideBoosts) return []; + + let url = `https://${domain}/api/v1/accounts/${encodeURIComponent(localUser.id)}/statuses?limit=${cfg.limit}` + if (cfg.hideReplies) url += "&exclude_replies=True" + if (cfg.hideBoosts) url += "&exclude_reblogs=True" + return await fetchJson(url) + }) + } + } + + // Load trends from all servers + if (cfg.loadTrends) { + for (const domain of cfg.servers) { + addTask(domain, async () => { + return await fetchJson(`https://${domain}/api/v1/trends/statuses?limit=${cfg.limit}`) + }) + } + } + + // Load public timeline from all servers, optionally limited to just local + // or just federated posts. + if (cfg.loadPublic || cfg.loadFederated) { + for (const domain of cfg.servers) { + let url = `https://${domain}/api/v1/timelines/public` + if (!cfg.loadPublic) + url += "?remote=True" + if (!cfg.loadFederated) + url += "?local=True" + addTask(domain, async () => { + return await fetchJson(url) + }) + } + } + + // Collect results + const posts: Post[] = [] + const addOrRepaceStatus = (status: any) => { + if(!status || !filterStatus(status)) return; + const post = statusToWallPost(status) const i = posts.findIndex(p => p.url === post.url) if (i >= 0) posts[i] = post @@ -251,33 +242,27 @@ async function fetchAllPosts() { posts.unshift(post) } - type Job = ()=>void; - const jobsPerServer: Record> = {} + // Be nice and not overwhelm servers with parallel requests. + // Run tasks for the same domain in sequence instead. + const groupedTasks = Object.entries(domainTasks) + .map(([domain, tasks]) => { + return async () => { + for (const task of tasks) { + try { + (await task()).forEach(addOrRepaceStatus) + } catch (err) { + console.warn(`Update task failed for domain ${domain}`, err) + } + } + } + }) - const addJob = (domain:string, job: Job) => { - (jobsPerServer[domain] ??= []).push(job) - } - - for(const domain of cfg.servers) { - for(const tag of cfg.tags) { - if(tag.startsWith("!")) continue - if(!tag.startsWith){} - } - } - - - // Start all sources in parallel - const tasks = groupedSources.value.map(source => fetchSource(source)); - const results = await Promise.allSettled(tasks); - - for (const result of results) { - if (result.status === "fulfilled") { - result.value.filter(filterStatus).map(statusToWallPost).forEach(addOrReplace) - } else { - const err = result.reason; - } - } + // Start all the domain-grouped tasks in parallel, so reach server can be + // processed as fast as its rate-limit allows. + // TODO: Add a timeout + await Promise.allSettled(groupedTasks.map(task => task())) + // Done. Return collected posts return posts } @@ -443,7 +428,8 @@ const privacyLink = computed(() => { }} mode]