2

Let's say I want to download 1000 recipes from a website. The websites accepts at most 10 concurrent connections. Each recipe should be stored in an array, at its corresponding index. (I don't want to send the array to the DownloadRecipe method.)

Technically, I've already solved the problem, but I would like to know if there is an even cleaner way to use async/await or something else to achieve it?

    static async Task MainAsync()
    {
        int recipeCount = 1000;
        int connectionCount = 10;
        string[] recipes = new string[recipeCount];
        Task<string>[] tasks = new Task<string>[connectionCount];
        int r = 0;

        while (r < recipeCount)
        {
            for (int t = 0; t < tasks.Length; t++)
            {
                tasks[t] = Task.Run(async () => recipes[r] = await DownloadRecipe(r));
                r++;
            }

            await Task.WhenAll(tasks);
        }
    }

    static async Task<string> DownloadRecipe(int index)
    {
        // ... await calls to download recipe
    }

Also, this solution it's not optimal, since it doesn't bother starting a new download until all the 10 running downloads are finished. Is there something we can improve there without bloating the code too much? A thread pool limited to 10 threads?

Daniel Johansson
  • 740
  • 6
  • 10
  • I'm not sure that it will help you, but since C# 8.0 you can use asynchronouse iterations and async foreach loops to handle that https://learn.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8 Maybe it will help you realize what you want easier – Ivan Khorin Jan 11 '21 at 00:47
  • 1
    I think that your solution works only because the `recipeCount` is divisible by the `connectionCount`. Otherwise an argument-out-of-range exception would occur here: `recipes[r] = await...`. Your solution is probably also susceptible to this problem: [Captured variable in a loop in C#](https://stackoverflow.com/questions/271440/captured-variable-in-a-loop-in-c-sharp), regarding the variable `r`. – Theodor Zoulias Jan 11 '21 at 02:57
  • 1
    @TheodorZoulias Yes, you're right. I rewrote the solution a little bit to make it shorter and clearly show the code I want to improve. Luckily for me, neither of those problems exist in my longer solution. Thanks for pointing out! – Daniel Johansson Jan 11 '21 at 12:01

2 Answers2

2

There are many many ways you could do this. One way is to use an ActionBlock which give you access to MaxDegreeOfParallelism fairly easily and will work well with async methods

static async Task MainAsync()
{
   var recipeCount = 1000;
   var connectionCount = 10;
   var recipes = new string[recipeCount];

   async Task Action(int i) => recipes[i] = await DownloadRecipe(i);
   
   var processor = new ActionBlock<int>(Action, new ExecutionDataflowBlockOptions()
   {
      MaxDegreeOfParallelism = connectionCount,
      SingleProducerConstrained = true
   });

   for (var i = 0; i < recipeCount; i++)
      await processor.SendAsync(i);

   processor.Complete();
   await processor.Completion;
}

static async Task<string> DownloadRecipe(int index)
{
   ...
}

Another way might be to use a SemaphoreSlim

var slim = new SemaphoreSlim(connectionCount, connectionCount);

var tasks = Enumerable
   .Range(0, recipeCount)
   .Select(Selector);
   
async Task<string> Selector(int i)
{
   await slim.WaitAsync()
   try
   {
      return await DownloadRecipe(i)
   }
   finally
   {
      slim.Release();
   }
}

var recipes = await Task.WhenAll(tasks);

Another set of approaches is to use Reactive Extensions (Rx)... Once again there are many ways to do this, this is just an awaitable approach (and likely could be better all things considered)

var results = await Enumerable
        .Range(0, recipeCount)
        .ToObservable()
        .Select(i => Observable.FromAsync(() => DownloadRecipe(i)))
        .Merge(connectionCount)
        .ToArray()
        .ToTask();
TheGeneral
  • 79,002
  • 9
  • 103
  • 141
  • @00110001 In an effort to simplify the question I didn't include the fact that my "recipes" array is actually multi-dimensional. I love the ActionBlock solution, but is it possible to use it with something like `async Task Action(int i, int j) => recipes[i, j] = await DownloadRecipe(i, j);` – Daniel Johansson Jan 11 '21 at 22:52
  • I managed to solve the multiple parameter problem above with a `Tuple`. But I also have some additional requirements that I forgot in my question: After every 100 recipes downloaded, I would like to run some custom code that serializes the array to a file. Is there a way to get another method call in there every 100 downloads? – Daniel Johansson Jan 11 '21 at 23:13
  • @DanielJohansson yes to the first question, yup tuple would be the easiest solution. You can create bigger pipe lines with TPL dataflow. Ie you could invisage going from a transformBlock to a BatchBlock to an ActionBlock. Id be happy to answer, however its a little beyond the scope to this question. Ping me if you ask another one – TheGeneral Jan 11 '21 at 23:21
  • @DanielJohansson There are many TPL resources, take a look at this one https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline though in relatity once you know the concepts they are fairly easy to plumb together – TheGeneral Jan 11 '21 at 23:22
1

Alternative approach to have 10 "pools" which will load data "simultaneously".

You don't need to wrap IO operations with the separate thread. Using separate thread for IO operations is just a waste of resources.
Notice that thread which downloads data will do nothing, but just waiting for a response. This is where async-await approach come very handy - we can send multiple requests without waiting them to complete and without wasting threads.

static async Task MainAsync()
{
    var requests = Enumerable.Range(0, 1000).ToArray();
    var maxConnections = 10;
    var pools = requests
        .GroupBy(i => i % maxConnections)
        .Select(group => DownloadRecipesFor(group.ToArray()))
        .ToArray();

    await Task.WhenAll(pools);

    var recipes = pools.SelectMany(pool => pool.Result).ToArray();
}

static async Task<IEnumerable<string>> DownLoadRecipesFor(params int[] requests)
{
    var recipes = new List<string>();
    foreach (var request in requests)
    {
        var recipe = await DownloadRecipe(request);
        recipes.Add(recipe);
    }

    return recipes;
}

Because inside the pool (DownloadRecipesFor method) we download results one by one - we make sure that we have no more than 10 active requests all the time.

This is little bit more effective than originals, because we don't wait for 10 tasks to complete before starting next "bunch".
This is not ideal, because if last "pool" finishes early then others it aren't able to pickup next request to handle.

Final result will have corresponding indexes, because we will process "pools" and requests inside in same order as we created them.

Fabio
  • 31,528
  • 4
  • 33
  • 72
  • 1
    @TheodorZoulias, ideal approach will be one with `SemaphoreSlim`, because this is the way where we have 10 active tasks and when one of the tasks is complete we start another one. – Fabio Jan 11 '21 at 22:00