0

For some insane reason, I'm getting an OutOfMemoryException while steaming data to SQL in sensible chunks, and barely using any memory at all:

System.OutOfMemoryException: Exception of type 'System.OutOfMemoryException' was thrown.
   at System.Data.DataTable.NewRowArray(Int32 size)
   at System.Data.RecordManager.GrowRecordCapacity()
   at System.Data.RecordManager.NewRecordBase()
   at System.Data.DataTable.NewRecord(Int32 sourceRecord)
   at Company.PA.Data.PADbContext.d__22`1.MoveNext() in D:\Agent_A\_work\7\s\Company.PA.DataLayer\Company.PA.Data\BulkInsert\StreamedSqlBulkCopy.cs:line 46

The error occurs when calling dataTable.NewRow() inside the while loop below, once I get past about the 30 millionth row:

/// <summary>Helper to stream a large number of records into SQL without
/// ever having to materialize the entire enumerable into memory at once.</summary>
/// <param name="destinationTableName">The name of the table in the database to copy data to.</param>
/// <param name="dataTable">A new instance of the DataTable class that matches the schema of the table to insert to.
/// This should match exactly (same column names) what is in SQL, for automatic column mapping to work.</param>
/// <param name="sourceData">The enumerable of data that will be used to generate DataRows</param>
/// <param name="populateRow">A delegate function that populates and returns a new data row for a given record.</param>
/// <param name="memoryBatchSize">The number of DataRows to generate in memory before passing them to SqlBulkCopy</param>
/// <param name="insertBatchSize">The batch size of inserts performed by SqlBulkCopy utility.</param>
public async Task StreamedSqlBulkCopy<T>(
    string destinationTableName, DataTable dataTable,
    IEnumerable<T> sourceData, Func<T, DataRow, DataRow> populateRow,
    int memoryBatchSize = 1000000, int insertBatchSize = 5000)
{
    using (SqlConnection connection = new SqlConnection(Database.Connection.ConnectionString))
    {
        connection.Open();
        using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
        using (IEnumerator<T> enumerator = sourceData.GetEnumerator())
        {
            // Configure the single SqlBulkCopy instance that will be used to copy all "batches"
            bulkCopy.DestinationTableName = destinationTableName;
            bulkCopy.BatchSize = insertBatchSize;
            bulkCopy.BulkCopyTimeout = _bulkInsertTimeOut;
            foreach (DataColumn column in dataTable.Columns)
                bulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
            // Begin enumerating over all records, preparing batches no larger than "memoryBatchSize"
            bool hasNext = true;
            while (hasNext)
            {
                DataRow[] batch = new DataRow[memoryBatchSize];
                int filled = 0;
                while ((hasNext = enumerator.MoveNext()) && filled < memoryBatchSize)
                    batch[filled++] = populateRow(enumerator.Current, dataTable.NewRow());
                // When we reach the end of the enumerable, we need to shrink the final buffer array
                if (filled < memoryBatchSize)
                    Array.Resize(ref batch, filled);
                await bulkCopy.WriteToServerAsync(batch);
            }
        }
    }
}

As is hopefully clear, the purpose of the above helper is to stream a (very large) IEnumerable<T> of data to an SQL table using the SqlBulkCopy reader and a delegate that will fill a row for a given element.

Sample usage is:

public async Task SaveExchangeRates(List<FxRate> fxRates)
{
    var createDate = DateTimeOffset.UtcNow;
    await StreamedSqlBulkCopy("RefData.ExchangeRate",
        GetExchangeRateDataTable(), fxRates, (fx, newRow) =>
        {
            newRow["BaseCurrency"] = "USD";
            newRow["TargetCurrency"] = fx.CurrencyCode;
            newRow["ExchangeDate"] = fx.ExchangeRateDate;
            newRow["DollarValue"] = fx.ValueInUsd;
            return newRow;
        });
}

private DataTable GetExchangeRateDataTable()
{
    var dataTable = new DataTable();
    dataTable.Columns.Add("ExchangeDate", typeof(DateTime));
    dataTable.Columns.Add("BaseCurrency", typeof(string));
    dataTable.Columns.Add("TargetCurrency", typeof(string));
    dataTable.Columns.Add("DollarValue", typeof(double));
    return dataTable;
}
Alain
  • 26,663
  • 20
  • 114
  • 184

2 Answers2

0

It turns out that even if you are only using a DataTable instance as an empty structure for schema purposes, and even if you are never calling dataTable.Rows.Add() to actually add it to the table, inside it it incrementing a counter every time you call NewRow and apparently even growing a placeholder array where it expects you to eventually insert all these rows?

In any event, the workaround is to periodically "reset" your template by overwriting it with a clone of itself:

dataTable = dataTable.Clone();

Inelegant to be sure, but easier than trying to implement your own IDataReader which is the only other way to leverage SQLBulkCopy. (That said - to anyone else trying to stream to SQL Bulk Copy who isn't constrained to avoid 3rd party libraries as I am, check out Marc Gravel's ObjectReader from the FastMember package and this answer: https://stackoverflow.com/a/47208127/529618)

Alain
  • 26,663
  • 20
  • 114
  • 184
0

Another approach, simplified (but at the cost of additional overhead) is to accept our fate and use the DataTable class rather than an array of DataRow - but create Clone() copies of the original table periodically to avoid the apparent hard-max limit of 16,777,216 rows.

I didn't appreciate that DataTable maintains an array for all the rows you create with it, even if they don't end up getting added - so we might as well take advantage rather than allocate our own.

Some of the overhead with using a DataTable can be offset by setting its initial capacity to ensure it doesn't grow (memory allocation) and disabling as many events as possible:

Relevant change below:

bool hasNext = true;
while (hasNext)
{
    using (DataTable tableChunk = dataTable.Clone())
    {
        tableChunk.MinimumCapacity = memoryBatchSize + 1; // Avoid triggering resizing
        tableChunk.BeginLoadData(); // Speeds up inserting a large volume of rows a little
        int filled = 0;
        while ((hasNext = enumerator.MoveNext()) && filled++ < memoryBatchSize)
            tableChunk.Rows.Add(populateRow(enumerator.Current, tableChunk.NewRow()));
        await bulkCopy.WriteToServerAsync(tableChunk);
    }
}
Alain
  • 26,663
  • 20
  • 114
  • 184