0

Consider the following pipeline:

public class Program
{
    private static int firstBlockactualDOP = 0;
    private static int lastBlockactualDOP = 0;
    private static ActionBlock<int> lastBlock;
    private static TransformBlock<int, int> firstBlock;
    private static System.Timers.Timer timer;
    public static void Main()
    {
        Do(-1, 32, -1, 100).GetAwaiter().GetResult();
    }

    public static async Task Do(int firstBlockBc, int firstBlockDOP, int secondBlockBc, int secondBlockDOP)
    {
        timer = new System.Timers.Timer
        {
            AutoReset = true,
            Enabled = true,
            Interval = 1000
        };

        firstBlock = new TransformBlock<int, int>(s => 
        {
            Interlocked.Increment(ref firstBlockactualDOP);
            Thread.Sleep(50);
            Interlocked.Decrement(ref firstBlockactualDOP);
            return s;
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = firstBlockDOP, BoundedCapacity = firstBlockBc });
        
        lastBlock = new ActionBlock<int>(s =>
        {
            Interlocked.Increment(ref lastBlockactualDOP);
            Thread.Sleep(2);
            Interlocked.Decrement(ref lastBlockactualDOP);
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = secondBlockDOP, BoundedCapacity = secondBlockBc });
        
        timer.Elapsed += MonitorEvent;

        firstBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
        var stopwatch = Stopwatch.StartNew();
        for (int i = 0; i < 10000; i++)
        {
            if (i % 32 == 0)
            {
             //   Thread.Sleep(200);
            }
            await firstBlock.SendAsync(i);
        }   
        

        firstBlock.Complete();
        await lastBlock.Completion;
        stopwatch.Stop();
        Console.WriteLine($"Total seconds: {stopwatch.Elapsed.TotalSeconds}.");
    }

    private static void MonitorEvent(object _, System.Timers.ElapsedEventArgs __)
    {
        Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} firstBlock: {{Input: {firstBlock.InputCount}, DOP: {firstBlockactualDOP}, Output: {firstBlock.OutputCount}}}" +
            $" lastBlock: {{Input: {lastBlock.InputCount}, DOP: {lastBlockactualDOP}}}");
    }
}

I was expected to see "lastBlock" running as soon as "firstBlock" finished processing his first element.

This is the observed output I got:

45:39.707 firstBlock: {Input: 9728, DOP: 16, Output: 256} lastBlock: {Input: 0, DOP: 0}
45:40.718 firstBlock: {Input: 9455, DOP: 17, Output: 528} lastBlock: {Input: 0, DOP: 0}
45:42.716 firstBlock: {Input: 8861, DOP: 19, Output: 1120} lastBlock: {Input: 0, DOP: 0}
45:44.705 firstBlock: {Input: 8203, DOP: 21, Output: 1776} lastBlock: {Input: 0, DOP: 0}
45:46.717 firstBlock: {Input: 7481, DOP: 23, Output: 2496} lastBlock: {Input: 0, DOP: 0}
45:48.714 firstBlock: {Input: 6695, DOP: 25, Output: 3280} lastBlock: {Input: 0, DOP: 0}
45:50.713 firstBlock: {Input: 5845, DOP: 27, Output: 4128} lastBlock: {Input: 0, DOP: 0}
45:52.714 firstBlock: {Input: 4936, DOP: 29, Output: 5035} lastBlock: {Input: 0, DOP: 0}
45:53.718 firstBlock: {Input: 4455, DOP: 30, Output: 5515} lastBlock: {Input: 0, DOP: 0}
45:55.715 firstBlock: {Input: 3454, DOP: 32, Output: 6514} lastBlock: {Input: 0, DOP: 0}
45:57.716 firstBlock: {Input: 2448, DOP: 32, Output: 512} lastBlock: {Input: 6815, DOP: 2}
45:59.708 firstBlock: {Input: 1424, DOP: 32, Output: 1024} lastBlock: {Input: 6880, DOP: 4}
46:00.715 firstBlock: {Input: 912, DOP: 32, Output: 512} lastBlock: {Input: 7583, DOP: 5}
46:02.514 firstBlock: {Input: 0, DOP: 30, Output: 407} lastBlock: {Input: 7865, DOP: 7}
46:04.206 firstBlock: {Input: 0, DOP: 0, Output: 0} lastBlock: {Input: 4076, DOP: 40}
46:05.211 firstBlock: {Input: 0, DOP: 0, Output: 0} lastBlock: {Input: 1451, DOP: 41}
Total seconds: 27.0824484.

Uncommenting //Thread.Sleep(200); "released" the lastBlock and it got it to work. Also, changing the method call to Do(64,32, -1, 100); Made it also play together much better

My question is how I can really balance the work between the 2 blocks I have

boger
  • 553
  • 1
  • 5
  • 12
  • Could you try adding `ThreadPool.SetMinThreads(1000, 1000);` at the start of the program, and see if it makes any difference? This is not suggested as a fix, but as a way to troubleshoot the issue that you are observing. – Theodor Zoulias Feb 24 '22 at 19:06
  • It doesn't make any change – boger Feb 24 '22 at 19:26
  • 2
    Currently you are logging the "Input" and the "DOP" of each block. Both of them being zero simply means that the block has done all the work that has been assigned to it so far. Since the second block has to do a much lighter job than the first block, and since it is configured with a larger `MaxDegreeOfParallelism`, it is logical that most of the time will be idle, waiting for elements to come from the heavily working first block. Is your expectation different? And why do you consider this to be a problem? – Theodor Zoulias Feb 24 '22 at 19:52
  • 1
    Seems like you are absolutely correct. Thank you – boger Feb 25 '22 at 13:22

0 Answers0