0

I have problem in a windows service which is using TPL DataFlow to manage a queue (database) and redirects work to a grid computing service. And at one point BufferBlock stops releasing tasks, and I am not sure why. I think it's because some exceptions happen during execution of some tasks, but they get suppressed and it's difficult to understand at which point BufferBlock stops accepting new tasks.

I tried to simplify it in the working example below. It doesn't have any exception handling and I and wondering how to properly handle exceptions in TPL. I found something similar here TPL Dataflow, guarantee completion only when ALL source data blocks completed. In this example I have 100 requests, and process data in batches with 10 requests. Emulating some exception which happens if ID % 9 == 0 If I don't catch this exception, it works a bit and then stops accepting new requests. If I handle and return Result.Failure it works fine I believe, but I'm not sure if it's a proper way to have it in production environment.

I'm new to TPL, forget me if I didn't explain more clearly my question. GitHub Project

Image Empty Slots

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Timers;
using CSharpFunctionalExtensions;

namespace TestTPL
{
    public class ServicePipeline
    {
        public const int batches = 100;
        private int currentBatch = 0;

        public ServicePipeline(int maxRequestsInParallel)
        {
            MaxRequestsInParallel = maxRequestsInParallel;
        }

        public int MaxRequestsInParallel { get; }
        public BufferBlock<MyData> QueueBlock { get; private set; }
        public List<TransformBlock<MyData, Result>> ExecutionBlocks
            { get; private set; }
        public ActionBlock<Result> ResultBlock { get; private set; }

        private void Init()
        {
            QueueBlock = new BufferBlock<MyData>(new DataflowBlockOptions()
                { BoundedCapacity = MaxRequestsInParallel });
            ExecutionBlocks = new List<TransformBlock<MyData, Result>>();
            ResultBlock = new ActionBlock<Result>(_ => _.OnFailure(
                () => Console.WriteLine($"Error: {_.Error}")));

            for (int blockIndex = 0; blockIndex < MaxRequestsInParallel;
                blockIndex++)
            {
                var executionBlock = new TransformBlock<MyData, Result>((d) =>
                {
                    return ExecuteAsync(d);
                }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
                executionBlock.LinkTo(ResultBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                QueueBlock.LinkTo(executionBlock, new DataflowLinkOptions()
                    { PropagateCompletion = true });
                ExecutionBlocks.Add(executionBlock);
            }
        }

        public static Result ExecuteAsync(MyData myData)
        {
            //try
            //{
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => web.DownloadStringAsync(
                new Uri("http://localhost:49182/Slow.ashx")));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
            //}
            //catch (Exception ex)
            //{
            //    return Result.Failure($"Exception: {ex.Message}");
            //}
        }

        public async void Start()
        {
            Init();
            while (currentBatch < batches)
            {
                Thread.Sleep(1000);
                await SubmitNextRequests();
            }
            Console.WriteLine($"Completed: {batches}");
        }

        private async Task<int> SubmitNextRequests()
        {
            var emptySlots = MaxRequestsInParallel - QueueBlock.Count;
            Console.WriteLine($"Empty slots: {emptySlots}" +
                $", left = {batches - currentBatch}");
            if (emptySlots > 0)
            {
                var dataRequests = await GetNextRequests(emptySlots);
                foreach (var data in dataRequests)
                {
                    await QueueBlock.SendAsync(data);
                }
            }
            return emptySlots;
        }

        private async Task<List<MyData>> GetNextRequests(int request)
        {
            MyData[] myDatas = new MyData[request];
            Task<List<MyData>> task = Task<List<MyData>>.Run(() =>
            {
                for (int i = 0; i < request; i++)
                {
                    myDatas[i++] = new MyData(currentBatch);
                    currentBatch++;
                }
                return new List<MyData>(myDatas);
            });
            return await task;
        }
    }

    public class MyData
    {
        public int Id { get; set; }
        public MyData(int id) => Id = id;
        public override string ToString() { return Id.ToString(); }
    }
}

EDIT: 10/30/2019 It works as expected when the exception is handled and called explicitly Result.Failure($"Exception: {ex.Message}");

