1

I am running hive (Version 1.2) query on Tez and my query is been taking forever to return data because of the cross product .

I have already tried some of these hive properties.

set hive.execution.engine=tez;
--set hive.execution.engine=mr;
set hive.vectorized.execution.enabled=true;
set hive.vectorized.execution.reduce.enabled=true;
set hive.exec.compress.intermediate=true;
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
SET hive.exec.parallel=true;
Set hive.auto.convert.join=false;
set hive.vectorized.execution.reduce.groupby.enabled = true;

Any pointers on how to optimize the query ?

create table schema.table_integ_all stored as ORC as
select
 concat(substr(base.process_dt,1,4),'-',substr(base.process_dt,5,2),'-',substr(base.process_dt,7,2)) as process_dt,
 base.agreement_partic_key,
 base.lob,
 base.agent_name,
 COALESCE(base.agent_active_flag, 1) as agent_active_flag,
 base.agent_end_dt,
 COALESCE(base.pif_count,0) as pif_count,
 COALESCE(base.iif_count,0) as iif_count,
 COALESCE(base.renewal_count,0) as renewal_count,
 CASE WHEN base.lob='LIFE' THEN COALESCE(BB.life_wrtn_prem_agent,0.0) ELSE COALESCE(base.written_premium ,0.0) END as written_premium,
 COALESCE(BB.earn_prem_agent,0) as earned_premium,
 CASE WHEN base.lob='LIFE' THEN COALESCE(BB.life_orig_face_amt,0.0) ELSE 0.0 END as life_orig_face_amt,
 COALESCE(CC.cnt_plcy_newbuss,0) as new_policy_count,
 COALESCE(DD.cnt_item_newbuss,0) as new_item_count,
 COALESCE(base.total_claim_count,0) as total_claim_count,
 COALESCE(base.total_claim_loss,0) as total_claim_loss,
 COALESCE(AA.transfer_pifcount_in,0) as transfer_pifcount_in,
 COALESCE(AA.transfer_pifcount_out,0) as transfer_pifcount_out,
 COALESCE(AA.transfer_pifcount_out,0)  as block_transfer,
 COALESCE(round(((AA.transfer_pifcount_out/(AA.transfer_pifcount_out+base.pif_Count))*100),2),0.0) as transferpif_percentage_out,
 COALESCE(CC.cnt_plcy_attrited,0)  as plcy_attrited,
 COALESCE(DD.cnt_item_attrited,0)  as item_attrited 

 from schema.table_prdcr_clm_wrtn_full_join base 

 left outer join
 schema.table_transferpif_out_in_mthly AA
 on base.process_dt=AA.process_dt
 and base.agreement_partic_key=AA.agreement_partic_key
 and base.lob = AA.lob 

 left outer join
 schema.table_earn_prem_mthly BB
 on base.process_dt=BB.process_dt
 and base.agreement_partic_key=BB.agreement_partic_key
 and base.lob = BB.lob 

 full outer join
 schema.table_plcy_attrited_mthly CC
 on base.process_dt=CC.process_dt
 and base.agreement_partic_key=CC.agreement_partic_key
 and base.lob = CC.lob 

 full outer join
 schema.table_item_attrited_mthly DD
 on base.process_dt=CC.process_dt
 and base.agreement_partic_key=CC.agreement_partic_key
 and base.lob = CC.lob;

I have seen issues where queries ran longer because of misssing "ON" clause and instead done using "where" filter but my query seems to have addressd that issue.

BELOW IS THE XPLAIN PLAN FOR THE SAME QUERY

Warning: Shuffle Join MERGEJOIN[20][tables = [base, AA, BB, CC, DD]] in Stage 'Reducer 3' is a cross product
OK
Plan not optimized by CBO due to missing statistics. Please check log for more details.

Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
Reducer 3 <- Map 7 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)

