2

Is there any ways to improve the HQL performance? I have a query like this:

with
tmp_a as (
SELECT * FROM `zhihu.answer` where ym in (select distinct(ym) from zhihu.answer_increment)
),
-- the result of subquery select distinct(ym) from zhihu.answer_increment is 201806
-- the rows of tmp_a are 1,790,000
tmp1 as (
select a.* from tmp_a a
LEFT JOIN `zhihu.answer_increment` b
ON a.answer_id = b.answer_id
AND a.insert_time = b.insert_time
WHERE b.answer_id IS NULL)

insert overwrite table zhihu.answer partition(ym)
select * from tmp1
UNION ALL
SELECT *
FROM `zhihu.answer_increment` t 

Table information:

Rows of `zhihu.answer` are about 10 millions. and was partitioned by `ym`, that means year and month,
and was clustered by `answer_id` into 256 buckets.
Rows of `zhihu.answer_increment` are about 100 thousands.

the data structure of these two tables are the same.

The purpose of above query is to merge the fresh created data (answer_increment table) with the history data (answer table), and save into Hive.

The question is the whole process took approximately 2 hours to run, is there any optimization for above HQL?

I tried create index to the table, but not sure if this helped:

CREATE INDEX insert_time_index ON TABLE zhihu.answer (insert_time) AS 'COMPACT'
 WITH DEFERRED REBUILD;

My Hive setting:

set hive.auto.convert.join=true;

My HQL Explain:

Explain 
STAGE DEPENDENCIES: 
  Stage-5 is a root stage   
  Stage-11 depends on stages: Stage-5 , consists of Stage-13, Stage-1   
  Stage-13 has a backup stage: Stage-1  
  Stage-10 depends on stages: Stage-13  
  Stage-9 depends on stages: Stage-1, Stage-10 , consists of Stage-12, Stage-2  
  Stage-12 has a backup stage: Stage-2  
  Stage-8 depends on stages: Stage-12   
  Stage-3 depends on stages: Stage-2, Stage-8   
  Stage-0 depends on stages: Stage-3    
  Stage-4 depends on stages: Stage-0    
  Stage-2   
  Stage-1   

