1

I have an existing transformation function Func<IAsyncEnumerable<TIn>, IAsyncEnumerable<TOut>> that I want to use inside a Dataflow pipeline. In order to link this transformation function, I need the input side of this function to map to ITargetBlock<TIn> and the output side to ISourceBlock<TOut>.

To handle the input-side (target), I've used a BufferBlock<TIn> with ToAsyncEnumerable() extension shown here.

However I can't seem to get the output-side (source) in a shape that I want. I currently have the following:

IPropagatorBlock<TIn, TOut> Adapt<TIn, TOut>(
    Func<IAsyncEnumerable<TIn>, IAsyncEnumerable<TOut>> transformation)
{
    var target = new BufferBlock<TIn>(new DataflowBlockOptions { BoundedCapacity = 1024 });
    var generator = transformation(target.ToAsyncEnumerable());
    var source = new BufferBlock<TOut>(new DataflowBlockOptions { BoundedCapacity = 1024 });
    var task = Task.Run(async () =>
    {
        await foreach (var item in generator)
        {
            await source.SendAsync(item);
        }
        source.Complete()
    });
    return DataflowBlock.Encapsulate(target, source);
}

My goal here is that the transformation function can execute on an enumerated dataset (millions of database rows) without loading these entries in memory. I have currently implemented that using BoundedCapacity, but performance takes a hit when doing so. I would prefer a solution without having to introduce BufferBlocks.

What I do not like the additional BufferBlock and the spawning of a Task. Reading through Dataflow's source code, it seems like a lot of effort was put into avoiding creation of tasks. However that seems the primitive I need to iterate over the enumerable and make the entries flow through my transformation function.

Is there another obvious way to adapt my transformation function such that I can use it as a Dataflow Block (both consumer + producer)?

