Issue details -
Running below spark code (object TestMySQL) on a 2-node spark cluster, 2 core , 7.5 G each to extract data from external MySql table
object TestMysql { def main(args: Array[String]) { var session = SparkSession.builder() // uncomment line for local testing //.master("local").config("spark.pangea.ae.LOGGER", true) .enableHiveSupport() // comment line for local testing .config("spark.sql.warehouse.dir", "/user/pangea/data/optimisation_program/target") .config("spark.sql.crossJoin.enabled", true) .appName("TestMySQL") .getOrCreate() val sqlcontext = new org.apache.spark.sql.SQLContext(session.sparkContext) val dataframe_mysql = sqlcontext.read.format("jdbc") .option("url", "jdbc:mysql://35.164.49.234:3306/translation") .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", "table4").option("user", "admin1").option("password", "India@123").option("fetchSize", "100000") .load() dataframe_mysql.write.format("com.databricks.spark.csv").save("/user/pangea/data/optimisation_program/target") } }
10000000 records ~ 1GB data reside in mysql database. structure of table -
+-----------+---------------+----------+--------+----------+--------+----------+------------+---------------+--------+----------+ | firstname | middleinitial | lastname | suffix | officeid | userid | username | email | phone | groups | pgpemail | +-----------+---------------+----------+--------+----------+--------+----------+------------+---------------+--------+----------+ | f1 | m1 | l1 | Mr | 1 | userid | username | a@mail.com | 9998978988998 | g1 | pgpemail | | f1 | m1 | l1 | Mr | 2 | userid | username | a@mail.com | 9998978988998 | g1 | pgpemail | +-----------+---------------+----------+--------+----------+--------+----------+------------+---------------+--------+----------+
spark shell script -
/root/spark/bin/spark-submit '--master' 'yarn' '--class' 'org.pangea.translation.core.TestMysql' '/home/pangea/pangea-ae-pae/lib/PangeaTranslationCore-2.7.0.10.jar'
job run with executor details below - executor and driver both provided 2GB each.
The statistics do not change at all and after sometime executor start becoming dead.(The job ran successfully with 1000000 records but garbage collection was significant).
here are logs -
17/11/29 10:52:42 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.lang.OutOfMemoryError: Java heap space
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1536)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
17/11/29 10:52:42 INFO DiskBlockManager: Shutdown hook called
17/11/29 10:52:42 INFO ShutdownHookManager: Shutdown hook called
17/11/29 10:52:42 INFO ShutdownHookManager: Deleting directory /mnt/yarn-local/usercache/pangea/appcache/application_1501236719595_25297/spark-9dcdeb76-1b18-4dd1-bf35-fc84895b9013
17/11/29 10:52:42 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: Java heap space
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2114)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1921)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3278)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:462)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:2997)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2245)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2638)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2530)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1907)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2030)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:399)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:370)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
My Query - Want to understand changes to above code or any other configuration which could make the job run. It should also be able run on large read say 12GB of same table even if longer run. let me know it is possible. There is limited scope for partitioning in my cluster which and cannot scale much.