My use case is this: send 100,000+ web requests to our application server and wait the results. Here, most of the delay is IO-bound, not CPU-bound, so I understand the Dataflow libraries may not be the best tool for this. I've managed to use it will a lot of success and have set the MaxDegreeOfParallelism
to the number of requests that I trust the server to be able to handle, however, since this is the maximum number of tasks, it's no guarantee that this will actually be the number of tasks running at any time.
The only bit of information I could find in the documentation is this:
Because the MaxDegreeOfParallelism property represents the maximum degree of parallelism, the dataflow block might execute with a lesser degree of parallelism than you specify. The dataflow block can use a lesser degree of parallelism to meet its functional requirements or to account for a lack of available system resources. A dataflow block never chooses a greater degree of parallelism than you specify.
This explanation is quite vague on how it actually determines when to spin up a new task. My hope was that it will recognize that the task is blocked due to IO, not any system resources, and it will basically stay at the maximum degrees of parallelism for the entire duration of the operation.
However, after monitoring a network capture, it seems to be MUCH quicker in the beginning and slower near the end. I can see from the capture, that at the beginning it does reach the maximum as specified. The TPL library doesn't have any built-in way to monitor the current number of active threads, so I'm not really sure of the best way to investigate further on that end.
My implementation:
internal static ExecutionDataflowBlockOptions GetDefaultBlockOptions(int maxDegreeOfParallelism,
CancellationToken token) => new()
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
CancellationToken = token,
SingleProducerConstrained = true,
EnsureOrdered = false
};
private static async ValueTask<T?> ReceiveAsync<T>(this ISourceBlock<T?> block, bool configureAwait, CancellationToken token)
{
try
{
return await block.ReceiveAsync(token).ConfigureAwait(configureAwait);
} catch (InvalidOperationException)
{
return default;
}
}
internal static async IAsyncEnumerable<T> YieldResults<T>(this ISourceBlock<T?> block, bool configureAwait,
[EnumeratorCancellation]CancellationToken token)
{
while (await block.OutputAvailableAsync(token).ConfigureAwait(configureAwait))
if (await block.ReceiveAsync(configureAwait, token).ConfigureAwait(configureAwait) is T result)
yield return result;
// by the time OutputAvailableAsync returns false, the block is gauranteed to be complete. However,
// we want to await it anyway, since this will propogate any exception thrown to the consumer.
// we don't simply await the completed task, because that wouldn't return all aggregate exceptions,
// just the last to occur
if (block.Completion.Exception != null)
throw block.Completion.Exception;
}
public static IAsyncEnumerable<TResult> ParallelSelectAsync<T, TResult>(this IEnumerable<T> source, Func<T, Task<TResult?>> body,
int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler? scheduler = null, CancellationToken token = default)
{
var options = GetDefaultBlockOptions(maxDegreeOfParallelism, token);
if (scheduler != null)
options.TaskScheduler = scheduler;
var block = new TransformBlock<T, TResult?>(body, options);
foreach (var item in source)
block.Post(item);
block.Complete();
return block.YieldResults(scheduler != null && scheduler != TaskScheduler.Default, token);
}
So, basically, my question is this: when an IO-bound action is executed in a TPL Dataflow block, how can I ensure the block stays at the MaxDegreeOfParallelism
that is set?