49

Can anyone point me at a simple, open-source Map/Reduce framework/API for Java? There doesn't seem to much evidence of such a thing existing, but someone else might know different.

The best I can find is, of course, Hadoop MapReduce, but that fails the "simple" criteria. I don't need the ability to run distributed jobs, just something to let me run map/reduce-style jobs on a multi-core machine, in a single JVM, using standard Java5-style concurrency.

It's not a hard thing to write oneself, but I'd rather not have to.

skaffman
  • 398,947
  • 96
  • 818
  • 769
  • 3
    I came across this video which annouces about new feature in Java 8. It seems that there will be mapreduce API in the new release. http://www.youtube.com/watch?v=47_Em-zc7_Q – gigadot Nov 26 '11 at 15:43
  • I'm curious to know what your current solution is to this problem. I'm just looking for fast, easy ways to do Lists.transform(function) in parallel on a single machine. – Snekse Apr 12 '13 at 16:54
  • 6
    LeoTask works. It is a parallel task running and results aggregation framework on a mult-core machine. https://github.com/mleoking/leotask – Changwang Zhang Jan 08 '15 at 14:05

10 Answers10

18

Have you check out Akka? While akka is really a distributed Actor model based concurrency framework, you can implement a lot of things simply with little code. It's just so easy to divide work into pieces with it, and it automatically takes full advantage of a multi-core machine, as well as being able to use multiple machines to process work. Unlike using threads, it feels more natural to me.

I have a Java map reduce example using akka. It's not the easiest map reduce example, since it makes use of futures; but it should give you a rough idea of what's involved. There are several major things that my map reduce example demonstrates:

  • How to divide the work.
  • How to assign the work: akka has a really simple messaging system was well as a work partioner, whose schedule you can configure. Once I learned how to use it, I couldn't stop. It's just so simple and flexible. I was using all four of my CPU cores in no time. This is really great for implementing services.
  • How to know when the work is done and the result is ready to process: This is actually the portion that may be the most difficult and confusing to understand unless you're already familiar with Futures. You don't need to use Futures, since there are other options. I just used them because I wanted something shorter for people to grok.

If you have any questions, StackOverflow actually has an awesome akka QA section.

chaostheory
  • 1,621
  • 4
  • 20
  • 36
12

I think it is worth mentioning that these problems are history as of Java 8. An example:

int heaviestBlueBlock =
    blocks.filter(b -> b.getColor() == BLUE)
          .map(Block::getWeight)
          .reduce(0, Integer::max);

In other words: single-node MapReduce is available in Java 8.

For more details, see Brian Goetz's presentation about project lambda

Has QUIT--Anony-Mousse
  • 76,138
  • 12
  • 138
  • 194
Lukas Eder
  • 211,314
  • 129
  • 689
  • 1,509
10

I use the following structure

int procs = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(procs);

List<Future<TaskResult>> results = new ArrayList();
for(int i=0;i<tasks;i++)
    results.add(es.submit(new Task(i)));
for(Future<TaskResult> future:results)
    reduce(future);
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • 6
    Umm... that's not map-reduce, that's just a naked executor. – skaffman Mar 10 '11 at 13:39
  • You wanted simple. The loop maps the work into `tasks` tasks and can be used to combine or reduce the individual results. Optionally the results can be stored in a Future. – Peter Lawrey Mar 10 '11 at 14:07
  • I realise that I *can* write my own map/reduce framework, but I don't *want* to. It's complex enough to want to use an off-the-shelf, generic solution. – skaffman Mar 10 '11 at 14:09
  • 8
    @skaffman, You want something more complex than the simplest solution but simpler than a full solution. A goldie-locks solution. ;) Perhaps you could say what your minimum requirements are. – Peter Lawrey Mar 10 '11 at 14:16
  • 1
    Somewhere between "an executor" and "hadoop". I'm open to all suggestions in between that. – skaffman Mar 10 '11 at 14:42
8

I realise this might be a little after the fact but you might want to have a look at the JSR166y ForkJoin classes from JDK7.

There is a back ported library that works under JDK6 without any issues so you don't have to wait until the next millennium to have a go with it. It sits somewhere between an raw executor and hadoop giving a framework for working on map reduce job within the current JVM.

Gareth Davis
  • 27,701
  • 12
  • 73
  • 106
6

I created a one-off for myself a couple years ago when I got an 8-core machine, but I wasn't terribly happy with it. I never got it to be as simple to used as I had hoped, and memory-intensive tasks didn't scale well.

If you don't get any real answers I can share more, but the core of it is:

public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Iterator<TMapInput> inputIterator) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        while (inputIterator.hasNext()) {
            TMapInput m = inputIterator.next();
            Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m));
            futureSet.add(f);
            Thread.sleep(10);
        }
        while (!futureSet.isEmpty()) {
            Thread.sleep(5);
            for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) {
                Future<TMapOutput> f = fit.next();
                if (f.isDone()) {
                    fit.remove();
                    TMapOutput x = f.get();
                    m_reducer.reduce(x);
                }
            }
        }
        return m_reducer.getResult();
    }
}

