-2

I'm creating a pipeline where you need to:

  • read from files and make transformations [TRANSFORM_1];
  • with the transformed data I have to do 2 procedures:
    • do another transformation (very expensive for the CPU) [TRANSFORM_2]
    • perform an action that I will need to do other things not important for the purpose of the question [ACTION_1].

Below is the map with how I thought of doing:

ReadingFiles() -> TRANSFORM1_ -> BROADCASTBLOCK -> TRANSFORM2_ -> ... -> _______________________________________________\ -> ACTION_1

Requirements

two points must be met:

  • Do not overload the available memory;
  • All messages sent must arrive at the final block.

Half Solution

To fulfill the first requirement I simply have set ExecutionDataflowBlockOptions with BoundedCapacity = n in the various blocks. However, by using a back pressure with BroadcastBlock there is no guarantee that all messages will be sent.

Question

if there are no other solutions, how is it possible to implement a class that creates a custom BroadCast Block? I would make a class that works like a GuaranteedBroadCastBlock but implements the IPropagatorBlock interface

at the moment I have only read a few examples where they create an ActionBlock that acts as BroadCast through a method, but I think the best thing is to create a custom class rather than a method

N.B. About first part of the pipeline (when reading from file and sending to TRANSFORM_1) I already know that you need to use await TRANSFORM_1.SendAsync () to ensure that all messages are sent to the first block. The problem is in the BroadCast Block which is sending the most recent.

