1

I would like to read a file line by line, do something slow with each line that can easily be done in parallel, and write the result to a file line by line. I don't care about the order of the output. The input and output are so big they don't fit in memory. I would like to be able to set a hard limit on the number of threads running at the same time as well as the number of lines in memory.

The libary I use for file IO (Apache Commons CSV) does not seem to offer synchronised file access so I don't think I can read from the same file or write to the same file from several threads at once. If that was possible I would create a ThreadPoolExecutor and feed it a task for each line, which would simply read the line, perform the calculation and write the result.

Instead, what I think I need is a single thread that does the parsing, a bounded queue for the parsed input lines, a thread pool with jobs that do the calculations, a bounded queue for the calculated output lines, and a single thread that does the writing. A producer, a lot of consumer-producers and a consumer if that makes sense.

What I have looks like this:

BlockingQueue<CSVRecord> inputQueue = new ArrayBlockingQueue<CSVRecord>(INPUT_QUEUE_SIZE);
BlockingQueue<String[]> outputQueue = new ArrayBlockingQueue<String[]>(OUTPUT_QUEUE_SIZE);

Thread parserThread = new Thread(() -> {
    while (inputFileIterator.hasNext()) {
        CSVRecord record = inputFileIterator.next();
        parsedQueue.put(record); // blocks if queue is full
    }
});

// the job queue of the thread pool has to be bounded too, otherwise all 
// the objects in the input queue will be given to jobs immediately and 
// I'll run out of heap space
// source: https://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler 
    = new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService 
    = new ThreadPoolExecutor(
        NUMBER_OF_THREADS, 
        NUMBER_OF_THREADS, 
        0L, 
        TimeUnit.MILLISECONDS, 
        jobQueue, 
        rejectedExecutionHandler
    );
Thread processingBossThread = new Thread(() -> {
    while (!inputQueue.isEmpty() || parserThread.isAlive()) {
        CSVRecord record = inputQueue.take(); // blocks if queue is empty
        executorService.execute(() -> {
            String[] array = this.doStuff(record);
            outputQueue.put(array); // blocks if queue is full
        });
    }
    // getting here that means that all CSV rows have been read and 
    // added to the processing queue
    executorService.shutdown(); // do not accept any new tasks
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
        // wait for existing tasks to finish
});

Thread writerThread = new Thread(() -> {
    while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
        String[] outputRow = outputQueue.take(); // blocks if queue is empty
        outputFileWriter.printRecord((Object[]) outputRow);
});

parserThread.start();
consumerBossThread.start();
writerThread.start();

// wait until writer thread has finished
writerThread.join();

I've left out the logging and exception handling so this looks a lot shorter than it is.

This solution works but I'm not happy with it. It seems hacky to have to create my own threads, check their isAlive(), create a Runnable within a Runnable, be forced to specify a timeout when I really just want to wait until all the workers have finished, etc. All in all it's a 100+ line method, or even several hundred lines of code if I make the Runnables their own classes, for what seems like a very basic pattern.

Is there a better solution? I'd like to make use of Java's libraries as much as possible, to help keep my code maintainable and in line with best practices. I would still like to know what it's doing under the hood, but I doubt that implementing all this myself is the best way to do it.

Update: Better solution, after suggestions from the answers:

BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
    = new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService 
    = new ThreadPoolExecutor(
        NUMBER_OF_THREADS, 
        NUMBER_OF_THREADS, 
        0L, 
        TimeUnit.MILLISECONDS, 
        jobQueue, 
        rejectedExecutionHandler
    );

while (it.hasNext()) {
    CSVRecord record = it.next();
    executorService.execute(() -> {
        String[] array = this.doStuff(record);
        synchronized (writer) {
            writer.printRecord((Object[]) array);
        }
    });
}
Peter P
  • 128
  • 1
  • 7

2 Answers2

1

I would like to point out something first, I could think of three possible scenarios:

1.- For all the lines of a file, the time that it needs to process a line, by using the doStuff method, is bigger than the time that it takes to read the same line from disk and parse it

2.- For all the lines of a file, the time that it needs to process a line, by using the doStuff method, is lower or equal than the time that it takes to read the same line and parse it.

3.- Neither the first nor the second scenarios for the same file.

Your solution should be good for the first scenario, but not for the second or third ones, also, you're not modifying queues in a synchronized way. Even more, if you're experiencing scenarios like number 2, then you're wasting cpu cycles when there is no data to be sent to the output, or when there are no lines to be sent to the queue to be processed by the doStuff, by spining at:

while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {

Finally, regardless of which scenario you're experiencing, I would suggest you to use Monitor objects, that will allow you to put specific threads to wait until another process notifies them that a certain condition is true and that they can be activated again. By using Monitor objects you'll not waste cpu cycles.

For more information, see: https://docs.oracle.com/javase/7/docs/api/javax/management/monitor/Monitor.html

EDIT: I've deleted the suggestion of using Synchronized Methods, since as you've pointed out, BlockingQueue's methods are thread-safe (or almost all) and prevents race conditions.

  • Thank you! The processing is indeed the slowest part. I've updated my post to implicitly use a monitor, by simply synchronising on the writer object. This has allowed me to get rid of the ugly queue checking. – Peter P Mar 20 '16 at 15:54
  • That said I don't understand why I need to synchronise queue access. The [BlockingQueue doc](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html) says that "implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control." Why do I still need to use synchronised methods myself? – Peter P Mar 20 '16 at 15:55
  • Yes you're right, my bad, you don't need to create a synchronized method. I'll edit my answer now, – Jaime Ortega Mar 20 '16 at 16:12
1

Use ThreadPoolExecutor tied to a fixed size blocking queue and all of your complexity vanishes in a puff of JavaDoc.

Just have a single thread read the file and gorge the blocking queue, all the processing is done by the Executor.

Addenda:

You can either synchronize on your writer, or simply use yet another queue, and the processors fill that, and your single write thread consume the queue.

Synchronizing on the writer would most likely be the simplest way.

Will Hartung
  • 115,893
  • 19
  • 128
  • 203
  • Thank you! I suspected there to be a much less complex solution that I just couldn't think of. What you are suggesting is what I had in mind originally ("If that was possible...") and I don't know why I thought I needed different threads for reading and queueing, but there would be still be several threads writing to the same file simultaneously, resulting in garbage output. However I think I have solved the problem by synchronising on the `CSVPrinter` object. I've updated my post above. Is that what you had in mind? – Peter P Mar 20 '16 at 15:43