5

I have an IEnumerable<Task<T>> where T represents some event (the natural language type of event, not the event type of event).

I want to process these asynchronously because they are IO-bound, and limit the amount of concurrency, because the database handling the events can't handle more than a handfull (say 6) concurrent processing requests (they are quite heavy) What is the right strategy of doing this?

If I have

private Task processeventasync(T someevent) {
  ...
}

foreach(t in tasks) {
  await processeventsasync(await t)
}

I have no concurrency.

If I guard things with a semaphore, I'm actually guarding threads and protecting them with locks rather than awaiting them asynchronously.

The LimitedConcurrencyLevelTaskScheduler from the example on https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx is also a thread/lock based approach

I've considered mainting a queue of at most 6 tasks, and make an WhenAny loop around that, but it feels like re-inventing the square wheel.

private List<Task> running = new List<Task>();

foreach(Task<T> task in tasks) {
  var inner = TaskExtensions.Unwrap(t.ContinueWith(tt => processeventasync(tt.Result)));
  running.Add(inner);
  if (running.Count >= 6) {
    var resulttask = await Task.WhenAny(running); 
    running.Remove(resulttask);
    await resulttask;
    //not sure if this await will schedule the next iteration
    //of the loop asynchronously, or if the loop happily continues
    //and the continuation has the rest of the loop body (nothing
  }
}

What's the right way to go here?

EDIT:

SemaphoreSlims WaitAsync seems very reasonable for this. I'm coming to the following strange looking code:

    private async void Foo()
    {

        IEnumerable<Task<int>> tasks = gettasks();
        var resulttasks = tasks.Select(ti => TaskExtensions.Unwrap(ti.ContinueWith(tt => processeventasync(tt.Result))));
        var semaphore = new SemaphoreSlim(initialCount: 6);

        foreach (Task task in resulttasks)
        {
            await semaphore.WaitAsync();
            semaphore.Release();
        }
    }

Having async void here is rather smelly here, but it's an infinite loop; it will never return (actual processing would obviously have some cancellation mechanism).

It looks really strange with just the await/release in the body, but it looks like that's actually right. Is this a reasonable approach without hidden gotchas?

Martijn
  • 11,964
  • 12
  • 50
  • 96
  • The producer will produce an infinite amount of `Task` when iterated backed by `TaskCompletionSource`s. It is throtled by the pace of enumeration and nothing else. It's fundamentally non-blocking. – Martijn Jun 16 '15 at 09:45
  • 1
    http://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations http://stackoverflow.com/questions/20355931/limiting-the-amount-of-concurrent-tasks-in-net-4-5 http://stackoverflow.com/questions/23105748/limit-number-of-threads-in-task-parallel-library – Mauricio Scheffer Jun 16 '15 at 09:47
  • @oleksii In my experience, mixing blocking and non-blocking code will lead to problems down the road, and complicates things by requiring its own thread I can freely block without adverse effects on the rest of the system. – Martijn Jun 16 '15 at 09:49
  • @oleksii To clarify, I do want to throttle my consumer, but I want to throttle it by `await`ing, not by blocking – Martijn Jun 16 '15 at 10:04
  • @oleksii yes, it looks like `SemaphoreSlim`s `WaitAsync` operations fit the bill nicely. – Martijn Jun 16 '15 at 10:13
  • 1
    `I have an IEnumerable> where T represents some event` - are you sure `IEnumerable` is the correct abstraction? Sounds like `IObservable` may be better. – Stephen Cleary Jun 16 '15 at 12:16
  • Possibly. Thanks for pointing me in that direction @StephenCleary – Martijn Jun 16 '15 at 12:18
  • @Martijn: And Reactive Extensions (`IObservable`) has built-in support for throttling. – Stephen Cleary Jun 16 '15 at 12:29
  • I can see some of its appeal, but I'd rather have my difficulties in throttling here than the complexities in composition in that interface. Thanks for pointing it out though. – Martijn Jun 16 '15 at 16:37

2 Answers2

3

You can limit concurrency using SemaphoreSlim.WaitAsync.

It looks really strange with just the await/release in the body, but it looks like that's actually right

Your current approach doesn't really do anything. The tasks aren't effected by the SemaphoreSlim at all, since you concurrently invoke them using Enumerable.Select.

You'll need to monitor the semaphore inside the Select:

private const int ConcurrencyLimit = 6;
SemaphoreSlim semaphoreSlim = new SemaphoreSlim(ConcurrencyLimit);

public async Task FooAsync()
{
    var tasks = GetTasks();
    var sentTasks = tasks.Select(async task =>
    {
       await semaphoreSlim.WaitAsync();
       try
       {
          await ProcessEventAsync(await task);
       }
       finally
       {
           semaphoreSlim.Release();
       }
    });

    await Task.WhenAll(sentTasks);
}

private Task ProcessEventAsync(T someEvent) 
{
    // Process event.
}
Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • Thank you. is the `await Task.WhenAll(sentTasks)` safe here? It seems it would re-evaluate the IEnumerable which would generate its tasks again. The IEnumerable is also unbounded, so wouldn't this consume unbounded memory as well? And won't the await inside the try effictively throttle the inner task production to one task at a time? – Martijn Jun 16 '15 at 10:55
  • @Martijn The iterator will only be iterated once inside `WhenAll`. By unbounded, you mean that you have an infinite set of tasks returning from `GetTasks`? Regarding throttling, No. The first 6 tasks will enter the semaphore, and the inner task will be awaited. This means that you'll have 6 tasks concurrently waiting for the completion of their operation. – Yuval Itzchakov Jun 16 '15 at 11:01
  • Yes, given infinite time, GetTasks will lazily produce an infinite number of Tasks. Since the process probably won't be running for infinite time, I went for the word unbounded rather than infinite. – Martijn Jun 16 '15 at 11:07
  • @Martijn Well, in case you have an unbounded number of tasks, `WhenAll` will asynchronously wait until all of them have completed. If it is unbounded, that may take a while. Regarding memory, you should let the GC worry about that. If the tasks are lazily produces, this shouldn't be a problem. – Yuval Itzchakov Jun 16 '15 at 11:09
  • That's absolutely perfect. Let me verify it before I accept it as the answer though. – Martijn Jun 16 '15 at 11:20
  • It seems that WhenAll does indeed force total unthrottled enumeration of sentTasks. I still have to find out why though. – Martijn Jun 16 '15 at 12:22
  • @Martijn You say it doesn't throttle the requests at all? – Yuval Itzchakov Jun 16 '15 at 12:25
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/80667/discussion-between-martijn-and-yuval-itzchakov). – Martijn Jun 16 '15 at 12:26
1

You can use TPL Dataflow's ActionBlock<T>.

Define an action block that processes your events, and then post items to be processed to this block. You can also set the maximum degree of parallelism.

var block = new ActionBlock<string>(str =>
{
    //save in db
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 6
});

var sendings = new List<Task<bool>>
{
    block.SendAsync("a"),
    block.SendAsync("b"),
    block.SendAsync("c")
};

await Task.WhenAll(sendings);

block.Complete();       // tell the block we're done sending messages
await block.Completion; // wait for messages to be processed
dcastro
  • 66,540
  • 21
  • 145
  • 155