3

I am doing test to find out the best way to read and process csv file. So I need to read each line of the csv file and analyse each of them. So basically all works fine for a file containing thousands of rows. However when trying with a CSV file containing more than 1 millions rows I am getting an out of memory exception. I thought Stream Parallel would perform faster. So I am bit confused why I got this out of memory error. How does Java process the parallel reading?

Below is the test code reading file sequentially and in parallel.

String filename = "c:\\devs\\files\\datas.csv"; // 193MB
Path path = Paths.get(filename);

@Test
public void testFileExist() {
    assertTrue(Files.exists(path));
}

@Test
public void testSingleThreadRead() {
    Function<Path, String> processfile = (Path p) -> {
        String result = "";
        try {
            result = Files.lines(p).collect(Collectors.joining(" ,"));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return result;
    };

    long start = System.currentTimeMillis();
    String result = processfile.apply(path);
    long end = System.currentTimeMillis();
    assertFalse(result.isEmpty());
    System.out.println(end -start + "ms");
}

@Test
public void testSingleThreadReadParallel() {
    Function<Path, String> processfile = (Path p) -> {
        String result = "";
        try {
            result = Files.lines(p).parallel().collect(Collectors.joining(" ,"));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return result;
    };

    long start = System.currentTimeMillis();
    String result = processfile.apply(path);
    long end = System.currentTimeMillis();
    assertFalse(result.isEmpty());
    System.out.println(end -start + "ms");
}

Exception

java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at test.TestProcessFile.lambda$1(TestProcessFile.java:48)
at test.TestProcessFile.testSingleThreadReadParallel(TestProcessFile.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at 

Update

Running parallel processing in separe class and still got this exception

Exception in thread "main" java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at ProcessFileParallel.lambda$0(ProcessFileParallel.java:19)
at ProcessFileParallel.main(ProcessFileParallel.java:29)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at java.util.StringJoiner.merge(Unknown Source)
at java.util.stream.Collectors$$Lambda$5/990368553.apply(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$ReduceTask.onCompletion(Unknown Source)
at java.util.concurrent.CountedCompleter.tryComplete(Unknown Source)
at java.util.stream.AbstractTask.compute(Unknown Source)
at java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
altruistlife
  • 191
  • 1
  • 3
  • 14
  • I see in this [article](https://blog.oio.de/2016/01/22/parallel-stream-processing-in-java-8-performance-of-sequential-vs-parallel-stream-processing/) that IO are not that good to parralelize (as expected). Because you need to read the file to be able to split it. So it needs to load **193Mb**. I would guess this is your problem. See how to incress the memory to test that. But I would not let the Stream split the file itself. – AxelH May 15 '17 at 09:02
  • what are the values for `-Xmx` and `-Xms` for your application? – Eugene May 15 '17 at 09:05
  • SO basically when using Parallel stream, Java stores the source in memory first before applying ForkJoin mechanism? – altruistlife May 15 '17 at 09:19

2 Answers2

5

Your code fails at testSingleThreadReadParallel and not parallel. The problem is elsewhere - probably collecting the entire file as a String.

Files.lines are buffered (look at the implementation), so reading the file will most probably not cause any problems.

But collecting that file to a single String will require lots of memory obviously, much more then what the file size itself is.

Actually reading those files in parallel will require much more memory rather than sequential as per my understanding. Each Thread will read it's chuck in memory, in parallel, so your parallel approach will require more memory. And by more I mean your numbers of CPU * BufferSize from Stream.lines.

EDIT2

After taking some time, I realize that your problem has to be somewhere else. Like are do you have actually lines in your files? Or may be you are at the limit - I mean parallel will increase memory indeed, but not with that much. May be you need to increase your -Xms and -Xmx by just a little.

For example I've created for test purposes a file with 247MB of dummy data and ran this code on it:

 Path p = Paths.get("/private/tmp/myfile.txt");
 Stream<String> s = Files.lines(p).parallel(); // and without parallel 
 s.forEach(System.out::println);

The settings that I used are -Xmx200m -Xms200m for both parallel and sequential processing. This is less than the actual file size. Still it works just fine.

Your main problem is that you are collecting everything to a single String, thus making it HUGE in size. Collecting everything to String on my machine under jdk-8 requires at least 1.5GB of Heap.

Also a very good read here

Community
  • 1
  • 1
Eugene
  • 117,005
  • 15
  • 201
  • 306
  • `testSingleThreadReadParallel` is the method using the `parallel` ;) But indeed. `at java.util.stream.ReferencePipeline.collect(Unknown Source)` is the method part of Stream that generate the issue. Obviously that is a heavy processing – AxelH May 15 '17 at 09:18
  • Collecting the file into a single String works very well without parallel processing. I got that error when getting a parallel stream before collecting into a single String. – altruistlife May 15 '17 at 09:36
  • @altruistlife u can't really get the error *before* the collecting. As these stages don't actually do anything - its just the terminal `collecting` that starts processing the stream. – Eugene May 15 '17 at 10:24
  • @Eugene Tried in seperate main class and got the error when reading parallel – altruistlife May 15 '17 at 10:42
  • @altruistlife see edit, it seems that you might be right here. – Eugene May 15 '17 at 10:46
  • @Eugene it seems that 190MB or file over 1M rows reaches a MAX value in the current Stream API implementation. Is it possible to specify a custom thread pool or limit it for Java 8 parallel stream? – altruistlife May 16 '17 at 07:37
  • @altruistlife see EDIT with bold. – Eugene May 16 '17 at 18:36
  • @Eugene great test, now we can say the problem is on collecting the resulting string. I did some test by creating dummy data and increasing the data size by factor of 10 on each test. It comes up using parallel stream uses a lot of memory. It takes me something like 1GB file to generate Out of memory exception with sequential stream but only around 50MB to generate error with parallel stream. Both sequential and parallel stream collect the result in String, How can we explan that outcome? – altruistlife May 17 '17 at 12:51
0

Try to change your JVM's memory settings in the JVM args, especially the -Xmx (minimum heap memory) arg. See Oracle's Documentation.

Another (and even better) option is to read your file in chunks, as suggested in the comments. This will ensure the maximum memory size used for reading the file.

Amir Kost
  • 2,148
  • 1
  • 16
  • 30
  • 1
    Remember that's not a complete answer, you can only increase the memory to as certain limit. But this will of course works for a time (until he read a bigger file) – AxelH May 15 '17 at 09:06
  • True. But when dealing with large data, you should set your heap size. You should know your max file size and set your heap size accordingly. The JVM's memory is limited. Whatever heap size you set, there could always be a bigger file that will cause an OutOfMemory. – Amir Kost May 15 '17 at 09:20
  • Thank you for this answer. Actually I don't want to make it work, just trying to figure out why this error and the mechanism underneath the parallel stream in Java. – altruistlife May 15 '17 at 09:21
  • I cannot know the size of my file because in the future I will use the best code in webservice API allowing processing big file. – altruistlife May 15 '17 at 09:23
  • @AmirKost _You should know your max file size and set your heap size accordingly._ In a production environnement, you can't know the maximum size of a file. First, they tends to naturly grow with time, and off course, it is a vulnerability to think that nobody could send a 1Go file. – AxelH May 15 '17 at 09:41
  • I already read my files in chunks ;) but you did not mention that in your answer for other people – AxelH May 15 '17 at 13:10