STAGE PLANS:    
  Stage: Stage-5    
    Map Reduce  
      Map Operator Tree:    
          TableScan 
            alias: zhihu_answer_increment   
            filterExpr: ym is not null (type: boolean)  
            Statistics: Num rows: 101549 Data size: 21426839 Basic stats: COMPLETE Column stats: COMPLETE   
            Select Operator 
              expressions: ym (type: string)    
              outputColumnNames: ym 
              Statistics: Num rows: 101549 Data size: 21426839 Basic stats: COMPLETE Column stats: COMPLETE 
              Group By Operator 
                keys: ym (type: string) 
                mode: hash  
                outputColumnNames: _col0    
                Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE 
                Reduce Output Operator  
                  key expressions: _col0 (type: string) 
                  sort order: + 
                  Map-reduce partition columns: _col0 (type: string)    
                  Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE   
      Execution mode: vectorized    
      Reduce Operator Tree: 
        Group By Operator   
          keys: KEY._col0 (type: string)    
          mode: mergepartial    
          outputColumnNames: _col0  
          Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE   
          Group By Operator 
            keys: _col0 (type: string)  
            mode: hash  
            outputColumnNames: _col0    
            Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE 
            File Output Operator    
              compressed: false 
              table:    
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat    
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat  
                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe   

  Stage: Stage-11   
    Conditional Operator    

  Stage: Stage-13   
    Map Reduce Local Work   
      Alias -> Map Local Tables:    
        _u1-subquery1:tmp1:a:$INTNAME   
          Fetch Operator    
            limit: -1   
      Alias -> Map Local Operator Tree: 
        _u1-subquery1:tmp1:a:$INTNAME   
          TableScan 
            HashTable Sink Operator 
              keys: 
                0 ym (type: string) 
                1 _col0 (type: string)  

  Stage: Stage-10   
    Map Reduce  
      Map Operator Tree:    
          TableScan 
            alias: zhihu.zhihu_answer   
            filterExpr: ym is not null (type: boolean)  
            Statistics: Num rows: 76466394 Data size: 16134409134 Basic stats: COMPLETE Column stats: PARTIAL   
            Map Join Operator   
              condition map:    
                   Left Semi Join 0 to 1    
              keys: 
                0 ym (type: string) 
                1 _col0 (type: string)  
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27   
              Statistics: Num rows: 4024547 Data size: 7356871916 Basic stats: COMPLETE Column stats: PARTIAL   
              File Output Operator  
                compressed: false   
                table:  
                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat  
                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat    
                    serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe 
      Local Work:   
        Map Reduce Local Work   

  Stage: Stage-9    
    Conditional Operator    

  Stage: Stage-12   
    Map Reduce Local Work   
      Alias -> Map Local Tables:    
        _u1-subquery1:tmp1:b    
          Fetch Operator    
            limit: -1   
      Alias -> Map Local Operator Tree: 
        _u1-subquery1:tmp1:b    
          TableScan 
            alias: b    
            Statistics: Num rows: 101549 Data size: 2741823 Basic stats: COMPLETE Column stats: NONE    
            HashTable Sink Operator 
              keys: 
                0 _col3 (type: string), _col15 (type: string)   
                1 answer_id (type: string), insert_time (type: string)  

  Stage: Stage-8    
    Map Reduce  
      Map Operator Tree:    
          TableScan 
            Map Join Operator   
              condition map:    
                   Left Outer Join0 to 1    
              keys: 
                0 _col3 (type: string), _col15 (type: string)   
                1 answer_id (type: string), insert_time (type: string)  
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27, _col31   
              Statistics: Num rows: 4427001 Data size: 8092559283 Basic stats: COMPLETE Column stats: NONE  
              Filter Operator   
                predicate: _col31 is null (type: boolean)   
                Statistics: Num rows: 2213500 Data size: 4046278727 Basic stats: COMPLETE Column stats: NONE    
                Select Operator 
                  expressions: _col0 (type: boolean), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: smallint), _col12 (type: boolean), _col13 (type: int), _col14 (type: string), _col15 (type: string), _col16 (type: boolean), _col17 (type: boolean), _col18 (type: boolean), _col19 (type: boolean), _col20 (type: string), _col21 (type: string), _col22 (type: string), _col23 (type: string), _col24 (type: int), _col25 (type: int), _col26 (type: int), _col27 (type: string) 
                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27   
                  Statistics: Num rows: 2213500 Data size: 4046278727 Basic stats: COMPLETE Column stats: NONE  
                  File Output Operator  
                    compressed: false   
                    table:  
                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat  
                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat    
                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe 
      Local Work:   
        Map Reduce Local Work   

  Stage: Stage-3    
    Map Reduce  
      Map Operator Tree:    
          TableScan 
            Union   
              Statistics: Num rows: 2315049 Data size: 4067705566 Basic stats: COMPLETE Column stats: PARTIAL   
              Reduce Output Operator    
                sort order:     
                Map-reduce partition columns: _col3 (type: string)  
                Statistics: Num rows: 2315049 Data size: 4067705566 Basic stats: COMPLETE Column stats: PARTIAL 
                value expressions: _col0 (type: boolean), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: smallint), _col12 (type: boolean), _col13 (type: int), _col14 (type: string), _col15 (type: string), _col16 (type: boolean), _col17 (type: boolean), _col18 (type: boolean), _col19 (type: boolean), _col20 (type: string), _col21 (type: string), _col22 (type: string), _col23 (type: string), _col24 (type: int), _col25 (type: int), _col26 (type: int), _col27 (type: string) 
          TableScan 
            alias: t    
            Statistics: Num rows: 101549 Data size: 21426839 Basic stats: COMPLETE Column stats: PARTIAL    
            Select Operator 
              expressions: admin_closed_comment (type: boolean), answer_content (type: string), answer_created (type: string), answer_id (type: string), answer_updated (type: string), author_headline (type: string), author_id (type: string), author_name (type: string), author_type (type: string), author_url_token (type: string), avatar_url (type: string), badge_num (type: smallint), can_comment (type: boolean), comment_count (type: int), gender (type: string), insert_time (type: string), is_advertiser (type: boolean), is_collapsed (type: boolean), is_copyable (type: boolean), is_org (type: boolean), question_created (type: string), question_id (type: string), question_title (type: string), question_type (type: string), reward_member_count (type: int), reward_total_money (type: int), voteup_count (type: int), ym (type: string)   
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27   
              Statistics: Num rows: 101549 Data size: 21426839 Basic stats: COMPLETE Column stats: PARTIAL  
              Union 
                Statistics: Num rows: 2315049 Data size: 4067705566 Basic stats: COMPLETE Column stats: PARTIAL 
                Reduce Output Operator  
                  sort order:   
                  Map-reduce partition columns: _col3 (type: string)    
                  Statistics: Num rows: 2315049 Data size: 4067705566 Basic stats: COMPLETE Column stats: PARTIAL   
                  value expressions: _col0 (type: boolean), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: smallint), _col12 (type: boolean), _col13 (type: int), _col14 (type: string), _col15 (type: string), _col16 (type: boolean), _col17 (type: boolean), _col18 (type: boolean), _col19 (type: boolean), _col20 (type: string), _col21 (type: string), _col22 (type: string), _col23 (type: string), _col24 (type: int), _col25 (type: int), _col26 (type: int), _col27 (type: string)   
      Reduce Operator Tree: 
        Select Operator 
          expressions: VALUE._col0 (type: boolean), VALUE._col1 (type: string), VALUE._col2 (type: string), VALUE._col3 (type: string), VALUE._col4 (type: string), VALUE._col5 (type: string), VALUE._col6 (type: string), VALUE._col7 (type: string), VALUE._col8 (type: string), VALUE._col9 (type: string), VALUE._col10 (type: string), VALUE._col11 (type: smallint), VALUE._col12 (type: boolean), VALUE._col13 (type: int), VALUE._col14 (type: string), VALUE._col15 (type: string), VALUE._col16 (type: boolean), VALUE._col17 (type: boolean), VALUE._col18 (type: boolean), VALUE._col19 (type: boolean), VALUE._col20 (type: string), VALUE._col21 (type: string), VALUE._col22 (type: string), VALUE._col23 (type: string), VALUE._col24 (type: int), VALUE._col25 (type: int), VALUE._col26 (type: int), VALUE._col27 (type: string) 
          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27   
          Statistics: Num rows: 2315049 Data size: 425969016 Basic stats: COMPLETE Column stats: PARTIAL    
          File Output Operator  
            compressed: false   
            Statistics: Num rows: 2315049 Data size: 425969016 Basic stats: COMPLETE Column stats: PARTIAL  
            table:  
                input format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat 
                output format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat   
                serde: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe  
                name: zhihu.zhihu_answer    

  Stage: Stage-0    
    Move Operator   
      tables:   
          partition:    
            ym  
          replace: true 
          table:    
              input format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat   
              output format: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat 
              serde: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe    
              name: zhihu.zhihu_answer  

  Stage: Stage-4    
    Stats-Aggr Operator 

  Stage: Stage-2    
    Map Reduce  
      Map Operator Tree:    
          TableScan 
            Reduce Output Operator  
              key expressions: _col3 (type: string), _col15 (type: string)  
              sort order: ++    
              Map-reduce partition columns: _col3 (type: string), _col15 (type: string) 
              Statistics: Num rows: 4024547 Data size: 7356871916 Basic stats: COMPLETE Column stats: PARTIAL   
              value expressions: _col0 (type: boolean), _col1 (type: string), _col2 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: smallint), _col12 (type: boolean), _col13 (type: int), _col14 (type: string), _col16 (type: boolean), _col17 (type: boolean), _col18 (type: boolean), _col19 (type: boolean), _col20 (type: string), _col21 (type: string), _col22 (type: string), _col23 (type: string), _col24 (type: int), _col25 (type: int), _col26 (type: int), _col27 (type: string)    
          TableScan 
            alias: b    
            Statistics: Num rows: 101549 Data size: 2741823 Basic stats: COMPLETE Column stats: NONE    
            Reduce Output Operator  
              key expressions: answer_id (type: string), insert_time (type: string) 
              sort order: ++    
              Map-reduce partition columns: answer_id (type: string), insert_time (type: string)    
              Statistics: Num rows: 101549 Data size: 2741823 Basic stats: COMPLETE Column stats: NONE  
      Reduce Operator Tree: 
        Join Operator   
          condition map:    
               Left Outer Join0 to 1    
          keys: 
            0 _col3 (type: string), _col15 (type: string)   
            1 answer_id (type: string), insert_time (type: string)  
          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27, _col31   
          Statistics: Num rows: 4427001 Data size: 8092559283 Basic stats: COMPLETE Column stats: NONE  
          Filter Operator   
            predicate: _col31 is null (type: boolean)   
            Statistics: Num rows: 2213500 Data size: 4046278727 Basic stats: COMPLETE Column stats: NONE    
            Select Operator 
              expressions: _col0 (type: boolean), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: smallint), _col12 (type: boolean), _col13 (type: int), _col14 (type: string), _col15 (type: string), _col16 (type: boolean), _col17 (type: boolean), _col18 (type: boolean), _col19 (type: boolean), _col20 (type: string), _col21 (type: string), _col22 (type: string), _col23 (type: string), _col24 (type: int), _col25 (type: int), _col26 (type: int), _col27 (type: string) 
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27   
              Statistics: Num rows: 2213500 Data size: 4046278727 Basic stats: COMPLETE Column stats: NONE  
              File Output Operator  
                compressed: false   
                table:  
                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat  
                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat    
                    serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe 

  Stage: Stage-1    
    Map Reduce  
      Map Operator Tree:    
          TableScan 
            alias: zhihu.zhihu_answer   
            filterExpr: ym is not null (type: boolean)  
            Statistics: Num rows: 76466394 Data size: 16134409134 Basic stats: COMPLETE Column stats: PARTIAL   
            Reduce Output Operator  
              key expressions: ym (type: string)    
              sort order: + 
              Map-reduce partition columns: ym (type: string)   
              Statistics: Num rows: 76466394 Data size: 16134409134 Basic stats: COMPLETE Column stats: PARTIAL 
              value expressions: admin_closed_comment (type: boolean), answer_content (type: string), answer_created (type: string), answer_id (type: string), answer_updated (type: string), author_headline (type: string), author_id (type: string), author_name (type: string), author_type (type: string), author_url_token (type: string), avatar_url (type: string), badge_num (type: smallint), can_comment (type: boolean), comment_count (type: int), gender (type: string), insert_time (type: string), is_advertiser (type: boolean), is_collapsed (type: boolean), is_copyable (type: boolean), is_org (type: boolean), question_created (type: string), question_id (type: string), question_title (type: string), question_type (type: string), reward_member_count (type: int), reward_total_money (type: int), voteup_count (type: int)    
          TableScan 
            Reduce Output Operator  
              key expressions: _col0 (type: string) 
              sort order: + 
              Map-reduce partition columns: _col0 (type: string)    
              Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE   
      Reduce Operator Tree: 
        Join Operator   
          condition map:    
               Left Semi Join 0 to 1    
          keys: 
            0 ym (type: string) 
            1 _col0 (type: string)  
          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27   
          Statistics: Num rows: 4024547 Data size: 7356871916 Basic stats: COMPLETE Column stats: PARTIAL   
          File Output Operator  
            compressed: false   
            table:  
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat  
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat    
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe 

