Remove background tasks

This commit is contained in:
Gervasio Marchand 2023-07-05 09:11:42 -03:00
parent 032e03c823
commit e05d91d93d
No known key found for this signature in database
GPG Key ID: B7736CB188DD0A38
3 changed files with 3 additions and 111 deletions

View File

@ -1,7 +1,5 @@
using System.Threading.Tasks;
using FakeRelay.Core; using FakeRelay.Core;
using FakeRelay.Core.Helpers; using FakeRelay.Core.Helpers;
using FakeRelay.Web.Services;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
namespace FakeRelay.Web.Controllers; namespace FakeRelay.Web.Controllers;
@ -10,15 +8,15 @@ public class AcitivityPubController : Controller
{ {
[Route("actor")] [Route("actor")]
public async Task<ActionResult> Actor([FromServices] IBackgroundTaskQueue taskQueue) => public async Task<ActionResult> Actor() =>
Content(MastodonHelper.GetActorStaticContent(), "application/activity+json; charset=utf-8"); Content(MastodonHelper.GetActorStaticContent(), "application/activity+json; charset=utf-8");
[Route("inbox"), HttpPost] [Route("inbox"), HttpPost]
public async Task<ActionResult> Inbox([FromBody] ActivityPubModel model, [FromServices] IBackgroundTaskQueue taskQueue) public async Task<ActionResult> Inbox([FromBody] ActivityPubModel model)
{ {
if (model.Type == "Follow") if (model.Type == "Follow")
{ {
await taskQueue.QueueBackgroundWorkItemAsync(_ => MastodonHelper.ProcessInstanceFollowAsync(model)); await MastodonHelper.ProcessInstanceFollowAsync(model);
} }
return Content("{}", "application/activity+json"); return Content("{}", "application/activity+json");

View File

@ -1,5 +1,4 @@
using FakeRelay.Core; using FakeRelay.Core;
using FakeRelay.Web.Services;
using Microsoft.AspNetCore.HttpOverrides; using Microsoft.AspNetCore.HttpOverrides;
using Prometheus; using Prometheus;
@ -26,9 +25,6 @@ builder.Services.Configure<ForwardedHeadersOptions>(options =>
options.ForwardLimit = 1; options.ForwardLimit = 1;
}); });
builder.Services.AddHostedService<QueuedHostedService>();
builder.Services.AddSingleton<IBackgroundTaskQueue>(_ => new BackgroundTaskQueue(30));
var app = builder.Build(); var app = builder.Build();
// Configure the HTTP request pipeline. // Configure the HTTP request pipeline.

View File

@ -1,102 +0,0 @@
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace FakeRelay.Web.Services;
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, Task> workItem);
ValueTask<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, Task>> _queue;
public BackgroundTaskQueue(int capacity)
{
// Capacity should be set based on the expected application load and
// number of concurrent threads accessing the queue.
// BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
// which completes only when space became available. This leads to backpressure,
// in case too many publishers/calls start accumulating.
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, Task>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, Task> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
public class QueuedHostedService : BackgroundService
{
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger)
{
TaskQueue = taskQueue;
_logger = logger;
}
public IBackgroundTaskQueue TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"Queued Hosted Service is running.");
await BackgroundProcessing(stoppingToken);
}
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem =
await TaskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");
await base.StopAsync(stoppingToken);
}
}