67

On Hive, for Data Retrieval Queries (e.g. SELECT ...), NOT Data Definition (e.g. CREATE TABLES ...), as far as I understand:

  • SORT BY only sorts with in the reducer

  • ORDER BY orders things globally but shoves everything into one reducers

  • CLUSTER BY intelligently distributes stuff into reducers by the key hash and make a sort by

So my question is:

  1. Does CLUSTER BY guarantees a global order?
  2. DISTRIBUTE BY puts the same keys into same reducers but what aboutthe adjacent keys?

The only document I can find on this is here and from the example it seems like it orders them globally. But from the definition I feel like it doesn't always do that.

MrE
  • 19,584
  • 12
  • 87
  • 105
cashmere
  • 2,811
  • 1
  • 23
  • 32

8 Answers8

173

In short, for your questions:

  1. Does CLUSTER BY guarantee a global order?
    • No.
  2. DISTRIBUTE BY puts the same keys into same reducers but what about the adjacent keys?

The longer version:

To understand Hive, first you must understand Map Reduce. The relevant property of Map Reduce for this question is that when data arrives at the reducing stage, they would have always gone through shuffling and sorting on mapper or reducer before the reducing step. Then:

  • ORDER BY x: Data guarantees global ordering by x by using one reducer for the last reducing step. This will be slow for large outputs. It is however acceptable even for processing large datasets as long as you know at the last step the data size is small. (e.g. after filtering and aggregation)
  • SORT BY x: guarantees local ordering specifically by x, at each of N reducers. Global ordering is not guaranteed.
  • DISTRIBUTE BY x: guarantees that the rows of the same distribute key x goes to the same reducer but does NOT guarantees your desired ordering. Data at reducing stage is still sorted, but NOT necessarily by x.
  • CLUSTER BY x: This is the same as doing (DISTRIBUTE BY x and SORT BY x).

Please refer to the official wiki for detailed mechanism and syntax: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy


Edited: As mentioned in comments and other answers, previous answer is incorrect and the choice of words may add wrong nuances. The use of of the phrase overlapping-range is misleading since shuffling is done with a hash function, which is NOT necessarily by range. For instance depending on implementation hashing integers by even or odd number is valid, which is overlapping in terms of number range. DISTRIBUTE BY in this case only guarantees all even numbers goes to the same reducer, and all odd numbers goes to another.

The other answers have minor mistakes, or unrelated to the question, or too verbose.

greybeard
  • 2,249
  • 8
  • 30
  • 66
Lars Yencken
  • 2,976
  • 2
  • 21
  • 12
  • 12
    As mentioned by other answers, according to https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy, `CLUSTER BY` and `DISTRIBUTE BY` cannot give you non-overlapping ranges. `CLUSTER BY` cannot guarantee the global ordering. – yhuai Jan 09 '15 at 19:01
  • Im wondering... What considered to be "Large DataSets"? Can you quantify that? – idoda Aug 31 '15 at 11:07
  • 2
    My query returns the same undesired stuff for both `SORT BY` and `CLUSTER BY`: local ordering within reducers. I had to resort to `ORDER BY` and wait entire weekend until the job is completed. – kurtosis Nov 09 '15 at 09:47
  • 1
    CLUSTER BY uses hash of clustering columns mod number of reducers to ensure rows with those same column values go to the same reducer - that's it, no stronger guarantees than that! See my answer with links to example and order preserving hashing etc. – Edi Bice Aug 04 '16 at 16:12
  • I am also thinking the same as @yhuai. lars-yencken, can you please provide any reference? – nomadSK25 Aug 14 '18 at 03:58
  • So, after a little search along this subject, I found that there's no guarantee that `CLUSTER BY` will distribute with non overlapping ranges. Given that, is there any way optimize a global ordering process, instead of doing the whole process into one reducer with `ORDER BY` ? – Joabe Lucena Jan 31 '19 at 16:27
  • @yhuai: Cluster By is a short-cut for both Distribute By and Sort By. Hive uses the columns in Distribute By to distribute the rows among reducers. All rows with the same Distribute By columns will go to the same reducer. However, `Distribute By` does not guarantee clustering or sorting properties on the distributed keys. It's the `Distribute By` that doesn't guarantee global ordering right? not `Cluster By`. Am I reading it wrong? The went on explaining further [1..cont'd] – raja777m Nov 22 '20 at 16:57
  • [Con'd... 2]: Note that all rows with the same key x1 is guaranteed to be distributed to the same reducer (reducer 1 in this case), but they are not guaranteed to be clustered in adjacent positions. In contrast, if we use Cluster By x, the two reducers will further sort rows on x.. and specify Distribute By and Sort By, so the partition columns and sort columns can be different... – raja777m Nov 22 '20 at 16:57
  • From ed9w2in6: *I decided to correct this answer instead of writing a new one. This answer showed up as top search on Google and DuckDuckGo but it is incorrect!* – greybeard Jun 08 '23 at 13:35
16

Let me clarify first: clustered by only distributes your keys into different buckets, clustered by ... sorted by get buckets sorted.

With a simple experiment (see below) you can see that you will not get global order by default. The reason is that default partitioner splits keys using hash codes regardless of actual key ordering.

However you can get your data totally ordered.

Motivation is "Hadoop: The Definitive Guide" by Tom White (3rd edition, Chapter 8, p. 274, Total Sort), where he discusses TotalOrderPartitioner.

I will answer your TotalOrdering question first, and then describe several sort-related Hive experiments that I did.

Keep in mind: what I'm describing here is a 'proof of concept', I was able to handle a single example using Claudera's CDH3 distribution.

Originally I hoped that org.apache.hadoop.mapred.lib.TotalOrderPartitioner will do the trick. Unfortunately it did not because it looks like Hive partitions by value, not key. So I patch it (should have subclass, but I do not have time for that):

Replace

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(key);
}

with

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(value);
}

