I am running hive (Version 1.2) query on Tez and my query is been taking forever to return data because of the cross product .
I have already tried some of these hive properties.
set hive.execution.engine=tez;
--set hive.execution.engine=mr;
set hive.vectorized.execution.enabled=true;
set hive.vectorized.execution.reduce.enabled=true;
set hive.exec.compress.intermediate=true;
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
SET hive.exec.parallel=true;
Set hive.auto.convert.join=false;
set hive.vectorized.execution.reduce.groupby.enabled = true;
Any pointers on how to optimize the query ?
create table schema.table_integ_all stored as ORC as
select
concat(substr(base.process_dt,1,4),'-',substr(base.process_dt,5,2),'-',substr(base.process_dt,7,2)) as process_dt,
base.agreement_partic_key,
base.lob,
base.agent_name,
COALESCE(base.agent_active_flag, 1) as agent_active_flag,
base.agent_end_dt,
COALESCE(base.pif_count,0) as pif_count,
COALESCE(base.iif_count,0) as iif_count,
COALESCE(base.renewal_count,0) as renewal_count,
CASE WHEN base.lob='LIFE' THEN COALESCE(BB.life_wrtn_prem_agent,0.0) ELSE COALESCE(base.written_premium ,0.0) END as written_premium,
COALESCE(BB.earn_prem_agent,0) as earned_premium,
CASE WHEN base.lob='LIFE' THEN COALESCE(BB.life_orig_face_amt,0.0) ELSE 0.0 END as life_orig_face_amt,
COALESCE(CC.cnt_plcy_newbuss,0) as new_policy_count,
COALESCE(DD.cnt_item_newbuss,0) as new_item_count,
COALESCE(base.total_claim_count,0) as total_claim_count,
COALESCE(base.total_claim_loss,0) as total_claim_loss,
COALESCE(AA.transfer_pifcount_in,0) as transfer_pifcount_in,
COALESCE(AA.transfer_pifcount_out,0) as transfer_pifcount_out,
COALESCE(AA.transfer_pifcount_out,0) as block_transfer,
COALESCE(round(((AA.transfer_pifcount_out/(AA.transfer_pifcount_out+base.pif_Count))*100),2),0.0) as transferpif_percentage_out,
COALESCE(CC.cnt_plcy_attrited,0) as plcy_attrited,
COALESCE(DD.cnt_item_attrited,0) as item_attrited
from schema.table_prdcr_clm_wrtn_full_join base
left outer join
schema.table_transferpif_out_in_mthly AA
on base.process_dt=AA.process_dt
and base.agreement_partic_key=AA.agreement_partic_key
and base.lob = AA.lob
left outer join
schema.table_earn_prem_mthly BB
on base.process_dt=BB.process_dt
and base.agreement_partic_key=BB.agreement_partic_key
and base.lob = BB.lob
full outer join
schema.table_plcy_attrited_mthly CC
on base.process_dt=CC.process_dt
and base.agreement_partic_key=CC.agreement_partic_key
and base.lob = CC.lob
full outer join
schema.table_item_attrited_mthly DD
on base.process_dt=CC.process_dt
and base.agreement_partic_key=CC.agreement_partic_key
and base.lob = CC.lob;
I have seen issues where queries ran longer because of misssing "ON" clause and instead done using "where" filter but my query seems to have addressd that issue.
BELOW IS THE XPLAIN PLAN FOR THE SAME QUERY
Warning: Shuffle Join MERGEJOIN[20][tables = [base, AA, BB, CC, DD]] in Stage 'Reducer 3' is a cross product
OK
Plan not optimized by CBO due to missing statistics. Please check log for more details.
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
Reducer 3 <- Map 7 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 3
File Output Operator [FS_14]
compressed:false
Statistics:Num rows: 4933855 Data size: 2955807047 Basic stats: COMPLETE Column stats: NONE
table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"}
Select Operator [SEL_13]
outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18","_col19","_col20","_col21"]
Statistics:Num rows: 4933855 Data size: 2955807047 Basic stats: COMPLETE Column stats: NONE
Merge Join Operator [MERGEJOIN_20]
| condition map:[{"":"Outer Join 0 to 1"}]
| filter predicates:{"0":"{(VALUE._col1 = VALUE._col35)} {(VALUE._col2 = VALUE._col36)} {(VALUE._col3 = VALUE._col37)}","1":""}
| keys:{}
| outputColumnNames:["_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col20","_col21","_col29","_col30","_col31","_col38","_col39","_col46","_col47"]
| Statistics:Num rows: 4933855 Data size: 2955807047 Basic stats: COMPLETE Column stats: NONE
|<-Map 7 [SIMPLE_EDGE] vectorized
| Reduce Output Operator [RS_25]
| sort order:
| Statistics:Num rows: 694521 Data size: 200006384 Basic stats: COMPLETE Column stats: NONE
| value expressions:cnt_item_newbuss (type: bigint), cnt_item_attrited (type: bigint)
| TableScan [TS_4]
| alias:DD
| Statistics:Num rows: 694521 Data size: 200006384 Basic stats: COMPLETE Column stats: NONE
|<-Reducer 2 [SIMPLE_EDGE]
Reduce Output Operator [RS_10]
sort order:
Statistics:Num rows: 4485323 Data size: 2687097258 Basic stats: COMPLETE Column stats: NONE
value expressions:_col1 (type: varchar(8)), _col2 (type: varchar(50)), _col3 (type: string), _col4 (type: varchar(120)), _col5 (type: int), _col6 (type: varchar(10)), _col7 (type: bigint), _col8 (type: bigint), _col9 (type: bigint), _col10 (type: double), _col11 (type: double), _col12 (type: double), _col20 (type: bigint), _col21 (type: bigint), _col29 (type: double), _col30 (type: double), _col31 (type: double), _col35 (type: string), _col36 (type: varchar(50)), _col37 (type: string), _col38 (type: bigint), _col39 (type: bigint)
Merge Join Operator [MERGEJOIN_19]
| condition map:[{"":"Left Outer Join0 to 1"},{"":"Left Outer Join0 to 2"},{"":"Outer Join 0 to 3"}]
| keys:{"0":"UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)","1":"UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)","2":"UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)","3":"process_dt (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)"}
| outputColumnNames:["_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col20","_col21","_col29","_col30","_col31","_col35","_col36","_col37","_col38","_col39"]
| Statistics:Num rows: 4485323 Data size: 2687097258 Basic stats: COMPLETE Column stats: NONE
|<-Map 1 [SIMPLE_EDGE] vectorized
| Reduce Output Operator [RS_21]
| key expressions:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
| Map-reduce partition columns:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
| sort order:+++
| Statistics:Num rows: 1359189 Data size: 814271879 Basic stats: COMPLETE Column stats: NONE
| value expressions:process_dt (type: varchar(8)), agent_name (type: varchar(120)), agent_active_flag (type: int), agent_end_dt (type: varchar(10)), pif_count (type: bigint), iif_count (type: bigint), renewal_count (type: bigint), written_premium (type: double), total_claim_count (type: double), total_claim_loss (type: double)
| TableScan [TS_0]
| alias:base
| Statistics:Num rows: 1359189 Data size: 814271879 Basic stats: COMPLETE Column stats: NONE
|<-Map 4 [SIMPLE_EDGE] vectorized
| Reduce Output Operator [RS_22]
| key expressions:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
| Map-reduce partition columns:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
| sort order:+++
| Statistics:Num rows: 81179 Data size: 30863019 Basic stats: COMPLETE Column stats: NONE
| value expressions:transfer_pifcount_out (type: bigint), transfer_pifcount_in (type: bigint)
| TableScan [TS_1]
| alias:AA
| Statistics:Num rows: 81179 Data size: 30863019 Basic stats: COMPLETE Column stats: NONE
|<-Map 5 [SIMPLE_EDGE] vectorized
| Reduce Output Operator [RS_23]
| key expressions:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
| Map-reduce partition columns:UDFToString(process_dt) (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
| sort order:+++
| Statistics:Num rows: 697301 Data size: 206401096 Basic stats: COMPLETE Column stats: NONE
| value expressions:earn_prem_agent (type: double), life_wrtn_prem_agent (type: double), life_orig_face_amt (type: double)
| TableScan [TS_2]
| alias:BB
| Statistics:Num rows: 697301 Data size: 206401096 Basic stats: COMPLETE Column stats: NONE
|<-Map 6 [SIMPLE_EDGE] vectorized
Reduce Output Operator [RS_24]
key expressions:process_dt (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
Map-reduce partition columns:process_dt (type: string), agreement_partic_key (type: varchar(50)), lob (type: string)
sort order:+++
Statistics:Num rows: 694484 Data size: 199995816 Basic stats: COMPLETE Column stats: NONE
value expressions:cnt_plcy_newbuss (type: bigint), cnt_plcy_attrited (type: bigint)
TableScan [TS_3]
alias:CC
Statistics:Num rows: 694484 Data size: 199995816 Basic stats: COMPLETE Column stats: NONE
Time taken: 3.521 seconds, Fetched: 83 row(s)