1

I want to optimize this code:

    public static void ProcessTo(this StreamReader sr, StreamWriter sw, Action<StreamWriter, string> action, FileProcessOptions fpo = null)
    {
        if (fpo == null)
        {
            fpo = new FileProcessOptions();
        }

        List<string> buffer = new List<string>(fpo.BuferSize);

        while (!sr.EndOfStream)
        {
            buffer.Clear();

            while (!sr.EndOfStream && buffer.Count < fpo.BuferSize)
            {
                buffer.Add(sr.ReadLine());
            }

            if (fpo.UseThreads)
            {
                buffer.AsParallel().ForAll(line => action(sw, line));
            }
            else
            {
                buffer.ForEach(line => action(sw, line));
            }
        }
    }

I process large amounts of data and want to parallelize the process. Usually data archived, so it is very important to use multiple threads to process flow of data

kenchilada
  • 7,469
  • 5
  • 25
  • 34
Vlad M
  • 13
  • 4
  • 3
    Well mister, you are very likely to be disappointed; more often than not the disk operations take 90% of the total time taken and multithreading will not help you. Have you tried profiling your code? What did the profiler tell you? – Dariusz Jul 12 '13 at 17:34
  • More than 50% of the CPU spent on archiving, I want to do it in a separate threads – Vlad M Jul 12 '13 at 17:41
  • Is your action just writing to other files? – Reed Copsey Jul 12 '13 at 17:41
  • No, there is little work on data processing, it takes no more than 50% CPU time – Vlad M Jul 12 '13 at 17:49
  • Please see [Replying to comments](http://stackoverflow.com/editing-help#comment-formatting) to see how to notify the other commenters about your replies. – Brad Rem Jul 12 '13 at 17:59
  • Why are you enumerating the whole buffer after every line you add to it? Also, if the input file is large, and you perform actions on it line-by-line, why buffer the whole file and then dispatch the actions? Seems you'd save possibly significant memory overhead / GC pressure if you just spun off every line into a `Task`. – millimoose Jul 12 '13 at 19:11
  • @millimoose: He's not enumerating the buffer after every line. Granted, the line-by-line approach without a buffer seems more reasonable, as I showed in my example. – Jim Mischel Jul 12 '13 at 19:25
  • Dariusz: I would have agreed with you up until the day I wrote a utility like this myself, decided to try C#'s multithreading, and saw a MUCH better performance result, using 100% CPU of a 4-core CPU (briefly). It may have depended on the specific case (for instance, my streams were coming through a .zip file) – Katana314 Jul 12 '13 at 19:40
  • @JimMischel Ah, right, I read the code horribly wrong. – millimoose Jul 12 '13 at 20:43
  • @millimoose file size is more then 600GB, so it is quite hard to buffer whole file – Vlad M Jul 13 '13 at 16:14
  • @VladM I didn't notice you're buffering only chunks of the file. That said, with my suggestion I believe that only as many lines would be read as `Parallel.ForEach()` will process simultaneously. – millimoose Jul 13 '13 at 16:16

2 Answers2

2

If you don't pass a StreamReader, and instead just pass the file name, you could write:

Parallel.Foreach(File.ReadLines(filename), (line) => action(sw, line));

You can still do this if you pass a StreamReader. You just have to create an enumerator that will read it. Something like what's done here: Recommended behaviour of GetEnumerator() when implementing IEnumerable<T> and IEnumerator<T>. Using that, you'd write:

LineReaderEnumerable myEnumerable = new LineEnumerator(sr);
Parallel.Foreach(myEnumerable, (line) => action(sw, line));

However, you have a potential problem with that because you then could have multiple threads writing to that stream writer. And StreamWriter doesn't support concurrent writes. It will throw an exception. If you're synchronizing access to the output file (using a lock, for example), then you're okay here.

One other problem you'll run into is the order in which things are output. It's almost certain that if you read lines in the order [1, 2, 3, 4, ... n], the output order is going to be different. You might get [1, 2, 4, 3, 6, 5, 7, 9, 8 ... n, n-1]. If output order is important, you have to come up with a way to make sure that things are output in the proper order.

Regarding the lock, you have:

sr.ProcessParalel(line => 
{ 
    string[] ls = line.Split('\t');
    lock (sw)
    {
        sw.Write(float.Parse(ls[0]));
        sw.Write(int.Parse(ls[1]) * 10 + 1);
        for (int i = 2; i < ls.Length; i++)
        {
            sw.Write(int.Parse(ls[1]));
        }
    }
 });

The problem isn't the lock. The problem is that you're holding the lock for too long. The way you have it written, the code is effectively single-threaded because all the threads are waiting on that lock to do their processing. You need to change your processing so that the lock is held for as short a time as possible.

Build your output into a StringBuilder, convert it to a string, and then output that string. For example:

string[] ls = line.Split('\t');
StringBuilder sb = new StringBuilder();
sb.Append(float.Parse(ls[0]));
sb.Append(' ');
sb.Append(int.Parse(ls[1])) * 10 + 1);
for (int i = 2; i < ls.Length; i++)
{
    sb.Append(' ');
    sb.Append(int.Parse(ls[i]));    }
}
var sout = sb.ToString();
// lock and write
lock (sw)
{
    sw.Write(sout);
}

