9

I am reading and processing very large amounts of Sql Server data (10’s of millions+ of rows in, 100’s of millions+ rows out). The processing performed on each source row is significant. A single threaded version is not performing to expectations. My current parallel processing version is performing very well on some smaller batches (300,000 source rows, 1M output rows), but I am running into some Out Of Memory exceptions for very large runs.

The code was significantly inspired by the answers provided here: Is there a way to use the Task Parallel Library(TPL) with SQLDataReader?

Here is the general idea:

Get the source data (data is too large to read into memory, so we will “stream” it)

public static IEnumerable<MyObject> ReadData()
{
    using (SqlConnection con = new SqlConnection(Settings.ConnectionString)) 
       using (SqlCommand cmd = new SqlCommand(selectionSql, con))
       {
            con.Open();
            using (SqlDataReader dr = cmd.ExecuteReader(CommandBehavior.CloseConnection))
            {
            while (dr.Read())
            {
                // make some decisions here – 1 to n source rows are used
                // to create an instance of MyObject
                yield return new MyObject(some parameters);
            }
        }
    }
}

Once we get to the point of parallel processing, we want to use the SqlBulkCopy object to write the data. Because of this, we don’t want to process individual MyObjects in parallel as we want to perform a bulk copy per thread. Because of this, we’ll read from above with another IEnumerable that returns a “batch” of MyObjects

class MyObjectBatch 
{
    public List<MyObject> Items { get; set; }

    public MyObjectBatch (List<MyObject> items)
    {
        this.Items = items;
    }

    public static IEnumerable<MyObjectBatch> Read(int batchSize)
    {
        List<MyObject> items = new List<MyObjectBatch>();
        foreach (MyObject o in DataAccessLayer.ReadData())
        {
            items.Add(o);
            if (items.Count >= batchSize)
            {
                yield return new MyObjectBatch(items);                    
                items = new List<MyObject>(); // reset
            }
        }
        if (items.Count > 0) yield return new MyObjectBatch(items);            
    }
}

Finally, we get to the point of parallel processing the “batches”

ObjectProcessor processor = new ObjectProcessor();

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Settings.MaxThreads };
Parallel.ForEach(MyObjectBatch.Read(Settings.BatchSize), options, batch =>
{
    // Create a container for data processed by this thread
    // the container implements IDataReader
    ProcessedData targetData = new ProcessedData(some params));

    // process the batch… for each MyObject in MyObjectBatch – 
    // results are collected in targetData
    for (int index = 0; index < batch.Items.Count; index++) 
    {
        processor.Process(batch.Item[index], targetData);
    }

    // bulk copy the data – this creates a SqlBulkCopy instance
    // and loads the data to the target table
    DataAccessLayer.BulkCopyData(targetData);

    // explicitly set the batch and targetData to null to try to free resources

});

Everything above has been significantly simplified, but I believe it includes all of the important concepts. Here is the behavior I am seeing:

Performance is very good – for reasonable sized data sets, I am getting very good results.

However, as it processes, the memory consumed continues to grow. For larger data sets, this leads to exceptions.

I have proved through logging, that if I slow down the reads from the database, it slows down the batch reads and subsequently, the parallel threads being created (especially if I set the MaxDegreeOfParallelization). I was concerned that I was reading faster than I could process, but if I limit the threads, it should only read what each thread can handle.

Smaller or larger batch sizes have some effect on performance, but the amount of memory used grows consistently with the size of the batch.

Where is there an opportunity to recover some memory here? As my “batches” go out of scope, should that memory be recovered? Is there something I could be doing at the first two layers that would help free some resources?

To answer some questions: 1. Could it be done purely in SQL - no, the processing logic is very complex (and dynamic). Generally speaking, it is doing low-level binary decoding. 2. We have tried SSIS (with some success). The issue is that the definition of the source data as well as the output is very dynamic. SSIS seems to require very strict input and output column definition which won't work in this case.

Someone also asked about the ProcessedData object - this is actually fairly simple:

class ProcessedData : IDataReader 
{
    private int _currentIndex = -1;
    private string[] _fieldNames { get; set; }

    public string TechnicalTableName { get; set; }        
    public List<object[]> Values { get; set; }

