I need to multicast a object into multiple path's
producer
|
multicast
| |
Process1 Process2
| |
Writedb WriteFile
the broadcast block is not helping much, it only does the latest to both proces1, process 2 , if process 2 is running late then it wont be able to receive messages.
db writer and write file have different data.
Here is the following code snippet.
class Program
{
public static void Main()
{
var broadCastBlock = new BroadcastBlock<int>(i => i);
var transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 transformblock called: {0}", i);
//Thread.Sleep(4);
return string.Format("1_ {0},", i);
});
var transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 transformblock called: {0}", i);
Thread.Sleep(100);
return string.Format("2_ {0},", i);
});
var processorBlockT1 = new ActionBlock<string>(i => Console.WriteLine("processBlockT1 {0}", i));
var processorBlockT2 = new ActionBlock<string>(i => Console.WriteLine("processBlockT2 {0}", i));
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlockT1, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlockT2, new DataflowLinkOptions { PropagateCompletion = true });
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//completion handling
broadCastBlock.Completion.ContinueWith(x =>
{
Console.WriteLine("Braodcast block Completed");
transformBlock1.Complete();
transformBlock2.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ =>
{
processorBlockT1.Complete();
processorBlockT2.Complete();
});
});
transformBlock1.Completion.ContinueWith(x => Console.WriteLine("Transform1 completed"));
transformBlock2.Completion.ContinueWith(x => Console.WriteLine("Transform2 completed"));
processorBlockT1.Completion.ContinueWith(x => Console.WriteLine("processblockT1 completed"));
processorBlockT2.Completion.ContinueWith(x => Console.WriteLine("processblockT2 completed"));
//mark completion
broadCastBlock.Complete();
Task.WhenAll(processorBlockT1.Completion, processorBlockT2.Completion).ContinueWith(_ => Console.WriteLine("completed both tasks")).Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
What is the best method of a guaranteed delivery by broadcast. i.e., a multicast.
should I just stick in two buffers at both ends and then consume it so that the buffers always collect what ever is coming in and then the process might take some time to process all of them?