0

I would like to execute async operations in parallel, in Silverlight 5, with limited concurency.

My code is like :

    public async void btn_click(object s, RoutedEventArgs e)
    {
        await DoAllWork();
    }

    private async Task DoAllWork()
    {
        //Get work to do
        int[] wrk = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

        //Start the tasks
        Task[] tasks = new Task[wrk.Length];
        for (int i = 0; i < wrk.Length; i++)
        {
            int item = wrk[i];
            tasks[i] = WorkSingleItem(item);
        }
        await TaskEx.WhenAll(tasks);
    }

    private async Task WorkSingleItem(int item)
    {
        //a very long operation
        var response = await Request(item);
        await Handle(response);
    }

I have found this article : http://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx

How can I await my work method, that start all my long operations with the "limited concurrency scheduler", and with each item work not relying on synchronization context to avoid code executed in UI thread...

svick
  • 236,525
  • 50
  • 385
  • 514
Dede
  • 1,640
  • 2
  • 14
  • 24
  • The task WhenAny method would be a good fit. You can use it to start a collection of tasks and it reports when one of them completes. When that happens you can add another task. Wrapped in a while 'has more tasks' loop you should get the desired behaviour. I'm not at a workstation to put a sample together so will put one up later. – kidshaw Aug 20 '14 at 06:42
  • 1
    Your question is vague, you need to explain a bit more what you're trying to do. Is this work IO bound or CPU bound, what does "limited concurreny" mean to you? – Yuval Itzchakov Aug 20 '14 at 07:13
  • "limited concurency" = "number of parallel tasks active". If I have 20 items, I don't want to work on all 20 at once, but only 4 at a time, to avoid a ddos on my server. And I don't want the item handling code to use the synchronization context to avoid code execution in the UI thread. – Dede Aug 20 '14 at 07:18
  • Are these all cpu bound operations? – Yuval Itzchakov Aug 20 '14 at 07:37
  • You can do this with `ForEachAsync` from Stephen Toub, see http://stackoverflow.com/a/24628962/1239433 – NeddySpaghetti Aug 20 '14 at 11:20

2 Answers2

2

Since your long operations deal with I/O asynchronously, and the purpose of the limited concurrency is to avoid a DDoS, then a TaskScheduler is an incorrect solution. This is because a TaskScheduler only controls active tasks (running or blocked); when a task yields back to its scheduler via await, it's no longer considered "active". So, a TaskScheduler can't be used to prevent a DDoS if your I/O is asynchronous.

The correct solution is to use something like an async-compatible semaphore:

public async void btn_click(object s, RoutedEventArgs e)
{
  await Task.Run(() => DoAllWork());
}

private async Task DoAllWork()
{
  int[] wrk = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
  var semaphore = new AsyncSemaphore(4);

  var tasks = wrk.Select(x => WorkSingleItem(x, semaphore));

  await TaskEx.WhenAll(tasks);
}

private async Task WorkSingleItem(int item, AsyncSemaphore semaphore)
{
  await semaphore.WaitAsync();
  try
  {
    var response = await Request(item);
    await Handle(response);
  }
  finally
  {
    semaphore.Release();
  }
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • I find that this pattern comes up enough that it's actually worth abstracting this logic out into it's own class (as seen in my answer) to allow you to both separate out the concerns and therefore reduce the complexity of each method, and also allows you to use it on operations that you can't control the source of. – Servy Aug 22 '14 at 17:08
  • I like that abstraction. In my own experience, once someone needs a limited concurrency queue, they're usually better off just switching to TPL Dataflow. Not for a simple one-off problem like this, of course, but I find that if this *part* of the code needs a queue, then the nearby parts of the code would benefit from a pipeline. – Stephen Cleary Aug 22 '14 at 17:22
  • Sure. Probably the one exceptional case that such a queue can handle is that TPL data flow is really designed for rate limiting a single operation, whereas such a queue could be passed around and have a handful of separate operations be rate limited together, without going to the global route of using a `SynchronizationContext` that rate limits parallelization for *everything* (which is also a useful tool). – Servy Aug 22 '14 at 17:25
0

You can create a type of queue that will only allow a certain number of the given tasks to be executing at one time:

public class FixedParallelismQueue
{
    private SemaphoreSlim semaphore;
    public FixedParallelismQueue(int maxDegreesOfParallelism)
    {
        semaphore = new SemaphoreSlim(maxDegreesOfParallelism,
            maxDegreesOfParallelism);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

This would let you write:

private Task DoAllWork()
{
    int[] work = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

    var queue = new FixedParallelismQueue(maxDegreesOfParallelism);
    var tasks = work.Select(n => queue.Enqueue(() => WorkSingleItem(n));
    return TaskEx.WhenAll(tasks);
}
Servy
  • 202,030
  • 26
  • 332
  • 449