Bouke
  • 11,768
  • 7
  • 68
  • 102
  • *"I need to adapt this function to `Func, ISourceBlock>`."* -- The `Adapt` method has a return type of `IPropagatorBlock` though. How is the `Func, ISourceBlock>` type relevant? – Theodor Zoulias Sep 17 '22 at 10:38
  • 1
    The func-part isn't really related to the question, but more to the adapter method. In the end I want the adapted method to participate in the Dataflow pipeline, so `IPropagatorBlock` will do. I have clarified this in the question. – Bouke Sep 19 '22 at 06:37
  • *"This allows me to iterate over the enumerator and not having to enumerate all entries, which would cause memory issues for large sets."* -- Could you elaborate on this? About which enumerator are you talking about? As far as I can see the two `BufferBlock`s are not configured with [`BoundedCapacity`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblockoptions.boundedcapacity), so they could both end up with millions of messages in their internal queues. – Theodor Zoulias Sep 19 '22 at 07:15
  • The example is indeed missing `BoundedCapacity`, which is needed to limit the number of items loaded. I have updated the example to show this. The `BufferBlock` is also a part that I don't like about this solution; I'm looking for a way to forgo buffering. – Bouke Sep 19 '22 at 12:34
  • 2
    I've been thinking about this, And this seems to be a XY problem. You're asking us to fix your solution to an underlying problem. However, due to the choices you made and because you're not telling us the underlying problem, we're in the same corner as where you are. In it's current state, there's no other option then spawning a task. Maybe you should tell us about the underlying problem, so we can think of an alternative approach. – JHBonarius Sep 19 '22 at 13:04
  • I don't think so? I want to use an `IAsyncEnumerable` as a `ISourceBlock`. The underlying problem I'm trying to solve is that I have some methods that take an `IAsyncEnumerable` (target) and produce an `IAsyncEnumerable` (source). I want to use those existing methods in my Dataflow pipeline, without having to modify these methods. So how can I use these methods in my Dataflow pipeline? – Bouke Sep 19 '22 at 13:09
  • Maybe that's an underlying problem. The Dataflow blocks work on items. But an `IAsyncEnumerable` is not an item. It's an asynchronous `IEnumerable`. It's not enumerated yet, so there are no items. there is nothing to process or buffer. It seems you are trying to mix two distinct ways of approaching parallel processing. I don't think they mix well – JHBonarius Sep 19 '22 at 13:47
  • @Bouke what are you *actually* trying to do? What is the concrete problem? Both IAsyncEnumerable and Dataflow blocks can be used to process *items* in a pipeline. What are you trying to adapt with this code? Why not just use a TransformBlock? You can use [ReceiveAllAsync](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.receiveallasync?view=net-6.0#system-threading-tasks-dataflow-dataflowblock-receiveallasync-1(system-threading-tasks-dataflow-ireceivablesourceblock((-0))-system-threading-cancellationtoken)) to read a block's output using an IAsyncEnum. – Panagiotis Kanavos Sep 20 '22 at 16:29
  • ... but your method returns just a block. What is the `transformation` method trying to do? Unless it tries to batch items by number or using a window, it shouldn't need `IAsyncEnumerable` – Panagiotis Kanavos Sep 20 '22 at 16:31
  • 1
    @Bouke another missing part - who's going to create the worker tasks that process the items inside `transformation`? And who's going to ensure the input/output order is maintained? This method is asking `transform` to reimplement a *lot* of Dataflow functionality – Panagiotis Kanavos Sep 20 '22 at 16:33
  • 1
    @PanagiotisKanavos the worker task is shown in the question. It's the `Task.Run` that pulls the `IAsyncEnumerable`, and pushes the pulled items in the `source` block. The output order is the same order that the items are pulled out from the `IAsyncEnumerable`. – Theodor Zoulias Sep 20 '22 at 18:06
  • @TheodorZoulias that's not what I asked at all. I asked what does `transformation` do? Why would a dataflow block process and produce one stream at a time? Why not use a TransformManyBlock instead? Valid reasons would be to batch by count or a window (sliding, tumbling, time based etc). There are ways to do all this with Dataflow blocks. – Panagiotis Kanavos Sep 21 '22 at 06:04
  • If the question is just to adapt an IAsyncEnumerable to a Dataflow block or vice versa, there's no reason for `transform` at all. `DataflowBlock.ReceiveAllAscync` will produce an `IAsyncEnumerable`. An `await foreach` that posts to an `ITargetBlock` will connect an IAsyncEnumerable to any block. – Panagiotis Kanavos Sep 21 '22 at 06:07
  • 1
    @PanagiotisKanavos my previous comment was a response to [the last](https://stackoverflow.com/questions/73753945?noredirect=1#comment130297661_73753945) of your 3 comments, not to all of them. Regarding what does the `transformation` do, the OP has clarified that it's not doing one specific thing. They have many `transformation`s that they could reuse, and they prefer to reuse them instead of reimplementing the same functionality from scratch for dataflow. For example `var block = Adapt(seq => seq.MyAggregate());`, where `MyAggregate` is a System.Linq.Async style operator. It's reasonable. – Theodor Zoulias Sep 21 '22 at 06:59

2 Answers2

1

I think there's a fundamental problem with what you are trying to do. Both Dataflow and AsyncEnumerable are distinct Task-based asynchronous programming methods, which don't combine well.

Dataflow blocks work on data. But an AsyncEnumerable is not data: it represents an interface to asynchronously retrieve data, but it itself is not data. So using an AsyncEnumerable within a Dataflow block doesn't make much sense. This is what you encounter in your solution: in order to get the data you have to enummerate the AsyncEnumerable, process it, and pass it on as AsyncEnumerable. You would always need to have some inner Task to handles asynchronous enumeration for you. And in the end you're not using any of the advantage/features the Dataflow approach, so it's redundant. It actually only adds unnecessary noise and ballast.

Better just to use the transformation directly on the AsyncEnumerable.

However, If you still really want to use Dataflow and the source is an AsyncEnumerable, you would have to make a AsyncEnumerable-to-Dataflow adapter (the opposite block of what's in your link), process the data item-by-item in the Dataflow and finally use the Dataflow-to-AsyncEnumerable adapter in your link.

JHBonarius
  • 10,824
  • 3
  • 22
  • 41
  • How would I use this? I probably still need to wrap this inside a `Task.Run` to perform the plumbing into the next Dataflow block? – Bouke Sep 19 '22 at 13:35
  • @Bouke thought about this overnight and edited my answer. – JHBonarius Sep 20 '22 at 06:56
  • 1
    [DataflowBlock.ReceiveAllAsync](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.receiveallasync?view=net-6.0#system-threading-tasks-dataflow-dataflowblock-receiveallasync-1(system-threading-tasks-dataflow-ireceivablesourceblock((-0))-system-threading-cancellationtoken)) returns an `IAsyncEnumerable<>` so the second adapter is available out of the box. This works for any `ISourceBlock`-derived block, not just buffer blocks. – Panagiotis Kanavos Sep 21 '22 at 14:01
0

I don't think that it's possible to implement the Adapt method using only one dataflow block. The ITargetBlock<TIn> and the ISourceBlock<TOut> facades could be the same block only if the TPL Dataflow library offered a built-in block with the exact functionality that you are trying to implement. But there is no built-in block that can be initialized with a Func<IAsyncEnumerable<TIn>, IAsyncEnumerable<TOut>> lambda. The closest block available will be the TransformManyBlock<TIn, TOut> with Func<TIn, IAsyncEnumerable<TOut>> parameter, which is expected with the upcoming .NET 7, and could not be used for this purpose either.

My suggestion is to take care of propagating the exception that might be received by the ITargetBlock<TIn> facade, originated from the linked dataflow block upstream. For this purpose you can use the Fault method.

Also I would suggest to replace the fire-and-forget Task.Run with an async void operation that is invoked on the ThreadPool. This way if the unthinkable happens and the code in the catch block fails, the error will be surfaced immediately as an unhandled exception that will crash the process, instead of just leaving the process in a hanging state.

IPropagatorBlock<TIn, TOut> Adapt<TIn, TOut>(
    Func<IAsyncEnumerable<TIn>, IAsyncEnumerable<TOut>> transformation)
{
    DataflowBlockOptions options = new() { BoundedCapacity = 1024 };
    BufferBlock<TIn> target = new(options);
    IAsyncEnumerable<TOut> output = transformation(target.ToAsyncEnumerable());
    BufferBlock<TOut> source = new(options);
    ThreadPool.QueueUserWorkItem(async _ =>
    {
        try
        {
            await foreach (TOut item in output)
            {
                await source.SendAsync(item);
            }
            source.Complete();
        }
        catch (Exception ex)
        {
            ((IDataflowBlock)source).Fault(ex);
        }
    });
    return DataflowBlock.Encapsulate(target, source);
}

As a side-note, the TPL Dataflow library is intended for "coarse-grained dataflow and pipelining tasks" (citation). The overhead of passing messages from block to block is not negligible, so if your workload is too granular (lightweight) it is advised to chunkify it by using the Chunk LINQ operator (or other similar means) at the entry point of the pipeline.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104