12

I need to construct TPL dataflow pipeline which will process a lot of messages. Because there are many messages I can not simply Post them into infinite queue of the BufferBlock or I will face memory issues. So I want to use BoundedCapacity = 1 option to disable the queue and use MaxDegreeOfParallelism to use parallel task processing since my TransformBlocks could take some time for each message. I also use PropagateCompletion to make all completion and fail to propagate down the pipeline.

But I'm facing the issue with error handling when error happened just right after the first message: calling await SendAsync simply switch my app into infinite waiting.

I've simplified my case to sample console app:

var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
    BoundedCapacity = 1
});

var process_block = new ActionBlock<int>(x =>
{
    throw new InvalidOperationException();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 2,
    BoundedCapacity = 1
});

data_buffer.LinkTo(process_block,
    new DataflowLinkOptions { PropagateCompletion = true });

for (var k = 1; k <= 5; k++)
{
    await data_buffer.SendAsync(k);
    Console.WriteLine("Send: {0}", k);
}

data_buffer.Complete();

await process_block.Completion;
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Michael Logutov
  • 2,551
  • 4
  • 28
  • 32
  • As a side note, configuring a block with `BoundedCapacity` smaller than the `MaxDegreeOfParallelism` will reduce the degree of parallelism to the value of the capacity. In other words, the block cannot process 2 items simultaneously if it is allowed to buffer only one. I believe this happens because after processing the two items it should store the two results in it's output buffer, and it has not available space for two results. – Theodor Zoulias Jun 08 '20 at 15:02
  • Could be, yeah. But it's not intuitive at least for me. I thought that "buffer" means everything that overflows. So if we have 2 workers and 1 buffer capacity it gets 2 items and gives them to every worker and got 1 more item "ahead". – Michael Logutov Jun 10 '20 at 16:28
  • Regarding an `ActionBlock` then yes, that would make sense, because this block has only an input queue with no output. But actually even `ActionBlock`s are governed by the same rule for some reason. Probably for consistency. – Theodor Zoulias Jun 10 '20 at 16:56

2 Answers2

9

This is expected behavior. If there's a fault "downstream", the error does not propagate "backwards" up the mesh. The mesh is expecting you to detect that fault (e.g., via process_block.Completion) and resolve it.

If you want to propagate errors backwards, you could have an await or continuation on process_block.Completion that faults the upstream block(s) if the downstream block(s) fault.

Note that this is not the only possible solution; you may want to rebuild that part of the mesh or link the sources to an alternative target. The source block(s) have not faulted, so they can just continue processing with a repaired mesh.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • So, how should I wait for `BufferBlock` to be available again for sending data? And what should I wait at the end of the data queueing? – Michael Logutov Feb 06 '14 at 17:52
  • I'm not sure if I understand your question. When the `ActionBlock` faults, the `BufferBlock` is still functional; it just doesn't have anywhere to send its data. If you hook up a separate `ActionBlock`, to the `BufferBlock` when the first one faults, then the `BufferBlock` will just continue executing. Or, you can just use a `try`/`catch` in your `ActionBlock` delegate. – Stephen Cleary Feb 06 '14 at 18:31
  • 1
    Makes sense. What I wanted is that if any of my blocks faults the whole pipeline stops and exception bubbles up to the main thread. – Michael Logutov Feb 07 '14 at 05:20
  • @MichaelLogutov: You'd have to make that link yourself; i.e., add a continuation to `ActionBlock.Completion` that faults the `BufferBlock`. – Stephen Cleary Feb 07 '14 at 12:55
  • I see. I wish that `PropagateCompletion` would propagate failure too. – Michael Logutov Feb 07 '14 at 16:34
  • 4
    @MichaelLogutov: It does. It will propagate any faults from `BufferBlock` to `ActionBlock`. But neither data nor completion/faults are propagated *backwards* across the link. – Stephen Cleary Feb 07 '14 at 16:44
  • @stephen, could you elaborate on how a mesh could be 'repaired'? I understood that once a block is faulted, it can't be recovered, and it's best to let the whole mesh die and restart. – pnewhook Jun 12 '14 at 18:29
  • 4
    @pnewhook: I prefer to tear down the entire mesh on a fault (and I imagine most people would), but we don't *have* to. If a block does not propagate completion and it faults, then you can unlink it from the other blocks and put in a replacement block (while the rest of the mesh is still running). – Stephen Cleary Jun 12 '14 at 19:39
  • 1
    Remember if you await not only process_block but also data_buffer block you may end with deadlock as the data_buffer block will run to completion only after it processes existing items. And because of bounded capacity of process_block it may never complete. – stil Mar 08 '20 at 23:50