    public static Result ExecuteAsync(MyData myData)
    {
        try
        {
            WebClient web = new WebClient();
            TaskCompletionSource<Result> res = new TaskCompletionSource<Result>();
            Task task = Task<Result>.Run(() => Thread.Sleep(2000));
            task.Wait();
            Console.WriteLine($"Data = {myData}");
            if (myData != null && myData.Id % 9 == 0)
                throw new Exception("Test");
            return Result.Ok();
        }
        catch (Exception ex)
        {
            return Result.Failure($"Exception: {ex.Message}");
        }
    }
Dima
  • 1
  • 4
  • The code you give us cannot be compile in a local test (we are missing some classes like `Result` and we don't have access to http://localhost:49182/Slow.ashx. Since the question is still unclear to me and the code does not show the desired output nor does it demonstrate a problem I am afraid you need to elaborate on your question. – Peter Bons Oct 28 '19 at 18:50
  • Hi Peter, I just shared this sample project on [GitHub - TestTPL](https://github.com/dmitriydas/TestTPL). It has commented out catching exception s in ExecuteAsync which is called by TransformBlock. In this case when SubmitNextRequests is called at one point it will not have empty slots to process next requests – Dima Oct 28 '19 at 20:36
  • Why are you creating multiple `TransformBlock`s? Aren't you aware of the [`MaxDegreeOfParallelism`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.executiondataflowblockoptions.maxdegreeofparallelism) option? – Theodor Zoulias Oct 29 '19 at 08:08
  • Thanks @TheodorZoulias I saw MaxDegreeOfParallelism, haven't dug deeply into it yet. But I agree, multiple TranformBlocks look weird to me as well. Just trying to troubleshoot existing production issue with a pipeline stops receiving new requests, and replicated the main logic in my sample – Dima Oct 29 '19 at 13:32
  • Honestly I think it will be easier to solve the problem be refactoring the code to use a single `TranformBlock` with `MaxDegreeOfParallelism = MaxRequestsInParallel`, than trying to fix this needlessly complex implementation. This Gordian knot needs to be cut! – Theodor Zoulias Oct 29 '19 at 17:28
  • I tried to use this approach, but I had to change BoundedCapacity = 1 to BoundedCapacity = MaxRequestsInParallel to make it work for TranformBlock – Dima Oct 30 '19 at 03:39
  • Yeap, this is logical. But why do you want to keep the `BoundedCapacity` so low? For latency reasons? Personally I am accustomed to configuring this setting at ten times the value of `MaxDegreeOfParallelism`, because this is usually the sweet point that gives the best performance. – Theodor Zoulias Oct 30 '19 at 15:27
  • @TheodorZoulias I don't know reasons for implementing it this way, don't have yet full picture (a person who implemented it is not with us anymore). The system is very complex, includes ETL, Database, AWS, Grid Computing, Data Storage (NAS/EFS) The actual TransformBlock job is getting executed on a grid node (Win C++ data processing logic) . One real job can take 1-3 hours to complete E2E and we have about 1 thousand jobs. Testing this can take several weeks and 1-2 months to move it to prod. after I gather more data about the process I can start enhancing the process. Have to start somewhere – Dima Oct 30 '19 at 22:56
  • Wow, you are in a tough situation! But you'll get the hang of it eventually, it's not that complicated. I mean the Dataflow part. The `BoundedCapacity` determines how many jobs can be queued by the receiving block. Since your workload is so bulky, setting `BoundedCapacity` == `MaxDegreeOfParallelism` is OK. My personal experience is with very granular workloads, where the duration of each job is less than 1 μsec, and setting a low `BoundedCapacity` causes the block to run out of queued jobs too frequently. – Theodor Zoulias Oct 30 '19 at 23:43

1 Answers1

2

When linking two block, there is an option to propagate completion forward, but not backward. This becomes a problem when the BoundedCapacity option is used, and an error occurs, because it can block the feeder of the pipeline and cause a dead-lock. It is quite easy to propagate completion manually though. Here is a method that you can use.

async void OnErrorComplete(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted) block2.Complete();
}

It waits asynchronously for block1 to complete, and if it has failed it completes immediately the block2. Completing the upstream block is usually enough, but you can also propagate the specific exception if you want:

async void OnErrorPropagate(IDataflowBlock block1, IDataflowBlock block2)
{
    await Task.WhenAny(block1.Completion); // Safe awaiting
    if (block1.Completion.IsFaulted)
        block2.Fault(block1.Completion.Exception.InnerException);
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Somehow, I couldn't make my code work with OnErrorComplete. Tried to use it like `QueueBlock.LinkTo(executionBlock, new DataflowLinkOptions() { PropagateCompletion = true }); ExecutionBlocks.Add(executionBlock); OnErrorComplete(QueueBlock, executionBlock); OnErrorComplete(executionBlock, ResultBlock); ` – Dima Oct 30 '19 at 03:33
  • The `OnErrorComplete` is intended to be used in the reverse direction. You link forward as usual: block1.LinkTo(block2), and then use the `OnErrorComplete` to create a backward conditional link: `OnErrorComplete(block2, block1)`. – Theodor Zoulias Oct 30 '19 at 15:41
  • After I changed the order, it completed all tasks, but stopped printing results. This what I got in the console window Empty slots: 10, left = 40 Empty slots: 10, left = 35 Empty slots: 10, left = 30 Empty slots: 10, left = 25 Empty slots: 10, left = 20 Empty slots: 10, left = 15 Empty slots: 10, left = 10 Empty slots: 10, left = 5 Completed: 100 – Dima Oct 30 '19 at 23:27
  • Are you logging the errors? It is trivial to write an `OnErrorLog(IDataflowBlock block)` method that awaits the `block.Completion` and then if `block.Completion.IsFaulted` it logs the `block.Completion.Exception` somewhere. Then attach it to all blocks so that you know what's going on. – Theodor Zoulias Oct 30 '19 at 23:51
  • Forgot to mention, that I updated my initial question with the solution which works in my example, but I need to move it to QA to test it. Logging is minimal in the existing code, so I need to enhance to to log more successful operations and better exception logging as well. I analyzed logs before, but because of a lot of noise, I missed initially some exception, with minimal information. I would like to try the the solution above I posted and then gradually improve logging, such as logging exception to a separate log file, outside of general log, so obvious problems can be identified fster – Dima Oct 31 '19 at 04:49
  • Yeap, having logs related to the specific problem you are facing right now should be helpful. After solving the problem you may integrate the important log messages to your general log, to avoid having 2 separate logging mechanisms in the production environment. – Theodor Zoulias Oct 31 '19 at 05:27
  • 1
    Yes, that's the plan :) thanks man, really appreciate your help! – Dima Nov 01 '19 at 01:18