So you have a Channel<Task<T>>
as a conveyor belt, the producer adds tasks with channel.Writer.TryWrite(Task.Run(() => Parse(item)))
, and the consumer reads the tasks and awaits them the one after the other:
await foreach (Task<T> task in channel.Reader.ReadAllAsync())
{
T result = await task;
// Do something with the result
}
This is a quite good setup. A disadvantage is that you are not controlling the degree of parallelism. So at some moments you might have too many Task.Run
actions running in parallel, resulting in ThreadPool
starvation, that might affect negatively other parts of your application. You can solve this problem by scheduling the work with the more advanced Task.Factory.StartNew
instead of the Task.Run
, and configure the scheduler
argument with the ConcurrentScheduler
property of a shared ConcurrentExclusiveSchedulerPair
instance.
Another approach is to replace the channel with a TransformBlock<TInput,TOutput>
from the TPL Dataflow library. This component combines an input buffer, an output buffer, and a processor that transforms the TInput
to TOutput
. It is equipped out of the box with parallel capabilities and order preservation. Here is an example:
TransformBlock<Item, Result> block = new(item =>
{
return Parse(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2, // Configurable, the default is 1
EnsureOrdered = true, // This is the default
});
The producer feeds the block with block.Post(item)
, and the consumer enumerates the output buffer of the block with the ReceiveAllAsync
method:
await foreach (var result in block.ReceiveAllAsync())
{
// Do something with the result
}
await block.Completion;
The await block.Completion;
at the end is needed because the ReceiveAllAsync
method currently has a bug, and doesn't propagate possible exceptions as part of the enumeration.
My expectation is that the TransformBlock
approach should have less overhead, and consume less memory than your current setup. The TPL Dataflow library is advertised by Microsoft as suitable for "coarse-grained dataflow and pipelining tasks". This means that your Parse
method should be chunky. In case it is feather-weight, like parsing a single number, most likely the benefits of parallelization will be negated by the synchronization overhead. In that case the solution might be to chunkify the work using a BatchBlock<T>
.
The TPL Dataflow library is not exactly cutting edge technology. It predates ValueTask
s and so it doesn't take advantage of them. It also comes with some quirks, like swallowing OperationCanceledException
s that might be thrown by the transform
delegate. It is also very difficult to extend. Although it should be better than what you have already, it's not the absolutely optimal solution, but it might be good enough for your needs.