1

My use case is this: send 100,000+ web requests to our application server and wait the results. Here, most of the delay is IO-bound, not CPU-bound, so I understand the Dataflow libraries may not be the best tool for this. I've managed to use it will a lot of success and have set the MaxDegreeOfParallelism to the number of requests that I trust the server to be able to handle, however, since this is the maximum number of tasks, it's no guarantee that this will actually be the number of tasks running at any time.

The only bit of information I could find in the documentation is this:

Because the MaxDegreeOfParallelism property represents the maximum degree of parallelism, the dataflow block might execute with a lesser degree of parallelism than you specify. The dataflow block can use a lesser degree of parallelism to meet its functional requirements or to account for a lack of available system resources. A dataflow block never chooses a greater degree of parallelism than you specify.

This explanation is quite vague on how it actually determines when to spin up a new task. My hope was that it will recognize that the task is blocked due to IO, not any system resources, and it will basically stay at the maximum degrees of parallelism for the entire duration of the operation.

However, after monitoring a network capture, it seems to be MUCH quicker in the beginning and slower near the end. I can see from the capture, that at the beginning it does reach the maximum as specified. The TPL library doesn't have any built-in way to monitor the current number of active threads, so I'm not really sure of the best way to investigate further on that end.

My implementation:

   internal static ExecutionDataflowBlockOptions GetDefaultBlockOptions(int maxDegreeOfParallelism,
        CancellationToken token) => new()
        {
            MaxDegreeOfParallelism = maxDegreeOfParallelism,
            CancellationToken = token,
            SingleProducerConstrained = true,
            EnsureOrdered = false
        };


    private static async ValueTask<T?> ReceiveAsync<T>(this ISourceBlock<T?> block, bool configureAwait, CancellationToken token)
    {
        try
        {
            return await block.ReceiveAsync(token).ConfigureAwait(configureAwait);
        } catch (InvalidOperationException)
        {
            return default;
        }
    }

    internal static async IAsyncEnumerable<T> YieldResults<T>(this ISourceBlock<T?> block, bool configureAwait,
        [EnumeratorCancellation]CancellationToken token)
    {
        while (await block.OutputAvailableAsync(token).ConfigureAwait(configureAwait))
            if (await block.ReceiveAsync(configureAwait, token).ConfigureAwait(configureAwait) is T result)
                yield return result;

        // by the time OutputAvailableAsync returns false, the block is gauranteed to be complete. However,
        // we want to await it anyway, since this will propogate any exception thrown to the consumer.
        // we don't simply await the completed task, because that wouldn't return all aggregate exceptions,
        // just the last to occur
        if (block.Completion.Exception != null)
            throw block.Completion.Exception;
    }

    public static IAsyncEnumerable<TResult> ParallelSelectAsync<T, TResult>(this IEnumerable<T> source, Func<T, Task<TResult?>> body,
        int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler? scheduler = null, CancellationToken token = default)
    {
        var options = GetDefaultBlockOptions(maxDegreeOfParallelism, token);

        if (scheduler != null)
            options.TaskScheduler = scheduler;

        var block = new TransformBlock<T, TResult?>(body, options);

        foreach (var item in source)
            block.Post(item);

        block.Complete();

        return block.YieldResults(scheduler != null && scheduler != TaskScheduler.Default, token);
    }

