I am trying to get count of rows in a table: bank_accounts. Conditions are "source_system_name=SAP" & period_year="2017"
To do that, I came up with the following code:
object PartitionRetrieval {
var conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s")
.set("spark.network.timeout","12000s")
val log = LogManager.getLogger("Spark-JDBC Program")
Logger.getLogger("org").setLevel(Level.ERROR)
val conFile = "/home/user/ReconTest/inputdir/testconnection.properties"
val properties = new Properties()
properties.load(new FileInputStream(conFile))
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName = properties.getProperty("devUserName")
val devPassword = properties.getProperty("devPassword")
val driverClass = properties.getProperty("gpDriverClass")
val tableName = "dev.banknumbers"
try {
Class.forName(driverClass).newInstance()
} catch {
case cnf: ClassNotFoundException =>
log.error("Driver class: " + driverClass + " not found")
System.exit(1)
case e: Exception =>
log.error("Exception: " + e.printStackTrace())
System.exit(1)
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
.option("dbtable",tableName)
.option("user",devUserName)
.option("password",devPassword).load()
val rc = gpTable.filter(gpTable("source_system_name")==="GEA_CENTERPIECE" && gpTable("period_year")==="2017").count()
println("gpTable Count: " + rc)
}
}
The jar runs for 5mins and then gives me the result. The output is 21222313 If I run the same in query format on my workbench tool, I get the result in 5 seconds. Earlier was getting:
18/07/24 10:10:50 ERROR YarnScheduler: Lost executor 2 on ip-10-230-137-10.ec2.internal: Executor heartbeat timed out after 120041 ms
18/07/24 10:10:52 ERROR YarnScheduler: Lost executor 2 on ip-10-230-137-10.ec2.internal: Container container_e540_1532132067680_0344_01_000003 exited from explicit termination request.
It runs fine after giving:
set("spark.executor.heartbeatInterval","120s")
set("spark.network.timeout","12000s")
I am practicing spark and it is just a count query but why is it running slow on spark. Should I give the filter predicate in any other way or change any other parameters in the code so that it runs faster ?