1

I am getting items from an upstream API which is quite slow. I try to speed this up by using TPL Dataflow to create multiple connections and bring these together, like this;

class Stuff
{
    int Id { get; }
}

async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
{
    var bagOfStuff = new ConcurrentBag<Stuff>();

    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        bagOfStuff.Add(await GetStuffById(id));
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    await processor.Completion;

    return bagOfStuff.ToArray();
}

The problem is that I have to wait until I have finished querying the entire collection of Stuff before I can return it to the caller. What I would prefer is that, whenever any of the multiple parallel queries returns an item, I return that item in a yield return fashion. Therefore I don't need to return an sync Task<IEnumerable<Stuff>>, I can just return an IEnumerable<Stuff> and the caller advances the iteration as soon as any items return.

I tried doing it like this;

IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        yield return await GetStuffById(id);
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    processor.Completion.Wait();

    yield break;
}

But I get an error

The yield statement cannot be used inside an anonymous method or lambda expression

How can I restructure my code?

Steztric
  • 2,832
  • 2
  • 24
  • 43

2 Answers2

2

You can return an IEnumerable, but to do so you must block your current thread. You need a TransformBlock to process the ids, and a feeder-task that will feed asynchronously the TransformBlock with ids. Finally the current thread will enter a blocking loop, waiting for produced stuff to yield:

static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
    using var completionCTS = new CancellationTokenSource();

    var processor = new TransformBlock<int, Stuff>(async id =>
    {
        return await GetStuffById(id);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5,
        BoundedCapacity = 50, // Avoid buffering millions of ids
        CancellationToken = completionCTS.Token
    });

    var feederTask = Task.Run(async () =>
    {
        try
        {
            foreach (int id in ids)
                if (!await processor.SendAsync(id)) break;
        }
        finally { processor.Complete(); }
    });

    try
    {
        while (processor.OutputAvailableAsync().Result)
            while (processor.TryReceive(out var stuff))
                yield return stuff;
    }
    finally // This runs when the caller exits the foreach loop
    {
        completionCTS.Cancel(); // Cancel the TransformBlock if it's still running
    }

    Task.WaitAll(feederTask, processor.Completion); // Propagate all exceptions
}

No ConcurrentBag is needed, since the TransformBlock has an internal output buffer. The tricky part is dealing with the case that the caller will abandon the enumeration of the IEnumerable<Stuff> by breaking early, or by being obstructed by an exception. In this case you don't want the feeder-task to keep pumping the IEnumerable<int> with the ids till the end. Fortunately there is a solution. Enclosing the yielding loop in a try/finally block allows a notification of this event to be received, so that the feeder-task can be terminated in a timely manner.

An alternative implementation could remove the need for a feeder-task by combining pumping the ids, feeding the block, and yielding stuff in a single loop. In this case you would want a lag between pumping and yielding. To achieve it, the MoreLinq's Lag (or Lead) extension method could be handy.


Update: Here is a different implementation, that enumerates and yields in the same loop. To achieve the desired lagging, the source enumerable is right-padded with some dummy elements, equal in number with the degree of concurrency.

This implementation accepts generic types, instead of int and Stuff.

public static IEnumerable<TResult> Transform<TSource, TResult>(
    IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
    int degreeOfConcurrency)
{
    var processor = new TransformBlock<TSource, TResult>(async item =>
    {
        return await taskFactory(item);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = degreeOfConcurrency
    });

    var paddedSource = source.Select(item => (item, true))
        .Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
    int index = -1;
    bool completed = false;
    foreach (var (item, hasValue) in paddedSource)
    {
        index++;
        if (hasValue) { processor.Post(item); }
        else if (!completed) { processor.Complete(); completed = true; }
        if (index >= degreeOfConcurrency)
        {
            if (!processor.OutputAvailableAsync().Result) break; // Blocking call
            if (!processor.TryReceive(out var result))
                throw new InvalidOperationException(); // Should never happen
            yield return result;
        }
    }
    processor.Completion.Wait();
}

Usage example:

IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);

Both implementations can be modified trivially to return an IAsyncEnumerable instead of IEnumerable, to avoid blocking the calling thread.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0

There's probably a few different ways you can handle this based on your specific use case. But to handle items as they come through in terms of TPL-Dataflow, you'd change your source block to a TransformBlock<,> and flow the items to another block to process your items. Note that now you can get rid of collecting ConcurrentBag and be sure to set EnsureOrdered to false if you don't care what order you receive your items in. Also link the blocks and propagate completion to ensure your pipeline finishes once all item are retrieved and subsequently processed.

class Stuff
{
    int Id { get; }
}

public class GetStuff
{
    async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

    async Task GetLotsOfStuff(IEnumerable<int> ids)
    {
        //var bagOfStuff = new ConcurrentBag<Stuff>();

        var options = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            EnsureOrdered = false
        };

        var processor = new TransformBlock<int, Stuff>(id => GetStuffById(id), options);

        var handler = new ActionBlock<Stuff>(s => throw new NotImplementedException());

        processor.LinkTo(handler, new DataflowLinkOptions() { PropagateCompletion = true });

        foreach (int id in ids)
        {
            processor.Post(id);
        }

        processor.Complete();
        await handler.Completion;
    }
}

Other options could be making your method an observable streaming out of the TransformBlock or using IAsyncEnumerable to yield return and async get method.

JSteward
  • 6,833
  • 2
  • 21
  • 30
  • Right! So I could effectively add the delegate for the handler as an argument to `GetLotsOfStuff`. Since the caller knows what it wants to do with each item, it can just pass in the delegate. – Steztric Oct 25 '19 at 16:00
  • Yeah that could work, if the caller knows what to do with each item – JSteward Oct 25 '19 at 16:07