I am getting items from an upstream API which is quite slow. I try to speed this up by using TPL Dataflow to create multiple connections and bring these together, like this;
class Stuff
{
int Id { get; }
}
async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();
async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
{
var bagOfStuff = new ConcurrentBag<Stuff>();
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
};
var processor = new ActionBlock<int>(async id =>
{
bagOfStuff.Add(await GetStuffById(id));
}, options);
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
await processor.Completion;
return bagOfStuff.ToArray();
}
The problem is that I have to wait until I have finished querying the entire collection of Stuff
before I can return it to the caller. What I would prefer is that, whenever any of the multiple parallel queries returns an item, I return that item in a yield return
fashion. Therefore I don't need to return an sync Task<IEnumerable<Stuff>>
, I can just return an IEnumerable<Stuff>
and the caller advances the iteration as soon as any items return.
I tried doing it like this;
IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5
};
var processor = new ActionBlock<int>(async id =>
{
yield return await GetStuffById(id);
}, options);
foreach (int id in ids)
{
processor.Post(id);
}
processor.Complete();
processor.Completion.Wait();
yield break;
}
But I get an error
The yield statement cannot be used inside an anonymous method or lambda expression
How can I restructure my code?