2

I have been rewriting some process intensive looping to use TPL to increase speed. This is the first time I have tried threading, so want to check what I am doing is the correct way to do it.

The results are good - processing the data from 1000 Rows in a DataTable has reduced processing time from 34 minutes to 9 minutes when moving from a standard foreach loop into a Parallel.ForEach loop. For this test, I removed non thread safe operations, such as writing data to a log file and incrementing a counter.

I still need to write back into a log file and increment a counter, so i tried implementing a lock which encases the streamwriter/increment code block.

FileStream filestream = new FileStream("path_to_file.txt", FileMode.Create);
StreamWriter streamwriter = new StreamWriter(filestream);
streamwriter.AutoFlush = true;

try
{
    object locker = new object();

    // Lets assume we have a DataTable containing 1000 rows of data.
    DataTable datatable_results;

    if (datatable_results.Rows.Count > 0)
    {
        int row_counter = 0;

        Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
        { 
            // Process data_row as normal.

            // When ready to write to log, do so.
            lock (locker)
            {
                row_counter++;
                streamwriter.WriteLine("Processing row: {0}", row_counter);

                // Write any data we want to log.
            }
        });                    
    }
}
catch (Exception e)
{
    // Catch the exception.
}
streamwriter.Close();

The above seems to work as expected, with minimal performance costs (still 9 minutes execution time). Granted, the actions contained in the lock are hardly significant themselves - I assume that as the time taken to process code within the lock increases, the longer the thread is locked for, the more it affects processing time.

My question: is the above an efficient way of doing this or is there a different way of achieving the above that is either faster or safer?

Also, lets say our original DataTable actually contains 30000 rows. Is there anything to be gained by splitting this DataTable into chunks of 1000 rows each and then processing them in the Parallel.ForEach, instead of processing all 300000 rows in one go?

