0

I am trying to work on a Spark-JDBC program in Scala. For that I have written the following code:

object PartitionRetrieval {

  var conf  = new SparkConf().setAppName("Spark-JDBC")
  val log   = LogManager.getLogger("Spark-JDBC Program")
  Logger.getLogger("org").setLevel(Level.ERROR)
  val conFile       = "/home/hmusr/ReconTest/inputdir/testconnection.properties"
  val properties    = new Properties()
  properties.load(new FileInputStream(conFile))
  val connectionUrl = properties.getProperty("gpDevUrl")
  val devUserName   = properties.getProperty("devUserName")
  val devPassword   = properties.getProperty("devPassword")
  val driverClass   = properties.getProperty("gpDriverClass")
  val tableName     = "supply.accounts"
  val connectionProperties = new Properties()
  connectionProperties.put("user",devUserName)
  connectionProperties.put("password",devPassword)
  connectionProperties.put("driver",driverClass)
  try {
    Class.forName(driverClass).newInstance()
  } catch {
    case cnf: ClassNotFoundException =>
      log.error("Driver class: " + driverClass + " not found")
      System.exit(1)
    case e: Exception =>
      log.error("Exception: " + e.printStackTrace())
      System.exit(1)
  }
  println("connectionUrl: " + connectionUrl)
  println("devUserName: " + devUserName)
  println("devPassword: " + devPassword)
  println("driverClass: " + driverClass)

  def main(args: Array[String]): Unit = {
    val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
    val gpTable2 = spark.read.jdbc(connectionUrl, tableName, connectionProperties)
    val count = gpTable2.filter(gpTable2("source_system_name")==="ORACLE").count()
    println("gpTable2 Count: " + count)
  }
}

These are the contents of the testconnection.properties file:

devUserName="username"
devPassword="password"
gpDriverClass=org.postgresql.Driver
gpDevUrl="jdbc:postgresql://xx.xxx.xxx.xxx:1234/base?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory"

I am trying to filter the table: accounts on a column: source_system_name with value: ORACLE and then get the count of filtered rows. When I execute the code I get the NullPointerException:

connectionUrl: "jdbc:postgresql://xx.xxx.xxx.xxx:1234/finance?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory"
devUserName: "username"
devPassword: "password"
driverClass: org.postgresql.Driver
18/07/23 14:57:07 INFO metastore: Trying to connect to metastore with URI thrift://ip-xx-xxx-xxx-xxx.ec2.internal:1234
18/07/23 14:57:07 INFO metastore: Connected to metastore.
18/07/23 14:57:20 INFO metastore: Trying to connect to metastore with URI thrift://ip-xx-xxx-xxx-xxx.ec2.internal:1234
18/07/23 14:57:20 INFO metastore: Connected to metastore.
Exception in thread "main" java.lang.NullPointerException
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:72)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:114)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
        at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:193)
        at com.yearpartition.obj.PartitionRetrieval$.main(PartitionRetrieval.scala:59)
        at com.yearpartition.obj.PartitionRetrieval.main(PartitionRetrieval.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

What I don't understand is whether the connection to the database failed or is the connection success and then my operation on the dataframe failed ? Could anyone let me know how can I correct the exception here ?

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
Metadata
  • 2,127
  • 9
  • 56
  • 127

0 Answers0