@leftjoin's HQL Explain:

Explain 
STAGE DEPENDENCIES: 
  Stage-3 is a root stage   
  Stage-6 depends on stages: Stage-3 , consists of Stage-7, Stage-1 
  Stage-7 has a backup stage: Stage-1   
  Stage-5 depends on stages: Stage-7    
  Stage-2 depends on stages: Stage-1, Stage-5   
  Stage-1   
  Stage-0 depends on stages: Stage-2    

STAGE PLANS:    
  Stage: Stage-3    
    Map Reduce  
      Map Operator Tree:    
          TableScan 
            alias: zhihu_answer_increment   
            filterExpr: ym is not null (type: boolean)  
            Statistics: Num rows: 101549 Data size: 21426839 Basic stats: COMPLETE Column stats: COMPLETE   
            Select Operator 
              expressions: ym (type: string)    
              outputColumnNames: ym 
              Statistics: Num rows: 101549 Data size: 21426839 Basic stats: COMPLETE Column stats: COMPLETE 
              Group By Operator 
                keys: ym (type: string) 
                mode: hash  
                outputColumnNames: _col0    
                Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE 
                Reduce Output Operator  
                  key expressions: _col0 (type: string) 
                  sort order: + 
                  Map-reduce partition columns: _col0 (type: string)    
                  Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE   
      Execution mode: vectorized    
      Reduce Operator Tree: 
        Group By Operator   
          keys: KEY._col0 (type: string)    
          mode: mergepartial    
          outputColumnNames: _col0  
          Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE   
          Group By Operator 
            keys: _col0 (type: string)  
            mode: hash  
            outputColumnNames: _col0    
            Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: COMPLETE 
            File Output Operator    
              compressed: false 
              table:    
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat    
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat  
                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe   

  Stage: Stage-6    
    Conditional Operator    

  Stage: Stage-7    
    Map Reduce Local Work   
      Alias -> Map Local Tables:    
        s:s-subquery1:$INTNAME  
          Fetch Operator    
            limit: -1   
      Alias -> Map Local Operator Tree: 
        s:s-subquery1:$INTNAME  
          TableScan 
            HashTable Sink Operator 
              keys: 
                0 ym (type: string) 
                1 _col0 (type: string)  

  Stage: Stage-5    
    Map Reduce  
      Map Operator Tree:    
          TableScan 
            alias: t    
            filterExpr: ym is not null (type: boolean)  
            Statistics: Num rows: 76466394 Data size: 16134409134 Basic stats: COMPLETE Column stats: PARTIAL   
            Map Join Operator   
              condition map:    
                   Left Semi Join 0 to 1    
              keys: 
                0 ym (type: string) 
                1 _col0 (type: string)  
              outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27   
              Statistics: Num rows: 4024547 Data size: 7340773728 Basic stats: COMPLETE Column stats: PARTIAL   
              Select Operator   
                expressions: _col0 (type: boolean), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: smallint), _col12 (type: boolean), _col14 (type: string), _col15 (type: string), _col16 (type: boolean), _col17 (type: boolean), _col18 (type: boolean), _col19 (type: boolean), _col20 (type: string), _col21 (type: string), _col22 (type: string), _col23 (type: string), _col24 (type: int), _col25 (type: int), _col26 (type: int), _col27 (type: string), 0 (type: int)    
                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20, _col21, _col22, _col23, _col24, _col25, _col26, _col27 
                Statistics: Num rows: 4024547 Data size: 756614836 Basic stats: COMPLETE Column stats: PARTIAL  
                File Output Operator    
                  compressed: false 
                  table:    
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat    
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat  
                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe   
      Local Work:   
        Map Reduce Local Work   

  Stage: Stage-2    
    Map Reduce  
      Map Operator Tree:    
          TableScan 
            Union   
DennisLi
  • 3,915
  • 6
  • 30
  • 66

2 Answers2

4
set hive.auto.convert.join=true; --this enables map-join
set hive.mapjoin.smalltable.filesize=25000000;
set hive.execution.engine=tez;

insert overwrite table zhihu.answer partition(ym)
select col1, col2 ... coln, ym, --list all columns
from
(
select col1, col2 ... coln, ym, --list all columns
       row_number() over(partition by ym, answer_id, insert_time order by new_flag desc) rn
from
 (
     select col1, col2 ... coln, ym, --list all columns
            0 as new_flag
       from zhihu.answer t
      where t.ym in (select distinct ym from zhihu.answer_increment)

   UNION ALL 

     select col1, col2 ... coln, ym, --list all columns
            1 as new_flag
       from zhihu.answer_increment t 
 )s
)s
where s.rn=1;

Indexes were removed in Hive 3.0, more details in this Jira: HIVE-18448

See also this answer: https://stackoverflow.com/a/37744071/2700344

Also tune parallelism for better performance: https://stackoverflow.com/a/48487306/2700344

UPDATE: I studied plans provided by @DennisLi. Some observations:

  1. Join of big table with the whole increment one is performed as a map-join. In this case FULL join approach can be better than UNION ALL+row_number.

  2. join with partition list is already transformed by optimizer to LEFT SEMI JOIN (works as map join also), after filtering there are 4K rows out of total 70M. I recommend to calculate min and max increment ym partition separately and pass them as a parameters using WHERE ym>= ${min_increment_ym} and ym<=${max_increment_ym} In this case partition pruning will filter data efficiently without join. But it can be applied only if applicable to the increment dataset (increment contains single small range of partitions and we can use min and max efficiently) Implementing this will give you the maximum benefit

  3. Intermediate compression is not enabled. Enabling compression may give you a little, but it worth trying