dalemac
  • 355
  • 1
  • 4
  • 15
  • 2
    This question appears to be off-topic because it belongs on [Code Review](http://codereview.stackexchange.com) – Yuval Itzchakov Nov 17 '14 at 14:55
  • The above code will not compile or run - it's not a direct extract. I'm more concerned with the theory of multithreading and making variables thread safe. Thanks though. – dalemac Nov 17 '14 at 15:12

5 Answers5

9

Writing to the file is expensive, you're holding a exclusive lock while writing to the file, that's bad. It's going to introduce contention.

You could add it in a buffer, then write to the file all at once. That should remove contention and provide way to scale.

if (datatable_results.Rows.Count > 0)
{
    ConcurrentQueue<string> buffer = new ConcurrentQueue<string>();
    Parallel.ForEach(datatable_results.AsEnumerable(), (data_row, state, index) =>
    {
        // Process data_row as normal.

        // When ready to write to log, do so.

        buffer.Enqueue(string.Format( "Processing row: {0}", index));
    });

    streamwriter.AutoFlush = false;
    string line;
    while (buffer.TryDequeue(out line))
    {
        streamwriter.WriteLine(line);
    }
    streamwriter.Flush();//Flush once when needed
}
  1. Note that you don't need to maintain a loop counter, Parallel.ForEach provides you one. Difference is that it is not the counter but index. If I've changed the expected behavior you can still add the counter back and use Interlocked.Increment to increment it.
  2. I see that you're using streamwriter.AutoFlush = true, that will hurt performance, you can set it to false and flush it once you're done writing all the data.

If possible, wrap the StreamWriter in using statement, so that you don't even need to flush the stream(you get it for free).

Alternatively, you could look at the logging frameworks which does their job pretty well. Example: NLog, Log4net etc.

Sriram Sakthivel
  • 72,067
  • 7
  • 111
  • 189
  • But this makes the logging **really** weird. You don't log as you go, you log at the end. So if something goes wrong you don't get any info on where you got to in the logs. – RobH Nov 17 '14 at 15:17
  • @RobH I don't see what you mean, Why not wrap the logging part in finally block if you worry about it? – Sriram Sakthivel Nov 17 '14 at 15:19
  • At the moment, you could look at the logs while the operation was in progress and see it doing something. Now you have to wait for it to finish then read the log all at once. You also have to hold all of the logging info in memory which might not be scalable. – RobH Nov 17 '14 at 15:23
  • @RobH Is that a requirement to look at logs while process is in flight? Holding `30000` elements of small string in memory is not too much. – Sriram Sakthivel Nov 17 '14 at 15:26
  • 1
    If something is going on for 10 minutes and I have absolutely no idea what the hell it's doing and no way of finding out what it's doing, I'm going to be pretty terrified. – RobH Nov 17 '14 at 15:30
  • @RobH I'm pretty sure file access is the main bottle neck. Check OP used auto flush also. For each iteration of the loop, Disk has to spin, content has to be written in disk, meanwhile all other threads has to wait, and so forth. Imaging this cycle goes on for 30000 times. That should answer your question. – Sriram Sakthivel Nov 17 '14 at 15:33
  • I'm not suggesting keeping in the lock and streamwriter. I'm only saying that your code makes the logging odd in my opinion. – RobH Nov 17 '14 at 15:36
  • @RobH: if there are no *huge* numbers envolved in your calculation, logging while paralelizing, bascially, drammatically decrise performance. Basic rule is: run as fast as you can, and check result after. Naturally if calculation takes time hours/days, it's not a solution and you need to schedule some kind of logging. But pretty sure, if you avoid logging during calculus, and avoid lock in general (look on hints in my answer) you will eventually boost performance even more, so wait for result even less. Or leave logging only for debugging. – Tigran Nov 17 '14 at 15:42
  • @Tigran - Yes, I understand the factors at play here. I would create buffered logging with a largish buffer or use a library that supports it. There's no reason to sacrifice knowing something about what your application is doing. – RobH Nov 17 '14 at 15:47
  • @RobH Your comment is rather amusing. So, you wanted to slow down the actual code just for the sake of logging? Logging is a [cross cutting concern](http://en.wikipedia.org/wiki/Cross-cutting_concern) that shouldn't affect your main processing. – Sriram Sakthivel Nov 17 '14 at 15:47
  • I am absolutely not suggesting locking around a file write after every row is processed. – RobH Nov 17 '14 at 15:48
  • @RobH: hps is always about balance. if 9 minutes is ok for you, keep logging and feels safe you and your client, having an information "at runtime" (wonder if you can *do* something when you get an error "at runtime", if not what is a point?). Our point was if you are going to paralelize, cause you concern about performance, push a little bit hurder and go a lot further. – Tigran Nov 17 '14 at 16:03
  • Thanks, this answer and subsequent comments have given me a lot to things to consider. – dalemac Nov 17 '14 at 16:03
  • @Tigran - agreed. I was looking at the problem from the point of view where processing the data is very much more expensive than the logging (not true in OPs code). Your code is interesting but note that `Parallel.ForEach` can run multiple partitions on the same thread so it might not be safe to use the current thread id as the key. – RobH Nov 17 '14 at 16:10
  • After thinking about this for a while, The logs would only really be looked at if there was a problem with processed data highlighted (or an exception was thrown). They wouldn't usually be looked at while the process was still running (unless debugging, which can be split into a separate method anyway). I can see how logging is going to cause contention and the `ConcurrentQueue` data type looks interesting so I will give that a go, as well as removing the counter iteration. Log files are typically 60mb - nothing in terms of memory allocation to hold onto. Thanks all! – dalemac Nov 17 '14 at 17:01
1

You may try to improve this, if you avoid logging, or log into only thread specific log file (not sure if that makes sense to you)

TPL start as many threads as many cores you have Does Parallel.ForEach limits the number of active threads?.

So what you can do is:

1) Get numbers of core on target machine

2) Create a list of counters, with as many elements inside as many cores you have

3) Update counter for every core

4) Sum all them up after parallel execution terminates.

So, in practice :

//KEY(THREAD ID, VALUE: THREAD LOCAL COUNTER)
Dictionary<int,int> counters = new Dictionary<int, int>(NUMBER_OF_CORES); 
....
 Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
        { 
            // Process data_row as normal.

            // When ready to write to log, do so.
            //lock (locker) //NO NEED FOR LOCK, EVERY THREAD UPDATES ITS _OWN_ COUNTER
            //{
                //row_counter++;

                 counters[Thread.CurrentThread.ManagedThreadId].Value +=1;

                //NO WRITING< OR WRITING THREAD SPECIFIC FILE ONLY
                //streamwriter.WriteLine("Processing row: {0}", row_counter);


            //}
        });             
