1

In Hadoop MapReduce no reducer starts before all mappers are finished. Can someone please explain me at which part/class/codeline is this logic implemented? I am talking about Hadoop MapReduce version 1 (NOT Yarn). I have searched the map reduce framework but there are so many classes and i don't understand much the method calls and their ordering.

In other words i need (first for test purposes) to let the reducers start reducing even if there are still working mappers. I know that this way i am getting false results for the job but for know this is the start of some work for changing parts of the framework. So where should i start to look and make changes?

teo
  • 137
  • 1
  • 10

2 Answers2

3

This is done in the shuffle phase. For Hadoop 1.x, take a look at org.apache.hadoop.mapred.ReduceTask.ReduceCopier, which implements ShuffleConsumerPlugin. You may also want to read the "Breaking the MapReduce Stage Barrier" research paper by Verma et al.

EDIT:

After reading @chris-white 's answer, I realized that my answer needed an extra explanation. In the MapReduce model, you need to wait for all mappers to finish, since the keys need to be grouped and sorted; plus, you may have some speculative mappers running and you do not know yet which of the duplicate mappers will finish first. However, as the "Breaking the MapReduce Stage Barrier" paper indicates, for some applications, it may make sense not to wait for all of the output of the mappers. If you would want to implement this sort of behavior (most likely for research purposes), then you should take a look at the classes I mentioned above.

cabad
  • 4,555
  • 1
  • 20
  • 33
  • 1
    The paper you referenced discusses indeed what i was looking for. Do you know if the implementation is somwhere online? **for others looking for the same thing:** as i understand from the code the ReduceCopier.fetchOutputs() method is the one that holds the reduce task from running until all map outputs are copied (through the while loop in line 2026 of Hadoop release 1.0.4). – teo Aug 16 '13 at 13:25
  • I don't think the code is online. You could e-mail the authors and ask for the code, but my guess is that it won't be too useful for you, since the work was done a while ago, on an older version of Hadoop (maybe 0.18?). – cabad Aug 16 '13 at 13:42
2

Some points for clarification:

A reducer cannot start reducing until all mappers have finished, their partitions copied to the node where the reducer task is running, and finally sorted.

What you may see is a reducer pre-empting the copy of map outputs while other map tasks are still running. This is controlled via a configuration property known as slowstart (mapred.reduce.slowstart.completed.map). This value represents a ratio (0.0 - 1.0) of the number of map tasks that need to have completed before the reducer tasks will start up (copying over the map outputs from those map tasks that have completed). The default value is usually around 0.9, meaning that if you have 100 map tasks for your job, 90 of them would need to finish before the job tracker can start to launch the reduce tasks.

This is all controlled by the job tracker, in the JobInProgress class, lines 775, 1610, 1664.

Chris White
  • 29,949
  • 4
  • 71
  • 93
  • I guess I assumed @teo was interested in doing this for research purposes. See my edited answer for more details. – cabad Aug 15 '13 at 20:52
  • so if i set this parameter to be zero than as soon as a mapper finishes **ALL** the reducers will start to copy its output. Do i understand this right? – teo Aug 16 '13 at 13:32