    public ProcessedData(string schemaName, string tableName, string[] fieldNames)
    {            
        this.TechnicalTableName = "[" + schemaName + "].[" + tableName + "]";
        _fieldNames = fieldNames;            
        this.Values = new List<object[]>();
    }

    #region IDataReader Implementation

    public int FieldCount
    {
        get { return _fieldNames.Length; }
    }

    public string GetName(int i)
    {
        return _fieldNames[i];
    }

    public int GetOrdinal(string name)
    {
        int index = -1;
        for (int i = 0; i < _fieldNames.Length; i++)
        {
            if (_fieldNames[i] == name)
            {
                index = i;
                break;
            }
        }
        return index;
    }

    public object GetValue(int i)
    {
        if (i > (Values[_currentIndex].Length- 1))
        {
            return null;
        }
        else
        {
            return Values[_currentIndex][i];
        }
    }

    public bool Read()
    {
        if ((_currentIndex + 1) < Values.Count)
        {
            _currentIndex++;
            return true;
        }
        else
        {
            return false;
        }
    }

    // Other IDataReader things not used by SqlBulkCopy not implemented
}

UPDATE and CONCLUSION:

I received a great deal of valuable input, but wanted to summarize it all into a single conclusion. First, my main question was if there was anything else I could do (with the code I posted) to aggressively reclaim memory. The consensus seems to be that the approach is correct, but that my particular problem is not entirely bound by CPU, so a simple Parallel.ForEach will not manage the processing correctly.

Thanks to usr for his debugging suggestion and his very interesting PLINQ suggestion. Thanks to zmbq for helping clarify what was and wasn't happening.

Finally, anyone else who may be chasing a similar issue will likely find the following discussions helpful:

Parallel.ForEach can cause a "Out Of Memory" exception if working with a enumerable with a large object

Parallel Operation Batching

Community
  • 1
  • 1
snow_FFFFFF
  • 3,235
  • 17
  • 29
  • 1
    What is the calculation that is being done? The reason I ask is that there may be a way you might be able to do it completely in SQL, that would solve your memory issues as SQL would optimise your code. I know that is not what the question is asking however. – Michael Coxon Mar 08 '14 at 18:26
  • 1
    In addition to what Michael mentions, Another option would be to use Sql Server Integration Services, which is really designed to do this sort of thing. You could fire off an SSIS job from within your application if need be. – Erik Funkenbusch Mar 08 '14 at 18:31
  • Few things, do you need on demand data? If not look into SSAS. Another thing, go low level if you want to do things like these. Another thing, ignore built in extension methods and libraries. No one can do some much generalization and be perfect for highly improbable requirements. – danish Mar 08 '14 at 18:45
  • 1
    The biggest problem here is that it's not real code. More than likely, you've elided something that is key to the problem. For instance, since we don't know what happens to the returned data in ProcessedData after it gets passed to processor, we don't know if it's retaining references to the data, and thus causing your problem. – Erik Funkenbusch Mar 08 '14 at 18:49
  • @ErikTheViking - I appreciate your comment (and understand your point). Though, it isn't reasonable to show all of the "real" code. One point of clarification is that ObjectProcessor.Process() keeps everything local, so, as the processing of an object completes, everything internal to it should be out of scope. So, in your opinion, what you can see should be cleaning up after itself? – snow_FFFFFF Mar 08 '14 at 19:05

2 Answers2

9

I do not fully understand how Parallel.ForEach is pulling items, but I think by default it pulls more than one to save locking overhead. This means that multiple items might be queued internally inside of Parallel.ForEach. This might cause OOM quickly because your items are very big individually.

You could try giving it a Partitioner that returns single items.

If that does not help, we need to dig deeper. Debugging memory issues with Parallel and PLINQ is nasty. There was in bug in one of those, for example, that caused old items not to be released quickly.

As a workaround, you could clear the list after processing. That will at least allow all items to be reclaimed deterministically after processing has been done.

Regarding the code you posted: It is clean, of high quality and you are adhering to high standards of resource management. I would not suspect a gross memory or resource leak on your part. It is still not impossible. You can test this by commenting out the code inside of the Parallel.ForEach and replacing it with a Thread.Sleep(1000 * 60). If the leak persists, you are not at fault.

