I'm trying to use a dataflowblock and I need to spy the items passing through for unit testing.
In order to do this, I'm using the AsObservable()
method on ISourceBlock<T>
of my TransformBlock<Tinput, T>
,
so I can check after execution that each block of my pipeline have generated the expected values.
Pipeline
{
...
var observer = new MyObserver<string>();
_block = new TransformManyBlock<string, string>(MyHandler, options);
_block.LinkTo(_nextBlock);
_block.AsObservable().Subscribe(observer);
_block.Post("Test");
...
}
MyObserver
public class MyObserver<T> : IObserver<T>
{
public List<Exception> Errors = new List<Exception>();
public bool IsComplete = false;
public List<T> Values = new List<T>();
public void OnCompleted()
{
IsComplete = true;
}
public void OnNext(T value)
{
Values.Add(value);
}
public void OnError(Exception e)
{
Errors.Add(e);
}
}
So basically I subscribe my observer to the transformblock, and I expect that each value passing through get registered in my observer "values" list.
But, while the IsComplete
is set to true, and the OnError()
successfully register exception,
the OnNext()
method never get called unless it is the last block of the pipeline...
I can't figure out why, because the "nextblock" linked to this sourceBlock successfully receive the data, proving that some data are exiting the block.
From what I understand, the AsObservable
is supposed to report every values exiting the block and not only the values that have not been consumed by other linked blocks...
What am I doing wrong ?