2

I have created a DataFlow pipeline using a BufferBlock, TransformBlock and an ActionBlock. Due to exception in the TransformBlock, the application is going to deadlock. I'm throttling data using BoundedCapacity.

My code is like this:

public async Task PerformOperation()
{
    var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 });
    var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
        //Call an api to fetch result.
        //Here for some data i get exception
        return ObjA;
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
    var finalBlock = new ActionBlock<ObjA>((item) => {
        if (item != null)
        {
            SaveToDB(item);
        }
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });

    bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
    fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await FetchData(bufferBlock);
    bufferBlock.Complete();
    await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
}
public async Task FetchData(bufferBlock)
{
    List<ObjA> dataToProcessList = GetFromDB();
    foreach (var item in dataToProcessList)
    {
        await bufferBlock.SendAsync(item);
    }
}

Here if exception comes in fetchApiResponse block, the data is not moving and it goes for a deadlock. How do I handle exception in this pipeline?
Here around 200,000 records are pushed to bufferBlock.
What is the best way to handle the exceptions without causing this deadlock?

UPDATE 1: Added the FetchData method also.

Thanks Binil

Binil
  • 107
  • 10
  • Related: [TPL Dataflow exception in transform block with bounded capacity](https://stackoverflow.com/questions/21603428/tpl-dataflow-exception-in-transform-block-with-bounded-capacity), [TPL DataFlow proper way to handle exceptions](https://stackoverflow.com/questions/58593202/tpl-dataflow-proper-way-to-handle-exceptions) and [Handle exceptions with TPL Dataflow blocks](https://stackoverflow.com/questions/56949426/handle-exceptions-with-tpl-dataflow-blocks). – Theodor Zoulias Oct 07 '20 at 12:24
  • 2
    As a side note, the `BufferBlock` block is probably redundant. You could feed directly the `TransformBlock`, since it has its own input buffer internally (as well as an output buffer). – Theodor Zoulias Oct 07 '20 at 12:27
  • I actually didn't understand much from the links. I dont need a backward propagation. I need to stop the flow and go to completion. My doubt is why the exception is not causing complete for the transformblock. – Binil Oct 07 '20 at 12:40
  • 1
    My guess is that your problem involves the feeding code that you have not included in the question, and probably includes a line `await bufferBlock.SendAsync(something)`. This code gets stuck because the `BufferBlock` will indefinitely postpone accepting the `something`, because its buffer is full, and because the `TransformBlock` is now dead and has stopped emptying the buffer of the `BufferBlock`. This problem can be solved by propagating backwards the exception to the `BufferBlock`, in which case all `await bufferBlock.SendAsync` calls will complete immediately with a `false` result. – Theodor Zoulias Oct 07 '20 at 12:53
  • @TheodorZoulias Based on your suggestions I have made some changes to fix the deadlock issue. – Binil Oct 08 '20 at 13:32
  • 1
    @TheodorZoulias I reverted it and added as an answer. I was not sure about my implementation due to that I updated my question. I'll keep that in mind. – Binil Oct 08 '20 at 13:45

2 Answers2

3

Rather than trying to make sense of what faulted or not, the blocks should not allow unhandled exceptions. This is a very common pattern, also used in [Go pipelines]9https://blog.golang.org/pipelines)

The article Exception Handling in TPL Dataflow Networks explains how exceptions are handled.

  • When an unhandled exception is thrown, the block enters the faulted state, only after all concurrent operations are finished.
  • That state is propagated to linked blocks that have PropagateCompletion set to true. That doesn't mean that downstream blocks will immediately fault though.

Awaiting a faulted block throws. The line :

await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);

should have thrown, unless those blocks were still busy.

The solution - don't allow unhandled exceptions

Return a Result object instead. When having to make eg 1000 HTTP calls, it would be a bad idea to have one exception prevent the other 900 calls anyway. This is broadly similar to Railroad-oriented programming. A Dataflow pipeline is quite similar to a functional pipeline.

Each block should return a Result<T> class that wraps the actual result and somehow indicates success or failure. An exception handling block should catch any exceptions and return a faulted Result<T> item. The LinkTo method can have a predicate that allows redirecting failed results to eg a logging block or a NullBlock.

Let's say we have this simple Result<T> :

