7

I've a problem)

I try to reproduсe something like several sp (stored procedure) calls in my c# code, but i want to do it in async way.

TSQL example: ( Execute sp @key = 15072000173475; Execute sp @key = 15072000173571; ... Execute sp @key = n;)

[TestClass]
public class UnitTestNomenclature {
    [TestMethod]
    public void ParallelSQLMethod() {
        Task scropeTasks = null;
        //real amount is more then 1500
        long[] keys = new long[] {15072000173475,15072000173571 ... n };

        try {
            var tasks = keys.Select( i =>  Task.Run(async () => { await RunStoredProc(i); }));
            scropeTasks =  Task.WhenAll(tasks);

            scropeTasks.Wait();
        } catch (Exception ex) {
            Debug.WriteLine("Exception: " + ex.Message);

            Debug.WriteLine("IsFaulted: " + scropeTasks.IsFaulted);
            foreach (var inx in scropeTasks.Exception.InnerExceptions) {
                Debug.WriteLine("Details: " + inx.Message);
            }
        }

        Assert.AreEqual(1, 1);
    }

    public async Task RunStoredProc(long scollNumbParam) {
        const string strStoredProcName = @"[dbo].[sp]";
        using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;")) {
            await conn.OpenAsync();
            Debug.WriteLine("============================================ Connection is open: ==============================================");

            // info
            Debug.WriteLine(String.Format("Connection: {0}", conn.ClientConnectionId));
            Debug.WriteLine(String.Format("State: {0}", conn.State.ToString()));

            using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure }) {

                SqlParameter scrParam = new SqlParameter() {
                    ParameterName = "@KEYKRT",
                    Value = scollNumbParam,
                    SqlDbType = SqlDbType.BigInt
                };
                cmd.Parameters.Add(scrParam);

                Debug.WriteLine("Start of Proccesing: " + scollNumbParam);
                await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
                Debug.WriteLine("End of Proccesing: " + scollNumbParam);

            }
        }

        Debug.WriteLine("============================================ Connection is closed: ==============================================");
    }
}

This's what i get in output window:

========== Connection is open: ========
Connection: 5be9c681-6eb5-422f-a22c-b49689a2d912
State: Open
Start of Proccesing: 15072000173475
========== Connection is open: ==========
Connection: cfb66041-6646-4b56-be1c-2afb26a18cb8
State: Open
Start of Proccesing: 15072000173571
.....
End of Proccesing: 15072000173475
=========== Connection is closed: =========
End of Proccesing: 15072000173571
=========== Connection is closed: =========

....

A timeout occurred while waiting for memory resources to execute the query in resource pool 'default' (2). Rerun the query.
Actual error number: 8645
Actual line number: 98

Also debug says that connection pool is overflowed I think main reason that connection doesn't appropriatly dispose, but how i can achive that with async?

If i try just open one connection before the async tasks are declared and pass this to my RunStoredProc method then i get connection doesn't support MultipleActiveResultSets

using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;)) {

                    conn.OpenAsync();
                    var tasks = keys.Select(i => Task.Run(async () => { await RunStoredProc(i, conn); }));
                    scropeTasks = Task.WhenAll(tasks);

                    scropeTasks.Wait();
                }

                Debug.WriteLine("========== Connection is closed: ==========");

This's what i get in output window:

Connection: 5be9c681-6eb5-422f-a22c-b49689a2d912
State: Open
Start of Proccesing: 15072000173475
======= Connection is open: =============
Connection: cfb66041-6646-4b56-be1c-2afb26a18cb8
State: Open
Start of Proccesing: 15072000173571
========= Connection is open: =========
AllmanTool
  • 1,384
  • 1
  • 16
  • 26
  • And what is stopping you from making the test async? – Nkosi Jul 14 '17 at 09:21
  • Nothing, I did it and got above errors. I'm very interested in find out why i get this errors – AllmanTool Jul 14 '17 at 09:26
  • Because you have 1500 or so tasks all executing at the same time and also mixing async and blocking calls which can cause deadlocks. What did you expect to happen? – Nkosi Jul 14 '17 at 09:27
  • I expect to get async processing. I'm looking for solution. How i can achive conenction dispose event after each iteration? Is it possible? I'm trying to figure out how async/ await + ado.net work together – AllmanTool Jul 14 '17 at 09:30
  • By iterating them in sequence. It will take longer but at least the connections will get disposed properly so as not to overload the resources. You could also consider doing them in reasonably sized batches. – Nkosi Jul 14 '17 at 09:32
  • connection pool has max size over 100. Maybe, is there way to process this task with pieces( 100 at time) and then refresh pool? – AllmanTool Jul 14 '17 at 09:37
  • I would batch them in even smaller sizes just to be safe. – Nkosi Jul 14 '17 at 09:39
  • thanks for the advice, would you mind showing me a some example (code) of real implemention? – AllmanTool Jul 14 '17 at 09:40
  • Do you really need a new connection for each stored proc call? You could just open one connection before the async tasks are declared and pass this to your `RunStoredProc` method. – ZippyZippedUp Jul 14 '17 at 09:48
  • In this case a get connection doesn't support MultipleActiveResultSets exception (example is above in code) – AllmanTool Jul 14 '17 at 10:28

3 Answers3

8

You have 1500 or so tasks all executing at the same time and also mixing async and blocking calls ( like .Wait) which can cause deadlocks.

Make the test async and try to avoid async void unless it is on an event handler.

