0

I have a paged Rest API and want to receive all records. Currently I loop until all records are received synchronously.

Now I want to change the behaviour due to a long time until all records are received. What I want to do, is to start 10 paged requests assigned to a List<Task> and when a request is finished start the next request. So that max 10 requests are running in parallel. My current code looks like:

protected async Task<List<T>> GetAll<T, TListObject>() where TListObject : ApiListModel<T> {
            var limit = this.GetRequestLimit();
            var token = this.GetBearerToken();
            var maxConcurrent = _config.GetValue<int>("Blackduck:MaxConcurrentRequests");

            var list = new List<T>();
            var tasks = new List<Task<TListObject>>();

            //Request first page to get total count
            var resp = await this.GetPage<TListObject>(0, limit);
            list.AddRange(resp.Items);
            var total = resp.TotalCount;

            for (int i = 0; i < maxConcurrent; i++)
            {
                tasks.Add(this.GetPage<TListObject>(0, 0)); // TODO: 0, 0 should be replaced with the offset and limit
            }

            TListObject result;
            while (list.Count < total || (result = Task.WhenAny<TListObject>(tasks)))
            {
                
            }

            return list;
        }

But now I can't use Task.WhenAny<T>() in a while to start the next run until all records are received. Has anybody an idea how to start the next page?

BR

