0

I have a sync process that must run on each of my businesses. The number of businesses is ever-changing.

I've read docs about the Thread class, Parallelism..etc...I'm not sure I understand how to do it without knowning/naming a predefined number of threads...in this case, that number is unknown. For that reason, I found Parallel.ForEach...because I wish to run an unknown number of simultaneous operations

My sync operations run every 10 minutes. They each take up to a minute or two to run. Obviously, I can't run them iteratively because by the time they'd finish, the next call would be triggering.

I want to run them simultaneously in separate threads. While they should each have unique API keys, they do not share mem or data and will not modify any shared data.

For this, I've done some research on how to do multi-threading...I'm thinking Parallel.ForEach would do the trick...

I need help with the syntax...

This is in a Worker Service...I have a private method called SyncBusiness(int businessId) that calls an API endpoint that syncs the business. Easy..just need help with calling the method?

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var businessIds = (from x in _db.Poslookup
                       select x.BusinessId).Distinct();

    while (!stoppingToken.IsCancellationRequested)
    {
        // Want to multi-thread a sync for each of the businesses in businessIds
        Parallel.ForEach(businessIds, i => { 
            await SyncBusiness(i)
        });

        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
        await Task.Delay(600000, stoppingToken);
    }
}

Also, please comment on any gotchas regarding scalability, thread limits...etc....any areas where I might get into trouble if I grew to several thousand businesses to sync...perhaps suggestions on things to read about sync operations and scalability?

Thank you so much. Cheers.

smolchanovsky
  • 1,775
  • 2
  • 15
  • 29
J Benjamin
  • 4,722
  • 6
  • 29
  • 39
  • 1
    Is your `SyncBusiness` function doing CPU bound work? I.e. some number crunching/algorithmic work? – JohanP Jan 28 '20 at 05:34
  • 2
    One thing to understand: `Parallel.ForEach()` does **NOT** guarantee all your jobs will run at the same time. It uses a **pool** of threads to work through tasks you give it, where the size of the pool is nominally determined by the number of the cores available on your system. If you have more jobs than threads in the pool, some jobs will effectively sit in a queue until threads are freed up. – Joel Coehoorn Jan 28 '20 at 05:42
  • Also... did you maybe just forget a semicolon (`;`) after `await SyncBusiness(i)`? And `Parallel.ForEach()` pre-dates `async`/`await`. I'm not sure they help you in this context. – Joel Coehoorn Jan 28 '20 at 05:45
  • 5
    You can't mix `async` and `Parallel.Foreach` and expect sane results... (or compiling code) - https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda. Otherwise... not exactly what you expect this code to do. – Alexei Levenkov Jan 28 '20 at 06:02
  • 2
    What is the code of SyncBusiness? It is key to the correct answer. If it is mainly I/O you might need to use Task.WhenAll – Peter Bons Jan 28 '20 at 06:34
  • 3
    `Parallel.ForEach` is **only** meant for data parallelism. It doesn't work with `async/await`. What you actually posted is `Parallel.ForEach(businessIds, async void (i) => { ...}`. This loop fires off N tasks, 8/16 at a time (depends on the number of cores)` and never waits for them to finish. `Task.Delay` will be called immediatelly, and could easily finish before the tasks themselves have finished – Panagiotis Kanavos Jan 28 '20 at 07:19
  • 1
    SInce nothing is awaiting those `SyncBusiness` calls, the application can easily terminate while those calls are still running. Your own code could call `Dispose()` on the objects used by those calls, thinking they've finished, leading to `ObjectDisposedException`s – Panagiotis Kanavos Jan 28 '20 at 07:23
  • @PeterBons SyncBusiness is just an HttpClient call that calls an API endpoint which performs a sync. It returns no data. – J Benjamin Jan 28 '20 at 15:53

3 Answers3

2

As others have noted, you can't use async with Parallel.ForEach. You can, however, make asynchronous code concurrent by starting all SyncBusiness calls at once and then using Task.WhenAll:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  var businessIds = (from x in _db.Poslookup
                     select x.BusinessId).Distinct();

  while (!stoppingToken.IsCancellationRequested)
  {
    var tasks = businessIds.Select(SyncBusiness).ToList();
    await Task.WhenAll(tasks);

    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    await Task.Delay(600000, stoppingToken);
  }
}

I'd also recommend making your database lookup asynchronous:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  while (!stoppingToken.IsCancellationRequested)
  {
    var businessIds = await (from x in _db.Poslookup
                       select x.BusinessId).Distinct().ToListAsync();

    var tasks = businessIds.Select(SyncBusiness).ToList();
    await Task.WhenAll(tasks);

    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    await Task.Delay(600000, stoppingToken);
  }
}

And the final observation is that this code current syncs all the businesses and then waits for ten minutes in between its work. If you want it to start running every 10 minutes, then you can start the timer at the beginning of the method:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  while (!stoppingToken.IsCancellationRequested)
  {
    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
    var timerTask = Task.Delay(TimeSpan.FromMinutes(10), stoppingToken);
    var businessIds = await (from x in _db.Poslookup
                       select x.BusinessId).Distinct().ToListAsync();

    var tasks = businessIds.Select(SyncBusiness).ToList();
    tasks.Add(timerTask);
    await Task.WhenAll(tasks);
  }
}
marcOcram
  • 3
  • 2
  • 2
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
1

From the official documentation: https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-write-a-simple-parallel-foreach-loop

The loop partitions the source collection and schedules the work on multiple threads based on the system environment. The more processors on the system, the faster the parallel method runs. For some source collections, a sequential loop may be faster, depending on the size of the source and the kind of work the loop performs.

You can't run them all simultaneously. The parallelism is always limited by cpus and (hyper threading helps too)

Pitfalls

Another great guide here, explaining a lot about parallel programming pitfalls: https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/potential-pitfalls-in-data-and-task-parallelism

High points are to avoid using non thread safe code, parallel is not always faster (depending on the situation), e.t.c

Watch out as you might not cover your requirements. If the threads are thousands and the processing has not been completed in 10 minutes, your next batch will not start. You need to scale to multiple machines.

Athanasios Kataras
  • 25,191
  • 4
  • 32
  • 61
0

Something like:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
      IEnumerable<string> businessIds = (from x in _db.Poslookup
                               select x.BusinessId).Distinct();

     // Want to multi-thread a sync for each of the businesses in businessIds
     Parallel.ForEach(businessIds, async i =>
     {
         await SyncBusiness(i, stoppingToken);
     });


    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
}

private async Task SyncBusiness(string businessId, CancellationToken stoppingToken)
{
    await new HttpClient().GetAsync($"https://example.com/endpoint/{businessId}", stoppingToken);
}

Edit after Peter Bons comment. replace

Parallel.ForEach(businessIds, async i =>
         {
             await SyncBusiness(i, stoppingToken);
         });

with

// Want to multi-thread a sync for each of the businesses in businessIds
            IEnumerable<Task> tasks = businessIds.Select(i => SyncBusiness(i, stoppingToken));

            Task.WaitAll(tasks.ToArray());
McKabue
  • 2,076
  • 1
  • 19
  • 34