I am trying to implement a classic map-reduce problem using System.Threading.Tasks.Dataflow
, and although I can get something (sort of) working I'm struggling to see how to generalise this functionality.
Given a simple problem
- Produce a stream of integers; and in parallel for each number
- Square the number
- add 5
- divide by 2
- Take the sum of all numbers
The problem im having is that I can get this working using a BufferBlock
, but I have to specify the initial size of the set of parallel tasks. This is fine for the test code (below) as I know upfront how many items im going to queue, but say I didnt know... how would I set this pipeline up?
Test code used (Note I added a short delay into the first of the "Parallel" blocks just to see some processing time difference depending on degrees of parallelism):
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
var input = 10;
var fanOutBlock = new TransformManyBlock<int, int>(x =>
{
return Enumerable.Range(1, x).Select(x => x);
});
var squareBlock = new TransformBlock<int, int>(async x =>
{
await Task.Delay(100);
return x * x;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var addFiveBlock = new TransformBlock<int, int>(x =>
{
return x + 5;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var divTwoBlock = new TransformBlock<int, double>(x =>
{
return x/2.0;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var batchBlock = new BatchBlock<double>(input);
var sumBlock = new TransformBlock<IList<double>,double>(x =>
{
return x.Sum();
});
var options = new DataflowLinkOptions { PropagateCompletion = true };
fanOutBlock.LinkTo(squareBlock, options);
squareBlock.LinkTo(addFiveBlock, options);
addFiveBlock.LinkTo(divTwoBlock, options);
divTwoBlock.LinkTo(batchBlock, options);
batchBlock.LinkTo(sumBlock, options);
var sw = Stopwatch.StartNew();
fanOutBlock.Post(input);
fanOutBlock.Complete();
var result = sumBlock.Receive();
Console.WriteLine(result);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds}ms");
await sumBlock.Completion;