Gerrit
  • 2,515
  • 5
  • 38
  • 63
  • 1
    You can use [Semaphore(Slim)](https://stackoverflow.com/questions/20056727/need-to-understand-the-usage-of-semaphoreslim) – Eldar Apr 03 '21 at 10:49
  • Related: [How to limit the amount of concurrent async I/O operations?](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations) A new API [`Parallel.ForEachAsync`](https://github.com/dotnet/runtime/issues/1946) is probably going to be available with the next major .NET release, but for now you have to do the throttling manually, or use some third-party library. – Theodor Zoulias Apr 03 '21 at 12:27

3 Answers3

2

So that max 10 requests are running in parallel.

To run asynchronous code concurrently, use SemaphoreSlim and Task.WhenAll:

var mutex = new SemaphoreSlim(10);
for (int i = 0; i < maxConcurrent; i++)
{
  tasks.Add(ThrottledGetPage(offset, limit));
}

var results = await Task.WhenAll(tasks);

async Task<T> ThrottledGetPage(int offset, int limit)
{
  await mutex.WaitAsync();
  try { return await this.GetPage<TListObject>(offset, limit); }
  finally { mutex.Release(); }
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
0

First of all, make a generator iterator method (one of those methods that return IEnumerator<T> and use yield) that create tasks, it will make your life easier.

Something along the lines of:

IEnumerator<Task<TListObject>> taskGenerator()
{
    // TODO: add termination condition, `total` is it?
    while (true)
    {
        // TODO: 0, 0 should be replaced with the offset and limit
        yield return GetPage<TListObject>(0, 0);
    }
}

Then we need a List<Task> for all the started tasks.

var tasks = new List<Task<TListObject>>();

foreach (newTask in taskGenerator())
{
    // This should add tasks until we have enough
    tasks.Add(newTask);
    if (tasks.Count < maxConcurrent)
    {
        continue;
    }

    var completedTask = await Task.WhenAny<TListObject>(tasks);
    // Whatever you do with completed task, I don't know
    HandleCompletedTask(completedTask);
    tasks.Remove(completedTask);
}

while (tasks.Count > 0)
{
    var completedTask = await Task.WhenAny<TListObject>(tasks);
    // Whatever you do with completed task, I don't know
    HandleCompletedTask(completedTask);
    tasks.Remove(completedTask);
}

The general idea is:

  • Add to the list until it reaches the number of concurrent tasks we want.
  • Wait for any of them to complete.
  • handle the completed task, and remove it from the list.
  • repeat.

With the caveat that when we reach the end of the tasks, we still need to handle all the tasks that we have in the list. Which is why we need another loop.


However, if it is possible that multiple tasks complete close by, it could be better to handle them together…

var tasks = new List<Task<TListObject>>();

foreach (newTask in taskGenerator())
{
    // This should add tasks until we have enough
    tasks.Add(newTask);
    if (tasks.Count < maxConcurrent)
    {
        continue;
    }

    await Task.WhenAny<TListObject>(tasks);
    tasks = HandleTasks(tasks);
}

while (tasks.Count > 0)
{
    await Task.WhenAny<TListObject>(tasks);
    tasks = HandleTasks(tasks);
}

Here the method HandleTasks is intended to:

  • Handle completed tasks (however you do that)
  • Make a new list with all the tasks yet to complete.
List<Task<TListObject>> HandleTasks(List<Task<TListObject>> tasks)
{
    var result = new List<Task<TListObject>>();
    // We will repopulate the list, and handle completed tasks
    foreach (task in tasks)
    {
        if (task.RanToCompletion)
        {
            // Whatever you do with completed task, I don't know
            HandleCompletedTask(task);
        }
        else
        {
            result .Add(task);
        }
    }
    return result;
}
E_net4
  • 27,810
  • 13
  • 101
  • 139
Theraot
  • 31,890
  • 5
  • 57
  • 86
-1

Maybe Parallel.For would also be an option, it makes it easy to perform multiple actions and let's you control the maximum amount of tasks that can run at the same time.

class Program
{
    static void Main(string[] args)
    {
        var pageSize = 10;
        var totalItems = 1000;
        var apiLists = new ConcurrentBag<ApiListModel>();
        
        Parallel.For(0, totalItems / pageSize, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
        {
            apiLists.Add(GetPage(i * pageSize, pageSize));
        });
    }

    static ApiListModel GetPage(int offset, int pageSize)
    {
        Console.WriteLine($"Getting page with size {pageSize} and offset {offset}");
        
        // Get content from API
        return new ApiListModel
        {

        };
    }
}

Results:

Getting page with size 10 and offset 300
Getting page with size 10 and offset 200
Getting page with size 10 and offset 400
Getting page with size 10 and offset 0
Getting page with size 10 and offset 500
Getting page with size 10 and offset 700
Getting page with size 10 and offset 800
Getting page with size 10 and offset 100
Getting page with size 10 and offset 600
Getting page with size 10 and offset 310
Getting page with size 10 and offset 320
Getting page with size 10 and offset 810
Getting page with size 10 and offset 330
Getting page with size 10 and offset 820
Getting page with size 10 and offset 340
Getting page with size 10 and offset 830
Getting page with size 10 and offset 350
Getting page with size 10 and offset 510
Getting page with size 10 and offset 360
Getting page with size 10 and offset 520
Getting page with size 10 and offset 370
Getting page with size 10 and offset 530
Getting page with size 10 and offset 380
Getting page with size 10 and offset 540
Getting page with size 10 and offset 610
Getting page with size 10 and offset 390
Getting page with size 10 and offset 550
Getting page with size 10 and offset 560
Getting page with size 10 and offset 10
Getting page with size 10 and offset 110
Getting page with size 10 and offset 20
Getting page with size 10 and offset 210
Getting page with size 10 and offset 120
Getting page with size 10 and offset 220
Getting page with size 10 and offset 130
Getting page with size 10 and offset 230
Getting page with size 10 and offset 140
Getting page with size 10 and offset 240
Getting page with size 10 and offset 30
Getting page with size 10 and offset 250
Getting page with size 10 and offset 40
Getting page with size 10 and offset 260
Getting page with size 10 and offset 50
Getting page with size 10 and offset 60
Getting page with size 10 and offset 840
Getting page with size 10 and offset 850
Getting page with size 10 and offset 620
Getting page with size 10 and offset 860
Getting page with size 10 and offset 650
Getting page with size 10 and offset 870
Getting page with size 10 and offset 660
Getting page with size 10 and offset 880
Getting page with size 10 and offset 410
Getting page with size 10 and offset 670
Getting page with size 10 and offset 420
Getting page with size 10 and offset 680
Getting page with size 10 and offset 430
Getting page with size 10 and offset 690
Getting page with size 10 and offset 440
Getting page with size 10 and offset 730
Getting page with size 10 and offset 450
Getting page with size 10 and offset 740
Getting page with size 10 and offset 460
Getting page with size 10 and offset 750
Getting page with size 10 and offset 470
Getting page with size 10 and offset 760
Getting page with size 10 and offset 480
Getting page with size 10 and offset 490
Getting page with size 10 and offset 900
Getting page with size 10 and offset 710
Getting page with size 10 and offset 720
Getting page with size 10 and offset 150
Getting page with size 10 and offset 170
Getting page with size 10 and offset 160
Getting page with size 10 and offset 180
Getting page with size 10 and offset 190
Getting page with size 10 and offset 570
Getting page with size 10 and offset 580
Getting page with size 10 and offset 590
Getting page with size 10 and offset 270
Getting page with size 10 and offset 280
Getting page with size 10 and offset 290
Getting page with size 10 and offset 630
Getting page with size 10 and offset 640
Getting page with size 10 and offset 70
Getting page with size 10 and offset 80
Getting page with size 10 and offset 890
Getting page with size 10 and offset 90
Getting page with size 10 and offset 770
Getting page with size 10 and offset 780
Getting page with size 10 and offset 790
Getting page with size 10 and offset 910
Getting page with size 10 and offset 920
Getting page with size 10 and offset 930
Getting page with size 10 and offset 940
Getting page with size 10 and offset 950
Getting page with size 10 and offset 960
Getting page with size 10 and offset 970
Getting page with size 10 and offset 980
Getting page with size 10 and offset 990

and apiLists now has 100 ApiListModel instances, one for each page.

Docs

MaartenDev
  • 5,631
  • 5
  • 21
  • 33