....

//AFTER EXECUTION OF PARALLEL LOOP SUM ALL COUNTERS AND GET TOTAL OF ALL THREADS. 

The benefit of this that no locking envolved at all, which will drammatically improve performance. When you use .net concurent collections, they are always use some kind of locking inside.

This is naturally a basic idea, may not work as it expected if you copy paste. We are talking about multi threading , which is always a hard topic. But, hopefully, it provides to you some ideas to relay on.

Community
  • 1
  • 1
Tigran
  • 61,654
  • 8
  • 86
  • 123
  • Alas, as my first attempt at threading, a thread specific log file doesn't make sense to me. Would you mind elaborating? – dalemac Nov 17 '14 at 15:52
  • 1
    I mean, **if** you need to log something in real time, if you avoid to write into the same file from different threads, you can avoid threading sync issue. What I was suggesting is to avoid real time logging at all, and just have final dump file about results done and calculations made. You can, at runtime, assemble strings with log data, but *write* those string into the file, only after termination of parallel loop. This is naturally, if the calculation does not involve long wait period. – Tigran Nov 17 '14 at 16:47
1

First of all, it takes about 2 seconds to process a row in your table and perhaps a few milliseconds to increment the counter and write to the log file. With the actual processing being 1000x more than the part you need to serialize, the method doesn't matter too much.

Furthermore, the way you have implemented it is perfectly solid. There are ways to optimize it, but none that are worth implementing in your situation.

One useful way to avoid locking on the increment is to use Interlocked.Increment. It is a bit slower than x++ but much faster than lock {x++;}. In your case, though, it doesn't matter.

As for the file output, remember that the output is going to be serialized anyway, so at best you can minimize the amount of time spent in the lock. You can do this by buffering all of your output before entering the lock, then just perform the write operation inside the lock. You probably want to do async writes to avoid unnecessary blocking on I/O.

Gabe
  • 84,912
  • 12
  • 139
  • 238
0

This is my code that uses a parallel for. The concept is similar, and perhaps easier for you to implement. FYI, for debugging, I keep a regular for loop in the code and conditionally compile the parallel code. Hope this helps. The value of i in this scenario isn't the same as the number of records processed, however. You could create a counter and use a lock and add values for that. For my other code where I do have a counter, I didn't use a lock and just allowed the value to be potentially off to avoid the slower code. I have a status mechanism to indicate number of records processed. For my implementation, the slight chance that the count is not an issue - at the end of the loop I put out a message saying all the records have been processed.

#if DEBUG
        for (int i = 0; i < stend.PBBIBuckets.Count; i++)
        {
            //int serverIndex = 0;
#else
        ParallelOptions options = new ParallelOptions();
        options.MaxDegreeOfParallelism = m_maxThreads;

        Parallel.For(0, stend.PBBIBuckets.Count, options, (i) =>

        {
#endif
            g1client.Message request;
            DataTable requestTable;

            request = new g1client.Message();

            requestTable = request.GetDataTable();

            requestTable.Columns.AddRange(
                Locations.Columns.Cast<DataColumn>().Select(x => new DataColumn(x.ColumnName, x.DataType)).ToArray
                    ());

            FillPBBIRequestTables(requestTable, request, stend.PBBIBuckets[i], stend.BucketLen[i], stend.Hierarchies);
#if DEBUG
        }
#else
        });
#endif
Mike Sage
  • 251
  • 2
  • 8
0

You can transfer the parallel code in new method. For example :

    // Class scope
    private string GetLogRecord(int rowCounter, DataRow row)
    {
        return string.Format("Processing row: {0}", rowCounter); // Write any data we want to log.
    }

    //....
    Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
    { 
        // Process data_row as normal.

        // When ready to write to log, do so.
        lock (locker)
            row_counter++;

        var logRecord = GetLogRecord(row_counter, data_row);

        lock (locker)
            streamwriter.WriteLine(logRecord);
    });
Evgeniy Mironov
  • 777
  • 6
  • 22