0

please help out :

i am running a pyspark code for basic etl with joins data size is around 270 gb , hive code works to complete process , when i converted it to spark sql i am getting the following error is there any way to resolve it .

  • I have tried spark skew hints and increasing the number of partitions
  • do i have to split the data into smaller chunks and process it .

while executing the config is :

--name "xx" --num-executors 3 --executor-cores 5 --executor-memory 6G --driver-memory 9G --deploy-mode cluster  --master yarn --queue xx --conf spark.yarn.nodemanager.local-dirs=$xx/tmp/tmp1 --conf spark.executor.memoryOverhead=20300 --conf spark.driver.maxResultSize=0 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.sql.parquet.binaryAsString=true  --conf spark.yarn.am.attemptFailuresValidityInterval=24h --conf spark.executor.heartbeatInterval=60s --conf spark.network.timeout=1200s

thanks for helping out.

spark ui screen shot : enter image description here

org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 536870912, max: 536870912)

full spark ui

query (they are not continuous and column names are changes): i have added a repartition after every join to improve the data slip

              xxx=sqlContext.sql(""" select a.*,lpad(b.col1,12,0) as col2 from
    data_3 as a LEFT JOIN
    (select col1, cm13 from linkage_data) as b on a.cm13=b.cm13 """).repartition(400)


 model_score=sqlContext.sql(""" select /*+ SKEW('a', ('market', 'cmpl_yr')) */ a.*,b.model_score as model_score from
    (select * from combined_data_temp) as a 
    LEFT JOIN
    %s.campaign_data as b 
    ON 
    trim(a.campno)=trim(b.cmpgn_no) AND 
    trim(a.campdate)=trim(b.cmpl_dt) AND 
    a.cm15=b.cm15 AND
    a.campaign_id=b.cmpgn_id AND 
    a.execution_id=b.exec_id AND 
    a.offer_id=b.offr_id AND
    trim(a.market)=trim(b.mkt_cd) AND 
    a.cmpl_yr=b.cmpl_yr """% (dbin1)).repartition(400)

      combined_data_srg=sqlContext.sql(""" select 
    a.*,
    b.*, 
    concat((b.srg_no),'0000',(a.supp_no),'00') as xy 
    from
    data_temp3 as a 
    LEFT JOIN 
    srg_no as b 
    ON 
    trim(a.cm11)=trim(b.no) """).repartition(400)
prajwal rao
  • 87
  • 1
  • 9
  • 1
    It depends on how your code looks like and how much resources each executor or the driver has. Could you add that information toyour question? Also, chexking the Spark Web UI should give you a hint at which steps you get this exception. Is it reproducable? – Michael Heil Mar 31 '20 at 19:32
  • hi i have added the screenshot and the config for the spark submit , in the up as seen from the screen shot the stage names are just sql at :0 , so i am not able to identity which join is causing the problem. – prajwal rao Apr 01 '20 at 01:36

0 Answers0