I am reading and processing very large amounts of Sql Server data (10’s of millions+ of rows in, 100’s of millions+ rows out). The processing performed on each source row is significant. A single threaded version is not performing to expectations. My current parallel processing version is performing very well on some smaller batches (300,000 source rows, 1M output rows), but I am running into some Out Of Memory exceptions for very large runs.
The code was significantly inspired by the answers provided here: Is there a way to use the Task Parallel Library(TPL) with SQLDataReader?
Here is the general idea:
Get the source data (data is too large to read into memory, so we will “stream” it)
public static IEnumerable<MyObject> ReadData()
{
using (SqlConnection con = new SqlConnection(Settings.ConnectionString))
using (SqlCommand cmd = new SqlCommand(selectionSql, con))
{
con.Open();
using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.CloseConnection))
{
while (dr.Read())
{
// make some decisions here – 1 to n source rows are used
// to create an instance of MyObject
yield return new MyObject(some parameters);
}
}
}
}
Once we get to the point of parallel processing, we want to use the SqlBulkCopy object to write the data. Because of this, we don’t want to process individual MyObjects in parallel as we want to perform a bulk copy per thread. Because of this, we’ll read from above with another IEnumerable that returns a “batch” of MyObjects
class MyObjectBatch
{
public List<MyObject> Items { get; set; }
public MyObjectBatch (List<MyObject> items)
{
this.Items = items;
}
public static IEnumerable<MyObjectBatch> Read(int batchSize)
{
List<MyObject> items = new List<MyObjectBatch>();
foreach (MyObject o in DataAccessLayer.ReadData())
{
items.Add(o);
if (items.Count >= batchSize)
{
yield return new MyObjectBatch(items);
items = new List<MyObject>(); // reset
}
}
if (items.Count > 0) yield return new MyObjectBatch(items);
}
}
Finally, we get to the point of parallel processing the “batches”
ObjectProcessor processor = new ObjectProcessor();
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Settings.MaxThreads };
Parallel.ForEach(MyObjectBatch.Read(Settings.BatchSize), options, batch =>
{
// Create a container for data processed by this thread
// the container implements IDataReader
ProcessedData targetData = new ProcessedData(some params));
// process the batch… for each MyObject in MyObjectBatch –
// results are collected in targetData
for (int index = 0; index < batch.Items.Count; index++)
{
processor.Process(batch.Item[index], targetData);
}
// bulk copy the data – this creates a SqlBulkCopy instance
// and loads the data to the target table
DataAccessLayer.BulkCopyData(targetData);
// explicitly set the batch and targetData to null to try to free resources
});
Everything above has been significantly simplified, but I believe it includes all of the important concepts. Here is the behavior I am seeing:
Performance is very good – for reasonable sized data sets, I am getting very good results.
However, as it processes, the memory consumed continues to grow. For larger data sets, this leads to exceptions.
I have proved through logging, that if I slow down the reads from the database, it slows down the batch reads and subsequently, the parallel threads being created (especially if I set the MaxDegreeOfParallelization). I was concerned that I was reading faster than I could process, but if I limit the threads, it should only read what each thread can handle.
Smaller or larger batch sizes have some effect on performance, but the amount of memory used grows consistently with the size of the batch.
Where is there an opportunity to recover some memory here? As my “batches” go out of scope, should that memory be recovered? Is there something I could be doing at the first two layers that would help free some resources?
To answer some questions: 1. Could it be done purely in SQL - no, the processing logic is very complex (and dynamic). Generally speaking, it is doing low-level binary decoding. 2. We have tried SSIS (with some success). The issue is that the definition of the source data as well as the output is very dynamic. SSIS seems to require very strict input and output column definition which won't work in this case.
Someone also asked about the ProcessedData object - this is actually fairly simple:
class ProcessedData : IDataReader
{
private int _currentIndex = -1;
private string[] _fieldNames { get; set; }
public string TechnicalTableName { get; set; }
public List<object[]> Values { get; set; }
public ProcessedData(string schemaName, string tableName, string[] fieldNames)
{
this.TechnicalTableName = "[" + schemaName + "].[" + tableName + "]";
_fieldNames = fieldNames;
this.Values = new List<object[]>();
}
#region IDataReader Implementation
public int FieldCount
{
get { return _fieldNames.Length; }
}
public string GetName(int i)
{
return _fieldNames[i];
}
public int GetOrdinal(string name)
{
int index = -1;
for (int i = 0; i < _fieldNames.Length; i++)
{
if (_fieldNames[i] == name)
{
index = i;
break;
}
}
return index;
}
public object GetValue(int i)
{
if (i > (Values[_currentIndex].Length- 1))
{
return null;
}
else
{
return Values[_currentIndex][i];
}
}
public bool Read()
{
if ((_currentIndex + 1) < Values.Count)
{
_currentIndex++;
return true;
}
else
{
return false;
}
}
// Other IDataReader things not used by SqlBulkCopy not implemented
}
UPDATE and CONCLUSION:
I received a great deal of valuable input, but wanted to summarize it all into a single conclusion. First, my main question was if there was anything else I could do (with the code I posted) to aggressively reclaim memory. The consensus seems to be that the approach is correct, but that my particular problem is not entirely bound by CPU, so a simple Parallel.ForEach will not manage the processing correctly.
Thanks to usr for his debugging suggestion and his very interesting PLINQ suggestion. Thanks to zmbq for helping clarify what was and wasn't happening.
Finally, anyone else who may be chasing a similar issue will likely find the following discussions helpful: