0

I'm investigating if TPL-Dataflow is capable of relieving us from writing boilerplate code with locks and monitors for our highly-concurrent applications.

So I'm simulating a simple scenario with a single producer and multiple consumers, each of which is expected to get all produced messages. And in case some consumers are slower than others, it should not cause stagnation of the system.

Here's the code:

using NLog;
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp10
{
    internal sealed class Program
    {
        private static readonly Logger m_logger = LogManager.GetCurrentClassLogger();

        static void Main(string[] args)
        {
            BroadcastBlock<int> root = new BroadcastBlock<int>(d => d);

            ExecutionDataflowBlockOptions consumerOptions = new ExecutionDataflowBlockOptions() {
                BoundedCapacity = 3
            };

            for (int consumerIndex = 0; consumerIndex < 5; ++consumerIndex)
            {
                int c = consumerIndex;
                ActionBlock<int> consumer = new ActionBlock<int>(
                    (int d) => {
                        m_logger.Trace($"[#{c}] Starting consuming {d}");
                        Thread.Sleep(c * 100);
                        m_logger.Trace($"[#{c}] Ended consuming {d}");
                    },
                    consumerOptions
                );
                root.LinkTo(consumer);
            }

            Producer(10, root);

            Console.ReadLine();
        }


        private static void Producer(int n, ITargetBlock<int> target)
        {
            for (int i = 0; i < n; ++i)
            {
                m_logger.Trace($"Starting producing {i}");
                if (!target.Post(i))
                {
                    throw new Exception($"Failed to post message #{i}");
                }
                m_logger.Trace($"Ending producing {i}");
                Thread.Sleep(50);
            }
        }
    }
}

As you may see, I'm limiting size of buffers in consumers to 3 (to prevent infinite buffer growth for slow consumers).

Each next consumer is slower than previous. Consumer #0 is the fastest one with no delay. And producer has some small delay in producing.

I'm expecting that at least consumer #0 would consume all the messages, and consumer #4 would not get some messages because it's buffer would overflow.

Here are the results:

