0

I am trying to read a table on Postgres and insert the dataframe into a Hive table on HDFS in the below manner:

def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
  val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_forecast where period_year='2017'"
  val yearDF    = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017").option("user", devUserName).option("password", devPassword).option("numPartitions",20).load()
  val totalCols:List[String] = splitColumns ++ textList
  val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
  hiveDataTypes              = cdt.gpDetails()
  prepareHiveTableSchema(hiveDataTypes, partition_columns)
  val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
  val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
  val resultDF               = yearDF.select(allCols:_*)
  val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
  val finalDF                = stringColumns.foldLeft(resultDF) {
    (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
  }
  finalDF
}

    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    dataDF.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE default.xx_gl_forecast PARTITION(${prtn_String_columns}) select * from preparedDF")

The spark-submit command I am using:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/username/jars/postgresql-42.1.4.jar  --jars /home/username/jars/postgresql-42.1.4.jar --num-executors 40 --executor-cores 10 --executor-memory 30g --driver-memory 20g --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/username/usr.keytab --principal usr@DEV.COM --files /username/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/username/jars/postgresql-42.1.4.jar

I have the following resources:

number of cores:51
max container memory:471040 MB
Number of executors per LLAP Daemon:39 

Even though the I double the memory, I still these exceptions in the log:

Container exited with a non-zero exit code 143.
Killed by external signal
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.String.toCharArray(String.java:2899)
at java.util.zip.ZipCoder.getBytes(ZipCoder.java:78)
at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)
18/09/23 04:57:20 INFO JDBCRDD: closed connection

enter image description here

Is there anything wrong in the code that makes the program crash ? Could anyone let me know what is the mistake I am doing here so that I can fix it.

Metadata
  • 2,127
  • 9
  • 56
  • 127

1 Answers1

2

This exception is telling you that you're spending a large amount of time garbage collecting. The first thing you should do is check the Spark UI while the job is running (or in he history server) to see which stage(s) are GCing a lot. You should be able to see it very obviously from the UI.

My guess is that it's going to be a shuffle. Now the questions are:

  • do you have enough partitions considering the size of the data?
  • if not, try increasing the default parallelism of your shuffles using spark.sql.shuffle.partitions
  • if they're already well sized, what's causing your heap to get filled up? You might want to perform a heap dump while your job is running and then explore it with a dump analysis tool.
Dici
  • 25,226
  • 7
  • 41
  • 82