I have an existing transformation function Func<IAsyncEnumerable<TIn>, IAsyncEnumerable<TOut>>
that I want to use inside a Dataflow pipeline. In order to link this transformation function, I need the input side of this function to map to ITargetBlock<TIn>
and the output side to ISourceBlock<TOut>
.
To handle the input-side (target), I've used a BufferBlock<TIn>
with ToAsyncEnumerable()
extension shown here.
However I can't seem to get the output-side (source) in a shape that I want. I currently have the following:
IPropagatorBlock<TIn, TOut> Adapt<TIn, TOut>(
Func<IAsyncEnumerable<TIn>, IAsyncEnumerable<TOut>> transformation)
{
var target = new BufferBlock<TIn>(new DataflowBlockOptions { BoundedCapacity = 1024 });
var generator = transformation(target.ToAsyncEnumerable());
var source = new BufferBlock<TOut>(new DataflowBlockOptions { BoundedCapacity = 1024 });
var task = Task.Run(async () =>
{
await foreach (var item in generator)
{
await source.SendAsync(item);
}
source.Complete()
});
return DataflowBlock.Encapsulate(target, source);
}
My goal here is that the transformation
function can execute on an enumerated dataset (millions of database rows) without loading these entries in memory. I have currently implemented that using BoundedCapacity
, but performance takes a hit when doing so. I would prefer a solution without having to introduce BufferBlock
s.
What I do not like the additional BufferBlock
and the spawning of a Task
. Reading through Dataflow's source code, it seems like a lot of effort was put into avoiding creation of tasks. However that seems the primitive I need to iterate over the enumerable and make the entries flow through my transformation function.
Is there another obvious way to adapt my transformation function such that I can use it as a Dataflow Block (both consumer + producer)?