1

I'm trying to make 4,000+ database calls run simultaneously. But what I'm finding is that instead, all of them will stack up, and none of them will finish until all of them have started.

What I'm trying so far is based on this thread:

Process thousands of database calls simultaneously

The logic works perfectly when it's done in a console app with no database call. But here, it's behaving as I described above.

using Dapper;

public async Task ProcessFileAsync(Stream blobFile)
{
    List<Customer> customers = LoadCustomers(blobFile)
    var tasks = RunWithMaxDegreeOfConcurrency(10, customers, customer => VerifyCustomerAsync(customer));
    await Task.WhenAll(tasks);

    DoStuffWhenAllDatabaseCallsAreFinished()
}

private async Task VerifyCustomerAsync(Customer customer)
{
    RecordLog(LogType.Info, $"Starting {customer.CustomerName}");

    var parameters = new DynamicParameters();
    // ... create parameters 

    ValidaitonResult validaitonResult = null;
    using (var connection = new SqlConnection(ConfigurationManager.ConnectionStrings["FubarConnection"].ConnectionString))
    {
        connection.Open();
        var queryResult = await connection.QueryAsync<ValidaitonResult>("sp_name", parameters, commandType: CommandType.StoredProcedure);
        validaitonResult = queryResult.FirstOrDefault();
    }

        // Handle the result
        switch (validaitonResult.ValidaitonAction)
        {
            case ValidaitonAction.NoAction:
                _customersNoAction.Add(customer);
                break;
            case ValidaitonAction.Insert:
                _customersToInsert.Add(customer);
                break;
            default:
                break;
        }

    RecordLog(LogType.Info, $"Finished {customer.CustomerName}");
}


private static async Task RunWithMaxDegreeOfConcurrency<T>(int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            activeTasks.RemoveAll(t => t.IsCompleted);
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
    {
        //observe exceptions in a manner consistent with the above   
    });
}
Casey Crookston
  • 13,016
  • 24
  • 107
  • 193

3 Answers3

1

I don't see anything wrong with your code, apart from a lack of error handling. Try a simplified database operation like this:

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Dapper;


namespace ConsoleApp26
{
    class Customer
    {
        public string CustomerName { get; set; }
    }
    class Program
    {
        private object _customersNoAction;

        public async Task ProcessFileAsync(Stream blobFile)
        {
            List<Customer> customers = Enumerable.Range(1, 1000).Select(i => new Customer() { CustomerName = $"Customer{i}" } ).ToList();
            var tasks = RunWithMaxDegreeOfConcurrency(100, customers, customer => VerifyCustomerAsync(customer));
            await Task.WhenAll(tasks);

            DoStuffWhenAllDatabaseCallsAreFinished();
        }

        private void DoStuffWhenAllDatabaseCallsAreFinished()
        {
            RecordLog(LogType.Info, $"Finished");
        }

        private async Task VerifyCustomerAsync(Customer customer)
        {
            RecordLog(LogType.Info, $"Starting {customer.CustomerName}");

            var parameters = new DynamicParameters();
            // ... create parameters 

            ValidaitonResult validaitonResult = null;
            using (var connection = new SqlConnection("server=.;database=tempdb;integrated security=true"))
            {
                connection.Open();
                //var queryResult = await connection.QueryAsync<ValidaitonResult>("sp_name", parameters, commandType: CommandType.StoredProcedure);

                var queryResult = await connection.QueryAsync<ValidaitonResult>("waitfor delay '0:0:2'; select 1 ValidationAction");
                validaitonResult = queryResult.FirstOrDefault();
            }

            // Handle the result

            RecordLog(LogType.Info, $"--Finished {customer.CustomerName}");
        }

        private void RecordLog(object info, string v)
        {
            Console.WriteLine($"{v}running on thread {System.Threading.Thread.CurrentThread.ManagedThreadId}");
        }

        private static async Task RunWithMaxDegreeOfConcurrency<T>(int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
        {
            var activeTasks = new List<Task>(maxDegreeOfConcurrency);
            foreach (var task in collection.Select(taskFactory))
            {
                activeTasks.Add(task);
                if (activeTasks.Count == maxDegreeOfConcurrency)
                {
                    await Task.WhenAny(activeTasks.ToArray());

                    foreach (var t in activeTasks)
                    {
                        if (t.IsFaulted)
                            throw t.Exception;
                    }
                    activeTasks.RemoveAll(t => t.IsCompleted);
                }
            }
            await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
            {
                //observe exceptions in a manner consistent with the above   
            });
        }

        static void Main(string[] args)
        {
            var p = new Program();
            p.ProcessFileAsync(null).Wait();
        }

        private class LogType
        {
            internal static readonly int Info = 1;
        }
    }

    internal class ValidaitonResult
    {
        public int ValidaitonAction { get; internal set; }
    }
}
David Browne - Microsoft
  • 80,331
  • 6
  • 39
  • 67
0

Async != parallel. If you want to simulate parallel change RunWithMaxDegreeOfConcurrency to use Parallel.Foreach

matt-dot-net
  • 4,204
  • 21
  • 24
  • from what I've been told... Parallel is for CPU bound work. Database call is I/O bound work – Casey Crookston Aug 16 '19 at 17:08
  • That's correct. However since you are simulating "thousands of database calls simultaneously" you need to use parallelism. You want to see what happens when there are concurrent requests to your database. You will find that it is I/O bound and serializes the calls making your exercise somewhat silly. You could just take the DB vendor's word for performance and load characteristics. – matt-dot-net Aug 16 '19 at 17:10
  • 1
    Async on the client translates into parallel on the server. You don't hundreds of client threads to manage hundreds of concurrent database operations. – David Browne - Microsoft Aug 16 '19 at 17:58
  • Tried this. It ran for a long time and then gave me this: `Timeout expired. The timeout period elapsed prior to obtaining a connection from the pool. This may have occurred because all pooled connections were in use and max pool size was reached.` – Casey Crookston Aug 16 '19 at 18:07
0

Well, I feel silly. The problem was a typo in the name of the SP. The code works as written.

Casey Crookston
  • 13,016
  • 24
  • 107
  • 193