4

Please note following code sample. I need an aggregator node, that can be linked to any number of sources, waites for all sources to send one message and then combines those in a result[].

This should be obvious and straigt forward, but somehow I do not find a solution. I checked JoinBlock and TransformaterBlock, but both seem unfitting.

using System;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp2
{
    internal class Program
    {
        private static readonly uint _produceCount = 0;
        private static void Main(string[] args)
        {

            BufferBlock<string> p1 = new BufferBlock<string>();
            BufferBlock<string> p2 = new BufferBlock<string>();

            // a block is required that accepts n sources as input, waits for all inputs to arrive, and then creates a result array from all inputs

            ActionBlock<string[]> c1 = new ActionBlock<string[]>((inputs) =>
            {
                Console.WriteLine(String.Join(',', inputs));
            });

            p1.Post("Produce 1.1");
            p2.Post("Produce 2.1");

            // desired output:
            // "Produce 1.1, Produce 2.1"
            // actually the order is of no importance at this time

        }


    }
}

[Edit] Further clarification: I would like to have a block that: - dynamically await all source-notes (at the point in time the first message arrives) to complete and aggregate the result to pass to follower nodes

Martin Meeser
  • 2,784
  • 2
  • 28
  • 41

2 Answers2

2

If you know your sources beforehand, I'd use a JoinBlock together with a TransformBlock. You would have to create a BufferBlock for each source.

First, the JoinBlock waits for one message from each source and packs them in one tuple. Then the TransformBlock creates a result array from the intermediate tuple.

If you do not know your sources beforehand, you need to explain how you expect your new block to know when to produce a result. That logic should then be put into a custom block, probably in the form of a TransformManyBlock<string,string[]>.

If you want to join a dynamic number of sources, you can create an unlimited join block like this:

private static void Main()
{
    var source1 = new BufferBlock<string>();
    var source2 = new BufferBlock<string>();
    var source3 = new BufferBlock<string>();
    var aggregator = CreateAggregatorBlock( 3 );
    var result = new ActionBlock<string[]>( x => Console.WriteLine( string.Join( ", ", x ) ) );
    source1.LinkTo( aggregator );
    source2.LinkTo( aggregator );
    source3.LinkTo( aggregator );
    aggregator.LinkTo( result );

    source1.Post( "message 1" );
    source2.Post( "message 2" );
    source3.Post( "message 3" );

    Console.ReadLine();
}

private static TransformManyBlock<string, string[]> CreateAggregatorBlock( int sources )
{
    var buffer = new List<string>();
    return new TransformManyBlock<string, string[]>( message => {
        buffer.Add( message );
        if( buffer.Count == sources )
        {
            var result = buffer.ToArray();
            buffer.Clear();
            return new[] {result};
        }
        return Enumerable.Empty<string[]>();
    } );
}

This assumes your sources produce messages at the same rate. If that's not the case, you need to but the identity of the source next to the message and have a buffer for each source.

Haukinger
  • 10,420
  • 2
  • 15
  • 28
  • A `JoinBlock` has a limited number of input sources. The OP is asking about having many sources, presumably more than any one `JoinBlock` can handle. – JSteward Aug 29 '18 at 17:00
  • As you noted the sources must be in lock step for this to work since you're just taking the first three inputs and creating a race. Also, if this block operates in parallel then there's nothing protecting the shared buffer. Rolling your own variant of this block is not going to be a trivial task. – JSteward Aug 30 '18 at 13:28
  • I guess I like the batch block more, indeed. I wasn't aware of the `Greedy=false` behavior, that's really two different blocks in one. – Haukinger Aug 30 '18 at 13:58
2

You can use a non-greedy BatchBlock for this. By being non-greedy each source will contribute one item to the batch. This was originally suggested here. And here's a tested example: Note as proof, source1 is sent multiple items that don't show up in the batch:

public class DataAggregator
{
    private BatchBlock<string> batchBlock = new BatchBlock<string>(5, new GroupingDataflowBlockOptions() { Greedy = false });
    private ActionBlock<string[]> writer = new ActionBlock<string[]>(strings => strings.ToList().ForEach(str => Console.WriteLine(str)));
    private BufferBlock<string> source1 = new BufferBlock<string>();
    private BufferBlock<string> source2 = new BufferBlock<string>();
    private BufferBlock<string> source3 = new BufferBlock<string>();
    private BufferBlock<string> source4 = new BufferBlock<string>();
    private BufferBlock<string> source5 = new BufferBlock<string>();

    public DataAggregator()
    {
        source1.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source2.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source3.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source4.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source5.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        batchBlock.LinkTo(writer, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    [Test]
    public async Task TestPipeline()
    {
        source1.Post("string1-1");
        source1.Post("string1-2");
        source1.Post("string1-3");
        source2.Post("string2-1");
        source3.Post("string3-1");
        source4.Post("string4-1");
        source5.Post("string5-1");
        //Should print string1-1 string2-1 string3-1 string4-1 string5-1
        source1.Complete();
        source2.Complete();
        source3.Complete();
        source4.Complete();
        source5.Complete();
        await writer.Completion;
    }
}

Output:

string1-1
string2-1
string3-1
string4-1
string5-1
JSteward
  • 6,833
  • 2
  • 21
  • 30
  • 1
    This is working and practical. Let me anyhow add following minor note: I need to know the number of input blocks beforehand. I guess the bottom line is that there just is no aggregator block which is aware of all its sources. See also [q here](https://stackoverflow.com/questions/46552291/aggregation-and-joins-inner-outer-left-with-tpl-dataflow?rq=1) – Martin Meeser Aug 31 '18 at 16:03
  • 1
    True there are limitations, even dynamically linking an unknown number of blocks would still leave you limited but, like the link describes, Rx can fill some of those gaps. – JSteward Aug 31 '18 at 16:06
  • let me add that it is crucial to have one buffer block for each input source before the batch block - otherwise for example you have to sources in your network - one finishing completely before the other starts - you will get a pair of the first and the last item – Martin Meeser Sep 12 '18 at 07:40