I am new to spark. We have below hive query and on top of it we are performing pivot operation by using spark with python.
below pyspark script does some pivot operation and writes into hive table. Hive query is returning 140 million rows.
Approach 1
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql import functions as F
sc = SparkContext()
hc = HiveContext(sc)
tbl = hc.sql("""
Select Rating.BranchID
, Rating.Vehicle
, Rating.PersonalAutoCov
, Rating.PersonalVehicleCov
, Rating.EffectiveDate
, Rating.ExpirationDate
, attr.name as RatingAttributeName
, Cast(Rating.OutputValue as Int) OutputValue
, Rating.InputValue
From db.dbo_pcx_paratingdata_piext_master rating
Inner Join db.dbo_pctl_ratingattrname_piext_master attr
on rating.RatingAttribute = attr.id
and attr.CurrentRecordIndicator = 'Y'
Where
rating.CurrentRecordIndicator = 'Y'
""")
tbl.cache()
pvttbl1 = tbl.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
.pivot("RatingAttributeName")\
.agg({"InputValue":"max", "OutputValue":"sum"})
pvttbl1.createOrReplaceTempView("paRatingAttributes")
hc.sql("Create table dev_pekindataaccesslayer.createcount as select * from paRatingAttributes")
When i am running the above script with spark-submit command i am ending up with
java.lang.OutOfMemoryError: Java heap space
or some times
java.lang.OutOfMemoryError: GC overhead limit exceeded
spark-submit command which i used.
spark-submit spark_ex2.py --master yarn-cluster --num-executors 15 --executor-cores 50 --executor-memory 100g --driver-memory 100g, --conf `"spark.sql.shuffle.partitions=1000", --conf "spark.memory.offHeap.enabled=true", --conf "spark.memory.offHeap.size=100g",--conf "spark.network.timeout =1200", --conf "spark.executor.heartbeatInterval=1201"`
Detailed log:
INFO MemoryStore: Memory use = 1480.9 KB (blocks) + 364.8 MB (scratch space shared across 40 tasks(s)) = 366.2 MB.
Storage limit = 366.3 MB.
WARN BlockManager: Persisting block rdd_11_22 to disk instead.
WARN BlockManager: Putting block rdd_11_0 failed due to an exception
WARN BlockManager: Block rdd_11_0 could not be removed as it was not found on disk or in memory
ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 10)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
I made litte changes to above pyspark script and that works with out of any issues
Approach 2
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
sc = SparkContext()
hc = HiveContext(sc)
sqlContext = SQLContext(sc)
tbl = hc.sql("""
Select Rating.BranchID
, Rating.Vehicle
, Rating.PersonalAutoCov
, Rating.PersonalVehicleCov
, Rating.EffectiveDate
, Rating.ExpirationDate
, attr.name as RatingAttributeName
, Cast(Rating.OutputValue as Int) OutputValue
, Rating.InputValue
From db.dbo_pcx_paratingdata_piext_master rating
Inner Join db.dbo_pctl_ratingattrname_piext_master attr
on rating.RatingAttribute = attr.id
and attr.CurrentRecordIndicator = 'Y'
Where
rating.CurrentRecordIndicator = 'Y'
""")
tbl.createOrReplaceTempView("Ptable")
r=sqlContext.sql("select count(1) from Ptable")
m=r.collect()[0][0]
hc.sql("drop table if exists db.Ptable")
hc.sql("Create table db.Ptable as select * from Ptable")
tb2 = hc.sql("select * from db.Ptable limit "+str(m))
pvttbl1 = tb2.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
.pivot("RatingAttributeName")\
.agg({"InputValue":"max", "OutputValue":"sum"})
pvttbl1.createOrReplaceTempView("paRatingAttributes")
hc.sql("drop table if exists db.createcount")
hc.sql("Create table db.createcount STORED AS ORC as select * from paRatingAttributes")
But the above script involves creation of intermediate tables and it is an additional step. In approach 2 when i kept limit keyword with same spark-submit commands works correctly.
what's wrong in my approach 1 and how can i make it work?
Note: I have followed Spark java.lang.OutOfMemoryError: Java heap space and tried with all suggested conf parameters but still no luck.