Recommended approach:

set hive.auto.convert.join=true; --this enables map-join
set hive.mapjoin.smalltable.filesize=25000000;

--check compression influence separately.
--it may give some improvement depending on your data entropy
set hive.exec.compress.intermediate=true;
set mapred.output.compress=true;
set hive.exec.compress.output=true;

insert overwrite table zhihu.answer partition(ym)
 select --select increment if exists, old if not exists
  case when i.answer_id is not null then i.col1 else t.col1 end as col1,
  ... --for all columns
  case when i.answer_id is not null then i.coln else t.coln end as coln,
  --partition is the last one
  case when i.answer_id is not null then i.ym else t.ym end as ym
   from zhihu.answer t
        full join zhihu.answer_increment i
        on t.answer_id   = i.answer_id
       and t.insert_time = i.insert_time
       and t.ym=i.ym --check this condition
  where t.ym in (select distinct ym from zhihu.answer_increment) --try to implement min and max parameters instead of this if possible (see 2)
  --alternatively if you do not want to employ shell, check if you can
  --remove the WHERE condition providing ym in the join condition, 
  --this will allow to get rid of the second join in the plan, 
  --though partition pruning will work with parameters better 
;

Finally the plan will be the most optimal one.

And you still may need to tune parallelism on mappers and reducers based on your execution logs, see previous answer recommendation

