1

I am trying to get count of rows in a table: bank_accounts. Conditions are "source_system_name=SAP" & period_year="2017" To do that, I came up with the following code:

object PartitionRetrieval {
    var conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s")
                                                        .set("spark.network.timeout","12000s")
    val log   = LogManager.getLogger("Spark-JDBC Program")
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conFile       = "/home/user/ReconTest/inputdir/testconnection.properties"
    val properties    = new Properties()
    properties.load(new FileInputStream(conFile))
    val connectionUrl = properties.getProperty("gpDevUrl")
    val devUserName   = properties.getProperty("devUserName")
    val devPassword   = properties.getProperty("devPassword")
    val driverClass   = properties.getProperty("gpDriverClass")
    val tableName     = "dev.banknumbers"

    try {
    Class.forName(driverClass).newInstance()
    } catch {
    case cnf: ClassNotFoundException =>
        log.error("Driver class: " + driverClass + " not found")
        System.exit(1)
    case e: Exception =>
        log.error("Exception: " + e.printStackTrace())
        System.exit(1)
    }

    def main(args: Array[String]): Unit = {
        val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
        val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
            .option("dbtable",tableName)
            .option("user",devUserName)
            .option("password",devPassword).load()
        val rc = gpTable.filter(gpTable("source_system_name")==="GEA_CENTERPIECE" && gpTable("period_year")==="2017").count()
        println("gpTable Count: " + rc)
    }
}

The jar runs for 5mins and then gives me the result. The output is 21222313 If I run the same in query format on my workbench tool, I get the result in 5 seconds. Earlier was getting:

18/07/24 10:10:50 ERROR YarnScheduler: Lost executor 2 on ip-10-230-137-10.ec2.internal: Executor heartbeat timed out after 120041 ms
18/07/24 10:10:52 ERROR YarnScheduler: Lost executor 2 on ip-10-230-137-10.ec2.internal: Container container_e540_1532132067680_0344_01_000003 exited from explicit termination request.

It runs fine after giving:

set("spark.executor.heartbeatInterval","120s")
set("spark.network.timeout","12000s")

I am practicing spark and it is just a count query but why is it running slow on spark. Should I give the filter predicate in any other way or change any other parameters in the code so that it runs faster ?

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
Metadata
  • 2,127
  • 9
  • 56
  • 127
  • Yes, parallelize() has its own advantages but how can I use parallelize in this case ? Is it using this parameter "set("spark.default.parallelism", "xx")" – Metadata Jul 24 '18 at 18:12

0 Answers0