5

I look for an alternative to JoinBlock which can be linked to by n-TransformBlocks and join/merge messages of all TransformBlock source blocks together in order to pass a collection of such on to another data flow block.

JoinBlock does the job fine but it is limited to hooking up to 3 source blocks. It also suffers from quite a number inefficiencies (very slow to join even value types (ints) of 2 source blocks). Is there a way to have Tasks returned from the TransformBlocks and wait until all TransformBlocks have a completed task to pass on before accepting the Task<item>?

Any alternative ideas? I potentially have 1-20 such transform blocks which items I need to join together before passing on the joined item collection. Each transform block is guaranteed to return exactly one output item for each input item "transformed".

Edit: Requested clarification:

Per one of my previous questions, I set up my JoinBlocks as follows:

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking
    broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
    broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
    transformBlock1.LinkTo(joinBlock.Target1);
    transformBlock2.LinkTo(joinBlock.Target2);
    joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}

public void Start()
{
    Stopwatch watch = new Stopwatch();
    watch.Start();

    const int numElements = 1000000;

    for (int i = 1; i <= numElements; i++)
    {
        broadCastBlock.Post(i);
    }

    ////mark completion
    broadCastBlock.Complete();
    Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


    processorBlock.Completion.Wait();

    watch.Stop();

    Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
    Console.ReadLine();
}
i3arnon
  • 113,022
  • 33
  • 324
  • 344
Matt
  • 7,004
  • 11
  • 71
  • 117
  • 1
    “Is there a way to have Tasks returned from the TransformBlocks and wait until all TransformBlocks have a completed task to pass on before accepting the `Task`?” I'm not sure why do you think that would help, but TDF blocks don't work that way. You either accept an item, or you don't, you can't take an item and decide to accept it sometime later. – svick Dec 02 '12 at 09:55

2 Answers2

6

One way to do this is to use BatchBlock with Greedy set to false. In this configuration, the block doesn't do anything until there are n items from n different blocks waiting for it to be consumed (where n is the number you set when creating the BatchBlock). When that happens, it consumes all n items at once and produces an array containing all of the items.

One caveat with this solution is that the resulting array is not sorted: you're not going to know which item came from which source. And I have no idea how does its performance compare with JoinBlock, you'll have to test that by yourself. (Though I would understand if using BatchBlock this way was slower, because of the overhead necessary for non-greedy consumption.)

svick
  • 236,525
  • 50
  • 385
  • 514
  • I will test it and report back with performance numbers relative to join block. However, I am also interested in alternative Non-TDF solutions. – Matt Dec 02 '12 at 15:05
  • some feedback re BatchBlock: You were right, batchblock in non-greedy mode (required to get all items, not just the first one available) slows down things significantly. I decided to solve this problem outside of TDF. I still want to run the func in each previous transform block in parallel and then join results conventionally rather than through TDF. But thanks for the pointers to batchBlock. – Matt Dec 03 '12 at 06:53
  • @svick - Nice repurposing of BatchBlock to compensate for the limited number of output tuples for JoinBlock. – Robert Oschler Apr 08 '13 at 04:10
0

If you want to perform multiple parallel operations for each item, it makes more sense IMHO to perform these operations inside a single block, instead of splitting them to multiple blocks and then trying to join the independent results into a single object again. So my suggestion is to do something like this:

var block = new TransformBlock<MyClass, MyClass>(async item =>
{
    Task<SomeType1> task1 = Task.Run(() => CalculateProperty1(item.Id));
    Task<SomeType2> task2 = Task.Run(() => CalculateProperty2(item.Id));
    await Task.WhenAll(task1, task2).ConfigureAwait(false);
    item.Property1 = task1.Result;
    item.Property2 = task2.Result;
    return item;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2
});

In the above example items of type MyClass are passed through a TransformBlock. The properties Property1 and Property2 of each item are calculated in parallel using a separate Task for each property. Then both tasks are awaited, and when both are complete the results are assigned to the properties of the item. Finally the processed item is returned.

The only thing you want to be aware with this approach is that the degree of parallelism will be the product of the internal parallel operations and the MaxDegreeOfParallelism option of the block. So in the above example the degree of parallelism will be 2 x 2 = 4. To be precise this will be the maximum degree of parallelism, because it is possible that one of the two internal calculations will be slower than the other. So at any given moment the actual degree of parallelism could be anything between 2 and 4.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104