1

I read lines from file, in one thread of course. Lines was sorted by key.

Then I collect lines with same key (15-20 lines), make parsing, big calculation, etc, and push resulting object to statistic class.

I want to paralell my programm to read in one thread, make parsing and calc in many threads, and join results in one thread to write to stat class.

Is any ready pattern or solution in java7 framework for this problem?

I realize it with executor for multithreading, pushing to blockingQueue, and reading queue in another thread, but i think my code sucks and will produce bugs

Many thanks

upd:

I can't map all file in memory - it's very big

mishka
  • 2,027
  • 2
  • 20
  • 30

4 Answers4

2

You already have the main classes of approaches in mind. CountDownLatch, Thread.join, Executors, Fork/Join. Another option is the Akka framework, which has message passing overheads measured in 1-2 microseconds and is open source. However let me share another approach that often out performs the above approaches and is simpler, this approach is born from working on batch file loads in Java for a number of companies.

Assuming that your goal of splitting the work up is performance, rather than learning. Performance as measured by how long it takes from start to finish. Then it is often difficult to make it faster than memory mapping the file, and processing in a single thread that has been pinned to a single core. It is also gives much simpler code too. A double win.

This may be counter intuitive, however the speed of processing files is nearly always limited by how efficient the file loading is. Not how parallel the processing is. Hence memory mapping the file is a huge win. Once memory mapped we want the algorithm to have low contention with the hardware as it performs the file load. Modern hardware tend to have the IO controller and the memory controller on the same socket as the CPU; which when combined with the prefetchers within the CPU itself lead to a hell of a lot of efficiency when processing the file in a orderly fashion from a single thread. This can be so extreme that going parallel may actually be a lot slower. Pinning a thread to a core usually speeds up memory bound algorithms by a factor of 5. Which is why the memory mapping part is so important.

If you have not already, give it a try.

Chris K
  • 11,622
  • 1
  • 36
  • 49
  • Thank u for advice about memory mapping. My files - 40gb in gzip, so I wan't map it all, but I will try make optimisation here. My multythread code now speed up for 2 times and stuck on reading – mishka Mar 15 '13 at 15:33
  • 1
    You should give a look at what Martin Thomson wrote about [Java sequential IO performance](http://mechanical-sympathy.blogspot.fr/2011/12/java-sequential-io-performance.html) last year. As usual your mileage may vary. If you have identified IO as a bottleneck, you should benchmark different solutions. – Clément MATHIEU Mar 15 '13 at 18:51
  • 1
    Indeed 40g would be somewhat large to map :) I second looking at Martin Thompson's work, especially as there are some bench mark code on his blog that you can make use of. Nice find Clement. Given such a large file, watch out for premature promotion of objects. If that happens GC will hurt you. Being gzipped will also limit your options. – Chris K Mar 15 '13 at 19:27
  • 1
    I recommend to not use Martin's numbers. They are old and very platform dependent (hardware/OS/JDK). It is astonishing how you get different numbers when you try to reproduce them. I believe that the moral is that a plain old `RandomAccessFile` is most likely efficient enough. If in doubt: measure. Don't blindly follow advise like: "memory mapping" is faster. Write a simple & clean solution, then use facts to improve it further if needed. I definitely agree regarding the GC and gzip. – Clément MATHIEU Mar 16 '13 at 09:33
1

Without facts and numbers it is hard to give you advices. So let's start from the beginning:

  1. You must identify the bottleneck. Do you really need to perform the computation in parallel or is your job IO bound ? Avoid concurrency if possible, it could be faster.
  2. If computations must be done in parallel you must decide how fine or coarse grained your tasks must be. You need to measure your computations and tasks to be able to size them. Avoid to create too many tasks
  3. You should have a IO thread, several workers, and a "data gatherer" thread. No mutable data.
  4. Be sure to not slow down the IO thread because of task submission. Otherwise you should use more coarse grained tasks or use a better task dispatcher (who said disruptor ?)
  5. The "Data gatherer" thread should be the only one to mutate the final state
  6. Avoid unnecessary data copy and object creation. Quite often, when iterating on large files the bottleneck is the GC. Last week, I achieved a 6x speedup replacing a standard scala object by a flyweight pattern. You should also try to pre-allocate everything and use large buffers (page sized).
  7. Avoid disk seeks.

Having that said you should be one the right track. You can start with an Executor using properly sized tasks. Tasks write into a data structure, like your blocking queue, shared between workers and the "data gatherer" thread. This threading model is really simple, efficient and hard to get wrong. It is usually efficient enough. If you still require better performances then you must profile your application and understand the bottleneck. Then you can decide the way to go: refine your task size, use faster tools like the disruptor/Akka, improve IO, create fewer objects, tune your code, buy a bigger machine or faster disks, move to Hadoop etc. Pinning each thread to a core (require platform specific code) could also provide a significant boost.

Clément MATHIEU
  • 3,030
  • 23
  • 25
0

You can use CountDownLatch http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html

to synchronize the starting and joining of threads. This is better than looping on the set of threads and calling join() on each thread reference.

Kishore
  • 819
  • 9
  • 20
0

