3

I have an application which runs multiple threads to insert data into a SQL Server 2017 database table using EF Core 5.

The C# code for inserting the domain model entities using EF Core 5 is as follows:

using (var ctx = this.dbContextFactory.CreateDbContext())
{
    //ctx.Database.AutoTransactionsEnabled = false;
    foreach (var rootEntity in request.RootEntities)
    {
        ctx.ChangeTracker.TrackGraph(rootEntity, node =>
        {
            if ((request.EntityTypes != null && request.EntityTypes.Contains(node.Entry.Entity.GetType()))
                || rootEntity == node.Entry.Entity)
            {
                if (node.Entry.IsKeySet)
                    node.Entry.State = Microsoft.EntityFrameworkCore.EntityState.Modified;
                else
                    node.Entry.State = Microsoft.EntityFrameworkCore.EntityState.Added;
            }
        });
    }
    await ctx.SaveChangesAsync(cancellationToken);
}
        

Each thread is responsible for instantiating its own DbContext instance hence the use of dbContextFactory.

Some example SQL generated for the INSERT (MERGE) is as follows:

SET NOCOUNT ON;
DECLARE @inserted0 TABLE ([OrderId] bigint, [_Position] [int]);
MERGE [dbo].[Orders] USING (
VALUES (@p0, 0),
(@p1, 1),
(@p2, 2),
...
(@43, 41)) AS i ([SomeColumn],  _Position) ON 1=0
WHEN NOT MATCHED THEN
INSERT ([SomeColumn])
VALUES (i.[SomeColumn])
OUTPUT INSERTED.[OrderId], i._Position
INTO @inserted0;

SELECT [t].[OrderId] FROM [dbo].[Orders] t
INNER JOIN @inserted0 i ON ([t].[OrderId] = [i].[OrderId])
ORDER BY [i].[_Position];

As these threads frequently run at the same time I get the following SQL exception:

Transaction (Process ID 99) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

EF Core implicitly sets the isolation level to READ COMMITTED.

Using SQL Profiler the transaction deadlock was caused by the following:

enter image description here

My concerns:

  1. Frustratingly, the SQL generated by EF Core includes two statements: a MERGE, and then a SELECT. I do not understand the purpose of the SELECT given the identities of the primary key are available from the @inserted0 table variable. Given this answer, the MERGE statement in isolation would be sufficient enough to make this atomic.

I believe it is this SELECT which is causing the transaction deadlock.

  1. I tried to resolve the problem by using READ COMMITTED SNAPSHOT to avoid the conflict with the primary key lookup, however I still got the same error even though this isolation level should avoid locks and use row versioning instead.

My attempt at solving the problem:

The only way I could find to solve this problem was to explicitly prevent a transaction being started by EF Core, hence the following code:

ctx.Database.AutoTransactionsEnabled = false;

I have tested this numerous times and haven't received a transaction deadlock. Given the logic is merely inserting new records I believe this can be done.

Does anyone have any advice to fixing this problem?

Thanks for your time.

user978139
  • 579
  • 4
  • 16
  • Why Just do not insert to the same table by different threads? – Svyatoslav Danyliv Aug 09 '21 at 16:06
  • Because EF core has automatically translated this into a MERGE not an INSERT. A MERGE of batches of 40 records is faster than 40 individual INSERT statements. Unless there is an option in EF Core to bulk insert using an individual sql statement?? – user978139 Aug 09 '21 at 16:49
  • I know why it translates to MERGE. Trying to figure out how to avoid that. Which kind of data do you insert? Maybe you need just `BulkCopy` via available extensions? – Svyatoslav Danyliv Aug 09 '21 at 16:52

1 Answers1

0

