3

My task is to write a known nr of values to an external system by using an (async) interface. I have to limit the maximum number of parallel writes that are executed concurrently. Additionally I've got to use load balancing because it may take longer for some values to be written by that external system.

I know how to solve these problems each on it's own:

Degree of parallelism:

new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites}

I also stumbled over this article: http://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx

Load balancing:

var partitioner = Partitioner.Create(values.ToList(), true);

Task from async interface:

var writeTask = Task<AccessResult>.Factory.FromAsync(BeginWriteValue, EndWriteValue, value.SystemId, value.Xml, priority, null);



But how do I correctly combine all this techniques? I created the following code:

  int maxNrParallelWrites = GetMaxNrParallelWrites();
  var partitioner = Partitioner.Create(values.ToList(), true);
  Parallel.ForEach(partitioner, new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites},
    (val) =>
    {
      var writeValueTask = GetWriteValueTask(val, priority);
      Task.WaitAny(writeValueTask);
    });

I'm especially unsure about the the last part of the previous code: the action that executes the workload. Would it be better instead of creating a WriteValueTask directly use the synchronous interface like this:

(val) =>
    {
      var accessResult = externalSystem.WriteValue(....);
    }

Or is it okay to create a task and then directly wait for it (Task.WaitAny(...))?

i3arnon
  • 113,022
  • 33
  • 324
  • 344
samwise
  • 1,907
  • 1
  • 21
  • 24
  • 2
    Load balancing is tricky when it comes to parallel async work, so you'd be wise to set that aside until you've tackled the rest of your design (l3arnon's answer is certainly a step in the right direction), and only get into it if you conclusively determine that long-running calls (outliers) slow things down to an unacceptable degree. – Kirill Shlenskiy Sep 24 '14 at 06:09
  • possible duplicate of [Throttling asynchronous tasks](http://stackoverflow.com/questions/22492383/throttling-asynchronous-tasks) – Yuval Itzchakov Sep 24 '14 at 06:55

2 Answers2

2

You should use TPL Dataflow's ActionBlock that encapsulates all that for you. It's an actor based framework that is part of the TPL:

var block = new ActionBlock<Value>(
    value => GetWriteValueTask(value, priority)
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = GetMaxNrParallelWrites();
    });

foreach (var value in values)
{
    block.Post(value);
}

You can set the MaxDegreeOfParallelism, BoundedCapacity and load balancing is baked in because it handles only MaxDegreeOfParallelism items at a time, and when each completes it handles the next one (as opposed to using a Partitioner that partitions the collection in advance)

Note: When you take an async task and wait for it to complete synchronously (i.e. Task.WaitAny) nothing is actually asynchronous. You should be using Task.WhenAny instead in such cases.

i3arnon
  • 113,022
  • 33
  • 324
  • 344
  • Thank you. I've tested it and I think it perfectly fits me needs. I'm very interested how the ActionBlock class works. Is there a collection with the posted values to process which are removed from this buffer when processed? I didn't find an MSDN article that describes this in detail. – samwise Sep 24 '14 at 07:23
  • @samwise In the case of ActionBlock there's an internal queue. You can get its size by using the `InputCount` property by you don't have any access to it. Here's more about Dataflow: http://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx – i3arnon Sep 24 '14 at 07:34
1

There is a good example of how to create a load balancing ForEachASync method in this article.. I've taken out the Task.Run to avoid starting a new thread and then the extension method becomes this:

public static class Extensions
{
    public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
    {
        using (partition)
            while (partition.MoveNext())
                await body(partition.Current);
    }

    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select ExecuteInPartition(partition, body));
    }
}

Usage

This example asynchronously processes a maximum of 100 emails at a time

 // Process 100 emails at a time
 return emailsToProcess.ForEachAsync(100, ProcessSingleEmail);
NeddySpaghetti
  • 13,187
  • 5
  • 32
  • 61
  • Isn't the problem with partitions that if a bunch of long running tasks randomly comes into the same partition, this isn't efficient at all? Because then all long running tasks have to be process sequentially while the fast tasks are finished long ago? – samwise Sep 24 '14 at 07:19
  • @samwise the 100 tasks are processed concurrently and as soon as the fast ones are finished, more tasks can be started as long as the total number is less than or equal to 100. – NeddySpaghetti Sep 24 '14 at 07:28
  • @NedStoyanov ah now I understand. Foreach of the tasks is a partition created. Is this efficient or just an abuse of the partitions to reach the goal? But thank you anyway, I never used TPL before and learned a lot of you guys. – samwise Sep 24 '14 at 07:45