Here is what I would do if asked to split work as you are trying to:

public class App {

    public static class Statistics {
    }

    public static class StatisticsCalculator implements Callable<Statistics> {

        private final List<String> lines;

        public StatisticsCalculator(List<String> lines) {
            this.lines = lines;
        }

        @Override
        public Statistics call() throws Exception {
            //do stuff with lines
            return new Statistics();
        }
    }

    public static void main(String[] args) {
        final File file = new File("path/to/my/file");
        final List<List<String>> partitionedWork = partitionWork(readLines(file), 10);
        final List<Callable<Statistics>> callables = new LinkedList<>();
        for (final List<String> work : partitionedWork) {
            callables.add(new StatisticsCalculator(work));
        }
        final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(partitionedWork.size(), 10));
        final List<Future<Statistics>> futures;
        try {
            futures = executorService.invokeAll(callables);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
        try {
            for (final Future<Statistics> future : futures) {
                final Statistics statistics = future.get();
                //do whatever to aggregate the individual
            }
        } catch (InterruptedException | ExecutionException ex) {
            throw new RuntimeException(ex);
        }
        executorService.shutdown();
        try {
            executorService.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    static List<String> readLines(final File file) {
        //read lines
        return new ArrayList<>();
    }

    static List<List<String>> partitionWork(final List<String> lines, final int blockSize) {
        //divide up the incoming list into a number of chunks
        final List<List<String>> partitionedWork = new LinkedList<>();
        for (int i = lines.size(); i > 0; i -= blockSize) {
            int start = i > blockSize ? i - blockSize : 0;
            partitionedWork.add(lines.subList(start, i));
        }
        return partitionedWork;
    }
}

I have create a Statistics object, this holds the result of the work done.

There is a StatisticsCalculator object which is a Callable<Statistics> - this does the calculation. It is given a List<String> and it processes the lines and creates the Statistics.

The readLines method I leave to you to implement.

The most important method in many ways is the partitionWork method, this divides the incoming List<String> which is all the lines in the file into a List<List<String>> using the blockSize. This essentially decides how much work each thread should have, tuning of the blockSize parameter is very important. As if each work is only one line then the overheads would probably outweight the advantages whereas if each work of ten thousand lines then you only have one working Thread.

Finally the meat of the opertation is the main method. This calls the read and then partition methods. It spawns an ExecutorService with a number of threads equal to the number of bits of work but up to a maximum of 10. You may way to make this equal to the number of cores you have.

The main method then submits a List of all the Callables, one for each chunk, to the executorService. The invokeAll method blocks until the work is done.

The method now loops over each returned List<Future> and gets the generated Statistics object for each; ready for aggregation.

Afterwards don't forget to shutdown the executorService as it will prevent your application form exiting.

EDIT

OP wants to read line by line so here is a revised main

 public static void main(String[] args) throws IOException {
    final File file = new File("path/to/my/file");
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final List<Future<Statistics>> futures = new LinkedList<>();
    try (final BufferedReader reader = new BufferedReader(new FileReader(file))) {
        List<String> tmp = new LinkedList<>();
        String line = null;
        while ((line = reader.readLine()) != null) {
            tmp.add(line);
            if (tmp.size() == 100) {
                futures.add(executorService.submit(new StatisticsCalculator(tmp)));
                tmp = new LinkedList<>();
            }
        }
        if (!tmp.isEmpty()) {
            futures.add(executorService.submit(new StatisticsCalculator(tmp)));
        }
    }
    try {
        for (final Future<Statistics> future : futures) {
            final Statistics statistics = future.get();
            //do whatever to aggregate the individual
        }
    } catch (InterruptedException | ExecutionException ex) {
        throw new RuntimeException(ex);
    }
    executorService.shutdown();
    try {
        executorService.awaitTermination(1, TimeUnit.DAYS);
    } catch (InterruptedException ex) {
        throw new RuntimeException(ex);
    }
}

This streams the file line by line and, after a given number of lines fires a new task to process the lines to the executor.

You would need to call clear on the List<String> in the Callable when you are done with it as the Callable instances are references by the Futures they return. If you clear the Lists when you're done with them that should reduce the memory footprint considerably.

A further enhancement may well be to use the suggestion here for a ExecutorService that blocks until there is a spare thread - this will guranatee that there are never more than threads*blocksize lines in memory at a time if you clear the Lists when the Callables are done with them.

Community
  • 1
  • 1
Boris the Spider
  • 59,842
  • 6
  • 106
  • 166
  • thank u for code, but i can't map all file in memory - it's toooo big – mishka Mar 15 '13 at 15:39
  • Well change the code then - create the workers on the fly. Every x lines submit another task to the executor and store the `Fututre`. – Boris the Spider Mar 15 '13 at 15:42
  • you going to realize my solution. now this code will off heap space (file is big, statistics objects bigs too). Another thread to deal with ready futures stored in queue? – mishka Mar 15 '13 at 16:11
  • I guess that is the way it is headed - if you need to clear the `Future`s out as they are done then create another `Callable` that processes the statistics an `singleThreadExecutor` to run them. – Boris the Spider Mar 15 '13 at 16:15