Try iterating them in sequence. It will take longer but at least the connections will get disposed properly so as not to overload the resources. You could also consider doing them in reasonably sized batches.

[TestMethod]
public async Task ParallelSQLMethod() {
    //real amount is more then 1500
    var keys = new long[] { 
        15072000173475, 
        15072000173571, 
        //....., n
    };
    var tasks = keys.Select(i => RunStoredProc(i));
    var batchSize = 50; //Or smaller

    //run tasks in batches
    var sequence = tasks;
    while (sequence.Any()) {
        var batch = sequence.Take(batchSize);
        sequence = sequence.Skip(batchSize);

        await Task.WhenAll(batch);
    }
}
Nkosi
  • 235,767
  • 35
  • 427
  • 472
2

I am afraid, that I see classic problems with async/await/concurrent/threading here. The test has multiple problems, I will try to go through one by one.

1) Testing case architecture. You do not tell if the unit test you are writing and the SQL server is sitting on the same box or different boxes.

If on the same box, I would go for Max(n_cores/2, 1) connections.

If different boxes, I would go for something like 1-3 connections.

And these numbers could be tweaked up/down based on the stored procedure behavior, long/short computation, the amount of transferred data, connection speed etc.

2) SQL connection concurrency issue. You can not open one connection and then somehow try through this connection invoke 1500 requests at the same time. Actually not even two in the same time.

That is what it told you: connection doesn't support MultipleActiveResultSets.

You have to use one opened connection to be used by one request at the time.

But! You do not have to use it only for one request and close it, you can after the first request completed run next request and it would be way faster than closing and be creating the new connection. You just have to run these requests through each connection sequentially...

3) So the correct test case architecture would look like:

  • an async test method,
  • which would push all the keys to the ConcurrentQueue queue;
  • then Task[] tasks array of size based on the number of connections required,
  • firing up each task and store them to the array,
  • await Task.WhenAll(tasks);

I like to play with concurrent/parallel code very much but making more and more tasks without coordinating them is not helping to speed up things, resources are wasted instead...

4) Example:

[TestClass]
public class UnitTestNomenclature
{
    [TestMethod]
    public async Task ParallelSQLMethod()
    {
        long[] keys = new long[] { 15072000173475, 15072000173571 };

        ConcurrentQueue<long> queue = new ConcurrentQueue<long>(keys);

        int connections = Math.Max(1, Environment.ProcessorCount / 2);

        Task[] tasks =
        Enumerable
        .Range(0, connections)
        .Select(i => Task.Run<Task>(() => RunConnection(i, queue)).Unwrap())
        .ToArray()
        ;

        await Task.WhenAll(tasks);
    }

    public async Task RunConnection(int connection, ConcurrentQueue<long> queue)
    {
        using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;"))
        {
            await conn.OpenAsync();
            Debug.WriteLine($"====== Connection[{connection}] is open: ======");

            Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
            Debug.WriteLine($"Connection[{connection}].State: {conn.State}");

            long scollNumbParam;

            while (queue.TryDequeue(out scollNumbParam))
            {
                await RunStoredProc(conn, connection, scollNumbParam);
                Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
                Debug.WriteLine($"Connection[{connection}].State: {conn.State}");
            }
        }

        Debug.WriteLine($"====== Connection[{connection}] is closed  ======");
    }

    public async Task RunStoredProc(SqlConnection conn, int connection, long scollNumbParam)
    {
        const string strStoredProcName = @"[dbo].[sp]";

        using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure })
        {
            SqlParameter scrParam = new SqlParameter()
            {
                ParameterName = "@KEYKRT",
                Value = scollNumbParam,
                SqlDbType = SqlDbType.BigInt
            };
            cmd.Parameters.Add(scrParam);

            Debug.WriteLine($"Connection[{connection}] Start of Proccesing: " + scollNumbParam);
            await cmd.ExecuteNonQueryAsync();
            Debug.WriteLine($"Connection[{connection}] End of Proccesing: " + scollNumbParam);
        }
    }
}
ipavlu
  • 1,617
  • 14
  • 24
1

I did some experimens with my code and achived appropriate result (async processing). I changed connection link (added: Max Pool Size=250;Connection Timeout=60;Connection Lifetime=0;MultipleActiveResultSets=true ) i.e. i increased size of connection pooling and connection time duration.

  • Maximum connection pool size (Max Pool Size)

  • Minimum number of connections in a connection pool (Min Pool Size)

  • Number of seconds to keep connections in a connection pool (Connection Lifetime) (0 is max value)

Tip: Excessive amount of pools Max Pool Size(by default 100) can hung up your server(I did:))

Also I noticed that I didn't get exception 'connection doesn't support MultipleActiveResultSets' With 'MultipleActiveResultSets=true' in my connection string, but processing was synchronously. You can read about that (MARS) on

Conclusion: Parallel execution at the server is not a MARS feature and MARS operations are not thread-safe. MARS is not designed to remove all requirements for multiple connections in an application. If an application needs true parallel execution of commands against a server, multiple connections should be used. it's usually used for such causes

AllmanTool
  • 1,384
  • 1
  • 16
  • 26
  • Yeah, concurrent communication through one channel is not easy stuff. I have studied few message queueing systems and I was usually very disappointed about handling concurrency by these systems. It is not easy to do these things because toolbox is full of clunky solutions. It could be that the MARS functionality was grafted on top of sequential technology and year 2005, well I would think as highly probable. – ipavlu Jul 16 '17 at 22:48