2

I'm currently using an ActionBlock to process serially started asynchronous jobs. It works very well for processing each item Posted to it, but there is no way to collect a list of the results from each job.

What can I use to collect the results of my jobs in a thread safe manner?

My code is currently something like this:

var actionBlock = new ActionBlock<int> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
    actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;

I've tried using a TransformBlock instead, but it hangs indefinitely when awaiting the Completion. The completion's status is "WaitingForActivation".

My code with the TransformBlock is something like this:

var transformBlock = new TransformBlock<int, string> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
    actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;
transformBlock.TryReceiveAll(out IList<string> strings);
JSteward
  • 6,833
  • 2
  • 21
  • 30
kiml42
  • 638
  • 2
  • 11
  • 26
  • 1
    Your `TransformBlock` simply couldn't unload it's output buffer so it will hang. An easy solution is to have your `ActionBlock` add items to concurrent collection like a `ConcurrentBag`. – JSteward Mar 20 '18 at 17:23
  • Thanks, That's pretty much what I've ended up with, but I rambled around a bit first, so I'm using a Parallel.Foreach, which made the code much more succinct. – kiml42 Mar 22 '18 at 10:01
  • To collect all the results of an `TransformBlock` as a `Task>`, you could use the method `ToListAsync` that is found [here](https://stackoverflow.com/questions/58714155/tpl-how-do-i-split-and-merge-the-dataflow/58751948#58751948). – Theodor Zoulias Jun 15 '20 at 16:33

1 Answers1

6

It turns out a ConcurrentBag is the answer

var bag = new ConcurrentBag<string>();
var actionBlock = new ActionBlock<int> (async i => 
   bag.Add(await Process(i))
);
for(int i = 0; i < 100; i++)
{
    actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;

Now 'bag' has all the results in it, and can be accessed as an IEnumerable.

The code I've actually ended up using uses a Parallel.ForEach instead of the ActionBlock.

Parallel.ForEach
(
    inputData, 
    i => bag.Add(await Process(i))
);

This is quite a lot simpler, but seems about as good for performance and still has options to limit the degree of parallelism etc.

kiml42
  • 638
  • 2
  • 11
  • 26