I would like to read a file line by line, do something slow with each line that can easily be done in parallel, and write the result to a file line by line. I don't care about the order of the output. The input and output are so big they don't fit in memory. I would like to be able to set a hard limit on the number of threads running at the same time as well as the number of lines in memory.
The libary I use for file IO (Apache Commons CSV) does not seem to offer synchronised file access so I don't think I can read from the same file or write to the same file from several threads at once. If that was possible I would create a ThreadPoolExecutor and feed it a task for each line, which would simply read the line, perform the calculation and write the result.
Instead, what I think I need is a single thread that does the parsing, a bounded queue for the parsed input lines, a thread pool with jobs that do the calculations, a bounded queue for the calculated output lines, and a single thread that does the writing. A producer, a lot of consumer-producers and a consumer if that makes sense.
What I have looks like this:
BlockingQueue<CSVRecord> inputQueue = new ArrayBlockingQueue<CSVRecord>(INPUT_QUEUE_SIZE);
BlockingQueue<String[]> outputQueue = new ArrayBlockingQueue<String[]>(OUTPUT_QUEUE_SIZE);
Thread parserThread = new Thread(() -> {
while (inputFileIterator.hasNext()) {
CSVRecord record = inputFileIterator.next();
parsedQueue.put(record); // blocks if queue is full
}
});
// the job queue of the thread pool has to be bounded too, otherwise all
// the objects in the input queue will be given to jobs immediately and
// I'll run out of heap space
// source: https://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
Thread processingBossThread = new Thread(() -> {
while (!inputQueue.isEmpty() || parserThread.isAlive()) {
CSVRecord record = inputQueue.take(); // blocks if queue is empty
executorService.execute(() -> {
String[] array = this.doStuff(record);
outputQueue.put(array); // blocks if queue is full
});
}
// getting here that means that all CSV rows have been read and
// added to the processing queue
executorService.shutdown(); // do not accept any new tasks
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
// wait for existing tasks to finish
});
Thread writerThread = new Thread(() -> {
while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
String[] outputRow = outputQueue.take(); // blocks if queue is empty
outputFileWriter.printRecord((Object[]) outputRow);
});
parserThread.start();
consumerBossThread.start();
writerThread.start();
// wait until writer thread has finished
writerThread.join();
I've left out the logging and exception handling so this looks a lot shorter than it is.
This solution works but I'm not happy with it. It seems hacky to have to create my own threads, check their isAlive(), create a Runnable within a Runnable, be forced to specify a timeout when I really just want to wait until all the workers have finished, etc. All in all it's a 100+ line method, or even several hundred lines of code if I make the Runnables their own classes, for what seems like a very basic pattern.
Is there a better solution? I'd like to make use of Java's libraries as much as possible, to help keep my code maintainable and in line with best practices. I would still like to know what it's doing under the hood, but I doubt that implementing all this myself is the best way to do it.
Update: Better solution, after suggestions from the answers:
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
while (it.hasNext()) {
CSVRecord record = it.next();
executorService.execute(() -> {
String[] array = this.doStuff(record);
synchronized (writer) {
writer.printRecord((Object[]) array);
}
});
}