9

I have a chain of TPL Dataflow blocks and would like to observe progress somewhere inside the system.

I am aware that I could just jam a TransformBlock into the mesh where I want to observe, get it to post to a progress updater of some variety and then return the message unchanged to the next block. I don't love this solution as the block would be purely there for its side-effect and I would also have to change the block linking logic wherever I want to observe.

So I wondered if I could use ISourceBlock<T>.AsObservable to observe the passing of messages within the mesh without altering it and without consuming the messages. This seems both a purer and more practical solution, if it worked.

From my (limited) understanding of Rx that means that I need the observable to be hot rather than cold, so that my progress updater sees the message but doesn't consume it. And .Publish().RefCount() seems to be the way to make an observable hot. However, it simply does not work as intended - instead either block2 or progress receives and consumes each message.

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); 
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});

// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

Result is non-deterministic but I get something mixed like this:

block2:21
progress:22
progress:24
block2:23
progress:25

So, am I doing something wrong, or is this impossible due to the way the way TPL Dataflow AsObservable is implemented?

I realise I could also replace the LinkTo between block1 and block2 with an Observable/Observer pair and that might work, but LinkTo with downstream BoundedCapacity = 1 is the whole reason I'm using TPL Dataflow in the first place.

edit: A few clarifications:

  • I did intend to set BoundedCapacity=1 in block2. While it's unnecessary in this trivial example, the downstream-constrained case is where I find TPL Dataflow really useful.
  • To clarify the solution I rejected in my second paragraph, it would be to add the following block linked in between block1 and block2:

    var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});

  • I would also like to maintain back-pressure so that if a further-upstream block was distributing work to block1 and also other equivalent workers, it wouldn't send work to block1 if that chain was already busy.

theStrawMan
  • 235
  • 2
  • 9
  • Careful using `.Publish().RefCount()` as it can create observables that can run only once. Do you really need to share observers? – Enigmativity Jun 16 '17 at 04:49
  • @Enigmativity - I don't have a great understanding of `.Publish().RefCount()`, I just thought from reading around that it might make the observable 'hot' so that both the progress updater and `block2` see the throughput. `block2` needs to receive the data for computation - the simple `block2` in the example could be a stand-in for a whole chain of interlinked dataflow blocks to execute a computation. The observable of `block1` meanwhile is for progress updates i.e. reports eventually to a UI. – theStrawMan Jun 16 '17 at 05:07
  • It's not as simple as "hot" and "cold". Just keep in mind that `.Publish().RefCount()` allows for multiple observers of the same source stream and it has nothing to do with what the source is or how it gets its data. – Enigmativity Jun 16 '17 at 05:11
  • I don't fully understand, however multiple observers is indeed what I want, i.e. `block2` and `progress`. My guess however is that `block2` isn't treated as an `Observer` because its link to `block1` is not done in the RX way, but instead however TPL Dataflow implements `LinkTo` internally (and `AsObservable`). So we don't get successful `MultiCast` because it would need to be set up like that inside `block1`. Does that sound right? – theStrawMan Jun 16 '17 at 05:20
  • 1
    Yes, that sounds better. The issue is that the observable only has one observer in your code. The Dataflow blocks are not observers of your observable. The only observer you have is created in the `ForEachAsync` method call (which you should use `.Subscribe` instead). – Enigmativity Jun 16 '17 at 05:26
  • @theStrawMan did you try to set the blocks to non-greedy instead of limiting their capacity? This may be what you want, as block wouldn't ask for more messages if it's busy. – VMAtm Jun 16 '17 at 12:58
  • Non-greedy is the behaviour wanted, however it's only available on `GroupingDataflowBlockOptions` i.e. `JoinBlock` and `BatchBlock`. That design makes some sense to me, however if you put other types of block i.e. `TransformBlock` into your mesh before that, you lose the ability to transmit the non-greediness further back upstream. So I use `BoundedCapacity=1` (zero is not allowed). Basically TPL Dataflow puts buffers, buffers everywhere, by default, but in downstream-constrained backpressure use-case I only want a buffer in certain spots, definitely not others. – theStrawMan Jun 16 '17 at 22:31
  • @theStrawMan did you solve this in the end? – user4388177 Mar 08 '18 at 16:04
  • @user4388177 - It was a little while ago but I'm pretty sure I just used a hacky approach where I got the `LinkTo` predicate to post each message to an observable and then return true. To hide the impurity of this it might be possible to create an extension method `ObservableLinkTo` which creates the observable and calls the regular `LinkTo` at the same time. – theStrawMan Mar 09 '18 at 00:16
  • 1
    @theStrawMan thanks. I spoke to a friend and he's implementing few new block that will allow this, I'll tell him to post them here. – user4388177 Mar 09 '18 at 10:24
  • @theStrawMan Any reason you never accepted the answer below? Seems like it answers your question perfectly. – Todd Menier Aug 07 '18 at 16:08
  • @ToddMenier - I didn't because the proposed solution either drops messages or loses backpressure, which were both unacceptable to me. I ended up writing something custom that did it. PS: If I was starting this today I'd look at a Reactive Streams implementation like [Akka.NET Streams](https://getakka.net/articles/streams/introduction.html) instead - looks a big improvement over TPL Dataflow. – theStrawMan Aug 08 '18 at 00:33
  • @theStrawMan You're right - `BroadcastBlock` won't propagate data to targets that can't accept it immediately. I missed that. This is an excellent question, I'm where you were when you asked it - `AsObservable` feels like the perfect solution for publishing a stream of progress updates, but for the details. Sounds like you've moved on but if I come up with something that works and isn't too hacky I might post an answer, in case it helps others. – Todd Menier Aug 08 '18 at 22:48
  • @ToddMenier - yeah it does seems like a reasonable approach for tracking movement through the blocks, and the continual interest in this question means I should probably get round to posting a simplified version of what I did. I'll do that. – theStrawMan Aug 09 '18 at 01:27

4 Answers4

5

The issue with your code is that you're wiring up two consumers of block1. Dataflow is then just giving a value to which ever consumer is there first.

So you need to broadcast the values from block1 into two other blocks to then be able to consume those independently.

Just a side note, don't do .Publish().RefCount() as it doesn't do what you think. It will effectively make a one run only observable that during that one run will allow multiple observers to connect and see the same values. It has nothing to do with the source of the data nor how the Dataflow blocks interact.

Try this code:

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);

// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

That gives me:

block2:21
block2:22
block2:23
block2:24
block2:25
progress:21
progress:22
progress:23
progress:24
progress:25

Which is what I think you wanted.

Now, just as a further aside, using Rx for this might be a better option all around. It's much more powerful and declarative than any TPL or Dataflow option.

Your code boils down to this:

Observable
    .Range(1, 5)
    .Select(i => i + 20)
    .Do(i => Debug.Print("progress:" + i.ToString()));
    .Subscribe(i => Debug.Print("block2:" + i.ToString()));

That pretty much gives you same result.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • I think this is a reasonable approach, except that by introducing a buffer after the `BroadcastBlock` we lose all back-pressure in the system. I didn't specify that as a constraint on the question explicitly, however the intention of the `BoundedCapacity=1` in `block2` is to show that it's constrained. I might want to maintain back-pressure if e.g. this is just one of multiple processing paths, and upstream there is actually an option to send to other paths if this is full. By introducing a `BufferBlock` here, this path is now essentially Greedy and the work can't successfully be distributed. – theStrawMan Jun 16 '17 at 05:36
  • @theStrawMan - The buffer block is only there for the observable. It shouldn't affect the rest of your data flow. – Enigmativity Jun 16 '17 at 05:43
  • Ah sorry I misread that. I'll have to test your solution. I confess I don't actually know if there's a risk of `broadcast_block` dropping a message if `block2` rejects it - could the `broadcast_block`'s current message be displaced during that? I'd hope not but I'm not sure. If not then this is indeed a reasonable solution. – theStrawMan Jun 16 '17 at 05:52
  • (with `BoundedCapacity=1` added to `broadcast_block`) – theStrawMan Jun 16 '17 at 06:02
  • I just tested this and unfortunately it did drop messages. I thought `BroadcastBlock` might be implemented such that in a back-pressure situation it would pass back to its upstream blocks 'I'm full' and so they wouldn't send it another message yet, but no, that's not the case. – theStrawMan Jun 16 '17 at 06:22
  • @theStrawMan - Try using Rx. It creates back-pressure but doesn't drop values. – Enigmativity Jun 16 '17 at 07:03
  • Oh? My understanding was that Rx.NET [didn't have](https://github.com/Reactive-Extensions/Rx.NET/issues/19) backpressure, at least not to the point where the producer becomes aware of the downstream blockage and can adjust behaviour. Though that is getting a little away from my original question here. – theStrawMan Jun 16 '17 at 12:04
2

There are two options to consider when creating an observable dataflow block. You can either:

  1. emit a notification every time a message is processed, or
  2. emit a notification when a previously processed message stored in the block's output buffer, is accepted by a linked block.

Both options have pros and cons. The first option provides timely but unordered notifications. The second option provides ordered but delayed notifications, and also must deal with the disposability of the block-to-block linking. What should happen with the observable, when the link between the two blocks is manually disposed before the blocks are completed?

Below is an implementation of the first option, that creates a TransformBlock together with a non-consuming IObservable of this block. There is also an implementation for an ActionBlock equivalent, based on the first implementation (although it could also be implemented independently by copy-pasting and adapting the TransformBlock implementation, since the code is not that much).

public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var semaphore = new SemaphoreSlim(1);
    int startedIndexSeed = 0;
    int completedIndexSeed = 0;

    var notificationsBlock = new BufferBlock<(TInput, TOutput, int, int)>(
        new DataflowBlockOptions() { BoundedCapacity = 100 });

    var transformBlock = new TransformBlock<TInput, TOutput>(async item =>
    {
        var startedIndex = Interlocked.Increment(ref startedIndexSeed);
        var result = await transform(item).ConfigureAwait(false);
        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            // Send the notifications in synchronized fashion
            var completedIndex = Interlocked.Increment(ref completedIndexSeed);
            await notificationsBlock.SendAsync(
                (item, result, startedIndex, completedIndex)).ConfigureAwait(false);
        }
        finally
        {
            semaphore.Release();
        }
        return result;
    }, dataflowBlockOptions);

    _ = transformBlock.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)notificationsBlock).Fault(t.Exception);
        else notificationsBlock.Complete();
    }, TaskScheduler.Default);

    observable = notificationsBlock.AsObservable();
    // A dummy subscription to prevent buffering in case of no external subscription.
    observable.Subscribe(
        DataflowBlock.NullTarget<(TInput, TOutput, int, int)>().AsObserver());
    return transformBlock;
}

