0

I have 300 000 sale orders which needs to send in batches of either 200 or 1000/batch to a RestAPI call and with multi threading and using Semaphore and limit MaxDegreeOfParallelism = 8(better be number of CPU cores). Response of each batch orders needs to add into a generic list. Please suggest any possibility to get list of API response for all 300k orders.

  Parallel.ForEach(
    totalSalesOrdersList.Batch(1000),
    new ParallelOptions() { MaxDegreeOfParallelism = 8 /* better be number of CPU cores */ },
    batchOfSalesOrders => {
        DoMyProcessing(batchOfSalesOrders);
    });


 public static class LinqExtensions
 {
   public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(
          this IEnumerable<TSource> source, int size)
   {
    TSource[] bucket = null;
    var count = 0;

    foreach (var item in source)
    {
        if (bucket == null)
            bucket = new TSource[size];

        bucket[count++] = item;
        if (count != size)
            continue;

        yield return bucket;

        bucket = null;
        count = 0;
    }

    if (bucket != null && count > 0)
        yield return bucket.Take(count);
   }
}
Peter Duniho
  • 68,759
  • 7
  • 102
  • 136
venkat
  • 5,648
  • 16
  • 58
  • 83

2 Answers2

2

Making a large number of concurrent remote API calls should be performed carefully as you might exhaust the connection pool. I suggest an approach with a SemaphoreSlim to perform the throttling and a Channel to consume the responses in a thread safe way.

var batches = Enumerable.Range(0, 1000);
var responseCh = Channel.CreateUnbounded<string>();
var throttler = new SemaphoreSlim(10);

var requestTasks = batches.Select(async batch =>
{
    await throttler.WaitAsync();
    try
    {
        var result = await MakeHttpRequestAsync(batch);
        await responseCh.Writer.WriteAsync(result);   
    }
    finally
    {
        throttler.Release();
    }
}).ToArray();

var requestProcessing = Task.Run(async () =>
{                
    await Task.WhenAll(requestTasks);
    responseCh.Writer.Complete();
});

var responseProcessing = Task.Run(async () =>
{
    await foreach (var res in responseCh.Reader.ReadAllAsync())
        Console.WriteLine(res); // or store in a data structure
});

await Task.WhenAll(requestProcessing, responseProcessing);

We throttle the requests by not allowing more than 10 at a time. We run them concurrently and at the time of arrival, we write the response to the channel. We process the responses on a separate thread by reading them from the channel. Both the request and response processing happen concurrently so we asynchronously wait for both of them to complete.

Note that the IAsyncEnumerable interface (await foreach) is available in C# 8 and the channels ship with .NET Core 3.1 SDK or you can find them on NuGet.

Denis Kyashif
  • 161
  • 1
  • 4
1

You could use Parallel LINQ (AsParallel method), and control the concurrency with the method WithDegreeOfParallelism:

var result = Partitioner
    .Create(totalSalesOrdersList.Batch(1000), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(8)
    .Select(batch => DoMyProcessing(batch))
    .SelectMany(batch => batch)
    .ToList();

Generally though it is not recommended to control the concurrency of I/O-bound operations with PLINQ or the Parallel class. Currently the most "professional" way to do it is by using the TPL Dataflow library.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • how can i get with TPL and using with Semaphore for the above? – venkat Jan 19 '20 at 19:31
  • You would use a [`BatchBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.batchblock-1) to organize the items in batches, and then you would link it to a [`TransformBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.transformblock-2) to process the batches. The TPL Dataflow library has some learning curve. You can't use it effectively without studying it for a day or two. – Theodor Zoulias Jan 19 '20 at 20:06