142

In Map Reduce programming the reduce phase has shuffling, sorting and reduce as its sub-parts. Sorting is a costly affair.

What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming?

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
Nithin
  • 9,661
  • 14
  • 44
  • 67
  • 3
    I've always assumed this was necessary as the output from the mapper is the input for the reducer, so it was sorted based on the keyspace and then split into buckets for each reducer input. – BasicHorizon Mar 03 '14 at 08:49

8 Answers8

203

First of all shuffling is the process of transferring data from the mappers to the reducers, so I think it is obvious that it is necessary for the reducers, since otherwise, they wouldn't be able to have any input (or input from every mapper). Shuffling can start even before the map phase has finished saving some time. That's why you can see a reduce status greater than 0% (but less than 33%) when the map status is not yet 100%.

Sorting saves time for the reducer, helping it easily distinguish when a new reduce task should start. It simply starts a new reduce task, when the next key in the sorted input data is different than the previous, to put it simply. Each reduce task takes a list of key-value pairs, but it has to call the reduce() method which takes a key-list(value) input, so it has to group values by key. It's easy to do so, if input data is pre-sorted (locally) in the map phase and simply merge-sorted in the reduce phase (since the reducers get data from many mappers).

Partitioning, which you mentioned in one of the answers, is a different process. It determines which reducer a (key, value) pair, output of the map phase, will be sent. The default Partitioner uses hashing on the keys to distribute them to the reduced tasks, but you can override it and use your own custom Partitioner.

A great source of information for these steps is this Yahoo tutorial (archived).

A nice graphical representation of this, found in Tom White's book "Hadoop: The Definitive Guide", is the following (shuffle is called "copy" in this figure):

enter image description here

Note that shuffling and sorting are not performed at all if you specify zero reducers (setNumReduceTasks(0)). Then, the MapReduce job stops at the map phase, and the map phase does not include any kind of sorting (so even the map phase is faster).

Tom White has been an Apache Hadoop committer since February 2007, and is a member of the Apache Software Foundation, so I guess it is pretty credible and official (as you requested).

