1

I receive a sequence of objects (e.g., ItemGroup[]) that each contains multiple jobs (e.g., Item[]) and a max degree of parallelism value, for example:

public record Item(string Name);
public record ItemGroup(Item[] Items, int MaxDegreeOfParallelism);

The sequence of ItemGroup instances must be processed sequentially, but each ItemGroup may have a max degree of parallelism higher than 1. For example, the pipeline will process the group of A* items sequentially, then process the group of B* items concurrently:

var groups = new[]
{
    new ItemGroup(new[] { new Item("A0"), new Item("A1"), new Item("A2") }, 1),
    new ItemGroup(new[] { new Item("B0"), new Item("B1"), new Item("B2") }, 3)
};

I thought a custom TransformManyBlock implementation derived from IPropagatorBlock<ItemGroup, Item> would be a good choice, but I am not clear how to properly wait on the TransformManyBlock dynamically created internally as the producer posts ItemGroup instances to it.

Can anyone guide me here?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Ritmo2k
  • 993
  • 6
  • 17
  • 1
    If you don't have to preserve the order of groups, a simple solution would be to create multiple transform blocks, each with a different DOP, and use `LinkTo`'s predicate to route each message to the correct block. In the simplest case you can create a fixed number of such blocks in advance. In a more complex scenario you could redirect any blocks that don't match the predicates to a block that checks the message's DOP, creates a new TransformBlock with the new DOP, links it to the others, and then posts the pending message to the new block. – Panagiotis Kanavos Dec 12 '22 at 16:13
  • 1
    @PanagiotisKanavos if you use this one-to-many-to-one pattern, and there is `BoundedCapacity` involved, you should be careful not to trigger [this](https://github.com/dotnet/runtime/issues/31513 "Data loss during parallelization of BufferBlock") bug. The way to avoid it is to configure all the transform blocks with `EnsureOrdered = false`. – Theodor Zoulias Dec 13 '22 at 12:41

1 Answers1

0

You could create an inner TransformBlock<Item, Item> for each ItemGroup received. Below is a generalized solution with TInput, TChild and TOutput generic parameters. The TInput corresponds to a ItemGroup, the TChild corresponds to a Item, and the TOutput is also Item since you propagate the items without transforming them:

public static IPropagatorBlock<TInput, TOutput> CreateTransformManyDynamicBlock
    <TInput, TChild, TOutput>(
    Func<TInput, IEnumerable<TChild>> childrenSelector,
    Func<TInput, int> degreeOfParallelismSelector,
    Func<TChild, TOutput> transformChild)
{
    ArgumentNullException.ThrowIfNull(childrenSelector);
    ArgumentNullException.ThrowIfNull(degreeOfParallelismSelector);
    ArgumentNullException.ThrowIfNull(transformChild);

    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        TransformBlock<TChild, TOutput> innerBlock = new(transformChild, new()
        {
            MaxDegreeOfParallelism = degreeOfParallelismSelector(input)
        });
        foreach (var child in childrenSelector(input))
        {
            bool accepted = innerBlock.Post(child);
            if (!accepted) break; // The innerBlock has failed
        }
        innerBlock.Complete();

        // Propagate the results
        List<TOutput> results = new();
        while (await innerBlock.OutputAvailableAsync().ConfigureAwait(false))
            while (innerBlock.TryReceive(out TOutput result))
                results.Add(result);
        try { await innerBlock.Completion.ConfigureAwait(false); }
        catch when (innerBlock.Completion.IsCanceled) { throw; }
        catch { innerBlock.Completion.Wait(); } // Propagate AggregateException
        return results;
    });
}

Usage example:

IPropagatorBlock<ItemGroup, Item> block =
    CreateTransformManyDynamicBlock<ItemGroup, Item, Item>(
        x => x.Items, x => x.MaxDegreeOfParallelism, x => x);

Note: The above code has not been tested.


Update: My original implementation (revision 1) was based on the .NET 6 API ReceiveAllAsync, and the TransformManyBlock constructor that takes a Func<TInput,IAsyncEnumerable<TOutput>> argument (.NET 7). The problem was that the ReceiveAllAsync doesn't propagate the exception of the enumerated dataflow block, so I switched to collecting and propagating manually the results, by filling a List<TOutput> as shown in this answer.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thank you so much for the incredible detail, it was very enlightening. This was far simpler than what I thought was required. If I link the transform block created above to a buffer block with propagation, and post a bunch of groups to it, after I call complete() on the transform block the buffer is never signaled. Do you have any idea why? – Ritmo2k Dec 11 '22 at 18:06
  • 1
    Ignore the completion issue, it was my mistake. Thank you again! – Ritmo2k Dec 11 '22 at 18:12
  • @Ritmo2k most likely you linked the blocks without the `PropagateCompletion` option. :-) – Theodor Zoulias Dec 11 '22 at 18:36
  • 1
    Btw I have opened [an issue](https://github.com/dotnet/runtime/issues/79535 "ReceiveAllAsync, ReadAllAsync and propagation of errors") on GitHub about the behavior of the `ReceiveAllAsync` API. – Theodor Zoulias Dec 13 '22 at 12:45