I'm investigating if TPL-Dataflow is capable of relieving us from writing boilerplate code with locks and monitors for our highly-concurrent applications.
So I'm simulating a simple scenario with a single producer and multiple consumers, each of which is expected to get all produced messages. And in case some consumers are slower than others, it should not cause stagnation of the system.
Here's the code:
using NLog;
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApp10
{
internal sealed class Program
{
private static readonly Logger m_logger = LogManager.GetCurrentClassLogger();
static void Main(string[] args)
{
BroadcastBlock<int> root = new BroadcastBlock<int>(d => d);
ExecutionDataflowBlockOptions consumerOptions = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 3
};
for (int consumerIndex = 0; consumerIndex < 5; ++consumerIndex)
{
int c = consumerIndex;
ActionBlock<int> consumer = new ActionBlock<int>(
(int d) => {
m_logger.Trace($"[#{c}] Starting consuming {d}");
Thread.Sleep(c * 100);
m_logger.Trace($"[#{c}] Ended consuming {d}");
},
consumerOptions
);
root.LinkTo(consumer);
}
Producer(10, root);
Console.ReadLine();
}
private static void Producer(int n, ITargetBlock<int> target)
{
for (int i = 0; i < n; ++i)
{
m_logger.Trace($"Starting producing {i}");
if (!target.Post(i))
{
throw new Exception($"Failed to post message #{i}");
}
m_logger.Trace($"Ending producing {i}");
Thread.Sleep(50);
}
}
}
}
As you may see, I'm limiting size of buffers in consumers to 3 (to prevent infinite buffer growth for slow consumers).
Each next consumer is slower than previous. Consumer #0 is the fastest one with no delay. And producer has some small delay in producing.
I'm expecting that at least consumer #0 would consume all the messages, and consumer #4 would not get some messages because it's buffer would overflow.
Here are the results:
2021-04-15 22:44:15.4905 [T1] Starting producing 0
2021-04-15 22:44:15.5049 [T1] Ending producing 0
2021-04-15 22:44:15.5166 [T4] [#4] Starting consuming 0
2021-04-15 22:44:15.5285 [T7] [#0] Starting consuming 0
2021-04-15 22:44:15.5285 [T7] [#0] Ended consuming 0
2021-04-15 22:44:15.5285 [T7] [#1] Starting consuming 0
2021-04-15 22:44:15.5573 [T1] Starting producing 1
2021-04-15 22:44:15.5573 [T1] Ending producing 1
2021-04-15 22:44:15.5573 [T5] [#0] Starting consuming 1
2021-04-15 22:44:15.5573 [T5] [#0] Ended consuming 1
2021-04-15 22:44:15.5573 [T5] [#2] Starting consuming 0
2021-04-15 22:44:15.5573 [T6] [#3] Starting consuming 0
2021-04-15 22:44:15.6081 [T1] Starting producing 2
2021-04-15 22:44:15.6081 [T1] Ending producing 2
2021-04-15 22:44:15.6352 [T7] [#1] Ended consuming 0
2021-04-15 22:44:15.6352 [T7] [#1] Starting consuming 1
2021-04-15 22:44:15.6592 [T1] Starting producing 3
2021-04-15 22:44:15.6592 [T1] Ending producing 3
2021-04-15 22:44:15.7102 [T1] Starting producing 4
2021-04-15 22:44:15.7102 [T1] Ending producing 4
2021-04-15 22:44:15.7353 [T7] [#1] Ended consuming 1
2021-04-15 22:44:15.7353 [T7] [#1] Starting consuming 2
2021-04-15 22:44:15.7612 [T5] [#2] Ended consuming 0
2021-04-15 22:44:15.7612 [T5] [#2] Starting consuming 1
2021-04-15 22:44:15.7612 [T1] Starting producing 5
2021-04-15 22:44:15.7612 [T1] Ending producing 5
2021-04-15 22:44:15.8132 [T1] Starting producing 6
2021-04-15 22:44:15.8132 [T1] Ending producing 6
2021-04-15 22:44:15.8420 [T7] [#1] Ended consuming 2
2021-04-15 22:44:15.8420 [T7] [#1] Starting consuming 3
2021-04-15 22:44:15.8603 [T6] [#3] Ended consuming 0
2021-04-15 22:44:15.8603 [T6] [#3] Starting consuming 1
2021-04-15 22:44:15.8764 [T1] Starting producing 7
2021-04-15 22:44:15.8764 [T1] Ending producing 7
2021-04-15 22:44:15.9174 [T4] [#4] Ended consuming 0
2021-04-15 22:44:15.9174 [T4] [#4] Starting consuming 1
2021-04-15 22:44:15.9369 [T1] Starting producing 8
2021-04-15 22:44:15.9369 [T1] Ending producing 8
2021-04-15 22:44:15.9509 [T7] [#1] Ended consuming 3
2021-04-15 22:44:15.9509 [T7] [#1] Starting consuming 4
2021-04-15 22:44:15.9639 [T5] [#2] Ended consuming 1
2021-04-15 22:44:15.9639 [T5] [#2] Starting consuming 2
2021-04-15 22:44:15.9874 [T1] Starting producing 9
2021-04-15 22:44:15.9874 [T1] Ending producing 9
2021-04-15 22:44:16.0515 [T7] [#1] Ended consuming 4
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 2
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 2
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 3
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 3
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 4
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 4
2021-04-15 22:44:16.0515 [T7] [#1] Starting consuming 5
2021-04-15 22:44:16.1525 [T7] [#1] Ended consuming 5
2021-04-15 22:44:16.1525 [T7] [#1] Starting consuming 6
2021-04-15 22:44:16.1525 [T6] [#3] Ended consuming 1
2021-04-15 22:44:16.1525 [T6] [#3] Starting consuming 2
2021-04-15 22:44:16.1645 [T5] [#2] Ended consuming 2
2021-04-15 22:44:16.1645 [T5] [#2] Starting consuming 4
2021-04-15 22:44:16.2526 [T7] [#1] Ended consuming 6
2021-04-15 22:44:16.2526 [T7] [#1] Starting consuming 7
2021-04-15 22:44:16.3177 [T4] [#4] Ended consuming 1
2021-04-15 22:44:16.3177 [T4] [#4] Starting consuming 2
2021-04-15 22:44:16.3537 [T7] [#1] Ended consuming 7
2021-04-15 22:44:16.3537 [T7] [#1] Starting consuming 9
2021-04-15 22:44:16.3537 [T5] [#2] Ended consuming 4
2021-04-15 22:44:16.3537 [T5] [#2] Starting consuming 5
2021-04-15 22:44:16.4547 [T7] [#1] Ended consuming 9
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 5
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 5
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 6
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 6
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 7
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 7
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 9
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 9
2021-04-15 22:44:16.4607 [T6] [#3] Ended consuming 2
2021-04-15 22:44:16.4607 [T6] [#3] Starting consuming 4
2021-04-15 22:44:16.5648 [T5] [#2] Ended consuming 5
2021-04-15 22:44:16.5648 [T5] [#2] Starting consuming 9
2021-04-15 22:44:16.7179 [T4] [#4] Ended consuming 2
2021-04-15 22:44:16.7179 [T4] [#4] Starting consuming 4
2021-04-15 22:44:16.7610 [T6] [#3] Ended consuming 4
2021-04-15 22:44:16.7610 [T6] [#3] Starting consuming 9
2021-04-15 22:44:16.7610 [T5] [#2] Ended consuming 9
2021-04-15 22:44:17.0611 [T6] [#3] Ended consuming 9
2021-04-15 22:44:17.1182 [T4] [#4] Ended consuming 4
2021-04-15 22:44:17.1182 [T4] [#4] Starting consuming 9
2021-04-15 22:44:17.5185 [T4] [#4] Ended consuming 9
What puzzles me is that consumer #0 never gets message 8
. And actually no other consumer gets this message. Why is so? Is this an expected behavior for Dataflow?
In case you want to check my NLog.config looks like the following (I'm using AsyncWrapper
target to prevent file access from affecting my experiment results):
<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.nlog-project.org/schemas/NLog.xsd NLog.xsd"
internalLogFile="nlog.log"
internalLogLevel="Warn"
throwExceptions="false"
parseMessageTemplates="false"
>
<variable name="varExceptionMsg" value="${exception:format=Message}"/>
<variable name="varMessageWithException" value="${message}${onexception:inner= ${varExceptionMsg}}"/>
<variable name="msg4File" value="${longdate} [T${threadid}${threadname}] ${varMessageWithException} ${onexception:inner=${newline}${exception:format=tostring:maxInnerExceptionLevel=2:innerFormat=tostring}}" />
<targets>
<target name="file" xsi:type="AsyncWrapper" queueLimit="5000" overflowAction="Discard">
<target xsi:type="File"
layout="${msg4File}"
fileName="${basedir}/logs/${processname}.${shortdate}.log"
keepFileOpen="true"
encoding="utf-8"
/>
</target>
</targets>
<rules>
<logger name="*" minlevel="Trace" writeTo="file" />
</rules>
</nlog>