NickMan
  • 23
  • 4
  • Are you properly awaiting the `SendAsync` tasks? You are not supposed to have more than a handful of `SendAsync` operations in-flight at the same time. – Theodor Zoulias Nov 17 '20 at 08:42
  • 2
    When all blocks have a bounded capacity it creates back pressure, and likely to the first block who should be using SendAsync, at that point the code blocks until something can be sent – TheGeneral Nov 17 '20 at 09:19
  • 1
    Anyway, it's hard to know what's going on without a minimal example – TheGeneral Nov 17 '20 at 09:22
  • but if within the pipeline we have a broadcastBlock, how do we guarantee the sending of all messages? – NickMan Nov 17 '20 at 10:56
  • 1
    Broadcast will drop messages if it hits back pressure. However, first we have to work out why you need a broadcast block, and if you really need it (which I'm skeptical of) then there are other options. However this would depend exactly on your design. Which we don't know – TheGeneral Nov 17 '20 at 11:25
  • I've just modified post – NickMan Nov 17 '20 at 14:53
  • All you have to do to meet your requirements is get rid of the `BroadcastBlock` and set abounded capacity on each block and you're good. What's the issue, why the broadcast block? – JSteward Nov 17 '20 at 17:13
  • Related: [BroadcastBlock with Guaranteed Delivery in TPL Dataflow](https://stackoverflow.com/questions/22127660/broadcastblock-with-guaranteed-delivery-in-tpl-dataflow). Do you just want to replace the `CreateGuaranteedBroadcastBlock` method with a nicely packaged `IPropagatorBlock` implementation, or you want some extra functionality that is not offered by the answer of the related question? – Theodor Zoulias Nov 17 '20 at 17:37
  • @Theodor The link you inserted in the comment was just one of the examples I read, but in this method you have to pass the IEnumerable of target. __Instead what I would like is to add when I want a Target__ – NickMan Nov 18 '20 at 08:24
  • Yeap, this is a reasonable requirement. I may post an answer to the [other question](https://stackoverflow.com/questions/22127660/broadcastblock-with-guaranteed-delivery-in-tpl-dataflow) later, because it has a more specific title. I'll leave a comment here too. – Theodor Zoulias Nov 18 '20 at 09:05
  • however If it extended the _IPropagatorBlock _ interface it would come out a really nice thing – NickMan Nov 18 '20 at 09:11
  • @JSteward _BroadCastBlock_ is absolutely necessary. The Producer is faster than the Consumer. What I would like is a custom class that works like _BroadCastBlock_ but contains the backPressure, then a class that extends _IPropagatorBlock_ – NickMan Nov 18 '20 at 11:03
  • NickMan I posted an implementation [here](https://stackoverflow.com/questions/22127660/broadcastblock-with-guaranteed-delivery-in-tpl-dataflow/64916045#64916045). – Theodor Zoulias Nov 19 '20 at 16:44

1 Answers1

-1

I thought about writing this class by taking a cue from other examples of classes and modifying something.

  • Opinions?
  • Is there something that would not fit?
  • More than inheriting the ITargetBlock interface, could I inherit the IPropagatorBlock <T, U> interface and improve the connection with other TargetBlocks?
  • is this class thread safe? if I created multiple instances of this class what could happen?

public class GuaranteedBroadcastBlock : ITargetBlock

{
    private readonly List<ITargetBlock<T>> _targets;
    private readonly ITargetBlock<T> block;

    public GuaranteedBroadcastBlock(DataflowBlockOptions options)
    {
        var task = new List<Task>();
        block = new ActionBlock<T>(
            async item =>
            {
                foreach (var target in _targets)
                {
                    task.Add(target.SendAsync(item));
                }
                await Task.WhenAll(task);
            }, new ExecutionDataflowBlockOptions
                   {
                       BoundedCapacity = options.BoundedCapacity,
                       CancellationToken = options.CancellationToken
                   });
        
    }

    public Task Completion => block.Completion.ContinueWith(task =>
    {
        foreach (var target in _targets)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    public void AddTarget(ITargetBlock<T> target) => _targets.Add(target);

    public void Complete() => block.Complete();

    public void Fault(Exception exception)
    {
        Console.WriteLine("Error in GuaranteedBroadcastBlock");
    }

    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
    {
        return block.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }

}
NickMan
  • 23
  • 4
  • SO is a Q&A site about specific programming questions. If you want to ask for a code review, post at codereview.stackexchange.com. Dataflow doesn't overflow memory. It already offers restricted DOP, bounded capacity and task recyclyng to produce back throttle requests, produce backpressure and prevent tasks from monopolizing cores. All you have to do is specify a BoundedCapacity, and pump messages at the head of the pipeline with `await headBlock.SendAsync` – Panagiotis Kanavos Nov 22 '20 at 17:46
  • So what you posted is unnecessary. Looks like the *only* thing you need is handling errors messages. The easy way to do it isn't disabling error handling and allowing catastrophic errors to go undetected. *Handle* the errors instead, and have your blocks propagate `Result` that can be either a Success or Error object. This is called [Railway Oriented Programming](https://fsharpforfunandprofit.com/rop/). In TPL Dataflow you can use the `LinkTo` predicate to redirect error messages to logger blocks instead of propagating them – Panagiotis Kanavos Nov 22 '20 at 17:50
  • PS: You don't need a "GuaranteedBroadcast" block either. In the linked answer, svick's answer is extremely simple - just a loop that posts to target blocks. You don't need anything else. Dataflow blocks are simple when used properly. The model you should have in mind is a command-shell pipeline, not a full-program-in-a-box – Panagiotis Kanavos Nov 23 '20 at 07:56
  • the program I use is done in winform because I want a graphic feedback and because there are other things inserted that are not related to the question I asked. What I wanted was to simply connect blocks when I wanted, and not all at once. And then @Theodor replied great – NickMan Nov 23 '20 at 10:05
  • Except there's no need for that code at all. Svick's code works just fine. And Winforms doesn't change how Dataflow works. You can have any block you want run on the UI thread by specifying the TaskScheduler option. What you ended up with is definitely not "simply" – Panagiotis Kanavos Nov 23 '20 at 10:47
  • Just look how simple svick's answer is and how complicated the other one is. Which one do you suppose works best? Which is easier to maintain? And why would anyone need all this code just to offer a message to target blocks/ – Panagiotis Kanavos Nov 23 '20 at 10:48
  • In any case you asked for opinions. SO isn't about opinions. This code is overcomplicated. I use Dataflow for far more complicated jobs, calling financial web services in a pipeline, parse the results, then call other services, collect everything and store them to the database. I have far greater need for "guaranteed delivery" and all that's needed to multicast is svick's simple loop. Just to be paranoid, I clone the messages. Errors are handled and propagated along the long pipeline by wrapping them in `Result` classes and routing them using `LinkTo`'s predicate argument – Panagiotis Kanavos Nov 23 '20 at 10:51