2021-04-15 22:44:15.4905 [T1] Starting producing 0 
2021-04-15 22:44:15.5049 [T1] Ending producing 0 
2021-04-15 22:44:15.5166 [T4] [#4] Starting consuming 0 
2021-04-15 22:44:15.5285 [T7] [#0] Starting consuming 0 
2021-04-15 22:44:15.5285 [T7] [#0] Ended consuming 0 
2021-04-15 22:44:15.5285 [T7] [#1] Starting consuming 0 
2021-04-15 22:44:15.5573 [T1] Starting producing 1 
2021-04-15 22:44:15.5573 [T1] Ending producing 1 
2021-04-15 22:44:15.5573 [T5] [#0] Starting consuming 1 
2021-04-15 22:44:15.5573 [T5] [#0] Ended consuming 1 
2021-04-15 22:44:15.5573 [T5] [#2] Starting consuming 0 
2021-04-15 22:44:15.5573 [T6] [#3] Starting consuming 0 
2021-04-15 22:44:15.6081 [T1] Starting producing 2 
2021-04-15 22:44:15.6081 [T1] Ending producing 2 
2021-04-15 22:44:15.6352 [T7] [#1] Ended consuming 0 
2021-04-15 22:44:15.6352 [T7] [#1] Starting consuming 1 
2021-04-15 22:44:15.6592 [T1] Starting producing 3 
2021-04-15 22:44:15.6592 [T1] Ending producing 3 
2021-04-15 22:44:15.7102 [T1] Starting producing 4 
2021-04-15 22:44:15.7102 [T1] Ending producing 4 
2021-04-15 22:44:15.7353 [T7] [#1] Ended consuming 1 
2021-04-15 22:44:15.7353 [T7] [#1] Starting consuming 2 
2021-04-15 22:44:15.7612 [T5] [#2] Ended consuming 0 
2021-04-15 22:44:15.7612 [T5] [#2] Starting consuming 1 
2021-04-15 22:44:15.7612 [T1] Starting producing 5 
2021-04-15 22:44:15.7612 [T1] Ending producing 5 
2021-04-15 22:44:15.8132 [T1] Starting producing 6 
2021-04-15 22:44:15.8132 [T1] Ending producing 6 
2021-04-15 22:44:15.8420 [T7] [#1] Ended consuming 2 
2021-04-15 22:44:15.8420 [T7] [#1] Starting consuming 3 
2021-04-15 22:44:15.8603 [T6] [#3] Ended consuming 0 
2021-04-15 22:44:15.8603 [T6] [#3] Starting consuming 1 
2021-04-15 22:44:15.8764 [T1] Starting producing 7 
2021-04-15 22:44:15.8764 [T1] Ending producing 7 
2021-04-15 22:44:15.9174 [T4] [#4] Ended consuming 0 
2021-04-15 22:44:15.9174 [T4] [#4] Starting consuming 1 
2021-04-15 22:44:15.9369 [T1] Starting producing 8 
2021-04-15 22:44:15.9369 [T1] Ending producing 8 
2021-04-15 22:44:15.9509 [T7] [#1] Ended consuming 3 
2021-04-15 22:44:15.9509 [T7] [#1] Starting consuming 4 
2021-04-15 22:44:15.9639 [T5] [#2] Ended consuming 1 
2021-04-15 22:44:15.9639 [T5] [#2] Starting consuming 2 
2021-04-15 22:44:15.9874 [T1] Starting producing 9 
2021-04-15 22:44:15.9874 [T1] Ending producing 9 
2021-04-15 22:44:16.0515 [T7] [#1] Ended consuming 4 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 2 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 2 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 3 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 3 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 4 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 4 
2021-04-15 22:44:16.0515 [T7] [#1] Starting consuming 5 
2021-04-15 22:44:16.1525 [T7] [#1] Ended consuming 5 
2021-04-15 22:44:16.1525 [T7] [#1] Starting consuming 6 
2021-04-15 22:44:16.1525 [T6] [#3] Ended consuming 1 
2021-04-15 22:44:16.1525 [T6] [#3] Starting consuming 2 
2021-04-15 22:44:16.1645 [T5] [#2] Ended consuming 2 
2021-04-15 22:44:16.1645 [T5] [#2] Starting consuming 4 
2021-04-15 22:44:16.2526 [T7] [#1] Ended consuming 6 
2021-04-15 22:44:16.2526 [T7] [#1] Starting consuming 7 
2021-04-15 22:44:16.3177 [T4] [#4] Ended consuming 1 
2021-04-15 22:44:16.3177 [T4] [#4] Starting consuming 2 
2021-04-15 22:44:16.3537 [T7] [#1] Ended consuming 7 
2021-04-15 22:44:16.3537 [T7] [#1] Starting consuming 9 
2021-04-15 22:44:16.3537 [T5] [#2] Ended consuming 4 
2021-04-15 22:44:16.3537 [T5] [#2] Starting consuming 5 
2021-04-15 22:44:16.4547 [T7] [#1] Ended consuming 9 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 5 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 5 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 6 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 6 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 7 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 7 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 9 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 9 
2021-04-15 22:44:16.4607 [T6] [#3] Ended consuming 2 
2021-04-15 22:44:16.4607 [T6] [#3] Starting consuming 4 
2021-04-15 22:44:16.5648 [T5] [#2] Ended consuming 5 
2021-04-15 22:44:16.5648 [T5] [#2] Starting consuming 9 
2021-04-15 22:44:16.7179 [T4] [#4] Ended consuming 2 
2021-04-15 22:44:16.7179 [T4] [#4] Starting consuming 4 
2021-04-15 22:44:16.7610 [T6] [#3] Ended consuming 4 
2021-04-15 22:44:16.7610 [T6] [#3] Starting consuming 9 
2021-04-15 22:44:16.7610 [T5] [#2] Ended consuming 9 
2021-04-15 22:44:17.0611 [T6] [#3] Ended consuming 9 
2021-04-15 22:44:17.1182 [T4] [#4] Ended consuming 4 
2021-04-15 22:44:17.1182 [T4] [#4] Starting consuming 9 
2021-04-15 22:44:17.5185 [T4] [#4] Ended consuming 9 

What puzzles me is that consumer #0 never gets message 8. And actually no other consumer gets this message. Why is so? Is this an expected behavior for Dataflow?

In case you want to check my NLog.config looks like the following (I'm using AsyncWrapper target to prevent file access from affecting my experiment results):

<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.nlog-project.org/schemas/NLog.xsd NLog.xsd"
        internalLogFile="nlog.log"
        internalLogLevel="Warn"
        throwExceptions="false" 
        parseMessageTemplates="false"
    >

  <variable name="varExceptionMsg" value="${exception:format=Message}"/>
  <variable name="varMessageWithException" value="${message}${onexception:inner= ${varExceptionMsg}}"/>
  <variable name="msg4File" value="${longdate} [T${threadid}${threadname}] ${varMessageWithException} ${onexception:inner=${newline}${exception:format=tostring:maxInnerExceptionLevel=2:innerFormat=tostring}}" />

  <targets>

    <target name="file" xsi:type="AsyncWrapper" queueLimit="5000" overflowAction="Discard">
      <target xsi:type="File"
              layout="${msg4File}"
              fileName="${basedir}/logs/${processname}.${shortdate}.log"
              keepFileOpen="true"
              encoding="utf-8"
            />
    </target>
  </targets>

  <rules>
    <logger name="*" minlevel="Trace" writeTo="file" />
  </rules>
</nlog>

Mikhail
  • 1,223
  • 14
  • 25
  • Could you try adding this line at the start of the program: `ThreadPool.SetMinThreads(100, 100);`, and see if it makes any difference? This is not suggested as a fix, but as a way to troubleshoot the issue you are observing. – Theodor Zoulias Apr 15 '21 at 20:05
  • Exit program by hitting return and then log at logger file. – jdweng Apr 15 '21 at 20:46
  • @TheodorZoulias it seems that it does help, yes. So you suppose that the source of the issue is that I'm facing worker thread pool starvation? – Mikhail Apr 16 '21 at 11:47
  • @jdweng, not sure what do you mean there, but yes I do exit by hittnig return, and the file does get flushed. As you may see, the log is full -- at least there's an evidence that 0th worker did consume 9th message, which is produced after 8th, and no reordering could occur. – Mikhail Apr 16 '21 at 11:50
  • So isn't the issue a flush issue? Wrapping the code with a using block will automatically flush. – jdweng Apr 16 '21 at 11:57
  • Yeap, thread pool saturation is my guess. Btw the question in the title is way broader than the question in the body, and that's why I think that your question has been downvoted. Whatever the explanation is for the unconsumed messages in this specific configuration, it's unlikely to be an answer about whether the TPL-Dataflow library is applicable for highly concurrent applications in general! – Theodor Zoulias Apr 16 '21 at 12:09
  • IMHO a more interesting question would be how to implement a custom `BroadcastBlock` with specific behavior. For example a `BroadcastBlock` that guarantees that each message it receives will be delivered (offered and accepted) by at least one of its linked targets. You may get some ideas about how to create such a block [here](https://stackoverflow.com/questions/22127660/broadcastblock-with-guaranteed-delivery-in-tpl-dataflow "BroadcastBlock with guaranteed delivery in TPL Dataflow"). – Theodor Zoulias Apr 16 '21 at 12:37
  • @Mikhail DataFlow was built for *very* high concurrency applications, where multiple blocks are combined to form a processing pipeline. That's the CSP paradigm that Go channels implement. It's not just a pub/sub class. The problem is the code itself - those `Thread.Sleep` block the worker tasks that process messages. There's no pipeline at all – Panagiotis Kanavos Apr 20 '21 at 16:13
  • @Mikhail TPL DataFlow grew out of the [robotics-inspired Concurrency and Coordination Runtime](https://www.infoq.com/news/2011/01/TPL-Dataflow/) which in turn uses the [Communicating Sequential Processes model](https://en.wikipedia.org/wiki/Communicating_sequential_processes) instead of the common-memory, lock-based model used by typical programs. Blocks/processes run on their own "thread", share no state, and only communicate with each other through messages. – Panagiotis Kanavos Apr 20 '21 at 16:26
  • Long story short, the example you used is inappropriate. You don't need a custom broadcast block. You need a different example and/or program structure. – Panagiotis Kanavos Apr 20 '21 at 16:27
  • @PanagiotisKanavos thanks for the answers, still I'm in doubt. `Thread.Sleep`s just emulate some actual workload that could have different time to perform. Regarding a different program structure — could you please suggest a proper one for the task of single producer providing items for multiple consumers where each consumer should have a chance to get all messages if it's quick enough? Think of a single log source and multiple async log sinks, for example – Mikhail Apr 21 '21 at 05:27
  • @TheodorZoulias yes, I do understand that the title is not perfect, but I'm having a hard time inventing a better one ( Actually I'm really trying to learn about pros and cons of TPL Dataflow, before switching from manually implementing queues via locks and monitors. I like the general design idea, but I'm afraid of possible hidden pitfalls in implementation. This is a first mock I did to try implementing some real life cases from our prod. And I'm kinda worried that I've faced this kind of trouble on the first try. So now I'm wondering if this is really because I'm not using TPL properly? – Mikhail Apr 21 '21 at 06:10
  • @Mikhail `Thread.Sleep` doesn't just emulate a workload. It forces a threadpool thread that could serve many tasks to be suspended, effectively taking it out of the thread pool. The runtime has to create a new thread to serve other tasks. That's what the Dataflow paradigm was built to **avoid**. Your car uses a dataflow architecture - cars have several dozens of single-thread microcontrollers communicating through messages. – Panagiotis Kanavos Apr 21 '21 at 06:28
  • @Mikhail `the task of single producer providing items for multiple consumers` pub/sub is to Dataflow what wheels are to a car, bike or truck. An important part of the thing, not the thing itself. And while bikes and trucks have wheels, they're not the same at all. `Pub/sub` is a tool. Dataflow is the entire building plan to solve a problem, using that tool among others. And `one publisher many subscribers` describes only a detail, not the problem itself – Panagiotis Kanavos Apr 21 '21 at 06:32
  • @Mikhail what is the *actual* process/architecture-level problem you want to solve? It's not `pub/sub`. `I want to download, parse and use the data in 1000 URLs, combining it with outer sources` is something that can be broken into a dataflow pipeline. That's why ETL tools like SSIS use dataflows. `I want to process a stream of events to detect anomalies/fraud and block malicious IPs` is another thing that can use a dataflow. In general operating on an "infinite" stream of events can use dataflows – Panagiotis Kanavos Apr 21 '21 at 06:38
  • @Mikhail if all you want is an asynchronous queue on the other hand, use [Channel](https://www.stevejgordon.co.uk/an-introduction-to-system-threading-channels). That's a lower-level construct. Channels can be used to create dataflows. In fact, if Channel was available in 2012, TPL DataFlow would use it. To create a pipeline you'd have to add your own asynchronous code to read and process messages. In simple scenarios though, this can be as simple as `await foreach(var message in channelReader.ReadAllAsync())` – Panagiotis Kanavos Apr 21 '21 at 06:46
  • Mikhail if you are looking to TPL Dataflow for a tool that can solve all imaginable problems that involve concurrency, the TPL Dataflow is probably not that tool. Some problems can be solved easily and efficiently with Dataflow, while others can be solved with great difficulty or not at all. The library is not easily extensible. This is what you'll have to struggle with, if you try to solve problems that are too unusual/idiomatic/localized. Regarding the specific example you provided, it's unclear if the Dataflow is a good fit, because you haven't specified precisely the desirable behavior. – Theodor Zoulias Apr 21 '21 at 08:17

1 Answers1

-1

You need to do a .Complete() on root, then Main() needs to wait for all consumers to finish chewing their food before Main ends.

At the top of Main:

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

Inside your for loop:

  • Add all your consumers to a List (I called it "comps", short for Completions).
  • Include linkOptions as part of your LinkTo call.

Add this before your ReadLine():

root.Complete();
Task.WaitAll(comps.ToArray());
amonroejj
  • 573
  • 4
  • 16
  • Even with these updates, though, I got some weird effects where consumer #4 consumed fewer items overall, and value 6 was never picked up by ANY consumer, because BroadcastBlock (counter to intuition) goes on its merry way even if the consumers are all slowpokes. I don't know of many good use cases for this. This answer may have a workaround: https://stackoverflow.com/questions/22127660/broadcastblock-with-guaranteed-delivery-in-tpl-dataflow – amonroejj May 07 '21 at 15:11
  • Would the members of your real consumer group each have different behavior? Maybe a bucket brigade of consumers would be better than trying to force BroadcastBlock to do something that's a bad fit. – amonroejj May 07 '21 at 15:20
  • Another alternative, memory permitting, is to link the BroadcastBlock to five side-by-side BufferBlocks, and let the consumers read from the buffers. – amonroejj May 07 '21 at 19:32
  • thanks for the answer. It's a proper way to wait for all the jobs to finish, instead of Console.ReadLine(), yes. Still it does not answer the original problem, which is why some of the items get missing even in fastest consumer, and how to prevent this in a reliable way. – Mikhail May 08 '21 at 08:09
  • I tried it. The one-buffer-per-consumer worked. All 5 consumers got all 10 inputs. – amonroejj May 10 '21 at 17:19