2

I need to perform extensive data inserts to my database. I can implement the code the multithreaded way with a throttled scheduler that limits the number of concurrent operations. On every M rows, a block is formed and inserted into the database as an atomic operation. Multiple concurrent operations shall occur because database is slower than reading and parsing a data file. I often implement this model using multithreading.

If instead I decide to implement my code using await/async (Entity Framework supports asynchronous programming), how can I make sure that no more than N concurrent tasks execute (i.e. go to database) at the same time?

In my initial design, I have instantiated a List<Task>, added new tasks as soon as I read a block of data to be inserted atomically, and then have let my method return after awaiting all of the task. The design-time issue is that the number of concurrent Tasks (and thus memory footprint) are going to explode because tasks are fed faster than they complete for big data files.

I was thinking about using a SemaphoreSlim, but I have little experience with asynchronous programming (unlike multithreaded). So I am asking this question to get feedback about best practices, if there are any.

usr-local-ΕΨΗΕΛΩΝ
  • 26,101
  • 30
  • 154
  • 305
  • I definitely think a `SemaphoreSlim` is the way to go in this case, if you wish to run a limited number of multiple threads. – Gertsen Nov 21 '16 at 15:02
  • Since your work is IO bound you have no need for multiple threads at all. You can execute multiple parallel DB queries without any additional threads. – Servy Nov 21 '16 at 15:04
  • 1
    [There's a limited concurrency task scheduler here](https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler%28v=vs.110%29.aspx?f=255&MSPPError=-2147217396) (scroll down a bit for the implementation). – Matthew Watson Nov 21 '16 at 15:14
  • @Servy I personally don't agree. Based on experience at my company, limiting the number of concurrent DB writes **and** the bulk size can impact performance a lot. We have done benchmarking and found a few "magic numbers" on some of our applications. Most important, since I assume data is fed to the application faster than DB, and since DB insert performance degrades with parallelism, at some point an `OutOfMemoryException` is likely to be triggered. – usr-local-ΕΨΗΕΛΩΝ Nov 21 '16 at 15:19
  • 1
    @usr-local-ΕΨΗΕΛΩΝ What does that have to do with my comment. All I said was that you have no need to use multiple threads to have multiple concurrent DB requests. I never said you shouldn't have an upper bound on the number of pending DB requests you have out. – Servy Nov 21 '16 at 15:20
  • Sorry, re-reading your comment shows that I misunderstood it. I could use multiple threads because *I know how to write code*, but I wanted to try asynchronous mode – usr-local-ΕΨΗΕΛΩΝ Nov 21 '16 at 20:12

3 Answers3

1

The design-time issue is that the number of concurrent Tasks (and thus memory footprint) are going to explode because tasks are fed faster than they complete for big data files. I was thinking about using a SemaphoreSlim

Yes, SemaphoreSlim is an appropriate choice for throttling concurrent asynchronous operations:

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(10);

async Task ThrottledWorkAsync()
{
  await _semaphore.WaitAsync();
  try
  {
    await WorkAsync();
  }
  finally
  {
    _semaphore.Release();
  }
}

However...

If instead I decide to implement my code using await/async (Entity Framework supports asynchronous programming), how can I make sure that no more than N concurrent tasks execute (i.e. go to database) at the same time?

One thing to be aware of is that Entity Framework - while it supports asynchronous APIs - still requires one connection per request. So, you can't have multiple concurrent asynchronous requests with the same DbContext; you'd need to create a separate connection for each concurrent request (or at least N connections that are "borrowed" by the concurrent requests).

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • There could be an issue with your code. I chose to wait before spawning the task as not to consume the `StreamReader` if workers are still going (see Dennis's answer) – usr-local-ΕΨΗΕΛΩΝ Nov 21 '16 at 20:14
  • @usr-local-ΕΨΗΕΛΩΝ: I have no idea what `StreamReader` you're talking about. My code will not start `WorkAsync` until after a semaphore slot is taken. – Stephen Cleary Nov 21 '16 at 20:22
  • I meant the source of data that feeds the application, which in my case is a `StreamReader` I didn't mention in the question. Well, yes, your `WorkAsync` code does not indeed start until a slot is free, but IMO if you wrap that `async Task ThrottledWorkAsync` in a for loop that enqueues your async `wait-perform-release` code, then there is the risk that the TaskExecutor gets clogged with tasks waiting for a slot. In that case, only 10 tasks will run concurrently but a myriad of more tasks (with their memory footiprint) will be allocated. I need to edit my question (tomorrow) with little code – usr-local-ΕΨΗΕΛΩΝ Nov 21 '16 at 20:29
  • @usr-local-ΕΨΗΕΛΩΝ: The memory requirements of a single task are incredibly small. – Stephen Cleary Nov 21 '16 at 20:34
1

If you have at least the n values to insert initially (n being the maximum amount of concurrent Tasks), you can take the following approach:

  1. Invoke InsertAsync() n times with the different values.
  2. When each of the Task completes, continue with a new call to InsertAsync() (repeat 2).

That way, you won't need to control the concurrency level using a semaphore and will be non-blocking.

I've just published a package that can be useful for this scenarios, it exposes 2 methods Times() and Map(): https://github.com/jorgebay/concurrent-utils

For example:

// Execute MyMethodAsync() 1,000,000 times limiting the maximum amount
// of parallel async operations to 512
await ConcurrentUtils.Times(1000000, 512, (index) => MyMethodAsync(index));
jorgebg
  • 6,560
  • 1
  • 22
  • 31
  • Does `Map` work with an unbounded, unknown-size `IEnumerable`? I need not to store all records in memory before inserting, they could be over 10M – usr-local-ΕΨΗΕΛΩΝ Nov 28 '16 at 10:11
  • I still like your approach but from the reading of your answer, I am worried about the non-blocking part. See, I need to block the reading of the input `Stream` because if I come to read 10M or more records in memory, then the `OutOfMemoryException` is unavoidable. A clever solution can be the use of a `MemoryFailPoint`, which I am evaluating – usr-local-ΕΨΗΕΛΩΝ Nov 28 '16 at 10:25
  • If you want to work with an unspecified amount of data (retrieved from time to time in the background) you can use a job queue `ConcurrentUtils.CreateQueue()`. – jorgebg Nov 28 '16 at 11:26
0

I use this piece of code to execute my threads:

public static async Task WhenAll(this List<Func<Task>> actions, int threadCount)
{
    var executeTaskHelper = new ConcurrentTaskHelper(threadCount);
    return executeTaskHelper.Execute(actions);
}

public class ConcurrentTaskHelper
{
    int _threadCount;
    CountdownEvent _countdownEvent;
    SemaphoreSlim _throttler;

    public ConcurrentTaskHelper(int threadCount)
    {
        _threadCount = threadCount;
         _throttler  = new SemaphoreSlim(threadCount);
    }

    public async Task Execute(List<Func<Task>> tasks)
    {
        _countdownEvent = new CountdownEvent(tasks.Count);

        foreach (var task in tasks)
        {
            await _throttler.WaitAsync();
            Execute(task);
        }

        _countdownEvent.Wait();
    }

    private async Task Execute(Func<Task> task)
    {
        try { await task(); }
        finally { Completed(); }
    }

    private void Completed()
    {
        _throttler.Release();
        _countdownEvent.Signal();
    }
}

This code is based on the code provided in this thread: How to limit the amount of concurrent async I/O operations?

In stead of using the CountdownEvent, it is better to implement the AsyncCountdownEvent. This way it is possible to use _await countdownEvent.WaitAsync();

Calling it should look something like this. It will execute all the tasks, but only 40 (in this case) concurrent:

var tasks = new List<Func<Task>>();
tasks.Add(() => saveAsync());
//add more
await tasks.WhenAll(40);
  • 3
    There's no need for the use of `Task.Run` here. You already have an asynchronous operation, you shouldn't be starting it in another thread. Additionally, you're synchronously waiting for them to finish, which you absolutely shouldn't be doing considering that this is an asynchronous operation. You should be asynchronously waiting for them all to finish. – Servy Nov 21 '16 at 15:21
  • @Servy I do not agree with you. To make it concurrent, you will have to execute it in a Task.Run. If you remove that, you can only await the action (which will block the execution of the other concurrent threads) or you can execute the action (which will instantly calls the finally block and therefore isn't concurrent anymore). I do however see that this could better be wrapped in a class, this way it is easier to manage the task. – Dennis Rosenbaum Nov 21 '18 at 11:11
  • 1
    Why say that you don't agree with me and then show how to fix the problem that I pointed out at the same time? You can most certainly solve the problem without creating additional threads to start asynchronous operations. Now you just need to remove the synchronous synchronization mechanisms and replace them with asynchronous synchronization mechanisms so that you're not forcing thread pool threads to just sit there doing nothing productive in operations that *claim* to be asynchronous.. – Servy Nov 21 '18 at 14:20
  • Well I was pointing out that if you didn't encapsulate the whole thing in a class that have some state, the Task.Run would be the logic way to do it. Though, you have a very valid point, I felt to noob to admit. Your approach is better. I only do not understand to which 'synchronous synchronization mechanism' you are referring to. – Dennis Rosenbaum Nov 21 '18 at 14:48
  • You don't need to create a new explicit class, you could accomplish the same thing without it if you wanted to, but there's nothing wrong with having a new class. Another method would also do the same thing just as well. The synchronous mechanism is the countdown event that you're synchronously blocking on. – Servy Nov 21 '18 at 14:52
  • I understand. For the countdown event I have added an url to a blog to implement the AsyncCountdownEvent, which indeed is a better way to go. – Dennis Rosenbaum Nov 21 '18 at 15:00