Here is what I would do if asked to split work as you are trying to:
public class App {
public static class Statistics {
}
public static class StatisticsCalculator implements Callable<Statistics> {
private final List<String> lines;
public StatisticsCalculator(List<String> lines) {
this.lines = lines;
}
@Override
public Statistics call() throws Exception {
//do stuff with lines
return new Statistics();
}
}
public static void main(String[] args) {
final File file = new File("path/to/my/file");
final List<List<String>> partitionedWork = partitionWork(readLines(file), 10);
final List<Callable<Statistics>> callables = new LinkedList<>();
for (final List<String> work : partitionedWork) {
callables.add(new StatisticsCalculator(work));
}
final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(partitionedWork.size(), 10));
final List<Future<Statistics>> futures;
try {
futures = executorService.invokeAll(callables);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
static List<String> readLines(final File file) {
//read lines
return new ArrayList<>();
}
static List<List<String>> partitionWork(final List<String> lines, final int blockSize) {
//divide up the incoming list into a number of chunks
final List<List<String>> partitionedWork = new LinkedList<>();
for (int i = lines.size(); i > 0; i -= blockSize) {
int start = i > blockSize ? i - blockSize : 0;
partitionedWork.add(lines.subList(start, i));
}
return partitionedWork;
}
}
I have create a Statistics
object, this holds the result of the work done.
There is a StatisticsCalculator
object which is a Callable<Statistics>
- this does the calculation. It is given a List<String>
and it processes the lines and creates the Statistics
.
The readLines
method I leave to you to implement.
The most important method in many ways is the partitionWork
method, this divides the incoming List<String>
which is all the lines in the file into a List<List<String>>
using the blockSize
. This essentially decides how much work each thread should have, tuning of the blockSize
parameter is very important. As if each work is only one line then the overheads would probably outweight the advantages whereas if each work of ten thousand lines then you only have one working Thread
.
Finally the meat of the opertation is the main
method. This calls the read and then partition methods. It spawns an ExecutorService
with a number of threads equal to the number of bits of work but up to a maximum of 10. You may way to make this equal to the number of cores you have.
The main
method then submits a List
of all the Callable
s, one for each chunk, to the executorService
. The invokeAll
method blocks until the work is done.
The method now loops over each returned List<Future>
and gets the generated Statistics
object for each; ready for aggregation.
Afterwards don't forget to shutdown the executorService
as it will prevent your application form exiting.
EDIT
OP wants to read line by line so here is a revised main
public static void main(String[] args) throws IOException {
final File file = new File("path/to/my/file");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final List<Future<Statistics>> futures = new LinkedList<>();
try (final BufferedReader reader = new BufferedReader(new FileReader(file))) {
List<String> tmp = new LinkedList<>();
String line = null;
while ((line = reader.readLine()) != null) {
tmp.add(line);
if (tmp.size() == 100) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
tmp = new LinkedList<>();
}
}
if (!tmp.isEmpty()) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
}
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
This streams the file line by line and, after a given number of lines fires a new task to process the lines to the executor.
You would need to call clear
on the List<String>
in the Callable
when you are done with it as the Callable
instances are references by the Future
s they return. If you clear the List
s when you're done with them that should reduce the memory footprint considerably.
A further enhancement may well be to use the suggestion here for a ExecutorService
that blocks until there is a spare thread - this will guranatee that there are never more than threads*blocksize
lines in memory at a time if you clear the List
s when the Callable
s are done with them.