1

I have a > 5GB table on mysql. I want to load that table on spark as a dataframe and create a parquet file out of it.

This is my python function to do the job:

def import_table(tablename):
    spark = SparkSession.builder.appName(tablename).getOrCreate()

    df = spark.read.format('jdbc').options(
        url="jdbc:mysql://mysql.host.name:3306/dbname?zeroDateTimeBehavior=convertToNull
",
        driver="com.mysql.jdbc.Driver",
        dbtable=tablename,
        user="root",
        password="password"
    ).load()

    df.write.parquet("/mnt/s3/parquet-store/%s.parquet" % tablename)

I am running the following script to run my spark app:

./bin/spark-submit ~/mysql2parquet.py --conf "spark.executor.memory=29g" --conf "spark.storage.memoryFraction=0.9" --conf "spark.executor.extraJavaOptions=-XX:-UseGCOverheadLimit" --driver-memory 29G --executor-memory 29G

When I run this script on a EC2 instance with 30 GB, it fails with java.lang.OutOfMemoryError: GC overhead limit exceeded

Meanwhile, I am only using 1.42 GB of total memory available.

Here is full console output with stack trace: https://gist.github.com/idlecool/5504c6e225fda146df269c4897790097

Here is part of stack trace: enter image description here

Here is HTOP output: enter image description here

I am not sure if I am doing something wrong or spark is not meant for this use-case. I hope spark is.

Shiv Deepak
  • 3,122
  • 5
  • 34
  • 49
  • Are you using Yarn? If so take a look at [THIS](https://stackoverflow.com/a/33036908/1407161) answer to set your spark.yarn.executor.memoryOverhead property. Also, take a look at [THIS](https://stackoverflow.com/questions/1393486/error-java-lang-outofmemoryerror-gc-overhead-limit-exceeded/1393503#1393503) answer for more information on the particular exception. – Jeremy Aug 07 '17 at 20:01

2 Answers2

2

A bit of a crude explanation about memory management of spark is provided below, you can read more about it from the official documentation, but here is my take:

I believe the option "spark.storage.memoryFraction=0.9" is problematic in your case, roughly speaking an executor has three types of memory which can be allocated, first is the storage memory which you have set to 90% of the executor memory i.e. about ~27GB which is used to keep persistent datasets.

Second is heap memory which is used to perform computations and is typically set high for cases where you are doing machine learning or lot of calculations, this is what is insufficient in your case, your program needs to have a higher heap memory which is what causes this error.

The third type of memory is shuffle memory which is used for communicating between different partitions. It needs to be set to a high value in cases where you are doing a lot of joins between dataframes/rdd's or in general, which requires a high amount of network overhead. This can be configured by the setting "spark.shuffle.memoryFraction"

So basically you can set the memory fractions by using these two settings, the rest of the memory available after shuffle and storage memory goes to the heap.

Since you are having such a high storage fraction the heap memory available to the program is extremely small. You will need to play with these parameters to get an optimal value. Since you are outputting a parquet file, you will usually need a higher amount of heap space since the programs requires computations for compression. I would suggest the following settings for you. The idea is that you are not doing any operations which require a lot of shuffle memory hence it can be kept small. Also, you do not need such a high amount of storage memory.

"spark.storage.memoryFraction=0.4" "spark.shuffle.memoryFraction=0.2"

More about this can be read here:

https://spark.apache.org/docs/latest/configuration.html#memory-management

Gaurav Dhama
  • 1,346
  • 8
  • 19
0

thanks to Gaurav Dhama for good explanation , you may need to set spark.executor.extraJavaOptions to -XX:-UseGCOverheadLimit too.

Hossein Vatani
  • 1,381
  • 14
  • 26