leftjoin
  • 36,950
  • 8
  • 57
  • 116
  • Can you please help me understand the solution – Strick Oct 16 '19 at 10:35
  • 2
    @Strick The idea is to mark new_records with new_flag=1, union all with old data (in the affected ym partitions), calculate row_number partition by unique key order by new_flag desc - it will mark new records if exist for the same key as 1 (new records are preffered). And finally load all records marked with rn=1 – leftjoin Oct 16 '19 at 10:40
  • And yes, instead if in (select distinct ym), left semi join can be used – leftjoin Oct 16 '19 at 10:46
  • @leftjoin thanks for your answer, I tried your SQL, but it was not faster comparing to my SQL. The process spped is almost the same even slower. Moreover, there is not tez in my Cloudera 6.2 cluster. and `hive.auto.convert.join` have set to true by deault. is there any more suggestion to optimize? – DennisLi Oct 17 '19 at 03:31
  • @DennisLi We need to analyze your performance issue deeper then Please add to your answer the execution logs and query plan. So, we can identify where the bottleneck is and how to fix it – leftjoin Oct 17 '19 at 04:14
  • @leftjoin I added the HQL explain, both my version HQL and your HQL, please check, Thanks. – DennisLi Oct 18 '19 at 02:16
  • @DennisLi Please read my recommendations and comments in the code (in the second attempt) – leftjoin Oct 18 '19 at 07:34
  • @leftjoin Thanks a lot. I will take a try. – DennisLi Oct 18 '19 at 10:53
  • @leftjoin I tried your latest version of HQL, but it was still slow. maybe I think the bottleneck was not on HQL itself, it's on the hardware instead, as my cluster have only two hosts(about 48G memory 12 cores in total). Thanks for providing me with so much optimization suggestion anyway, it is helpful. – DennisLi Oct 21 '19 at 09:06
1

There are few things you can do

  1. Try to use left semi join in your first query.

  2. As your table zhihu.answer_increment is very small in comparison to you can try giving map side join hints

  3. If your tables is bucketed on same columns and both bucktes are same try with SMB approach

  4. Try setting below properties

    1. set hive.exec.compress.intermediate=true ;
    2. set hive.exec.parallel=true;
    3. set hive.exec.parallel.thread.number=50;
Strick
  • 1,512
  • 9
  • 15
  • Thanks for your answer, should I use left semi join in `tmp1`? and I forgot to say the `answer_id` was clustered into 256 buckets. – DennisLi Oct 16 '19 at 14:25
  • 1
    No should use left semi join in tmp_a it is replacement of IN,EXIST clause in hive apart from that try setting properties it should reduce some execution time. If your 1 table is comparatively smaller try using map side join you will see significant performance improvemets – Strick Oct 16 '19 at 14:30
  • actually there is only one value in EXIST clause ,`where ym in 201806`, is it helpful for using `left semi join`? and the `hive.auto.convert.join` has already set to true. – DennisLi Oct 17 '19 at 03:24
  • Yes no problem in using that as LSJ matches the first matching records and skip the remaining so there should not be problem. As you have set hive.auto.convert.join Please check explain plan of query whether it is executing map join or not. – Strick Oct 17 '19 at 06:22