Consider the following pipeline:
public class Program
{
private static int firstBlockactualDOP = 0;
private static int lastBlockactualDOP = 0;
private static ActionBlock<int> lastBlock;
private static TransformBlock<int, int> firstBlock;
private static System.Timers.Timer timer;
public static void Main()
{
Do(-1, 32, -1, 100).GetAwaiter().GetResult();
}
public static async Task Do(int firstBlockBc, int firstBlockDOP, int secondBlockBc, int secondBlockDOP)
{
timer = new System.Timers.Timer
{
AutoReset = true,
Enabled = true,
Interval = 1000
};
firstBlock = new TransformBlock<int, int>(s =>
{
Interlocked.Increment(ref firstBlockactualDOP);
Thread.Sleep(50);
Interlocked.Decrement(ref firstBlockactualDOP);
return s;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = firstBlockDOP, BoundedCapacity = firstBlockBc });
lastBlock = new ActionBlock<int>(s =>
{
Interlocked.Increment(ref lastBlockactualDOP);
Thread.Sleep(2);
Interlocked.Decrement(ref lastBlockactualDOP);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = secondBlockDOP, BoundedCapacity = secondBlockBc });
timer.Elapsed += MonitorEvent;
firstBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
var stopwatch = Stopwatch.StartNew();
for (int i = 0; i < 10000; i++)
{
if (i % 32 == 0)
{
// Thread.Sleep(200);
}
await firstBlock.SendAsync(i);
}
firstBlock.Complete();
await lastBlock.Completion;
stopwatch.Stop();
Console.WriteLine($"Total seconds: {stopwatch.Elapsed.TotalSeconds}.");
}
private static void MonitorEvent(object _, System.Timers.ElapsedEventArgs __)
{
Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} firstBlock: {{Input: {firstBlock.InputCount}, DOP: {firstBlockactualDOP}, Output: {firstBlock.OutputCount}}}" +
$" lastBlock: {{Input: {lastBlock.InputCount}, DOP: {lastBlockactualDOP}}}");
}
}
I was expected to see "lastBlock" running as soon as "firstBlock" finished processing his first element.
This is the observed output I got:
45:39.707 firstBlock: {Input: 9728, DOP: 16, Output: 256} lastBlock: {Input: 0, DOP: 0}
45:40.718 firstBlock: {Input: 9455, DOP: 17, Output: 528} lastBlock: {Input: 0, DOP: 0}
45:42.716 firstBlock: {Input: 8861, DOP: 19, Output: 1120} lastBlock: {Input: 0, DOP: 0}
45:44.705 firstBlock: {Input: 8203, DOP: 21, Output: 1776} lastBlock: {Input: 0, DOP: 0}
45:46.717 firstBlock: {Input: 7481, DOP: 23, Output: 2496} lastBlock: {Input: 0, DOP: 0}
45:48.714 firstBlock: {Input: 6695, DOP: 25, Output: 3280} lastBlock: {Input: 0, DOP: 0}
45:50.713 firstBlock: {Input: 5845, DOP: 27, Output: 4128} lastBlock: {Input: 0, DOP: 0}
45:52.714 firstBlock: {Input: 4936, DOP: 29, Output: 5035} lastBlock: {Input: 0, DOP: 0}
45:53.718 firstBlock: {Input: 4455, DOP: 30, Output: 5515} lastBlock: {Input: 0, DOP: 0}
45:55.715 firstBlock: {Input: 3454, DOP: 32, Output: 6514} lastBlock: {Input: 0, DOP: 0}
45:57.716 firstBlock: {Input: 2448, DOP: 32, Output: 512} lastBlock: {Input: 6815, DOP: 2}
45:59.708 firstBlock: {Input: 1424, DOP: 32, Output: 1024} lastBlock: {Input: 6880, DOP: 4}
46:00.715 firstBlock: {Input: 912, DOP: 32, Output: 512} lastBlock: {Input: 7583, DOP: 5}
46:02.514 firstBlock: {Input: 0, DOP: 30, Output: 407} lastBlock: {Input: 7865, DOP: 7}
46:04.206 firstBlock: {Input: 0, DOP: 0, Output: 0} lastBlock: {Input: 4076, DOP: 40}
46:05.211 firstBlock: {Input: 0, DOP: 0, Output: 0} lastBlock: {Input: 1451, DOP: 41}
Total seconds: 27.0824484.
Uncommenting //Thread.Sleep(200); "released" the lastBlock and it got it to work. Also, changing the method call to Do(64,32, -1, 100); Made it also play together much better
My question is how I can really balance the work between the 2 blocks I have