We had the same issues with INSERT (MERGE) statements on multiple threads. We didn't want to enable the EnableRetryOnFailure() option for all transactions, so we wrote the following DbContent extension method.

  public static async Task<TResult> SaveWithRetryAsync<TResult>(this DbContext context,
                                                                  Func<Task<TResult>> bulkInsertOperation,
                                                                  Func<TResult, Task<bool>> verifyBulkOperationSucceeded,
                                                                  IsolationLevel isolationLevel = IsolationLevel.Unspecified,
                                                                  int retryLimit = 6,
                                                                  int maxRetryDelayInSeconds = 30)
    {
        var existingTransaction = context.Database.CurrentTransaction?.GetDbTransaction();
        if (existingTransaction != null)
            throw new InvalidOperationException($"Cannot run {nameof(SaveWithRetryAsync)} inside a transaction");

        if (context.ChangeTracker.HasChanges())
        {
            throw new InvalidOperationException(
                "DbContext should be saved before running this action to revert only the changes of this action in case of a concurrency conflict.");
        }

        const int sqlErrorNrOnDuplicatePrimaryKey = 2627;
        const int sqlErrorNrOnSnapshotIsolation = 3960;
        const int sqlErrorDeadlock = 1205;
        int[] sqlErrorsToRetry = { sqlErrorNrOnDuplicatePrimaryKey, sqlErrorNrOnSnapshotIsolation, sqlErrorDeadlock };

        var retryState = new SaveWithRetryState<TResult>(bulkInsertOperation);

        // Use EF Cores connection resiliency feature for retrying (see https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency)
        // Usually the IExecutionStrategy is configured DbContextOptionsBuilder.UseSqlServer(..., options.EnableRetryOnFailure()).
        // In ASP.NET, the DbContext is configured in Startup.cs and we don't want this retry behaviour everywhere for each db operation.
        var executionStrategyDependencies = context.Database.GetService<ExecutionStrategyDependencies>();
        var retryStrategy = new CustomSqlServerRetryingExecutionStrategy(executionStrategyDependencies, retryLimit, TimeSpan.FromSeconds(maxRetryDelayInSeconds), sqlErrorsToRetry);

        try
        {
            var result = await retryStrategy.ExecuteInTransactionAsync(
                retryState,
                async (state, cancelToken) =>
                {
                    try
                    {
                        var r = await state.Action();

                        await context.SaveChangesAsync(false, cancelToken);

                        if (state.FirstException != null)
                        {
                            Log.Logger.Warning(
                                $"Action passed to {nameof(SaveWithRetryAsync)} failed {state.NumberOfRetries} times " +
                                $"(retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\nFirst exception was: {state.FirstException}");
                        }

                        state.Result = r;
                        return r;
                    }
                    catch (Exception ex)
                    {
                        context.RevertChanges();
                        state.NumberOfRetries++;
                        state.FirstException ??= ex;
                        state.LastException = ex;

                        throw;
                    }
                },
                (state, cancelToken) => verifyBulkOperationSucceeded(retryState.Result),
                context.GetSupportedIsolationLevel(isolationLevel));

            context.ChangeTracker.AcceptAllChanges();
            return result;
        }
        catch (Exception ex)
        {
            throw new InvalidOperationException(
                $"DB Transaction in {nameof(SaveWithRetryAsync)} failed. " +
                $"Tried {retryState.NumberOfRetries} times (retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\n" +
                $"First exception was: {retryState.FirstException}.\nLast exception was: {retryState.LastException}",
                ex);
        }
    }

With the following CustomSqlServerRetryingExecutionStrategy

 public class CustomSqlServerRetryingExecutionStrategy : SqlServerRetryingExecutionStrategy
{
    public CustomSqlServerRetryingExecutionStrategy(ExecutionStrategyDependencies executionStrategyDependencies, int retryLimit, TimeSpan fromSeconds, int[] sqlErrorsToRetry)
        : base(executionStrategyDependencies, retryLimit, fromSeconds, sqlErrorsToRetry)
    {
    }

    protected override bool ShouldRetryOn(Exception exception)
    {
        //SqlServerRetryingExecutionStrategy does not check the base exception, maybe a bug in EF core ?!
        return base.ShouldRetryOn(exception) || base.ShouldRetryOn(exception.GetBaseException());
    }
}

Helper class to save the current (retry) state:

private class SaveWithRetryState<T>
    {
        public SaveWithRetryState(Func<Task<T>> action)
        {
            Action = action;
        }

        public Exception FirstException { get; set; }
        public Exception LastException { get; set; }
        public int NumberOfRetries { get; set; }
        public Func<Task<T>> Action { get; }
        public T Result { get; set; }
    }

Now, the extension method can be used as follow. The code will try to add the bulk multiple times (5).

 await _context.SaveWithRetryAsync(
            // method to insert the bulk
            async () =>
            {
                var listOfAddedItems = new List<string>();
                foreach (var item in bulkImport)
                {
                    listOfAddedItems.Add(item.Guid);
                    await context.Import.AddAsync(item);
                }

                return listOfAddedItems;
            },
            // method to check if the bulk insert was successful
            listOfAddedItems =>
            {

                if (listOfAddedItems == null)
                    return Task.FromResult(true);
                return _context.Import.AsNoTracking().AnyAsync(x => x.Guid == listOfAddedItems.First());
            },
            IsolationLevel.Snapshot,
            5, // max retry attempts
            100); // max retry time

For background information why this can happen, have a look at this discussion: https://github.com/dotnet/efcore/issues/21899

Stef Chäser
  • 1,911
  • 18
  • 26