1

I am working on a hive(1.4-cdh) code optimization on MapReduce, in my project we have used lot of count distinct operation with groupby clause, an example hql is shown below.

DROP TABLE IF EXISTS testdb.NewTable PURGE;
CREATE TABLE testdb.NewTable AS
SELECT a.* FROM (
SELECT col1,
COUNT(DISTINCT col2) AS col2,
COUNT(DISTINCT col3) AS col3,
COUNT(DISTINCT col4) AS col4,
COUNT(DISTINCT col5) AS col5
FROM BaseTable
GROUP BY col1) a
WHERE  a.col3 > 1 OR a.col4 > 1 OR a.col2 > 1 OR a.col5 > 1;

Can you please help me with a better approach on this, to minimize the processing time of query.

Adding explain path for CountDistinct and CollectSet:

CountDistinct Explain Plan:

OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: BaseTable
            Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: col1 (type: string), col2 (type: decimal(3,0)), col3 (type: string), col4 (type: string), col5 (type: string)
              outputColumnNames: col1, col2, col3, col4, col5
              Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: count(DISTINCT col5), count(DISTINCT col2), count(DISTINCT col4), count(DISTINCT col3)
                keys: col1 (type: string), col5 (type: string), col2 (type: decimal(3,0)), col4 (type: string), col3 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8
                Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: decimal(3,0)), _col3 (type: string), _col4 (type: string)
                  sort order: +++++
                  Map-reduce partition columns: _col0 (type: string)
                  Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(DISTINCT KEY._col1:0._col0), count(DISTINCT KEY._col1:1._col0), count(DISTINCT KEY._col1:2._col0), count(DISTINCT KEY._col1:3._col0)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2, _col3, _col4
          Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
          Filter Operator
            predicate: ((((_col2 > 1) or (_col3 > 1)) or (_col1 > 1)) or (_col4 > 1)) (type: boolean)
            Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

CollectSet Explain Plan:

OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: BaseTable
            Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: col1 (type: string), col2 (type: decimal(3,0)), col3 (type: string), col4 (type: string), col5 (type: string)
              outputColumnNames: col1, col2, col3, col4, col5
              Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: collect_set(col5), collect_set(col2), collect_set(col4), collect_set(col3)
                keys: col1 (type: string)
                mode: hash
                outputColumnNames: _col0, _col1, _col2, _col3, _col4
                Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string)
                  sort order: +
                  Map-reduce partition columns: _col0 (type: string)
                  Statistics: Num rows: 16863109255 Data size: 2613966713222 Basic stats: COMPLETE Column stats: NONE
                  value expressions: _col1 (type: array<string>), _col2 (type: array<decimal(3,0)>), _col3 (type: array<string>), _col4 (type: array<string>)
      Reduce Operator Tree:
        Group By Operator
          aggregations: collect_set(VALUE._col0), collect_set(VALUE._col1), collect_set(VALUE._col2), collect_set(VALUE._col3)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2, _col3, _col4
          Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: _col0 (type: string), size(_col1) (type: int), size(_col2) (type: int), size(_col3) (type: int), size(_col4) (type: int)
            outputColumnNames: _col0, _col1, _col2, _col3, _col4
            Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: ((((_col2 > 1) or (_col3 > 1)) or (_col1 > 1)) or (_col4 > 1)) (type: boolean)
              Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
              File Output Operator
                compressed: false
                Statistics: Num rows: 8431554627 Data size: 1306983356533 Basic stats: COMPLETE Column stats: NONE
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
nilesh1212
  • 1,561
  • 2
  • 26
  • 60

1 Answers1

1

Try using collect_set, it will collect distinct values excluding nulls.

CREATE TABLE testdb.NewTable AS
SELECT a.* FROM (
SELECT col1,
size(collect_set(col2)) AS col2,
size(collect_set(col3)) AS col3,
size(collect_set(col4)) AS col4,
size(collect_set(col5)) AS col5
FROM BaseTable
GROUP BY col1) a
WHERE  a.col3 > 1 OR a.col4 > 1 OR a.col2 > 1 OR a.col5 > 1;
leftjoin
  • 36,950
  • 8
  • 57
  • 116
  • Thank you for the response, I tried your answer it is adding up more time in processing as compared to count(distinct). – nilesh1212 Jun 03 '19 at 08:01
  • @nilesh1212 what is slow exacly, could you please provide execution logs and the plan(explain output) – leftjoin Jun 03 '19 at 08:30
  • --in collect_set stg-1: Hadoop job information for Stage-1: number of mappers: 660; number of reducers: 400 stg-2: Hadoop job information for Stage-2: number of mappers: 313; number of reducers: 297 stg-3: Hadoop job information for Stage-4: number of mappers: 1; number of reducers: 0 Total Time: 3159.383 sec --in count(distinct) Hadoop job information for Stage-1: number of mappers: 659; number of reducers: 400 Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0 Total Time: 2306.815 sec – nilesh1212 Jun 03 '19 at 08:45
  • @nilesh1212 Both plans seems identical. this parameter hive.map.aggr is set to true? It seems so, because it is aggregation on mapper also. What I noticed is that in case of collect_set the number of mappers and reducers is less. Try to increase parallelism on mappers and reducers: https://stackoverflow.com/a/48296562/2700344 decrease figures untill you get more mappers and reducers running, it may help – leftjoin Jun 03 '19 at 11:28
  • yes hive.map.aggr=true , do I need to set it to false and try out your answer ? – nilesh1212 Jun 03 '19 at 11:33
  • @nilesh1212, no, let it be true. Try to increase the number of mappers and reducers – leftjoin Jun 03 '19 at 11:40
  • I have set below input split setting as you suggested, it is taking 1 hour to complete the job. set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; set mapreduce.input.fileinputformat.split.minsize=16000; -- 16 KB set mapreduce.input.fileinputformat.split.maxsize=128000000; -- 128Mb – nilesh1212 Jun 04 '19 at 06:09
  • @nilesh1212 Adjust these fugures. 128000000 is too high. reduce untill you get more mappers. The same for reducers – leftjoin Jun 04 '19 at 06:51
  • Thanks for the next steps, I tried with 64 MB max split configuration the overall execution time is now 39 mins. – nilesh1212 Jun 06 '19 at 06:56
  • @nilesh1212 You need to understand, there is no universal settings. Try to tune these parameters untill you get optimal performance. Try 32Mb. If it starts too many mappers, then they will most of the time be wating in the queue and work slow because lack of resources. Tune according to your cluster capacity – leftjoin Jun 06 '19 at 07:10
  • @nilesh1212 Also tune `hive.exec.reducers.bytes.per.reducer` for reducers optimal parallelism – leftjoin Jun 06 '19 at 07:12
  • Yes I will definitely try out these options, thank you so much for quick response. – nilesh1212 Jun 06 '19 at 07:58