So, basically, my question is this: when an IO-bound action is executed in a TPL Dataflow block, how can I ensure the block stays at the MaxDegreeOfParallelism that is set?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
thomasrea0113
  • 360
  • 1
  • 9
  • What's the actual problem? You don't need any of this code with Dataflow. Dataflow is *great* at IO bound tasks as all blocks can use asynchronous lambdas. Increasing the DOP from 1 to N means that only N concurrent operations will execute at any time – Panagiotis Kanavos Dec 16 '22 at 14:49
  • The explanations aren't vague - if you post only 5 items to a block with a DOP of 100, why would it use more than 5 tasks to process them? – Panagiotis Kanavos Dec 16 '22 at 14:51
  • Are you trying to use Dataflow as a job queue perhaps? It's not. It's meant to create pipelines similar to Bash or PowerShell pipelines, with each block acting as a separate command, reading messages from its input queue and passing them to the next block through its output queue. That's why the default DOP is 1 - parallelism and concurrency come from using multiple commands/blocks, not a fat block with a high DOP – Panagiotis Kanavos Dec 16 '22 at 14:55
  • @PanagiotisKanavos right, that's not what I was suggesting. In my case, I known for a fact my input far exceeds the limit set. So, I would expect to see the max number of threads being used consistently. But from what I've been able to gather, near the end of the operation it is using far fewer threads that the maximum being set. Which doesn't make sense since the work is mostly IO-bound. – thomasrea0113 Dec 16 '22 at 14:56
  • What is the *actual* problem? Because Dataflow works perfectly in this scenario with just a couple of lines. None of this code is needed. I use it to do far more heavy stuff than only 100K requests - download reports, parse them then download individual items and finally import everything into a database. The remote services are finicky so I have to use a different DOP per block to avoid crashing them or getting throttled – Panagiotis Kanavos Dec 16 '22 at 14:58
  • @thomasrea0113: You mean it's using fewer HTTP requests? Fewer threads are expected, since this is asynchronous. – Stephen Cleary Dec 16 '22 at 14:59
  • For context, my MDOP is only 20. my understand is that it's implemented using Tasks and a TaskScheduler, so I don't think I am limited to the actual number of CPU cores, especially since the work is IO-bound. – thomasrea0113 Dec 16 '22 at 15:00
  • Are you reusing the socket connections? If not, you may be running out of ephemeral ports. – Stephen Cleary Dec 16 '22 at 15:00
  • Yeah, so, I have one HttpClient instance that I'm reusing across all the dataflow actions. I didn't consider this being a potential issue since the client is thread safe. – thomasrea0113 Dec 16 '22 at 15:02
  • It's not. HttpClient *is* thread-safe – Panagiotis Kanavos Dec 16 '22 at 15:02
  • 1
    A few suggestions: 1. Add the line `ThreadPool.SetMinThreads(100, 100);` at the start of the program, to make sure that the limiting factor is not the `ThreadPool` availability. Then report your observations. This is suggested for debugging purposes, not as a fix. 2. Try to reproduce the gradually reduced parallelism issue after replacing your actual I/O work with something trivial, like an asynchronous delay (`await Task.Delay(1000)`). 3. Include in the question the code that monitors the current degree of parallelism, so that we can verify that you are monitoring it correctly. – Theodor Zoulias Dec 16 '22 at 15:52
  • 1
    As a side note the `YieldResults` method is not implemented efficiently, and it's not propagating the cancellation that is caused by the `ExecutionDataflowBlockOptions.CancellationToken` configuration. For better implementations you can look [here](https://stackoverflow.com/questions/49389273/for-a-tpl-dataflow-how-do-i-get-my-hands-on-all-the-output-produced-by-a-transf "For a TPL Dataflow: How do I get my hands on all the output produced by a TransformBlock while blocking until all inputs have been processed?"). – Theodor Zoulias Dec 16 '22 at 15:58

1 Answers1

2

On the contrary, Dataflow is great at IO work and perfect for this scenario. DataFlow architectures work by creating pipelines similar to Bash or PowerShell pipelines. Each block acting as a separate command, reading messages from its input queue and passing them to the next block through its output queue. That's why the default DOP is 1 - parallelism and concurrency come from using multiple commands/blocks, not a fat block with a high DOP

This is a simplified example of what I use at work request daily sales reports from about a hundred airlines (BSPs for those that know about air tickets), parse the reports and then download individual ticket records, before importing everything into the database.

In this case the head block downloads content with a DOP=10, then the parser block parses the responses one at a time. The downloader is IO-bound so it can make a lot more requests than there are cores, as many as the services allow, or the application wants to handle.