Stage-0
   Fetch Operator
      limit:-1
      Stage-1
         Reducer 3
         File Output Operator [FS_14]
            compressed:false
            Statistics:Num rows: 4933855 Data size: 2955807047 Basic stats: COMPLETE Column stats: NONE
            table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"}
            Select Operator [SEL_13]
               outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18","_col19","_col20","_col21"]
               Statistics:Num rows: 4933855 Data size: 2955807047 Basic stats: COMPLETE Column stats: NONE
               Merge Join Operator [MERGEJOIN_20]
               |  condition map:[{"":"Outer Join 0 to 1"}]
               |  filter predicates:{"0":"{(VALUE._col1 = VALUE._col35)} {(VALUE._col2 = VALUE._col36)} {(VALUE._col3 = VALUE._col37)}","1":""}
               |  keys:{}
               |  outputColumnNames:["_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col20","_col21","_col29","_col30","_col31","_col38","_col39","_col46","_col47"]
               |  Statistics:Num rows: 4933855 Data size: 2955807047 Basic stats: COMPLETE Column stats: NONE
               |<-Map 7 [SIMPLE_EDGE] vectorized
               |  Reduce Output Operator [RS_25]
               |     sort order:
               |     Statistics:Num rows: 694521 Data size: 200006384 Basic stats: COMPLETE Column stats: NONE
               |     value expressions:cnt_item_newbuss (type: bigint), cnt_item_attrited (type: bigint)
               |     TableScan [TS_4]
               |        alias:DD
               |        Statistics:Num rows: 694521 Data size: 200006384 Basic stats: COMPLETE Column stats: NONE
               |<-Reducer 2 [SIMPLE_EDGE]
                  Reduce Output Operator [RS_10]
                     sort order:
                     Statistics:Num rows: 4485323 Data size: 2687097258 Basic stats: COMPLETE Column stats: NONE
                     value expressions:_col1 (type: varchar(8)), _col2 (type: varchar(50)), _col3 (type: string), _col4 (type: varchar(120)), _col5 (type: int), _col6 (type: varchar(10)), _col7 (type: bigint), _col8 (type: bigint), _col9 (type: bigint), _col10 (type: double), _col11 (type: double), _col12 (type: double), _col20 (type: bigint), _col21 (type: bigint), _col29 (type: double), _col30 (type: double), _col31 (type: double), _col35 (type: string), _col36 (type: varchar(50)), _col37 (type: string), _col38 (type: bigint), _col39 (type: bigint)
                     Merge Join Operator [MERGEJOIN_19]
                     |  condition map:[{"":"Left Outer Join0 to 1"},{"":"Left Outer Join0 to 2"},{"":"Outer Join 0 to 3"}]
                     |  keys:{"0":"UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)","1":"UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)","2":"UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)","3":"process_dt (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)"}
                     |  outputColumnNames:["_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col20","_col21","_col29","_col30","_col31","_col35","_col36","_col37","_col38","_col39"]
                     |  Statistics:Num rows: 4485323 Data size: 2687097258 Basic stats: COMPLETE Column stats: NONE
                     |<-Map 1 [SIMPLE_EDGE] vectorized
                     |  Reduce Output Operator [RS_21]
                     |     key expressions:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
                     |     Map-reduce partition columns:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
                     |     sort order:+++
                     |     Statistics:Num rows: 1359189 Data size: 814271879 Basic stats: COMPLETE Column stats: NONE
                     |     value expressions:process_dt (type: varchar(8)), agent_name (type: varchar(120)), agent_active_flag (type: int), agent_end_dt (type: varchar(10)), pif_count (type: bigint), iif_count (type: bigint), renewal_count (type: bigint), written_premium (type: double), total_claim_count (type: double), total_claim_loss (type: double)
                     |     TableScan [TS_0]
                     |        alias:base
                     |        Statistics:Num rows: 1359189 Data size: 814271879 Basic stats: COMPLETE Column stats: NONE
                     |<-Map 4 [SIMPLE_EDGE] vectorized
                     |  Reduce Output Operator [RS_22]
                     |     key expressions:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
                     |     Map-reduce partition columns:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
                     |     sort order:+++
                     |     Statistics:Num rows: 81179 Data size: 30863019 Basic stats: COMPLETE Column stats: NONE
                     |     value expressions:transfer_pifcount_out (type: bigint), transfer_pifcount_in (type: bigint)
                     |     TableScan [TS_1]
                     |        alias:AA
                     |        Statistics:Num rows: 81179 Data size: 30863019 Basic stats: COMPLETE Column stats: NONE
                     |<-Map 5 [SIMPLE_EDGE] vectorized
                     |  Reduce Output Operator [RS_23]
                     |     key expressions:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
                     |     Map-reduce partition columns:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
                     |     sort order:+++
                     |     Statistics:Num rows: 697301 Data size: 206401096 Basic stats: COMPLETE Column stats: NONE
                     |     value expressions:earn_prem_agent (type: double), life_wrtn_prem_agent (type: double), life_orig_face_amt (type: double)
                     |     TableScan [TS_2]
                     |        alias:BB
                     |        Statistics:Num rows: 697301 Data size: 206401096 Basic stats: COMPLETE Column stats: NONE
                     |<-Map 6 [SIMPLE_EDGE] vectorized
                        Reduce Output Operator [RS_24]
                           key expressions:process_dt (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
                           Map-reduce partition columns:process_dt (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
                           sort order:+++
                           Statistics:Num rows: 694484 Data size: 199995816 Basic stats: COMPLETE Column stats: NONE
                           value expressions:cnt_plcy_newbuss (type: bigint), cnt_plcy_attrited (type: bigint)
                           TableScan [TS_3]
                              alias:CC
                              Statistics:Num rows: 694484 Data size: 199995816 Basic stats: COMPLETE Column stats: NONE

Time taken: 3.521 seconds, Fetched: 83 row(s)
cheapcoder
  • 183
  • 1
  • 3
  • 12

1 Answers1

0

The plan is not actual query plan.

Join condition in AA should cause syntax error. Alias pf is not defined: and base.agreement_partic_key=pf.agreement_partic_key and base.lob = pf.lob

Also according to the plan, you are doing join with some table without join keys specified, using some filter instead:

filter predicates:{"0":"{(VALUE._col1 = VALUE._col35)} {(VALUE._col2 = VALUE._col36)} {(VALUE._col3 = VALUE._col37)}","1":""} | keys:{}

This causes cross join.

Also some of your tables seem not very big and may fit in memory. Try to enable map-join conversion:

set hive.auto.convert.join=true;

And play with this setting:

set hive.mapjoin.smalltable.filesize=157286400; --set it bigger than your table size and see if it works 

AA table size is rather small and may fit in memory, it is 30863019 according to the plan. Maybe you can increase this setting to convert other joins as well without OutOfMemory issue.

leftjoin
  • 36,950
  • 8
  • 57
  • 116
  • @leftjoin-You made 3 important suggestions 1)Syntax Error-I fixed that here ,that was a CTRL+F error as I do not want to paste my original code here 2)Cross Product **I did not find any joins in my query without join condition and with a where filter instead** 3)I am trying the map-Join as we speak by setting the value of **small table size to 30863020**.I will keep you posted – cheapcoder Dec 07 '18 at 03:49
  • @leftjoin-**setting small table size to 30863020** obviously did not work.Any other changes you suggest to make ? – cheapcoder Dec 07 '18 at 07:12
  • @cheapcoder Try to increase the size until it works or fails w OOM, you will know for sure. And check this also: set hive.auto.convert.join=true; Plan should reflect map join step – leftjoin Dec 07 '18 at 07:16
  • @leftjoin-I have tried raising the small table size 300000000.The Job stays hung at the reducer phase for 48+ hours untill I kill it but never hits OOM.what else I could try ? – cheapcoder Dec 11 '18 at 06:48
  • @cheapcoder If it sticks on one reducer then the data skew may cause this. Try this approach: https://stackoverflow.com/a/51061613/2700344 – leftjoin Dec 11 '18 at 07:06
  • @leftjoin-I am just exhausted trying all options including the last one.I have even failed to run it on MR ..Not sure what other options I do have ? – cheapcoder Dec 13 '18 at 18:13
  • @map join is right option. double check that hive.auto.convert.join is set true. Also try to re-order joins, sometimes it helps. Also check that joins are not duplicating (join keys are unique). – leftjoin Dec 13 '18 at 18:21