I am doing test to find out the best way to read and process csv file. So I need to read each line of the csv file and analyse each of them. So basically all works fine for a file containing thousands of rows. However when trying with a CSV file containing more than 1 millions rows I am getting an out of memory exception. I thought Stream Parallel would perform faster. So I am bit confused why I got this out of memory error. How does Java process the parallel reading?
Below is the test code reading file sequentially and in parallel.
String filename = "c:\\devs\\files\\datas.csv"; // 193MB
Path path = Paths.get(filename);
@Test
public void testFileExist() {
assertTrue(Files.exists(path));
}
@Test
public void testSingleThreadRead() {
Function<Path, String> processfile = (Path p) -> {
String result = "";
try {
result = Files.lines(p).collect(Collectors.joining(" ,"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
};
long start = System.currentTimeMillis();
String result = processfile.apply(path);
long end = System.currentTimeMillis();
assertFalse(result.isEmpty());
System.out.println(end -start + "ms");
}
@Test
public void testSingleThreadReadParallel() {
Function<Path, String> processfile = (Path p) -> {
String result = "";
try {
result = Files.lines(p).parallel().collect(Collectors.joining(" ,"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
};
long start = System.currentTimeMillis();
String result = processfile.apply(path);
long end = System.currentTimeMillis();
assertFalse(result.isEmpty());
System.out.println(end -start + "ms");
}
Exception
java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at test.TestProcessFile.lambda$1(TestProcessFile.java:48)
at test.TestProcessFile.testSingleThreadReadParallel(TestProcessFile.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at
Update
Running parallel processing in separe class and still got this exception
Exception in thread "main" java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at ProcessFileParallel.lambda$0(ProcessFileParallel.java:19)
at ProcessFileParallel.main(ProcessFileParallel.java:29)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at java.util.StringJoiner.merge(Unknown Source)
at java.util.stream.Collectors$$Lambda$5/990368553.apply(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$ReduceTask.onCompletion(Unknown Source)
at java.util.concurrent.CountedCompleter.tryComplete(Unknown Source)
at java.util.stream.AbstractTask.compute(Unknown Source)
at java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)