please help out :
i am running a pyspark code for basic etl with joins data size is around 270 gb , hive code works to complete process , when i converted it to spark sql i am getting the following error is there any way to resolve it .
- I have tried spark skew hints and increasing the number of partitions
- do i have to split the data into smaller chunks and process it .
while executing the config is :
--name "xx" --num-executors 3 --executor-cores 5 --executor-memory 6G --driver-memory 9G --deploy-mode cluster --master yarn --queue xx --conf spark.yarn.nodemanager.local-dirs=$xx/tmp/tmp1 --conf spark.executor.memoryOverhead=20300 --conf spark.driver.maxResultSize=0 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.sql.parquet.binaryAsString=true --conf spark.yarn.am.attemptFailuresValidityInterval=24h --conf spark.executor.heartbeatInterval=60s --conf spark.network.timeout=1200s
thanks for helping out.
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 536870912, max: 536870912)
query (they are not continuous and column names are changes): i have added a repartition after every join to improve the data slip
xxx=sqlContext.sql(""" select a.*,lpad(b.col1,12,0) as col2 from
data_3 as a LEFT JOIN
(select col1, cm13 from linkage_data) as b on a.cm13=b.cm13 """).repartition(400)
model_score=sqlContext.sql(""" select /*+ SKEW('a', ('market', 'cmpl_yr')) */ a.*,b.model_score as model_score from
(select * from combined_data_temp) as a
LEFT JOIN
%s.campaign_data as b
ON
trim(a.campno)=trim(b.cmpgn_no) AND
trim(a.campdate)=trim(b.cmpl_dt) AND
a.cm15=b.cm15 AND
a.campaign_id=b.cmpgn_id AND
a.execution_id=b.exec_id AND
a.offer_id=b.offr_id AND
trim(a.market)=trim(b.mkt_cd) AND
a.cmpl_yr=b.cmpl_yr """% (dbin1)).repartition(400)
combined_data_srg=sqlContext.sql(""" select
a.*,
b.*,
concat((b.srg_no),'0000',(a.supp_no),'00') as xy
from
data_temp3 as a
LEFT JOIN
srg_no as b
ON
trim(a.cm11)=trim(b.no) """).repartition(400)