class Result<T>
{
    public T Value{get;}
    public Exception Exception{get;}
    public bool Ok {get;}

    public Result(){}

    public Result(T value)
    {
        Value=value;
        Ok=true;
    }

    public Result(Exception exc)
    {
        Exception=exc;
        Ok=false;
    }
}

fetchApiResponse could be :

    var fetchApiResponse = new TransformBlock<TA, Result<TA>>((item) => {
        try
        {
            ...
            return new Result(ObjA,true);
        }
        catch(Exception exc)
        {
            return new Result(exc);
        }
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });

and the LinkTo code could be :

var propagate=new DataflowLinkOptions { PropagateCompletion = true };

var nullBlock=DataflowBlock.NullTarget<Result<TA>>();
fetchApiResponse.Linkto(logger,propagage,msg=>!msg.Ok);
fetchApiResponse.LinkTo(finalBlock,propagate,msg=>msg.Ok);

In this case, bad message are simply dumped to a null block.

There's no reason to use another buffer block, or await all blocks. Both TransformBlock and ActionBlock have an input buffer controlled by the ExecutionDataflowBlockOptions options.

Posting the messages and awaiting completion can be:

await FetchData(fetchApiResponse);
fetchApiResponse.Complete();
await finalBlock.Completion;

The null check in finalBlock can be removed too, if fetchApiResponse return an empty Result object if there's no valid result.

More complex scenarios can be handled by more complex Result objects.

Abrupt termination

Even when the pipeline needs to terminate immediately, there shouldn't be any unhandled exceptions. A fault may propagate downstream but won't affect the upstream blocks. They'll keep their messages in memory and keep accepting input even though the rest of the pipeline is broken.

That can definitely look like a deadlock.

The solution to this is to use a CancellationTokenSource, pass its token to all blocks, and signal it if the pipeline needs to be terminated.

This is common practice eg in Go, to use a channel like a CancellationTokenSource for precisely this reason, and cancel both downstream and upstream blocks. This is described in Go Concurrency Patterns: Pipelines and cancellation

