I have an IEnumerable<Task<T>>
where T
represents some event (the natural language type of event, not the event
type of event).
I want to process these asynchronously because they are IO-bound, and limit the amount of concurrency, because the database handling the events can't handle more than a handfull (say 6) concurrent processing requests (they are quite heavy) What is the right strategy of doing this?
If I have
private Task processeventasync(T someevent) {
...
}
foreach(t in tasks) {
await processeventsasync(await t)
}
I have no concurrency.
If I guard things with a semaphore, I'm actually guarding threads and protecting them with locks rather than awaiting them asynchronously.
The LimitedConcurrencyLevelTaskScheduler
from the example on https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx is also a thread/lock based approach
I've considered mainting a queue of at most 6 tasks, and make an WhenAny
loop around that, but it feels like re-inventing the square wheel.
private List<Task> running = new List<Task>();
foreach(Task<T> task in tasks) {
var inner = TaskExtensions.Unwrap(t.ContinueWith(tt => processeventasync(tt.Result)));
running.Add(inner);
if (running.Count >= 6) {
var resulttask = await Task.WhenAny(running);
running.Remove(resulttask);
await resulttask;
//not sure if this await will schedule the next iteration
//of the loop asynchronously, or if the loop happily continues
//and the continuation has the rest of the loop body (nothing
}
}
What's the right way to go here?
EDIT:
SemaphoreSlim
s WaitAsync
seems very reasonable for this. I'm coming to the following strange looking code:
private async void Foo()
{
IEnumerable<Task<int>> tasks = gettasks();
var resulttasks = tasks.Select(ti => TaskExtensions.Unwrap(ti.ContinueWith(tt => processeventasync(tt.Result))));
var semaphore = new SemaphoreSlim(initialCount: 6);
foreach (Task task in resulttasks)
{
await semaphore.WaitAsync();
semaphore.Release();
}
}
Having async void
here is rather smelly here, but it's an infinite loop; it will never return (actual processing would obviously have some cancellation mechanism).
It looks really strange with just the await/release in the body, but it looks like that's actually right. Is this a reasonable approach without hidden gotchas?