3

i am querying mysql table

val url = "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/pg_partner"
val driver = "com.mysql.jdbc.Driver"
val username = "XXX"
val password = "XXX"
var connection:Connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement()
val patnerName = statement.executeQuery("SELECT id,name FROM partner")

i do get my result in patnerName but i need to be converted to Dataframe.

i am able to print data by below code:

while (patnerName.next) {
  val id = patnerName.getString("id")
  val name = patnerName.getString("name")
  println("id = %s, name = %s".format(id,name))
}

Now How do i convert patnerName to a DataFrame?

toofrellik
  • 1,277
  • 4
  • 15
  • 39

2 Answers2

4

So you'll have to do it in several steps:

  1. Define your columns and prepare a schema
    val columns = Seq("id", "name")
    val schema = StructType(List(
      StructField("id", StringType, nullable = true),
      StructField("name", StringType, nullable = true)
    ))
  1. Define how you will convert each record in the ResultSet to a Row at each iteration
    def parseResultSet(rs: ResultSet): Row = {
      val resultSetRecord = columns.map(c => rs.getString(c))
      Row(resultSetRecord:_*)
    }
  1. Define a function to convert your ResultSet to an Iterator[Row]. It will use the function you defined in the previous step (when you call it in the next step).
    def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] =
      new Iterator[Row] {
        def hasNext: Boolean = rs.next()
        def next(): Row = f(rs)
      }
  1. Define a function that creates an RDD out of an Iterator[Row].toSeq which uses the functions you defined in the previous step. Use the schema to create a DataFrame from the RDD
    def parallelizeResultSet(rs: ResultSet, spark: SparkSession): DataFrame = {
      val rdd = spark.sparkContext.parallelize(resultSetToIter(rs)(parseResultSet).toSeq)
      spark.createDataFrame(rdd, schema) // use the schema you defined in step 1
    }
  1. Finally call your function
    val df: DataFrame = parallelizeResultSet(patner, spark)
kfkhalili
  • 996
  • 1
  • 11
  • 24
  • I tried the approach mentioned above I am able to create dataframe but it is creating empty dataframe for me dosnt have the data from my result set. – Saurabh Sep 08 '20 at 14:08
  • @Saurabh are you able to verify that the ResultSet contains any data at all? Try to `rs.next()` and then select a column from the current row with `rs.getString("col_name")` and see if it returns you data. – kfkhalili Sep 08 '20 at 14:45
  • Yes I verified that I am having data in my resultset using rs.getString("col_name")..It has created the dataframe with e structure I have provided. – Saurabh Sep 08 '20 at 14:54
3

What about directly using the Spark functionality?

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/")
  .option("dbtable", "pg_partner")
  .option("user", "XXX")
  .option("password", "XXX")
  .load()

The code is taken from here.

Anton Okolnychyi
  • 936
  • 7
  • 10
  • what does `spark` in `spark.read` refer to? is it sparkcontext or sqlcontext – toofrellik Dec 13 '16 at 11:11
  • It is [SparkSession](https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html) that appeared in Spark 2. Which Spark version are you using? – Anton Okolnychyi Dec 13 '16 at 11:16
  • i am using spark 2.0.0 i am using these in my code: `val sparkConf = new SparkConf().setAppName("QuaterlyAudit").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc)` sc is not working i know `sc` is sparkcontext but what is `SparkSession` – toofrellik Dec 13 '16 at 11:18
  • The same code I got error: Exception in thread "main" java.sql.SQLException: No suitable driver – Tom Tang Aug 20 '23 at 08:23