9

I'm just wondering if someone can point me in the right direction about the async/await framework and thread pools?

Basically, what I'm trying to do is have x number of operations executed in a separate thread/async, but across a maximum of y threads.

For example, let's say that I have 100 database operations: await _repository.WriteData(someData);

What I'd like to do is have some method of running 10 of those operations at a time (ideally in a separate thread each, so 10 threads), and as each one is finished, the next is kicked off on the thread that then becomes available. We then wait for all operations to complete and all threads to finish...

Is this something that is readily achievable without too much effort or adding huge amounts of complexity?

Juzzbott
  • 1,737
  • 2
  • 25
  • 44
  • why not just use await and let the framework handle the threads for you? Have you done any performance testing that suggests the framework doesn't suit your needs? – Gabriel Sadaka Mar 01 '16 at 21:42
  • @gabriel the framework has no idea about the optimal IO parallism. How could it know? – usr Mar 01 '16 at 21:45
  • @Eser, I agree that it's a duplicate, but the answer given to this one is already of higher quality, so this looks promising – Kirill Shlenskiy Mar 01 '16 at 21:50
  • 7
    Wait, I am confused. If they are IO operations then why are there **any** threads? You wouldn't hire one worker per letter you receive to be responsible for waiting by the mailbox for that letter. Why would you hire a bunch of threads to sit around and wait for a database? – Eric Lippert Mar 01 '16 at 21:52
  • No it doesn't but in this case the example is for database operations which the database server would be responsible for handling the parallel IO operations. It really depends on what the use case is which he hasn't specified. – Gabriel Sadaka Mar 01 '16 at 21:53

2 Answers2

21

I think you're missing the point by focusing on threads, especially for asynchronous operations that don't require threads to execute.

.NET has a great ThreadPool you can use. You don't know how many threads are in it and you don't care. It just works (until it doesn't and you need to configure it yourself, but that's very advance).

Running tasks on the ThreadPool is very simple. Either create a task for each operation and throttle them using a SemaphoreSlim or use the ready-made TPL Dataflow blocks. For example:

var block = new ActionBlock<SomeData>(
    _ => _repository.WriteDataAsync(_), // What to do on each item
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); // How many items at the same time

foreach (var item in items)
{
    block.Post(item); // Post all items to the block
}

block.Complete(); // Signal completion
await block.Completion; // Asynchronously wait for completion.

If, however, you do plan on creating "dedicated" threads you can use Task.Factory.StartNew with the LongRunning option that creates a dedicated thread outside the ThreadPool. But keep in mind that async operations don't maintain the same thread throughout the operation as async operations don't need a thread. So starting on a dedicated thread may be pointless (more on that on my blog: LongRunning Is Useless For Task.Run With async-await)

i3arnon
  • 113,022
  • 33
  • 324
  • 344
  • 1
    TPL Dataflow is a great fit for the stated problem. Not sure why anyone would downvote this. – Kirill Shlenskiy Mar 01 '16 at 21:53
  • 1
    TPL Dataflow is not "built-in". I have learned that from your blog, haha – hyankov Oct 31 '17 at 09:18
  • @i3arnon How to add a new task to queue when any one task of 10 task completed. Means add a new task to the queue. Here my objective is for example if we already set a limit of 10 task run in a single time by SemaphoreSlim or MaxDegreeOfParallelism but I don't want to create 100 task and then set limit by SemaphoreSlim or MaxDegreeOfParallelism and control them to run 10 on a single time. , I only want to create a new task when any one task completed from 10 task and this process will continue infinitely. – virender Apr 20 '18 at 06:51
  • How can i get a result from ActionBlock by order as added items when it done? – dellos Oct 06 '22 at 07:24
  • @dellos Use a `TransfromBlock` instead. – i3arnon Oct 11 '22 at 11:52
13

@i3arnon's answer is correct. Use TPL Dataflow.

The rest of this answer is for educational purposes and/or special use cases only.

I've recently bumped into a similar problem in a project where I could not introduce any external dependencies, so I had to roll my own load-balancing implementation, and it turned out being surprisingly simple (until you start wiring in cancellation and ordered results - but that's outside the scope of this question).

I am disregarding the "10 dedicated threads" requirement since, as others have already explained, it does not make sense when dealing with async operations. Instead I will maintain up to N concurrent Task instances processing the work load.

static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism)
{
    Queue<Func<Task>> queue = new Queue<Func<Task>>(taskFactories);

    if (queue.Count == 0) {
        return;
    }

    List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);

    do
    {
        while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
        {
            Func<Task> taskFactory = queue.Dequeue();

            tasksInFlight.Add(taskFactory());
        }

        Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);

        // Propagate exceptions. In-flight tasks will be abandoned if this throws.
        await completedTask.ConfigureAwait(false);

        tasksInFlight.Remove(completedTask);
    }
    while (queue.Count != 0 || tasksInFlight.Count != 0);
}

Usage:

Func<Task>[] taskFactories = {
    () => _repository.WriteData(someData1),
    () => _repository.WriteData(someData2),
    () => _repository.WriteData(someData3),
    () => _repository.WriteData(someData4)
};

await InvokeAsync(taskFactories, maxDegreeOfParallelism: 2);

... or

IEnumerable<SomeData> someDataCollection = ... // Get data.

await ParallelTasks.InvokeAsync(
    someDataCollection.Select(someData => new Func<Task>(() => _repository.WriteData(someData))),
    maxDegreeOfParallelism: 10
);

This solution does not suffer from the poor load balancing problem which is often seen in other trivial implementations in cases where tasks have varying durations and the input is pre-partitioned (such as this one).

Version with perf optimizations and argument validation: Gist.

Community
  • 1
  • 1
Kirill Shlenskiy
  • 9,367
  • 27
  • 39
  • Nice solution, two suggestions though: 1. Use a `LinkedList` to track in-flight tasks, as its insertion/removal is O(1) without any memory moves involved under the hood. 2. If a task throws catch it outside of the loop and then `WaitAll` on the final in-flight list collecting any exceptions into one `AggregateException`. That way you account for all tasks and their end states instead of letting them to potentially outlive your `InvokeAsync`. – Boris B. Jan 31 '22 at 01:17