// Overload with synchronous lambda
public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableTransformBlock(item => Task.FromResult(transform(item)),
        out observable, dataflowBlockOptions);
}

// ActionBlock equivalent (requires the System.Reactive package)
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Func<TInput, Task> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateObservableTransformBlock<TInput, object>(
        async item => { await action(item).ConfigureAwait(false); return null; },
        out var sourceObservable, dataflowBlockOptions);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    observable = sourceObservable
        .Select(entry => (entry.Input, entry.StartedIndex, entry.CompletedIndex));
    return block;
}

// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Action<TInput> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableActionBlock(
        item => { action(item); return Task.CompletedTask; },
        out observable, dataflowBlockOptions);
}

Usage example in Windows Forms:

private async void Button1_Click(object sender, EventArgs e)
{
    var block = CreateObservableTransformBlock((int i) => i + 20,
        out var observable,
        new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

    var vals = Enumerable.Range(1, 20).ToList();
    TextBox1.Clear();
    ProgressBar1.Value = 0;

    observable.ObserveOn(SynchronizationContext.Current).Subscribe(onNext: x =>
    {
        TextBox1.AppendText($"Value {x.Input} transformed to {x.Output}\r\n");
        ProgressBar1.Value = (x.CompletedIndex * 100) / vals.Count;
    }, onError: ex =>
    {
        TextBox1.AppendText($"An exception occured: {ex.Message}\r\n");
    },
    onCompleted: () =>
    {
        TextBox1.AppendText("The job completed successfully\r\n");
    });

    block.LinkTo(DataflowBlock.NullTarget<int>());

    foreach (var i in vals) await block.SendAsync(i);
    block.Complete();
}

In the above example the type of the observable variable is:

IObservable<(int Input, int Output, int StartedIndex, int CompletedIndex)>

The two indices are 1-based.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0

Try replacing:

obs.ForEachAsync(i => Debug.Print("progressBlock:" + i.ToString()));

with:

obs.Subscribe(i => Debug.Print("progressBlock:" + i.ToString()));

I'd imagine the ForEachAsync method isn't hooking in properly / it's firing, but something funky is going on with the async portion.

Clint
  • 6,133
  • 2
  • 27
  • 48
  • Thanks for the suggestion, however I get exactly the same behaviour. – theStrawMan Jun 16 '17 at 03:54
  • Drats, seen @VMAtm's answer, perhaps you could decouple the two blocks, use your observable as the sole consumer, and then pipe the observed value into the second block? That way you can use the observed value as much as you like. – Clint Jun 16 '17 at 07:39
0

By specifying the BoundedCapacity for the block inside the chain you creating a situation where some of your messages are rejected by target blocks, as the buffer for ActionBlock is full, and message is being rejected.

With creating the observable from your buffer block you do provide a race condition: there are two consumers of your data getting messages simultaneously. Blocks in TPL Dataflow are propagating data to the first available consumer, which leads you to indeterministic state of an application.

Now, back to your problem. You can introduce a BroadcastBlock as it provides a copy of data to all the consumers, not the only first one, but in that case you have to remove the buffer size limitation, broadcast block is like a TV channel, you cannot get previous show, you only have a current one.

Side notes: you do not check the return value of Post method, you may consider the await SendAsync usage, and for better throttling effect set the BoundedCapacity for the starting point block, not for the middle one.

VMAtm
  • 27,943
  • 17
  • 79
  • 125
  • Thanks, the fact that my dataflow system is downstream-constrained was intentional - imo that's a use-case where the Dataflow library shines. I will add a note to that effect. For that reason I wouldn't removed the `BoundedCapacity=1` setting and `BroadcastBlock` wouldn't be appropriate. – theStrawMan Jun 16 '17 at 04:47
  • Still do not understand why do you need such restriction, can elaborate more on that? Dataflow add small overhead, if buffer is limited. – VMAtm Jun 16 '17 at 12:56