2

I need to multicast a object into multiple path's

      producer
         |
      multicast
     |        |
 Process1   Process2
     |        |
   Writedb   WriteFile

the broadcast block is not helping much, it only does the latest to both proces1, process 2 , if process 2 is running late then it wont be able to receive messages.

db writer and write file have different data.

Here is the following code snippet.

class Program
{
    public static void Main()
    {
        var broadCastBlock = new BroadcastBlock<int>(i => i);

        var transformBlock1 = new TransformBlock<int, string>(i =>
        {
            Console.WriteLine("1 transformblock called: {0}", i);
            //Thread.Sleep(4);
            return string.Format("1_ {0},", i);
        });

        var transformBlock2 = new TransformBlock<int, string>(i =>
        {
            Console.WriteLine("2 transformblock called: {0}", i);
            Thread.Sleep(100);
            return string.Format("2_ {0},", i);
        });

        var processorBlockT1 = new ActionBlock<string>(i => Console.WriteLine("processBlockT1 {0}", i));
        var processorBlockT2 = new ActionBlock<string>(i => Console.WriteLine("processBlockT2 {0}", i));

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlockT1, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlockT2, new DataflowLinkOptions { PropagateCompletion = true });

        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //completion handling

        broadCastBlock.Completion.ContinueWith(x =>
        {
            Console.WriteLine("Braodcast block Completed");
            transformBlock1.Complete();
            transformBlock2.Complete();
            Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ =>
            {
                processorBlockT1.Complete();
                processorBlockT2.Complete();
            });
        });


        transformBlock1.Completion.ContinueWith(x => Console.WriteLine("Transform1 completed"));
        transformBlock2.Completion.ContinueWith(x => Console.WriteLine("Transform2 completed"));
        processorBlockT1.Completion.ContinueWith(x => Console.WriteLine("processblockT1 completed"));
        processorBlockT2.Completion.ContinueWith(x => Console.WriteLine("processblockT2 completed"));


        //mark completion
        broadCastBlock.Complete();
        Task.WhenAll(processorBlockT1.Completion, processorBlockT2.Completion).ContinueWith(_ => Console.WriteLine("completed both tasks")).Wait();


        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

What is the best method of a guaranteed delivery by broadcast. i.e., a multicast.

should I just stick in two buffers at both ends and then consume it so that the buffers always collect what ever is coming in and then the process might take some time to process all of them?

bhushanvinay
  • 449
  • 1
  • 5
  • 21
  • 1
    What does "best" mean for you? Do you care that blocks are bounded? If not, I think your code will work as is. – svick Mar 19 '16 at 17:23
  • yes if I bound the capacity then the problem is more evident. – bhushanvinay Mar 19 '16 at 19:52
  • 1
    No, I believe that if you don't bound capacity, then the problem *does not exist* (because non-bounded blocks never postpone messages and so they never miss any). – svick Mar 19 '16 at 23:49
  • 1
    The *target* blocks have buffers of their own. You *won't* lose any messages even if one of the targets takes a long time. As svick said, you'll only run into trouble if you bound one of the targets' capacity – Panagiotis Kanavos Mar 22 '16 at 10:18
  • I'm looking to implement this kind of task branching too, https://stackoverflow.com/a/40639607 seems to indicate its a matter of `Post` vs `SendAsync` – Ethan Jul 11 '19 at 17:45

1 Answers1

1

The BroadcastBlock guarantees that all messages will be offered to all linked blocks. So it is exactly what you need. What you should fix though is the way you feed the BroadcastBlock with messages:

for (int i = 1; i <= numElements; i++)
{
    broadCastBlock.SendAsync(i); // Don't do this!
}

The SendAsync method is supposed to be awaited. You should never have more than one pending SendAsync operations targeting the same block. Doing so not only breaks all guarantees about the order of the received messages, but it is also extremely memory-inefficient. The whole point of using bounded blocks is for controlling the memory usage by limiting the size of the internal buffers of the blocks. By issuing multiple un-awaited SendAsync commands you circumvent this self-imposed limitation by creating an external dynamic buffer of incomplete Tasks, with each task weighing hundreds of bytes, for propagating messages having just a fraction of this weight. These messages could be much more efficiently buffered internally, by not making the blocks bounded in the first place.

for (int i = 1; i <= numElements; i++)
{
    await broadCastBlock.SendAsync(i); // Now it's OK
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104