In my experience, PLINQ is easier to get an exact degree of parallelism with (because the current version uses the exact DOP you specify, never less never more). Like this:

GetRows()
.AsBatches(10000)    
.AsParallel().WithDegreeOfParallelism(8)
.Select(TransformItems) //generate rows to write
.AsEnumerable() //leave PLINQ
.SelectMany(x => x) //flatten batches
.AsBatches(1000000) //create new batches with different size
.AsParallel().WithDegreeOfParallelism(2) //PLINQ with different DOP
.ForEach(WriteBatchToDB); //write to DB

This would give you a simple pipeline that pulls from the DB, does CPU-bound work with a specific DOP optimized for the CPU, and writes to the database with much bigger batches and less DOP.

This is quite simple and it should max out CPUs and disks independently with their respective DOP. Play with the DOP numbers.

usr
  • 168,620
  • 35
  • 240
  • 369
  • while I didn't have the patience for Thread.Sleep(1000 * 60), I did what you mentioned for 15 seconds (which was a good approximation of a given thread). The memory used did grow as it ran, but it seemed a lot more reasonable. I'm not sure I've learned too much, but this is an excellent tip to help me debug a little deeper - thanks. – snow_FFFFFF Mar 08 '14 at 19:39
  • 1
    I just added something that might help you as well. You said that a bulk copy size of 5k is optimal for you. That is unlikely. You should probably have independent DOPs for CPU work and bulk copy work. Then you should see better throughput with very big batches (like 100k). – usr Mar 08 '14 at 19:41
  • the PLINQ example looks very interesting. Of course, I'm not following it completely. I assume the GetRows() is my IEnumerable function. But, I don't seem to have the choice for .AsBatches(x) - I can start off with the .AsParallel() and do some of what you have. My main question is how to design the TransFormItems and WriteBatchToDB methods...and, how do they share data? – snow_FFFFFF Mar 08 '14 at 20:18
  • GetRows is your ReadData. AsBatches is your custom batching logic. TransformItems is some computation that you probably want to do before you insert. WriteBatchToDB is the bulk copy stuff. These are just placeholders for you to fill arbitrarily. – usr Mar 08 '14 at 21:40
1

You're keeping two things in memory - your input data and your output data. You've tried to read and process that data in parallel, but you're not reducing the overall memory footprint - you still end up keeping most the data in memory - the more threads you have, the more data you keep in memory.

I guess most of the memory is taken by your output data, as you create 10 times more output records than input records. So you have a few (10? 30? 50) SqlBulkCopy operations.

That is actually too much. You can gain a lot of speed by writing 100,000 records in bulk. What you should do is split your work - read 10,000-20,000 records, create the output records, SqlBulkCopy to the database, and repeat. Your memory consumption will drop considerably.

You can, of course, do that in parallel - handle several 10,000 record batches in parallel.

Just keep in mind that Parallel.ForEach and the thread-pool in general is meant to optimize CPU usage. Chances are what limits you is I/O on the database server. While databases can handle concurrency quite well, their limit doesn't depend on the number of cores on your client machine, so you'd better play with the number of concurrent threads and see what's fastest.

zmbq
  • 38,013
  • 14
  • 101
  • 171
  • The MyObjectBatch object is doing what you mention. I have tried different sizes of batches and thread counts to try to find the sweet spot. What seems to work best are bulk copies of about 5000 (any larger seems to drag it down). So, big question, do you see a way I can release the input and/or output data as I finish with it? I was hoping it would just go out of scope and get cleaned up, but that doesn't seem to be happening. – snow_FFFFFF Mar 08 '14 at 19:29
  • The garbage collector should take care of that for you. It will only work when it needs to, so you'll see the memory consumption go up and up, and then suddenly go down. You shouldn't see OutOfMemoryExceptions. Do you see those even when processing 5000 records at a time? – zmbq Mar 08 '14 at 19:44
  • I do see occasionally backing off of the memory consumption, but overall it continues to step up. So, for a smaller batch, it completes and cleans-up (before OOM). However, a large batch will always OOM even when limiting threads or batch size. – snow_FFFFFF Mar 08 '14 at 20:00