-1

The LinkTo method with the PropagateCompletion configuration, propagates the completion of the source block to the target block. So if the source block fails, the failure will be propagated to the target block, and so eventually both blocks will complete. The same is not true if the target block fails. In that case the source block will not get notified, and will continue accepting and processing messages. If we add the BoundedCapacity configuration in the mix, the internal output buffer of the source block will soon become full, preventing it from accepting more messages. And as you discovered, that can easily result in a deadlock.

To prevent a deadlock from happening, the simplest approach would be to ensure that an error in any block of the pipeline would cause the timely completion all its constituent blocks ASAP. Other approaches are also possible, as indicated by Stephen Cleary's answer, but in the majority of cases I expect the fail-fast approach to be the desirable behavior. Surprisingly this simple behavior is not so easy to achieve. No built-in mechanism is readily available for this purpose, and implementing it manually is tricky.

As of .NET 6, the only reliable way to complete forcefully a block that is part of a dataflow pipeline, is to Fault the block, and also discard its output buffer by linking it to a NullTarget. Faulting the block alone, or canceling it through the CancellationToken option, is not enough. There are scenarios where a faulted or canceled block will not complete. Here is a demonstration of the first case (faulted and not completed), and here is a demonstration of the second case (canceled and not completed). Both scenarios require that the block has been previously marked as completed, which can happen automatically and non-deterministically for all blocks participating in a dataflow pipeline, and are linked with the PropagateCompletion configuration. A GitHub issue reporting this problematic behavior exists: No way to cancel completing dataflow blocks. As of the time of this writing, no feedback has been provided by the devs.

Armed with this knowledge, we can implement a LinkTo-on-steroids method that can create fail-fast pipelines like this:

/// <summary>
/// Connects two blocks that belong in a simple, straightforward,
/// one-way dataflow pipeline.
/// Completion is propagated in both directions.
/// Failure of the target block causes purging of all buffered messages
/// in the source block, allowing the timely completion of both blocks.
/// </summary>
/// <remarks>
/// This method should be used only if the two blocks participate in an exclusive
/// producer-consumer relationship.
/// The source block should be the only producer for the target block, and
/// the target block should be the only consumer of the source block.
/// </remarks>
public static void ConnectTo<TOutput>(this ISourceBlock<TOutput> source,
    ITargetBlock<TOutput> target)
{
    source.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true });
    ThreadPool.QueueUserWorkItem(async _ =>
    {
        try { await target.Completion.ConfigureAwait(false); } catch { }
        if (!target.Completion.IsFaulted) return;
        if (source.Completion.IsCompleted) return;
        source.Fault(new Exception("Pipeline error."));
        source.LinkTo(DataflowBlock.NullTarget<TOutput>()); // Discard all output
    });
}

Usage example:

var data_buffer = new BufferBlock<int>(new() { BoundedCapacity = 1 });

var process_block = new ActionBlock<int>(
    x => throw new InvalidOperationException(),
    new() { BoundedCapacity = 2, MaxDegreeOfParallelism = 2 });

data_buffer.ConnectTo(process_block); // Instead of LinkTo

foreach (var k in Enumerable.Range(1, 5))
    if (!await data_buffer.SendAsync(k)) break;

data_buffer.Complete();
await process_block.Completion;

Optionally you could also consider awaiting all the constituent blocks of the pipeline, before awaiting the last one (or after in a finally region). This offers the advantage that in case of failure, you won't risk leaking fire-and-forget operations running in the background unobserved, before the next reincarnation of the pipeline:

try { await Task.WhenAll(data_buffer.Completion, process_block.Completion); } catch { }

You can ignore all the errors that might be thrown by the await Task.WhenAll operation, because awaiting the last block will convey most of the error-related information anyway. You may only miss additional errors that happened in blocks upstream after the failure of a block downstream. You can try to observe all errors if you want, but it will be tricky because of how the errors are propagated downstream: you may observe the same error multiple times. If you want to log diligently every single error, it is probably easier (and more accurate) to do the logging inside the lambdas of the processing blocks, instead of relying on their Completion property.

Shortcomings: The ConnectTo implementation above propagates the failure backwards one block at time. The propagation is not instantaneous, because a faulted block does not complete before the processing of any currently processed messages has finished. This can be an issue in case the pipeline is long (5-6 blocks or more), and the workload of each block is chunky. This additional latency is not only a waste of time, but also a waste of resources, for doing work that is going to be discarded anyway.

I've uploaded a more sophisticated version of the ConnectTo idea in this GitHub repository. It addresses the delayed-completion issue mentioned in the previous paragraph: a failure in any block is propagated instantaneously to all blocks. As a bonus it also propagates all the errors in the pipeline, as a flat AggregateException.


Note: This answer has been rewritten from scratch. The original answer (Revision 4) included some wrong ideas, and a flawed implementation of the ConnectTo method.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • `Recently I realized that this pattern is wrong.` there's nothing wrong with that pattern, or using CancellationTokenSource. Dataflow isn't something new or limited to .NET. There are specific patterns that are repeated across languages and platforms, even though they aren't included in the namespace's documentation. Even TPL Dataflow isn't new, it's almost 10 years old. It never needed using exceptions-as-control-flow to propagate completion or cancellation – Panagiotis Kanavos Sep 16 '21 at 14:33
  • `async void` is just begging for problems. A simple `source.Completion.ContinueWith(_=>target.Complete());` would do the same job *without* risking an ObjectDisposedException or the cost of throwing an exception inside an async state machine for no reason – Panagiotis Kanavos Sep 16 '21 at 14:37
  • @PanagiotisKanavos I think that I explained quite extensively what's wrong with awaiting only the last block in a pipeline. The age of TPL Dataflow and what other languages and platforms are doing is irrelevant. A wrong pattern doesn't become correct by aging. Regarding the [primitive](https://blog.stephencleary.com/2013/10/continuewith-is-dangerous-too.html) `ContinueWith` method, I easily rejected it in favor of the responsible and non-leaky `async void` approach. *If* my `ConnectTo` method is buggy, I want this error to be surfaced in broad daylight, and not get swallowed in the dark. – Theodor Zoulias Sep 16 '21 at 14:57
  • `catch { }` ?? `.NullTarget()` ? That's the definition of `swallowed in the dark` – Panagiotis Kanavos Sep 16 '21 at 15:07
  • Using the empty `catch` block in order to swallow the errors there is intentional. It's not `ConnectTo`'s business to handle the errors of the source and target blocks, because this method doesn't owned them. But **it is** the owner of the `target.Complete();` line. Since handling a failure of *this* line cannot be done in a reasonable way, it is allowed to propagate as an unhandled exception. Which is the responsible thing to do IMHO. Wrapping *this* line in an empty try-catch would be irresponsible. Which is what the `ContinueWith` is essentially doing (if you let the returned `Task` leak). – Theodor Zoulias Sep 16 '21 at 15:51