2

I am new to spark. We have below hive query and on top of it we are performing pivot operation by using spark with python.

below pyspark script does some pivot operation and writes into hive table. Hive query is returning 140 million rows.

Approach 1

from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.sql import functions as F
sc = SparkContext()
hc = HiveContext(sc)
tbl = hc.sql("""
    Select Rating.BranchID
    , Rating.Vehicle
    , Rating.PersonalAutoCov
    , Rating.PersonalVehicleCov 
    , Rating.EffectiveDate
    , Rating.ExpirationDate
    , attr.name as RatingAttributeName
    , Cast(Rating.OutputValue as Int) OutputValue
    , Rating.InputValue
    From db.dbo_pcx_paratingdata_piext_master rating
        Inner Join db.dbo_pctl_ratingattrname_piext_master attr 
            on rating.RatingAttribute = attr.id 
            and attr.CurrentRecordIndicator = 'Y'
    Where 
        rating.CurrentRecordIndicator = 'Y'
    """)
tbl.cache()
pvttbl1 = tbl.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
    .pivot("RatingAttributeName")\
    .agg({"InputValue":"max", "OutputValue":"sum"})

pvttbl1.createOrReplaceTempView("paRatingAttributes") 
hc.sql("Create table dev_pekindataaccesslayer.createcount as select * from paRatingAttributes") 

When i am running the above script with spark-submit command i am ending up with

java.lang.OutOfMemoryError: Java heap space

or some times

java.lang.OutOfMemoryError: GC overhead limit exceeded

spark-submit command which i used.

spark-submit spark_ex2.py --master yarn-cluster --num-executors 15 --executor-cores 50 --executor-memory 100g --driver-memory 100g, --conf `"spark.sql.shuffle.partitions=1000", --conf "spark.memory.offHeap.enabled=true", --conf "spark.memory.offHeap.size=100g",--conf "spark.network.timeout =1200", --conf "spark.executor.heartbeatInterval=1201"`

Detailed log:

INFO MemoryStore: Memory use = 1480.9 KB (blocks) + 364.8 MB (scratch space shared across 40 tasks(s)) = 366.2 MB. 
Storage limit = 366.3 MB.
WARN BlockManager: Persisting block rdd_11_22 to disk instead.
WARN BlockManager: Putting block rdd_11_0 failed due to an exception
WARN BlockManager: Block rdd_11_0 could not be removed as it was not found on disk or in memory
ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 10)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)

I made litte changes to above pyspark script and that works with out of any issues

Approach 2

    from pyspark import SparkContext
    from pyspark import HiveContext
    from pyspark.sql import SQLContext
    from pyspark.sql import functions as F
    sc = SparkContext()
    hc = HiveContext(sc)
    sqlContext = SQLContext(sc)
    tbl = hc.sql("""
       Select Rating.BranchID
          , Rating.Vehicle
         , Rating.PersonalAutoCov
            , Rating.PersonalVehicleCov
           , Rating.EffectiveDate
          , Rating.ExpirationDate
         , attr.name as RatingAttributeName
         , Cast(Rating.OutputValue as Int) OutputValue
         , Rating.InputValue
        From db.dbo_pcx_paratingdata_piext_master rating
           Inner Join db.dbo_pctl_ratingattrname_piext_master attr
              on rating.RatingAttribute = attr.id
             and attr.CurrentRecordIndicator = 'Y'
        Where
           rating.CurrentRecordIndicator = 'Y'
       """)
    tbl.createOrReplaceTempView("Ptable")
    r=sqlContext.sql("select count(1) from Ptable")
    m=r.collect()[0][0]
    hc.sql("drop table if exists db.Ptable")
    hc.sql("Create table db.Ptable as select * from Ptable")
    tb2 = hc.sql("select * from db.Ptable limit "+str(m))
    pvttbl1 = tb2.groupby("BranchId","Vehicle","PersonalAutoCov","PersonalVehicleCov","EffectiveDate","ExpirationDate")\
        .pivot("RatingAttributeName")\
        .agg({"InputValue":"max", "OutputValue":"sum"})

    pvttbl1.createOrReplaceTempView("paRatingAttributes")
    hc.sql("drop table if exists db.createcount")
    hc.sql("Create table db.createcount STORED AS ORC as select * from paRatingAttributes")

But the above script involves creation of intermediate tables and it is an additional step. In approach 2 when i kept limit keyword with same spark-submit commands works correctly.

what's wrong in my approach 1 and how can i make it work?

Note: I have followed Spark java.lang.OutOfMemoryError: Java heap space and tried with all suggested conf parameters but still no luck.

James Z
  • 12,209
  • 10
  • 24
  • 44
Tharunkumar Reddy
  • 2,773
  • 18
  • 32
  • Have you checked memory usage during execution with something like htop/task manager to ensure you are using as much memory as you expect? – PiRocks Sep 03 '19 at 14:39
  • @PiRocks Thanks for the reply. I have verified memory usage and it seems no issues. Because our cluster has 1.2 TB RAM with 6 nodes. – Tharunkumar Reddy Sep 03 '19 at 14:43
  • Because you are getting OutOfMemoryErrors, this implies you do not have enough memory, which seems odd given you have a combined 1.2 TB. Can you get the exact command line for the relevant java processes? There might be a mis-set flag somewhere that is causing a low memory limit. Otherwise, are you able to connect to the relevant processes with visualvm? That should give you a good idea of what is using up ram in your java/spark processes. – PiRocks Sep 03 '19 at 15:36
  • 1
    In case my last comment was not clear. I believe that you might have something along the lines of `java -Xmx100G -Xmx2G WhateverTheSparkMainClassIsCalled`, which would result in the the `-Xmx2G` superseding the `-Xmx100G`, which would explain the OutOfMemoryErrors. Hence why seeing the exact java command line is important. `ps -eo args | grep java` can be used for finding full command line – PiRocks Sep 03 '19 at 15:46
  • @PiRocks i will try the above one and get back to you. – Tharunkumar Reddy Sep 03 '19 at 17:10
  • 1
    @Tharunkumar Reddy .. can you show us some sample data for 'tbl' just before pivot and your expectation after using pivot operation on it. can you explain that part a bit. Pivot is expensive operation, may be we can try some other way to do the same thing. Just a thought. – vikrant rana Sep 03 '19 at 18:21
  • 1
    @vikrantrana With following configuration " --conf spark.yarn.appMasterEnv.SPARK_HOME=/dev/null" everything works fine for me. Do you have any idea on what this parameter does? – Tharunkumar Reddy Sep 11 '19 at 07:40
  • 1
    @Tharunkumar Reddy.. no idea. I will check it but I have found some interesting on pivot. may be useful. https://stackoverflow.com/questions/57541507/effective-way-to-groupby-without-using-pivot-in-pyspark/57574725#comment101618155_57574725 – vikrant rana Sep 11 '19 at 08:23
  • @PiRocks Can you read above latest comments. – Tharunkumar Reddy Sep 11 '19 at 09:08
  • 1
    The parameter `--conf spark.yarn.appMasterEnv.SPARK_HOME=/dev/null` sets the environment variable`SPARK_HOME` to /dev/null. In other words it unsets any existing `SPARK_HOME`. I am not sure why that fixes things. If I had to guess I would say that either there is a bad config file in your existing SPARK_HOME, ore there are jars in SPARK_HOME which are now not being loaded, which frees up memory. – PiRocks Sep 11 '19 at 12:25

0 Answers0