0

So here is the scenario:

I have to take a group of data, process it, build an object and then insert those objects into a database.

In order to increase performance, I am multi-threading the processing of the data using a parallel loop and storing the objects in a CollectionBag list.

That part works fine. However, the issue here is I now need to take that list, convert it into a DataTable object and insert the data into the database. It's very ugly and I feel like I'm not doing this in the best way possible (pseudo below):

ConcurrentBag<FinalObject> bag = new ConcurrentBag<FinalObject>();

ParallelOptions parallelOptions = new ParallelOptions();
parallelOptions.MaxDegreeOfParallelism = Environment.ProcessorCount;

Parallel.ForEach(allData, parallelOptions, dataObj =>
{   
    .... Process data ....

    bag.Add(theData);

    Thread.Sleep(100);
});

DataTable table = createTable();
foreach(FinalObject moveObj in bag) {
    table.Rows.Add(moveObj.x);
}
BWA
  • 5,672
  • 7
  • 34
  • 45
user2124871
  • 803
  • 3
  • 9
  • 25
  • 1
    You can do the conversion of FinalObject to DataRow also within the parallel loop, to add some more performance, making bag as Concurrent. – Nemo Mar 23 '16 at 20:52
  • So you're just adding one property of the underlying objects to the data table? If you already have the objects in a collection why do you need a data table? Why not just populate the data table in the first place? – D Stanley Mar 23 '16 at 20:52
  • I simplified it for this example - The datatables I use (I use 9 of them) range from 7 columns to 13 columns – user2124871 Mar 23 '16 at 20:54
  • Nemo - Basically have a concurrent bag of DataRows and add them all at the end? – user2124871 Mar 23 '16 at 20:55
  • 2
    Why would you have a `Thread.Sleep` inside your `Parallel.ForEach` body? – Kirill Shlenskiy Mar 23 '16 at 20:56
  • @user2124871 Yes, also maybe you can keep separate bags for each thread so you don't have contention and after all items are processed, loop through each bag and add them all to the DataTable. – Nemo Mar 23 '16 at 20:58
  • Due to the amount of processing in that parallel loop, I add a Thread.Sleep because the CPU will get to 20-30% without it. WIth it, it's at 3-5%. I don't want it there.. – user2124871 Mar 23 '16 at 21:00
  • @user2124871 So you're using parallel but you want _low_ CPU usage? The reason you _use_ parallel processing is to _increase_ CPU utilization. – D Stanley Mar 23 '16 at 21:33
  • Why are you setting `MaxDegreeOfParallelism = Environment.ProcessorCount`? There is [evidence](http://stackoverflow.com/questions/20806238) that this actually _increases_ total time by limiting the number of threads that can be used. – D Stanley Mar 23 '16 at 21:35
  • @DStanley, if `Thread.Sleep` is used inside the `Parallel.ForEach` delegate body (it really shouldn't be but I digress), it will lead to thread pool starvation unless `MaxDegreeOfParallelism` is limited (see http://stackoverflow.com/a/14039106/1644813) – Kirill Shlenskiy Mar 24 '16 at 00:07

3 Answers3

1

This is a good candidate for PLINQ (or Rx - I'll focus on PLINQ since it's part of the Base Class Library).

IEnumerable<FinalObject> bag = allData
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .Select(dataObj =>
    {
        FinalObject theData = Process(dataObj);

        Thread.Sleep(100);

        return theData;
    });

DataTable table = createTable();

foreach (FinalObject moveObj in bag)
{
    table.Rows.Add(moveObj.x);
}

Realistically, instead of throttling the loop via Thread.Sleep, you should be limiting the maximum degree of parallelism further until you get the CPU usage down to the desired level.

Disclaimer: all of the below is meant for entertainment only, although it does actually work.

Of course you can always kick it up a notch and produce a full-on async Parallel.ForEach implementation that allows you to process input in parallel and do your throttling asynchronously, without blocking any thread pool threads.

async Task ParallelForEachAsync<TInput, TResult>(IEnumerable<TInput> input,
                                                 int maxDegreeOfParallelism,
                                                 Func<TInput, Task<TResult>> body,
                                                 Action<TResult> onCompleted)
{
    Queue<TInput> queue = new Queue<TInput>(input);

    if (queue.Count == 0) {
        return;
    }

    List<Task<TResult>> tasksInFlight = new List<Task<TResult>>(maxDegreeOfParallelism);

    do
    {
        while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
        {
            TInput item = queue.Dequeue();
            Task<TResult> task = body(item);

            tasksInFlight.Add(task);
        }

        Task<TResult> completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);

        tasksInFlight.Remove(completedTask);

        TResult result = completedTask.GetAwaiter().GetResult(); // We know the task has completed. No need for await.

        onCompleted(result);
    }
    while (queue.Count != 0 || tasksInFlight.Count != 0);
}

Usage (full Fiddle here):

async Task<DataTable> ProcessAllAsync(IEnumerable<InputObject> allData)
{
    DataTable table = CreateTable();
    int maxDegreeOfParallelism = Environment.ProcessorCount;

    await ParallelForEachAsync(
        allData,
        maxDegreeOfParallelism,
        // Loop body: these Tasks will run in parallel, up to {maxDegreeOfParallelism} at any given time.
        async dataObj =>
        {
            FinalObject o = await Task.Run(() => Process(dataObj)).ConfigureAwait(false); // Thread pool processing.

            await Task.Delay(100).ConfigureAwait(false); // Artificial throttling.

            return o;
        },
        // Completion handler: these will be executed one at a time, and can safely mutate shared state.
        moveObj => table.Rows.Add(moveObj.x)
    );

    return table;
}

struct InputObject
{
    public int x;
}

struct FinalObject
{
    public int x;
}

FinalObject Process(InputObject o)
{
    // Simulate synchronous work.
    Thread.Sleep(100);

    return new FinalObject { x = o.x };
}

Same behaviour, but without Thread.Sleep and ConcurrentBag<T>.

Kirill Shlenskiy
  • 9,367
  • 27
  • 39
  • Thank you for the suggestion - I took the time to try it out and see what the end result would give me. It was about 2 seconds quicker, so it didn't do too much. However, I am looking at some of your other suggestions to see what I can get from them. Thank you! – user2124871 Mar 24 '16 at 16:42
  • @user2124871, if "quicker" is what you're after, then you definitely don't want `Thread.Sleep`, `Task.Delay` or any other kind of artificial delay. Whether you go with your `Parallel.ForEach` solution or my PLINQ version won't really make much of a difference other than in terms of code style. You should profile the processing component of your solution (*"process data"*) and optimise it for the sake of reducing CPU usage. – Kirill Shlenskiy Mar 24 '16 at 22:50
1

Sounds like you've complicated things quite a bit by tring to make everything run in parallel, but if you store DataRow obejcts in your bag instead of plain objects, at the end you can use DataTableExtensions to create a DataTable from a generic collection quite easily:

var dataTable = bag.CopyToDataTable();

Just add a reference to System.Data.DataSetExtensions in your project.

D Stanley
  • 149,601
  • 11
  • 178
  • 240
  • 1
    `CopyToDataTable` has a generic constraint of `T : DataRow`, so it only appears suitable for a small subset of generic collections (leading to workarounds like this: https://msdn.microsoft.com/en-us/library/bb669096(v=vs.110).aspx). Am I missing something? – Kirill Shlenskiy Mar 23 '16 at 23:45
  • @KirillShlenskiy That's why I said " if you store `DataRow` objects in your bag instead of plain objects". I'm still confused as to why the OP is using all of that complexity to create a collection in parallel, only to turn around and convert it to a `DataTable` serially. – D Stanley Mar 24 '16 at 12:51
  • The reason for the collection in parallel is the amount of processing that happens in the loop. Doing it sequentially is very time consuming and this allows me to move much faster. The datatable is used to insert all of the records I need to put in the DB. Since it's not a thread safe collection, I store in a thread safe collection and convert into a medium to insert into DB. That might be overkill, trying to figure it out. – user2124871 Mar 24 '16 at 13:26
  • @DStanley, apologies, I overlooked the "if you store `DataRow` objects in your bag" part. – Kirill Shlenskiy Mar 24 '16 at 13:28
  • Have you tried inserting the objects into the DB one by one rather than from a `DataTable`? Or are you using SqlBulkCopy to insert the records? – D Stanley Mar 24 '16 at 13:57
  • SqlBulkCopy - I'm actually trying both single insert and bulk copy to compare performance now. – user2124871 Mar 24 '16 at 14:31
0

I think something like this should give better performance, looks like object[] is a better option than DataRow as you need DataTable to get a DataRow object.

ConcurrentBag<object[]> bag = new ConcurrentBag<object[]>();

Parallel.ForEach(allData, 
    new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, 
    dataObj =>
{
    object[] row = new object[colCount];

    //do processing

    bag.Add(row);

    Thread.Sleep(100);
});

DataTable table = createTable();
foreach (object[] row in bag)
{
    table.Rows.Add(row);
}
Nemo
  • 3,285
  • 1
  • 28
  • 22