2

I am trying to implement a classic map-reduce problem using System.Threading.Tasks.Dataflow, and although I can get something (sort of) working I'm struggling to see how to generalise this functionality.

Given a simple problem

  • Produce a stream of integers; and in parallel for each number
    • Square the number
    • add 5
    • divide by 2
  • Take the sum of all numbers

The problem im having is that I can get this working using a BufferBlock, but I have to specify the initial size of the set of parallel tasks. This is fine for the test code (below) as I know upfront how many items im going to queue, but say I didnt know... how would I set this pipeline up?

Test code used (Note I added a short delay into the first of the "Parallel" blocks just to see some processing time difference depending on degrees of parallelism):

using System.Diagnostics;
using System.Threading.Tasks.Dataflow;

var input = 10;

var fanOutBlock = new TransformManyBlock<int, int>(x =>
{
    return Enumerable.Range(1, x).Select(x => x);
});

var squareBlock = new TransformBlock<int, int>(async x =>
 {
     await Task.Delay(100);
     return x * x;
 }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var addFiveBlock = new TransformBlock<int, int>(x =>
{
    return x + 5;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var divTwoBlock = new TransformBlock<int, double>(x =>
{
    return x/2.0;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var batchBlock = new BatchBlock<double>(input);

var sumBlock = new TransformBlock<IList<double>,double>(x =>
{
    return x.Sum();
});

var options = new DataflowLinkOptions { PropagateCompletion = true };

fanOutBlock.LinkTo(squareBlock, options);
squareBlock.LinkTo(addFiveBlock, options);
addFiveBlock.LinkTo(divTwoBlock, options);
divTwoBlock.LinkTo(batchBlock, options);
batchBlock.LinkTo(sumBlock, options);


var sw = Stopwatch.StartNew();
fanOutBlock.Post(input);
fanOutBlock.Complete();


var result = sumBlock.Receive();
Console.WriteLine(result);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds}ms");

await sumBlock.Completion;
Jamiec
  • 133,658
  • 13
  • 134
  • 193
  • I think I would have an ActionBlock instead of the last TransFormBlock that sums input to an integer class field via Interlocked... or something like that. The result can be set valid after completion of the pipeline. – Fildor May 18 '22 at 14:45
  • @Fildor Im not sure I follow (But that might be as im still pretty new to this dataflow library!). The intention is that this is part of a much longer pipeline where I sometimes want to fan part of it out to parallel processing. So after that final aggregate step the value is then passed on to the next block. All the examples terminate in an `ActionBlock` as they typically just `Console.WriteLine` some value. I have more to do. – Jamiec May 18 '22 at 14:48
  • Perhaps my example should have an `ActionBlock` at the end that just writes to console to make this clearer. – Jamiec May 18 '22 at 14:51
  • 1
    At this moment, I am trying to fiddle something. Here it is: https://dotnetfiddle.net/Ms5OOV – Fildor May 18 '22 at 14:58
  • And I think the whole thing, that's in the fiddle could then be embedded into a TransformBlock if it needs to be part of a bigger pipeline ... – Fildor May 18 '22 at 15:02
  • That was just my first idea, that came to mind. Maybe it makes sense? – Fildor May 18 '22 at 15:03
  • https://dotnetfiddle.net/XO9KMM => Variation that maybe better fits your case. – Fildor May 18 '22 at 15:05
  • @Fildor I think you're onto something, where I could actually turn this code into a custom block `MapReduceBlock : IPropogatorBlock` which propogates the result out to the next block in a wider pipeline – Jamiec May 18 '22 at 15:08
  • Sure. This was just a naive "first approach". – Fildor May 18 '22 at 15:08
  • Maybe this can even be "Task-Safe" with an AsyncLocal for the result field. – Fildor May 18 '22 at 15:13

1 Answers1

1

One idea is to configure the BatchBlock<T> with the maximum batchSize:

var batchBlock = new BatchBlock<double>(Int32.MaxValue);

When the batchBlock is completed (when its Complete method is invoked), it will emit a batch with all the messages it contains. The disadvantage is that by buffering every message, you might run out of memory in case the number of messages is huge. Or, if the number of messages is larger than Int32.MaxValue and miraculously you don't run out of memory, you'll get more than one batches, which regarding the logic that you are trying to implement will be a bug.

A better idea is to implement a custom Dataflow block that aggregates the messages it receives on the fly. Something similar to the Aggregate LINQ operator:

public static TResult Aggregate<TSource, TAccumulate, TResult>(
    this IEnumerable<TSource> source,
    TAccumulate seed,
    Func<TAccumulate, TSource, TAccumulate> function,
    Func<TAccumulate, TResult> resultSelector);

Here is an implementation, that is composed by two native blocks, that are encapsulated with the DataflowBlock.Encapsulate method:

public static IPropagatorBlock<TSource, TResult>
    CreateAggregateBlock<TSource, TAccumulate, TResult>(
    TAccumulate seed,
    Func<TAccumulate, TSource, TAccumulate> function,
    Func<TAccumulate, TResult> resultSelector,
    ExecutionDataflowBlockOptions options = default)
{
    options ??= new ExecutionDataflowBlockOptions();
    var maxDOP = options.MaxDegreeOfParallelism;
    options.MaxDegreeOfParallelism = 1;

    var inputBlock = new ActionBlock<TSource>(item =>
    {
        seed = function(seed, item);
    }, options);

    var outputBlock = new TransformBlock<TAccumulate, TResult>(accumulate =>
    {
        return resultSelector(accumulate);
    }, options);

    options.MaxDegreeOfParallelism = maxDOP; // Restore initial value

    PropagateCompletion(inputBlock, outputBlock, () =>
    {
        outputBlock.Post(seed);
    });

    return DataflowBlock.Encapsulate(inputBlock, outputBlock);

    static void PropagateCompletion(IDataflowBlock source, IDataflowBlock target,
        Action onSuccessfulCompletion)
    {
        ThreadPool.QueueUserWorkItem(async _ =>
        {
            try { await source.Completion; } catch { }
            Exception exception =
                source.Completion.IsFaulted ? source.Completion.Exception : null;
            if (source.Completion.IsCompletedSuccessfully)
            {
                // The action is invoked before completing the target.
                try { onSuccessfulCompletion(); }
                catch (Exception ex) { exception = ex; }
            }
            if (exception != null) target.Fault(exception); else target.Complete();
        });
    }
}

A tricky part is how to propagate the completion of the one block to the other. My preferred technique is to invoke an async void method on the thread pool. This way any bug in my code will be exposed as a crashing unhandled exception. The alternative is to put the code in a fire-and-forget task continuation, in which case the effect of a bug will be most likely a silent deadlock.

Another question mark is whether the mutations of the seed state are visible to all threads involved in the calculation. I've avoided putting explicit barriers or locks, and I am relying on the implicit barriers that the TPL includes when tasks are queued, and at the beginning/end of task execution.

Usage example:

var sumBlock = CreateAggregateBlock<double, double, double>(0.0,
    (acc, x) => acc + x, acc => acc);
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104