You could do much the same thing with a StringWriter.

Community
  • 1
  • 1
Jim Mischel
  • 131,090
  • 20
  • 188
  • 351
  • Order is not important, but synchronization lock really slows down – Vlad M Jul 12 '13 at 19:20
  • @user2577265: What makes you say that the synchronization lock slows things down? If it really does, then you're doing something wrong. – Jim Mischel Jul 12 '13 at 19:22
  • small example: sr.ProcessParalel(line => { string[] ls = line.Split('\t'); lock (sw) { sw.Write(float.Parse(ls[0])); sw.Write(int.Parse(ls[1]) * 10 + 1); for (int i = 2; i < ls.Length; i++) { sw.Write(int.Parse(ls[1])); } } }); does not look good – Vlad M Jul 12 '13 at 19:39
  • @VladM: Connect your `BinaryWriter` to a [MemoryStream](http://msdn.microsoft.com/en-us/library/system.io.memorystream.aspx), output to that, then lock the file and copy the memory stream's buffer to the output file. Unless your individual records are gigantic, you can always build them in memory outside of the lock, and then very quickly take the lock and write to file. The whole point is that .NET file I/O objects don't generally allow multiple concurrent operations, so you *have to* synchronize access. – Jim Mischel Jul 12 '13 at 20:13
  • You don't really need any sort of fiddly implementation of your own `LineEnumerator`. Use `yield return` in a `YieldLines()` method. – millimoose Jul 12 '13 at 20:44
  • @millimoose: I'm not familiar with the technique you're describing. Can you give me a link? – Jim Mischel Jul 12 '13 at 21:34
  • @JimMischel It's called [iterators](http://msdn.microsoft.com/en-us/library/vstudio/dscyy5s0(v=vs.110).aspx), and it's been in C# for... a few major versions now. There's no good reason I can think of to implement `IEnumerable` without them. So you'd basically do this: http://ideone.com/xVsTB2 – millimoose Jul 13 '13 at 00:46
  • @millimoose: I'm familiar with iterators, thanks. I thought you had some way to define one in an anonymous function so you could write `Paralell.ForEach(iterator-in-a-lambda, (line) => ...)` As for the `LineEnumerator`, I just grabbed what I found from a search here. – Jim Mischel Jul 13 '13 at 03:43
  • @JimMischel Then I was just confused by you suggesting what's mostly a legacy way of doing things, I guess. – millimoose Jul 13 '13 at 13:46
0

final solution:

        public static IEnumerable<string> GetEnumirator(this StreamReader sr)
    {
        while (!sr.EndOfStream)
        {
            yield return sr.ReadLine();
        }
    }

    public static void ProcessParalel(this StreamReader sr, Action<string> action)
    {
        sr.GetEnumirator().AsParallel().ForAll(action);
    }

    public static void ProcessTo(this StreamReader sr, BinaryWriter bw, Action<BinaryWriter, string> action, FileProcessOptions fpo = null)
    {
        sr.ProcessParalel(line =>
        {
            using (MemoryStream ms = new MemoryStream())
            {
                BinaryWriter lbw = new BinaryWriter(ms);

                action(lbw, line);
                ms.Seek(0, SeekOrigin.Begin);

                lock (bw)
                {
                    ms.WriteTo(bw.BaseStream);
                }
            }
        });
    }

with compressed input stream, I'v got the acceleration in 3 times

Vlad M
  • 13
  • 4