1

I would love if someone of you guys can guide me to convert a scala (or java) Resultset to spark Dataframe.

I cannot use this notation:

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/")
  .option("dbtable", "pg_partner")
  .option("user", "XXX")
  .option("password", "XXX")
  .load()

So before referring me to this similar question, please take it into account.

The reason why I cannot use that notation is that I need to use a jdbc configuration which is not present in current version of spark that I am using (2.2.0), because I want to use a "queryTimeout" option which has been recently added to the spark version 2.4, so I need to use it in the ResultSet.

Any help will be appreciated.

Thank you in advance!

James
  • 2,954
  • 2
  • 12
  • 25

3 Answers3

3

Give this a try

(haven't tried but should work with slight modification)

import java.sql.ResultSet
import org.apache.spark.sql.DataFrame

// assuming ResultSet comprises rows of (String, Int)
def resultSetToDataFrame(resultSet: ResultSet): DataFrame = {
  val resultSetAsList: List[(String, Int)] = new Iterator[(String, Int)] {
    override def hasNext: Boolean = resultSet.next()

    override def next(): (String, Int) = {
      // can also use column-label instead of column-index
      (resultSet.getString(0), resultSet.getInt(1))
    }
  }.toStream.toList

  import org.apache.spark.implicits._
  val listAsDataFrame: DataFrame = resultSetAsList.toDF("column_name_1", "column_name_2")

  listAsDataFrame
}

References:

y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
3

A working example against public source mySQL

import java.util.Properties
import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.implicits.

val url = "jdbc:mysql://mysql-rfam-public.ebi.ac.uk:4497/Rfam"
val username = "rfamro"
val password = ""
val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url, username, password), "select rfam_id, noise_cutoff from family limit ?, ?", 1, 100, 10,                  
                    r => r.getString("rfam_id") + ", " + r.getString("noise_cutoff"))
val DF = myRDD.toDF
DF.show

returns:

+-------------------+
|              value|
+-------------------+
|    5_8S_rRNA, 41.9|
|           U1, 39.9|
|           U2, 45.9|
|         tRNA, 28.9|
|        Vault, 33.9|
|          U12, 52.9|
....
....
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • I am on it... I'm gonna try both solutions to select the best answer for my case, but thanks again dude for your support – James Aug 08 '18 at 10:08
  • Fine, but mine actually works and they may well be complimentary. I think I actually answered. I only mention it as I find a number of people don't get the conduct code here – thebluephantom Aug 08 '18 at 10:13
0
def parallelizeResultSet(spark: SparkSession, rs: ResultSet): DataFrame = {

  def parseResultSet(rs: ResultSet): Row = {
    val columns = Seq.range(1, rs.getMetaData.getColumnCount + 1).map(i => rs.getMetaData.getColumnName(i))
    val resultSetRecord = columns.map(c => rs.getString(c))
    Row(resultSetRecord:_*)
  }

  def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] = new Iterator[Row] {
    def hasNext: Boolean = rs.next()
    def next(): Row = f(rs)
  }

  val rdd = spark.sparkContext.parallelize(resultSetToIter(rs)(parseResultSet).toSeq)
  val schema = StructType(Seq.range(1, rs.getMetaData.getColumnCount + 1).map(i => {StructField(rs.getMetaData.getColumnName(i), StringType, true)}))
  spark.createDataFrame(rdd, schema)
}

parallelizeResultSet(spark,rs)