5

I am trying to make multiple parallel updates to database using Task.WhenAll. The code flow goes like this.

In the main method, i have created a transaction scope and created clones of the main transactions and passed to the child. Main transactions is blocked until child is completed

using (var scope = DalcHelper.GetTransactionScope())
{
    DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
    var task1= Dalc.UpdateDetails1(transaction );

    DependentTransaction transaction1 = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
    var task2 = Dalc.UpdateDetails2(transaction1);

    await Task.WhenAll(task1, task2 ).ConfigureAwait(false);

    scope.Complete();
}

The DalcMethod goes like this. Here the clone created from the outer transaction goes as a parameter. The dependent transaction is completed notifying the main transaction that the dependent is completed

try
{
    using (SqlCommand databaseCommand = DalcHelper.GetCommand(SPName))
    using (var scope = new TransactionScope(dependentCloneTransaction, TransactionScopeAsyncFlowOption.Enabled))
    {
        -- Update database
        scope.Complete();
    }
}
finally
{
    //Call complete on the dependent transaction
    dependentCloneTransaction.Complete();
}

Dalc methods are asynchronous methods which returns Task

I am getting the below exception

The transaction has aborted.Failure while attempting to promote transaction.There is already an open DataReader associated with this Command which must be closed first.The wait operation timed out

. Can anyone tell me what am i doing wrong here?

abatishchev
  • 98,240
  • 88
  • 296
  • 433
Ranjit Krishnan
  • 111
  • 1
  • 4
  • 1
    In this case issue seems to be different, Nothing to do with `Async or TPL`, It started as a local transaction and as you open multiple resources (connections) in same transaction context, it does get promoted to distributed transaction, which would be failing due to some restriction. Second issue seems to be you trying to `ExecuteReader` on the `Command` object, when original `Reader` is still there, which is one live connection, close it first to run the second `ExecuteNonQuery` – Mrinal Kamboj Sep 24 '16 at 19:49
  • Possible duplicate of [Transaction can't handle parallel commands via Task.WhenAll](https://stackoverflow.com/questions/50660491/transaction-cant-handle-parallel-commands-via-task-whenall) – Liam Oct 31 '18 at 13:29

1 Answers1

0
namespace Playground
{
    static class DalcHelper
    {
        public static TransactionScope GetTransactionScope()
        {
            return new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        }

        public async static Task ReadDetails1(DependentTransaction transaction,SqlConnection conn)
        {
            try
            {
                string commandText = "SELECT * FROM dbo.Persons"; // some table, say Persons
                using (SqlCommand cmd = new SqlCommand(commandText, conn))
                {
                    cmd.CommandType = System.Data.CommandType.Text;
                    SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
                    while (reader.Read())
                    {
                        int Id = reader.GetInt32("Id");
                        Console.WriteLine("Id " + Id);
                    }
                    reader.Close();
                }
                transaction.Complete();
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Task 1"+ ex.Message);
            }
        }

        public async static Task ReadDetails2(DependentTransaction transaction1, SqlConnection conn)
        {
            try
            {
                string commandText = "SELECT * FROM dbo.Persons";
                using (SqlCommand cmd = new SqlCommand(commandText, conn))
                {
                    cmd.CommandType = System.Data.CommandType.Text;
                    SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
                    while (reader.Read())
                    {
                        int age = reader.GetInt32("Age");
                        Console.WriteLine("Age " + age);
                    }
                    reader.Close();
                }
                transaction1.Complete();
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Task 2" + ex.Message);
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            string connectionString = "YourConnectionString";
            _ = RunMe(connectionString);
        }

        private async static Task RunMe(string connectionString)
        {
            
                try
                {
                    
                    Task task1 = Task.Run( async()=> {
                        using (TransactionScope scope = DalcHelper.GetTransactionScope())
                        {
                            using (SqlConnection conn = new SqlConnection(connectionString))
                            {
                                DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                                conn.Open();
                                await DalcHelper.ReadDetails1(transaction, conn);
                                /*
                                * add more tasks if you wish to
                                */
                                Console.WriteLine("Completed task 1");
                                conn.Close();

                            }
                            scope.Complete();
                        }
                    });

                    

                    Task task2 = Task.Run(async () =>
                    {
                        using (TransactionScope scope = DalcHelper.GetTransactionScope())
                        {
                            using (SqlConnection conn = new SqlConnection(connectionString))
                            {
                                DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                                conn.Open();
                                await DalcHelper.ReadDetails2(transaction, conn);
                                /*
                                    may be update some column of table based on previous op.
                                   // await DalcHelper.UpdateDetails2(transaction, conn); 
                                */ 
                                Console.WriteLine("Completed task 2");
                                conn.Close();
                            }
                            /*
                            calling `Complete` method will commit all the changes within the transaction scope(including the UpdateDetails2 method)
                            need not dispose transaction scope explicitly, `using` block takes care of that
                            */ 
                            scope.Complete(); 
                        }
                    });

                 await Task.WhenAll(task1, task2);// at this point every task added is complete
                 Console.WriteLine("completed both tasks");
                 Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }

Some important points to remember when working with transaction scope

  1. It is required to dispose TransactionScope within the same thread it was created or might get an error thrown like Transaction already aborted.
  2. Any update operations is persisted only if TransactionScope.Complete() method is called.
  3. Make sure you open individual connections for each thread and close it after its usage.That being said, I am not sure from a performance standpoint about using an individual connection for each thread. I am happy to get educated more on this and I'll update my answer. However, this solution should help you solve your problem.

Do read some useful answers already posted related to the topic

Liswin
  • 158
  • 1
  • 9