0

I'm following this tutorial to create a hosted service. The program runs as expected. However, I want to process the queued items concurrently.

In my app, there are 4 clients, each of these clients can process 4 items at a time. So at any given time, 16 items should be processed in parallel.

So based on these requirements, I've modified the code a bit:

In the MonitorLoop class:

private int count = 0;
private async ValueTask MonitorAsync()
{
    while (!_cancellationToken.IsCancellationRequested)
    {
        await _taskQueue.QueueAsync(BuildWorkItem);
        Interlocked.Increment(ref count);
        Console.WriteLine($"Count: {count}");
    }
}

and in the same class:

    if (delayLoop == 3)
    {
        _logger.LogInformation("Queued Background Task {Guid} is complete.", guid);
        Interlocked.Decrement(ref count);
    }

This shows that, if I set the "Capacity" as 4, the value will never increase after 5. Basically, if the queue is full, it will wait until there's room for one more.

The problem is that the items are processed one at a time.

Here's the code for the BackgroundProcessing method on the QueuedHostedService class:

private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        var workItem = await TaskQueue.DequeueAsync(stoppingToken);

        try
        {
            //instead of getting a single item from the queue, somehow, here
            //we should be able to process them in parallel for 4 clients
            //with a limit for maximum items each client can process
            await workItem(stoppingToken);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
        }
    }
}

I want to process them in parallel. I'm not sure if using Channel as the queue in the system is the best solution. Maybe I should have a ConcurrentQueue instead. But again, I'm not sure how to achieve a robust implementation that can have 4 clients with 4 threads each.

Alireza Noori
  • 14,961
  • 30
  • 95
  • 179
  • I guess you want to simulate the situation: having four client and each client can handle 4 itemes at one time. so why do you make four background services and each background service take four item each time and using parallel.for to handle it at the same time? – miemengniao Dec 23 '22 at 00:42
  • @灭蒙鸟 Do you mean creating 4 instances of `QueuedHostedService` or `MonitorLoop`? Please keep in mind that the items are shared between the clients. So suppose there's 1000 URLs for the files, there are 4 HttpClients and each should download 4 URLs at a time. – Alireza Noori Dec 23 '22 at 01:13

1 Answers1

1

If you want four processors, then you can refactor the code to use four instances of your main loop, and use Task.WhenAll to (asynchronously) wait for all of them to complete:

private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
  var task1 = ProcessAsync(stoppingToken);
  var task2 = ProcessAsync(stoppingToken);
  var task3 = ProcessAsync(stoppingToken);
  var task4 = ProcessAsync(stoppingToken);
  await Task.WhenAll(task1, task2, task3, task4);

  async Task ProcessAsync(CancellationToken stoppingToken)
  {
    while (!stoppingToken.IsCancellationRequested)
    {
      var workItem = await TaskQueue.DequeueAsync(stoppingToken);

      try
      {
        await workItem(stoppingToken);
      }
      catch (Exception ex)
      {
        _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
      }
    }
  }
}

I'm not sure how to achieve a robust implementation

If you want a robust implementation, then you can't use that tutorial, sorry. The primary problem with that kind of background work is that it will be lost on any app restart. And app restarts are normal: the server can lose power or crash, OS or runtime patches can be installed, IIS will recycle your app periodically, and whenever you deploy your code, the app will restart. And whenever any of these things happen, all in-memory queues like channels will lose all their work.

A production-quality implementation requires a durable queue at the very least. I also recommend a separate background processor. I have a blog series on the subject that may help you get started.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Thank you for the response. I think I asked the wrong question. Could you please take a look at my new question and hopefully provide some guidance? https://stackoverflow.com/questions/74902687/distribute-concurrentqueue-amongst-workers-equally – Alireza Noori Dec 23 '22 at 18:01