4

I understand from When do reduce tasks start in Hadoop that the reduce task in hadoop contains three steps: shuffle, sort and reduce where the sort (and after that the reduce) can only start once all the mappers are done. Is there a way to start the sort and reduce every time a mapper finishes.

For example lets we have only one job with mappers mapperA and mapperB and 2 reducers. What i want to do is:

  1. mapperA finishes
  2. shuffles copies the appropriate partitions of the mapperAs output lets say to reducer 1 and 2
  3. sort on reducer 1 and 2 starts sorting and reducing and generates some intermediate output
  4. now mapperB finishes
  5. shuffles copies the appropriate partitions of the mapperBs output to reducer 1 and 2
  6. sort and reduce on reducer 1 and 2 starts again and the reducer merges the new output with the old one

Is this possible? Thanks

Community
  • 1
  • 1
teo
  • 137
  • 1
  • 10

3 Answers3

3

You can't with the current implementation. However, people have "hacked" the Hadoop code to do what you want to do.

In the MapReduce model, you need to wait for all mappers to finish, since the keys need to be grouped and sorted; plus, you may have some speculative mappers running and you do not know yet which of the duplicate mappers will finish first.

However, as the "Breaking the MapReduce Stage Barrier" paper indicates, for some applications, it may make sense not to wait for all of the output of the mappers. If you would want to implement this sort of behavior (most likely for research purposes), then you should take a look at theorg.apache.hadoop.mapred.ReduceTask.ReduceCopier class, which implements ShuffleConsumerPlugin.

EDIT: Finally, as @teo points out in this related SO question, the

ReduceCopier.fetchOutputs() method is the one that holds the reduce task from running until all map outputs are copied (through the while loop in line 2026 of Hadoop release 1.0.4).

Community
  • 1
  • 1
cabad
  • 4,555
  • 1
  • 20
  • 33
2

You can configure this using the slowstart property, which denotes the percentage of your mappers that need to be finished before the copy to the reducers starts. It usual default is in the 0.9 - 0.95 (90-95%) mark, but you can override to be 0 if your want

`mapreduce.reduce.slowstart.completed.map`
Chris White
  • 29,949
  • 4
  • 71
  • 93
  • Yes but this just starts the shuffle process. The sort and reduce would start only after mapperB finishes. What i want to do is to start sort and reduce before the mapperB finishes(step 3 in my above scenario). Or have i misunderstood something? – teo May 22 '13 at 10:53
  • Well the sort and reduce can't start until all mapper input are in – Chris White May 22 '13 at 14:11
1

Starting the sort process before all mappers finish is sort of a hadoop-antipattern (if I may put it that way!), in that the reducers cannot know that there is no more data to receive until all mappers finish. you, the invoker may know that, based on your definition of keys, partitioner etc., but the reducers don't.

davek
  • 22,499
  • 9
  • 75
  • 95
  • What do you mean with "the reducers cannot know that there is no more data to receive". And how do they know that all mappers are finished in a normal scenario? – teo May 22 '13 at 12:15
  • I mean that a reducer cannot know that a still-running mapper is not going to at some point emit data that will - depending on the partitioning logic - be sent to that reducer. All the reducer knows is that mapper tasks are processing input splits: the contents of those splits might be known to you (based on prior outputs) but not your reducers. – davek May 22 '13 at 12:56
  • yes you are right but is that for the reducer important? What i have in mind is to make the reducer start work as soon as some input data come from the mapper and let him wait. if new data come from another mapper than i want the reducer to reduce them and to merge the new result with the result generated before. When all the mappers end i think the reducer will stop as in a normal scenario. Is that possible? – teo May 22 '13 at 15:12
  • I see you point, but things don't work that way: the reducers are triggered only when all mappers are finished. – davek May 23 '13 at 07:56