3

I'm trying to use a dataflowblock and I need to spy the items passing through for unit testing.

In order to do this, I'm using the AsObservable() method on ISourceBlock<T> of my TransformBlock<Tinput, T>, so I can check after execution that each block of my pipeline have generated the expected values.

Pipeline

{
   ...
   var observer = new MyObserver<string>();
   _block  = new TransformManyBlock<string, string>(MyHandler, options);
   _block.LinkTo(_nextBlock);
   _block.AsObservable().Subscribe(observer);
   _block.Post("Test");
   ...
}

MyObserver

public class MyObserver<T> : IObserver<T>
{
    public List<Exception> Errors = new List<Exception>();
    public bool IsComplete = false;
    public List<T> Values = new List<T>();

    public void OnCompleted()
    {
        IsComplete = true;
    }

    public void OnNext(T value)
    {
        Values.Add(value);
    }

    public void OnError(Exception e)
    {
        Errors.Add(e);
    }
}

So basically I subscribe my observer to the transformblock, and I expect that each value passing through get registered in my observer "values" list.

But, while the IsComplete is set to true, and the OnError() successfully register exception, the OnNext() method never get called unless it is the last block of the pipeline... I can't figure out why, because the "nextblock" linked to this sourceBlock successfully receive the data, proving that some data are exiting the block.

From what I understand, the AsObservable is supposed to report every values exiting the block and not only the values that have not been consumed by other linked blocks...

What am I doing wrong ?

colinD
  • 1,641
  • 1
  • 20
  • 22
Duagt
  • 31
  • 4
  • Related: [Using AsObservable to observe TPL Dataflow blocks without consuming messages](https://stackoverflow.com/questions/44579543/using-asobservable-to-observe-tpl-dataflow-blocks-without-consuming-messages) and also [How do I monitor progress in a TPL Dataflow mesh?](https://stackoverflow.com/questions/48554941/how-do-i-monitor-progress-in-a-tpl-dataflow-mesh) – Theodor Zoulias Aug 28 '20 at 11:12

1 Answers1

2

Your messages are being consumed by _nextBlock before you get a chance to read them.

If you comment out this line _block.LinkTo(_nextBlock); it would likely work.

AsObservable sole purpose is just to allow a block to be consumed from RX. It doesn't change the internal working of the block to broadcast messages to multiple targets. You need a special block for that BroadcastBlock

I would suggest broadcasting to another block and using that to Subscribe

BroadcastBlock’s mission in life is to enable all targets linked from the block to get a copy of every element published

var options = new DataflowLinkOptions {PropagateCompletion = true};


var broadcastBlock = new BroadcastBlock<string>(x => x);
var bufferBlock = new BufferBlock<string>();
var actionBlock = new ActionBlock<string>(s => Console.WriteLine("Action " + s));

broadcastBlock.LinkTo(bufferBlock, options);
broadcastBlock.LinkTo(actionBlock, options);

bufferBlock.AsObservable().Subscribe(s => Console.WriteLine("peek " + s));

for (var i = 0; i < 5; i++)
   await broadcastBlock.SendAsync(i.ToString());

broadcastBlock.Complete();
await actionBlock.Completion;

Output

peek 0
Action 0
Action 1
Action 2
Action 3
Action 4
peek 1
peek 2
peek 3
peek 4
TheGeneral
  • 79,002
  • 9
  • 103
  • 141
  • Yes Indeed. but from documentation i was thinking that AsObservable was acting like a dynamic broadcast block and was offering a way to spy messages without consuming them... Is it an intended behavior ? It really disturbs me to modify my pipeline in order to spy what happens in a block. – Duagt Aug 28 '20 at 10:16
  • @Duagt `AsObservable` purpose is just to allow a block to be consumed from **RX**. It doesn't change the internal working of the block to broadcast messages to multiple targets. You need a special block for that `BroadcastBlock` – TheGeneral Aug 28 '20 at 10:19