Early cancellation is useful if a block decides there's no reason to continue working, not just in case of error. In this case it can singal the CancellationTokenSource to stop upstream blocks

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • 1
    This answer addresses the case where exceptions are to be expected and tolerated, but there are valid scenarios where exceptions are truly exceptional and are fatal for the whole operation. Like for example parsing a giant CSV file. If a single line is malformed, the whole file should be considered corrupted, and parsing the rest of the file would just be a waste of the user's time. This answer does not address this not uncommon scenario. – Theodor Zoulias Oct 08 '20 at 11:47
  • This addresses all cases. When you have a huge CSV file you *don't* want to discard all rows if only one has a bad value. Even if you do though, just return a bad Result in the TransformBlock or TransformManyBlock and exit the loop (if any) – Panagiotis Kanavos Oct 08 '20 at 11:49
  • BTW I have huge CSVs, downloaded from a list of ~100 sites stored in a Google sheet, parsed, and inserted in a database with a final step using SqlBulkCopy. Downloaded by a screen scraper using Selenium. If one of the user-edited URLs is wrong, I just discard it and keep working with the rest – Panagiotis Kanavos Oct 08 '20 at 11:52
  • 1
    A business partner sends you a CSV with 1000 new products, to insert them into the corporate database. The CSV should never be corrupted, but alas it is. Following the advice of this answer, you could end up with 500 products in the database, and 500 error messages in the logs. Nobody will be happy with this outcome. – Theodor Zoulias Oct 08 '20 at 12:15
  • Which is exactly my case. Call it airline, and call it arbitrarily deciding to use an invalid character for that format. Do you really want me to *not* refund your ticket because of that? If I discarded the file I'd end up with 50K irate customers, instead of just one. You *assume* there's no validation for individual rows, or that a single way to handle errors is valid for every case – Panagiotis Kanavos Oct 08 '20 at 12:19
  • If you use SSIS dataflows, what do you do if a few rows have problems? Fail the entire job? Or redirect the error ? If there was only one answer to this, why does SSIS dataflow offer so many options? – Panagiotis Kanavos Oct 08 '20 at 12:20
  • 1
    My argument is that there are real-life scenarios where exceptions are fatal. By providing counterexamples where exceptions are tolerable you do not invalidate my argument. – Theodor Zoulias Oct 08 '20 at 12:27
  • And all those exceptions are handled just fine using the railroad model. The only thing that changes is how fast the pipeline shuts down - immediately through the *CancellationTokenSource* or after all input has been processed or discarded? There's no case where allowing an unhandled exception would be appropriate. In fact, that's what caused the OP's problem – Panagiotis Kanavos Oct 08 '20 at 12:28
  • Hijacking cancellation to signify failure means that you won't be able to distinguish between cancellation and failure at the end. Why do that? I support the idea of canceling the *upstream* blocks of the pipeline if one block fails, in order to preserve resources and prevent deadlocks, but why suppress the exception of the faulted block, and prevent the exception from propagating downstream? What's the problem with a block completing in a faulted state? – Theodor Zoulias Oct 08 '20 at 13:00
  • I suggest you read the Go Patterns page. This is nothing new. The patterns exist for a long time, not just in Go or TPL Dataflow. Nothing is hijacked. if anything, .NET makes using the patterns easier – Panagiotis Kanavos Oct 08 '20 at 13:07
  • In fact, it's good to remember that Dataflow grew out of Microsoft Robotics and the Concurrency Runtime. You simply *can't* expect a moving robot's software to abort simply because of an error. Because aborting means the arm will keep hurtling towards a human or a wall – Panagiotis Kanavos Oct 08 '20 at 13:10
  • So I suppose the argument is: *"We should stop using the TPL Dataflow the way it was meant to be used, because the Go Pipelines are doing it differently"*. Not a strong argument IMHO. Regarding the robotic example, it's like the airline ticket service and the web scraper counterexamples: it doesn't invalidate my argument. BTW it's not a particularly good example either. A robot that is unable to parse correctly all the instructions it receives, and performs only the subset that can parse, is equally dangerous with a robot that shuts down when it receives the first invalid instruction. – Theodor Zoulias Oct 08 '20 at 13:29
  • In my scenario I'm processing 200,000 records and sometimes more. If the api is down or some error in most of the records, I should bring down the pipeline. If I pass exception to other target, i end up with lot of redundant errors. I need to end the process. So I'm checking the exception count and if its beyond a limit, I end the pipeline by calling the Fault method of the bufferBlock(in eg) or the source block. – Binil Oct 08 '20 at 17:20
  • @Binil signal the CTS instead, to cancel all blocks – Panagiotis Kanavos Oct 08 '20 at 17:29
1

I couldn't go through the post of @Panagiotis Kanavos. Meanwhile I have updated my code like this to handle the exception based on the comments.

public async Task PerformOperation()
{
  try
   {
    var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 
    });
    var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
        //Call an api to fetch result.
        //Here for some data i get exception
        try
        {
          int apiResult = await apiCall();
        }
        catch(Exception ex)
        {
         **var dataflowBlock = (IDataflowBlock)bufferBlock;
          dataflowBlock.Fault(ex);
          throw ex;**
        }
        return ObjA;
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
    var finalBlock = new ActionBlock<ObjA>((item) => {
        if (item != null)
        {
            SaveToDB(item);
        }
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });

    bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
    fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });

    await FetchData(bufferBlock);
    bufferBlock.Complete();
    await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
  }
  catch(AggregateException aex)
  {   //logging the exceptions in aex  }
  catch(Exception ex)
  { //logging the exception}
}
public async Task FetchData(bufferBlock)
{
    List<ObjA> dataToProcessList = GetFromDB();
    foreach (var item in dataToProcessList)
    {
        if(!await bufferBlock.SendAsync(item))
        {
          break; //breaking the loop to stop pushing data.
        }
    }
}

This will now stop the pipeline and doesn't go to a deadlock. Since I'm dealing with lots of data, I'm planning to add a counter for the exceptions and if it exceeds certain limit then only I'll stop the pipeline. If a small network glitch caused one api call to fail, it might work for the next data.

I'll go through the new posts and update my code to make things better. Please provide inputs.

Thanks Binil

Binil
  • 107
  • 10
  • You may find this question interesting: [How to keep track of faulted items in TPL pipeline in (thread)safe way](https://stackoverflow.com/questions/62154993/how-to-keep-track-of-faulted-items-in-tpl-pipeline-in-threadsafe-way) – Theodor Zoulias Oct 10 '20 at 00:22