Now you can set (patched) TotalOrderPartitioner as your Hive partitioner:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

hive> set total.order.partitioner.natural.order=false

hive> set total.order.partitioner.path=/user/yevgen/out_data2

I also used

hive> set hive.enforce.bucketing = true; 

hive> set mapred.reduce.tasks=4;

in my tests.

File out_data2 tells TotalOrderPartitioner how to bucket values. You generate out_data2 by sampling your data. In my tests I used 4 buckets and keys from 0 to 10. I generated out_data2 using ad-hoc approach:

import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;


public class TotalPartitioner extends Configured implements Tool{
    public static void main(String[] args) throws Exception{
            ToolRunner.run(new TotalPartitioner(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Path partFile = new Path("/home/yevgen/out_data2");
        FileSystem fs = FileSystem.getLocal(getConf());

        HiveKey key = new HiveKey();
        NullWritable value = NullWritable.get();

        SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
        key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
        writer.append(key, value);
        key.set( new byte[]{1, 6}, 0, 2);//partition at 6
        writer.append(key, value);
        key.set( new byte[]{1, 9}, 0, 2);//partition at 9
        writer.append(key, value);
        writer.close();
        return 0;
    }

}

Then I copied resulting out_data2 to HDFS (into /user/yevgen/out_data2)

With these settings I got my data bucketed/sorted (see last item in my experiment list).

Here is my experiments.

  • Create sample data

    bash> echo -e "1\n3\n2\n4\n5\n7\n6\n8\n9\n0" > data.txt

  • Create basic test table:

    hive> create table test(x int); hive> load data local inpath 'data.txt' into table test;

Basically this table contains values from 0 to 9 without order.

  • Demonstrate how table copying works (really mapred.reduce.tasks parameter which sets MAXIMAL number of reduce tasks to use)

    hive> create table test2(x int);

    hive> set mapred.reduce.tasks=4;

    hive> insert overwrite table test2 select a.x from test a join test b on a.x=b.x; -- stupied join to force non-trivial map-reduce

    bash> hadoop fs -cat /user/hive/warehouse/test2/000001_0

    1

    5

    9

  • Demonstrate bucketing. You can see that keys are assinged at random without any sort order:

    hive> create table test3(x int) clustered by (x) into 4 buckets;

    hive> set hive.enforce.bucketing = true;

    hive> insert overwrite table test3 select * from test;

    bash> hadoop fs -cat /user/hive/warehouse/test3/000000_0

    4

    8

    0

  • Bucketing with sorting. Results are partially sorted, not totally sorted

    hive> create table test4(x int) clustered by (x) sorted by (x desc) into 4 buckets;

    hive> insert overwrite table test4 select * from test;

    bash> hadoop fs -cat /user/hive/warehouse/test4/000001_0

    1

    5

    9

You can see that values are sorted in ascending order. Looks like Hive bug in CDH3?

  • Getting partially sorted without cluster by statement:

    hive> create table test5 as select x from test distribute by x sort by x desc;

    bash> hadoop fs -cat /user/hive/warehouse/test5/000001_0

    9

    5

    1

  • Use my patched TotalOrderParitioner:

    hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

    hive> set total.order.partitioner.natural.order=false

    hive> set total.order.partitioner.path=/user/training/out_data2

    hive> create table test6(x int) clustered by (x) sorted by (x) into 4 buckets;

    hive> insert overwrite table test6 select * from test;

    bash> hadoop fs -cat /user/hive/warehouse/test6/000000_0

    1

    2

    0

    bash> hadoop fs -cat /user/hive/warehouse/test6/000001_0

    3

    4

    5

    bash> hadoop fs -cat /user/hive/warehouse/test6/000002_0

    7

    6

    8

    bash> hadoop fs -cat /user/hive/warehouse/test6/000003_0

    9

Yevgen Yampolskiy
  • 7,022
  • 3
  • 26
  • 23
  • Sorry I flagged this answer as by further reading, although an interesting read, it is un related to the question. The question is on Data Retrieval but not Data Definition (DDL). – ed9w2in6 Jun 05 '23 at 08:29
10

CLUSTER BY does not produce global ordering.

The accepted answer (by Lars Yencken) misleads by stating that the reducers will receive non-overlapping ranges. As Anton Zaviriukhin correctly points to the BucketedTables documentation, CLUSTER BY is basically DISTRIBUTE BY (same as bucketing) plus SORT BY within each bucket/reducer. And DISTRIBUTE BY simply hashes and mods into buckets and while the hashing function may preserve order (hash of i > hash of j if i > j), mod of hash value does not.

Here's a better example showing overlapping ranges

http://myitlearnings.com/bucketing-in-hive/

Edi Bice
  • 566
  • 6
  • 18
5

As I understand, short answer is No. You'll get overlapping ranges.

From SortBy documentation: "Cluster By is a short-cut for both Distribute By and Sort By." "All rows with the same Distribute By columns will go to the same reducer." But there is no information that Distribute by guarantee non-overlapping ranges.

Moreover, from DDL BucketedTables documentation: "How does Hive distribute the rows across the buckets? In general, the bucket number is determined by the expression hash_function(bucketing_column) mod num_buckets." I suppose that Cluster by in Select statement use the same principle to distribute rows between reducers because it's main use is for populating bucketed tables with the data.

I created a table with 1 integer column "a", and inserted numbers from 0 to 9 there.

Then I set number of reducers to 2 set mapred.reduce.tasks = 2;.

And select data from this table with Cluster by clause select * from my_tab cluster by a;

And received result that I expected:

0
2
4
6
8
1
3
5
7
9

So, first reducer (number 0) got even numbers (because their mode 2 gives 0)

and second reducer (number 1) got odd numbers (because their mode 2 gives 1)

So that's how "Distribute By" works.

And then "Sort By" sorts the results inside each reducer.

samkart
  • 6,007
  • 2
  • 14
  • 29
2

Use case : When there is a large dataset then one should go for sort by as in sort by , all the set reducers sort the data internally before clubbing together and that enhances the performance. While in Order by, the performance for the larger dataset reduces as all the data is passed through a single reducer which increases the load and hence takes longer time to execute the query. Please see below example on 11 node cluster. enter image description here

This one is Order By example outputenter image description here

This one is Sort By example output enter image description here

This one is Cluster By example enter image description here

What I observed , the figures of sort by , cluster by and distribute by is SAME But internal mechanism is different. In DISTRIBUTE BY : The same column rows will go to one reducer , eg. DISTRIBUTE BY(City) - Bangalore data in one column , Delhi data in one reducer:

enter image description here

Tutu Kumari
  • 485
  • 4
  • 10
0

Cluster by is per reducer sorting not global. In many books also it is mentioned incorrectly or confusingly. It has got particular use where say you distribute each department to specific reducer and then sort by employee name in each department and do not care abt order of dept no the cluster by to be used and it more perform-ant as workload is distributed among reducers.

0

SortBy: N or more sorted files with overlapping ranges.

OrderBy: Single output i.e fully ordered.

Distribute By: Distribute By protecting each of N reducers gets non-overlapping ranges of the column but doesn’t sort the output of each reducer.

For more information http://commandstech.com/hive-sortby-vs-orderby-vs-distributeby-vs-clusterby/

ClusterBy: Refer to the same example as above, if we use Cluster By x, the two reducers will further sort rows on x:

slfan
  • 8,950
  • 115
  • 65
  • 78
0

If I understood it correctly

1.sort by - only sorts the data within the reducer

2.order by - orders things globally by pushing the entire data set to a single reducer. If we do have a lot of data(skewed), this process will take a lot of time.

  1. cluster by - intelligently distributes stuff into reducers by the key hash and make a sort by, but does not grantee global ordering. One key(k1) can be placed into two reducers. 1st reducer gets 10K K1 data, the second one might get 1K k1 data.