I am trying to read a table on Postgres and insert the dataframe into a Hive table on HDFS in the below manner:
def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_forecast where period_year='2017'"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017").option("user", devUserName).option("password", devPassword).option("numPartitions",20).load()
val totalCols:List[String] = splitColumns ++ textList
val cdt = new ChangeDataTypes(totalCols, dataMapper)
hiveDataTypes = cdt.gpDetails()
prepareHiveTableSchema(hiveDataTypes, partition_columns)
val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns
val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
val resultDF = yearDF.select(allCols:_*)
val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
val finalDF = stringColumns.foldLeft(resultDF) {
(tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
}
finalDF
}
val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
dataDF.createOrReplaceTempView("preparedDF")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql(s"INSERT OVERWRITE TABLE default.xx_gl_forecast PARTITION(${prtn_String_columns}) select * from preparedDF")
The spark-submit command I am using:
SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/username/jars/postgresql-42.1.4.jar --jars /home/username/jars/postgresql-42.1.4.jar --num-executors 40 --executor-cores 10 --executor-memory 30g --driver-memory 20g --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/username/usr.keytab --principal usr@DEV.COM --files /username/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/username/jars/postgresql-42.1.4.jar
I have the following resources:
number of cores:51
max container memory:471040 MB
Number of executors per LLAP Daemon:39
Even though the I double the memory, I still these exceptions in the log:
Container exited with a non-zero exit code 143.
Killed by external signal
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.String.toCharArray(String.java:2899)
at java.util.zip.ZipCoder.getBytes(ZipCoder.java:78)
at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)
18/09/23 04:57:20 INFO JDBCRDD: closed connection
Is there anything wrong in the code that makes the program crash ? Could anyone let me know what is the mistake I am doing here so that I can fix it.