vefthym
  • 7,422
  • 6
  • 32
  • 58
  • "Sorting saves time for the reducer, helping it easily distinguish when a new reduce task should start. It simply starts a new reduce task, when the next key in the sorted input data is different than the previous, to put it simply." I don't get this part. Mapper uses a partitioner to divide spills into partitions locally, each partition then send to a reduce. How sorting helps here? – MaxNevermind Jun 30 '16 at 15:18
  • 1
    @MaxNevermind If you have x reduce tasks (partitions), it doesn't mean that you will end up calling the reduce() method x times. It will be called once for every distinct key. So one reduce task can call the reduce() method several times. – vefthym Jun 30 '16 at 15:38
  • "It will be called once for every distinct key" Why? Mapper forms partitions whatever way it wants(not necessary one partition for every distinct key), then each partition goes to reducer, is it wrong? – MaxNevermind Jun 30 '16 at 16:03
  • 1
    @MaxNevermind Mapper outputs keys and values, it does not form partitions. The partitions are defined by the number of reduce tasks that the user defines and the Partitioner implementation. The outputs of all Mappers that have the same key are going to the same reduce() method. This cannot be changed. But what *can* be changed is what other keys (if any) will be placed in the same partition and thus, will be handled by the same task. A reduce task can call the reduce() function more than once, but only once for every key. – vefthym Jun 30 '16 at 18:59
  • 3
    ok I think I've got it. My problem was that I forgot that reduce takes a list of values as an argument not just one key-value pair. I think you should elaborate this in your answer: "Each reduce task takes a list of key-value pairs but it has to call reduce method which takes a key-List, so it has to group values by key, it's easy to do if input data is pre-sorted in a mapper stage" – MaxNevermind Jun 30 '16 at 20:52
  • @MaxNevermind thanks, I have added that in my answer. – vefthym Jul 01 '16 at 00:23
  • @SecurityHound are you sure that image is from that tutorial? I don't see it on the linked page. At the least, it should clearly be indicated exactly where it's from. – Ryan M Apr 04 '23 at 04:34
  • @RyanM - [Looks](https://meta.stackoverflow.com/questions/423995/restoration-of-posts) like I was trolled. – Security Hound Apr 04 '23 at 04:46
  • @RyanM thanks for pointing that out. The image is actually from the other link that I am citing (Tom White's book "The Definitive Guide"), and not the archived Yahoo tutorial. If you click on the link, it will be the first thing you see. This should be clearer in my answer. – vefthym Apr 05 '23 at 07:12
38

I thought of just adding some points missing in above answers. This diagram taken from here clearly states the what's really going on.

enter image description here

If I state again the real purpose of

  • Split: Improves the parallel processing by distributing the processing load across different nodes (Mappers), which would save the overall processing time.

  • Combine: Shrinks the output of each Mapper. It would save the time spending for moving the data from one node to another.

  • Sort (Shuffle & Sort): Makes it easy for the run-time to schedule (spawn/start) new reducers, where while going through the sorted item list, whenever the current key is different from the previous, it can spawn a new reducer.

Supun Wijerathne
  • 11,964
  • 10
  • 61
  • 87
  • Where would the partition step come into this graph? After map and before combine? – Issung Jun 02 '20 at 04:55
  • @Joel I hope you refer to 'split' step? – Supun Wijerathne Jun 02 '20 at 05:07
  • 2
    No I mean the partition step, it decides what reducer to send the data to, using a simple hash modulo by default, after some more research I believe it comes after the combine step, before shuffle & sort. – Issung Jun 02 '20 at 05:12
  • 1
    @Joel I'm not pretty much clear what you intend to describe. In a nutshell, the exact sequence of steps can be pretty much problem-specific. I can say that for some scenarios even sorting is not necessary. Coming back to your input, if I specifically talk about the above simple wordcount example, I don't really see any need for such a partitioning to decide reducers. Here it's quite straight forward to spawn reduces per key. But I can guess that your point can be valid for some scenarios. Frankly, I don't have an exact clear idea about that. – Supun Wijerathne Jun 02 '20 at 05:34
  • can you tell which node is responsible for shuffle and sort?Is it same some specific node we specify or is it some map node or reduce node – rahul sharma Mar 14 '21 at 22:23
  • 2
    @rahulsharma the whole map-reduce system follows master-slave co-ordination. So each inter-node action is based on that. – Supun Wijerathne Mar 15 '21 at 03:11
  • @Joel the partition step doesnt decide to which reducer the data is sent. It just 'partitions' data which means splits it into micro-partitions that contain the same group of keys. After this step there is a sort operation which goes to each partition and sorts it to make the reducer's task even easier. Then these partitions are distributed in a group by manner to each reducer in order. The step of partitioning doesn't say this partition A goes to Reducer 1 explicitly. – user3426711 Feb 24 '22 at 10:00
4

Some of the data processing requirements doesn't need sort at all. Syncsort had made the sorting in Hadoop pluggable. Here is a nice blog from them on sorting. The process of moving the data from the mappers to the reducers is called shuffling, check this article for more information on the same.

Praveen Sripati
  • 32,799
  • 16
  • 80
  • 117
2

I've always assumed this was necessary as the output from the mapper is the input for the reducer, so it was sorted based on the keyspace and then split into buckets for each reducer input. You want to ensure all the same values of a Key end up in the same bucket going to the reducer so they are reduced together. There is no point sending K1,V2 and K1,V4 to different reducers as they need to be together in order to be reduced.

Tried explaining it as simply as possible

BasicHorizon
  • 191
  • 2
  • 14
  • If we want to send k1,v1 and k1,v4 to same reducer we can do shuffling . then what is the purpose of sorting ? – Nithin Mar 04 '14 at 00:47
  • It does the sorting for multiple reasons one reason is, when a MapReduce Job is sending all of the KV pairs to a reducer if the input is not sorted It would have to scan all of the Mapper outputs to pick up every instance of K1,VX. whereas if the Mapper output is sorted as soon as K2,VX is picked up you know that all of K1,VX has been picked up and that set can be sent off to a reducer for processing, the benefit of this is you don't have to wait for every reducer to be ready in order for each of them to start reducing. – BasicHorizon Mar 04 '14 at 11:10
  • Also when it comes to aggregation, if you specify you want to Aggregate all of K1,V1 if the input to the reducer is sorted as soon as the reducer picks up on K2,V2 it knows that no more instances of K1,V1 exists so it can finish it's aggregation whereas if reducer input is not sorted it will have to scan the entire input for K1,V1 – BasicHorizon Mar 04 '14 at 11:12
  • I think the reason is this: You hashmap the keys to a given reducer. So, one scan through the entire key space is enough to map each (k,v) to a reducer in such a way that same key goes to same partition. Sorting is done to get (k,v1,v2,v3,v4,...) that the reducer logic will be run on. This is the hadoop's way of groupby – figs_and_nuts Sep 26 '21 at 20:13
2

Shuffling is the process by which intermediate data from mappers are transferred to 0,1 or more reducers. Each reducer receives 1 or more keys and its associated values depending on the number of reducers (for a balanced load). Further the values associated with each key are locally sorted.

Shailvi
  • 105
  • 4
0

There only two things that MapReduce does NATIVELY: Sort and (implemented by sort) scalable GroupBy.

Most of applications and Design Patterns over MapReduce are built over these two operations, which are provided by shuffle and sort.

Evgeny Benediktov
  • 1,389
  • 1
  • 10
  • 13
0

This is a good reading. Hope it helps. In terms of sorting you are concerning, I think it is for the merge operation in last step of Map. When map operation is done, and need to write the result to local disk, a multi-merge will be operated on the splits generated from buffer. And for a merge operation, sorting each partition in advanced is helpful.

hakunami
  • 2,351
  • 4
  • 31
  • 50
0

Well, In Mapreduce there are two important phrases called Mapper and reducer both are too important, but Reducer is mandatory. In some programs reducers are optional. Now come to your question. Shuffling and sorting are two important operations in Mapreduce. First Hadoop framework takes structured/unstructured data and separate the data into Key, Value.

Now Mapper program separate and arrange the data into keys and values to be processed. Generate Key 2 and value 2 values. This values should process and re arrange in proper order to get desired solution. Now this shuffle and sorting done in your local system (Framework take care it) and process in local system after process framework cleanup the data in local system. Ok

Here we use combiner and partition also to optimize this shuffle and sort process. After proper arrangement, those key values passes to Reducer to get desired Client's output. Finally Reducer get desired output.

K1, V1 -> K2, V2 (we will write program Mapper), -> K2, V' (here shuffle and soft the data) -> K3, V3 Generate the output. K4,V4.

Please note all these steps are logical operation only, not change the original data.

Your question: What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming?

Short answer: To process the data to get desired output. Shuffling is aggregate the data, reduce is get expected output.

Venu A Positive
  • 2,992
  • 2
  • 28
  • 31