1

I'm new to TPL Dataflow and I have it working but I am not sure if I'm using it properly. I have a list of inputs (strings) and I want to process them (all) with a max degree of parallelism and know when it's all complete. Right now I just foreach through the inputs and call Post on the ActionBlock, ignoring the return value. This seems incorrect since it could miss inputs.

My question is: how do I avoid missing items? Is there a built-in block to which I can just give my inputs and it will make sure they are all attempted? (Regardless of success/failure per input.)

The suggestions I've seen basically amount to:

await block.Completion;

Does this account for failed inputs (where Post or SendAsync would return false)? The strange thing for me is that it seems like this determination is made when I call Post and not after, so this Completion wouldn't even include those items.

I feel like I need basically a retry loop for the inputs that it wasn't able to handle the previous time around, something similar to:

while (items.Count > 0) {
  foreach (var item in items) {
    if (await block.SendAsync(item)) {
      items.Remove(item);
    }
  }

  await block.Completion;
}

block.Complete();

(Except with better loop handling/error checking.)

Is this additional level unnecessary? Or am I wrong conceptually somewhere?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Josh
  • 6,944
  • 8
  • 41
  • 64
  • Related: [TPL Dataflow, whats the functional difference between Post() and SendAsync()?](https://stackoverflow.com/questions/13599305/tpl-dataflow-whats-the-functional-difference-between-post-and-sendasync) – Theodor Zoulias Aug 14 '19 at 06:00

2 Answers2

1

This seems incorrect since it could miss inputs.

Assuming you're using the defaults, this is correct. Post only returns false if the block refuses the input. This can happen if the block has received a Complete signal, or if the block's input buffer is full. By default, each block's input buffer can grow indefinitely, so an ActionBlock with a default input buffer size will only return false from Post after Complete is called.

The most common use case for ActionBlock is with an unlimited bounded capacity and where the code only calls Complete after all items have been added. In that case, Post will never return false and you can safely ignore the return value.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
0

The method Post will return false if the block is completed, or if the block's input buffer is full. Since the setting BoundedCapacity is not something exotic, and may well become required in a later stage of a project to solve emerging problems with high RAM usage, I don't think it's a safe bet to use the Post method and simply ignore the result. To be protected from unfunny bugs involving missing messages (that could be orders or invoices), you could do something like this:

foreach (var item in items)
{
    var accepted = block.Post(item);
    if (!accepted) throw new InvalidOperationException("Item was not accepted");
}

This way you'll at least be notified that something is broken, and won't let buggy behavior to creep in.

On the other hand awaiting SendAsync and ignoring the result is much safer. The SendAsync will normally return false in conditions where an exception has happened or a cancellation has occurred, in which case you'll get notified at the point you await for the Completion of the block. So no need to throw exceptions in this case.

foreach (var item in items)
{
    await block.SendAsync(item).ConfigureAwait(false);
}

For performance reasons you could use both Post and SendAsync. This will make a difference only if you have tens of millions of items to process.

foreach (var item in items)
{
    if (!block.Post(item))
    {
        await block.SendAsync(item).ConfigureAwait(false);
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104