EDIT: Based on a comment, below is a version without sleep. The trick is to use CompletionService which essentially provides a blocking queue of completed Futures.

 public class LocalMapReduce<TMapInput, TMapOutput, TOutput> {
    private int m_threads;
    private Mapper<TMapInput, TMapOutput> m_mapper;
    private Reducer<TMapOutput, TOutput> m_reducer;
    ...
    public TOutput mapReduce(Collection<TMapInput> input) {
        ExecutorService pool = Executors.newFixedThreadPool(m_threads);
        CompletionService<TMapOutput> futurePool = 
                  new ExecutorCompletionService<TMapOutput>(pool);
        Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>();
        for (TMapInput m : input) {
            futureSet.add(futurePool.submit(m_mapper.makeWorker(m)));
        }
        pool.shutdown();
        int n = futureSet.size();
        for (int i = 0; i < n; i++) {
            m_reducer.reduce(futurePool.take().get());
        }
        return m_reducer.getResult();
    }

I'll also note this is a very distilled map-reduce algorithm, including a single reduce worker which does both the reduce and merge operation.

xan
  • 7,511
  • 2
  • 32
  • 45
  • There are lack of sorting reduce values by key, so reduce part is not parallelized as it done in Hadoop. – yura Mar 10 '11 at 14:44
  • @yura: Indeed. This is the kind of fine-tuned subtlety that I don't want to worry about. – skaffman Mar 10 '11 at 14:48
  • @Chris OK, so what's better? I haven't done any pro Java work in a while -- is there a reference for "good" concurrency techniques? – xan May 08 '11 at 16:32
  • 1
    Have a look at http://download.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html. Once you understand it, you won't need to use Thread.sleep again in this context :) – Chris Dennett May 08 '11 at 18:13
  • @xan, Why did you write a version without sleep? Is it because sleep is CPU-intensive? – Frank Sep 13 '17 at 15:33
5

I like to use Skandium for parallelism in Java. The framework implements certain patterns of parallelism (namely Master-Slave, Map/Reduce, Pipe, Fork and Divide & Conquer) for multi-core machines with shared memory. This technique is called "algorithmic skeletons". The patterns can be nested.

In detail there are skeletons and muscles. Muscles do the actual work (split, merge, execute and condition). Skeletons represent the patterns of parallelism, except for "While", "For" and "If", which can be useful when nesting patterns.

Examples can be found inside the framework. I needed a bit to understand how to use the muscles and skeletons, but after getting over this hurdle I really like this framework. :)

Florian Pilz
  • 8,002
  • 4
  • 22
  • 30
  • This does not seam to be actively developed. – Suminda Sirinath S. Dharmasena Mar 11 '13 at 12:36
  • Sad, but true. Wanted to visit their website a few days ago and it seems that they have pulled it in the beginning of this year. So if no one feels obligated to maintain the package himself (it's open source), there won't be any updates. Maybe I will look for alternatives next time, but I'm really happy with it. – Florian Pilz Mar 12 '13 at 17:20
3

Have you had a look at GridGain ?

Costi Ciudatu
  • 37,042
  • 7
  • 56
  • 92
  • GridGain Is very good, maybe the best, but very expensive and they do not more support communit edition. Even the files of the community edition 3.6 are not available for download. I do not recommend grid gain for simple purposes. Just if you have a Big project and a very very big company. By this reason I'd recommend Akka. – felipe Mar 25 '13 at 19:10
  • They re-opensourced in March 2014. – CheatEx Apr 20 '14 at 11:42
3

You might want to take a look at the project website of Functionals 4 Java: http://f4j.rethab.ch/ It introduces filter, map and reduce to java versions before 8.

rethab
  • 7,170
  • 29
  • 46
0

A MapReduce API was introduced into v3.2 of Hazelcast (see the MapReduce API section in the docs). While Hazelcast is intended to be used in a distributed system, it works perfectly well in a single node setup, and it's fairly lightweight.

skaffman
  • 398,947
  • 96
  • 818
  • 769
0

You can try LeoTask : a parallel task running and results aggregation framework

It is free and open-source: https://github.com/mleoking/leotask

Here is a brief introduction showing Its API: https://github.com/mleoking/leotask/blob/master/leotask/introduction.pdf?raw=true

It is a light weight framework working on a single computer using all its availble CPU-cores.

It has the following features:

  • Automatic & parallel parameter space exploration
  • Flexible & configuration-based result aggregation
  • Programming model focusing only on the key logic
  • Reliable & automatic interruption recovery

and Utilities:

  • Dynamic & cloneable networks structures.
  • Integration with Gnuplot
  • Network generation according to common network models
  • DelimitedReader: a sophisticated reader that explores CSV (Comma-separated values) files like a database
  • Fast random number generator based on the Mersenne Twister algorithm
  • An integrated CurveFitter from the ImageJ project