The parser on the other hand is CPU bound. A high DOP would lock a lot of core which would harm not just the application, but other processes as well.

// Create the blocks
var dlOptions = new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism=10
};
var downloader=new TransformBlock<string,string>(
                     url => _client.GetStringAsync(url,cancellationToken),
                     dlOptions);
var parser=new TransformBlock<string,Something>(ParseIntoSomething);
var importer=new ActionBlock<Something>(ImportInDb);

// Link the blocks
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
downloader.LinkTo(parser,linkOptions);
parser.LinkTo(importer,linkOptions);

After building this 3 step pipeline I post URLs at the front and expect the tail block to complete

foreach(var url in urls)
{
    downloader.Post(url);
}

downloader.Complete();
await importer.Completion;

There are a lot of improvements to this. Right now, if the downloader is faster than the parser, all the content will be buffered in memory. In a long running pipeline this can easily take up all available memory.

A simple way to avoid this is add BoundedCapacity=N to the parser block options. If the parser input buffer is full, upstream blocks, in this case the downloader, will pause and wait until a slot becomes available :

var parserOptions = new ExecutionDataflowBlockOptions {
    BoundedCapacity=2,
    MaxDegreeOfParallelism=2,
};
var parser=new TransformBlock<string,Something>(ParseIntoSomething, parserOptions);
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • I think there's a key distinction with this use case and mine: here, you have relatively few URLs (a few hundred, I'd suppose) that you need to perform a sequence of actions on, which is not the same as what I'm doing. In this case, I would imagine each URL produces potentially hundreds of thousands of results, which is why a `TransformBlock` is used. This work would be predominantly CPU-bound, wouldn't it? In my case, I have one action I need to perform: the download. On 100,000 plus web requests. – thomasrea0113 Dec 16 '22 at 15:12
  • Wrong, I have a stream of over 100K individual air tickets. I work for a large online travel agency. First step downloads daily sales reports, second parses the 2MB responses to retrieve individual ticket numbers, third request the ticket record for each ticket, batch step batches them in 5K batches, final block uses SqlBulkImport to insert everything into the database – Panagiotis Kanavos Dec 16 '22 at 15:13
  • @thomasrea0113 have you tried using blocks like blocks in a pipeline? Or are you trying to proactively "fix" something ? – Panagiotis Kanavos Dec 16 '22 at 15:15
  • Okay that is very similar to my use case then. I'll assume the slowdown I'm seeing is not actually related to the Dataflow after all. It's possible that it's our server slowing down as it tries to process all those requests. I'll do some more investigate – thomasrea0113 Dec 16 '22 at 15:16
  • 1
    Try *reducing* the DOP. Opening 100K sockets is unhealthy for any machine. It's also unhealthy for the remote services, the network card and any proxies. If the remote service is using shared resources like a database or disk, more requests will cause increasing delays. – Panagiotis Kanavos Dec 16 '22 at 15:18
  • My DOP is only 20, each thread used the same HttpClient. Ultimately, I'm trying to create a set of general-use enumerable extensions so I can batch web requests (and other parallel operations) on the fly across many different applications. Meaning, I don't want the extension to actually expose any Dataflow blocks. It takes an IEnumerable or IAsyncEnumerable, and then returns an IAsyncEnumerable that will asynchronously yield results as they come in. This will be used by many different developers, and I don't want them to have to learn the Dataflow library since it's not important to them. – thomasrea0113 Dec 16 '22 at 15:20
  • Dataflow blocks already work on infinite streams of messages. They don't need enumerable extensions. Dataflow is a *higher* level abstraction than item enumeration. It's an entirely different *architecture* – Panagiotis Kanavos Dec 16 '22 at 15:23
  • Dataflow is a [pipes-and-filters architecture](https://www.enterpriseintegrationpatterns.com/PipesAndFilters.html), with each block acting as a filter. – Panagiotis Kanavos Dec 16 '22 at 15:31