40

I want to know how can I do following things in scala?

  1. Connect to a postgreSQL database using Spark scala.
  2. Write SQL queries like SELECT , UPDATE etc. to modify a table in that database.

I know to do it using scala but how to import the connector jar of psql scala into sbt while packaging it?

febinsathar
  • 422
  • 1
  • 6
  • 8
  • 6
    Why the downvotes? I think this is a great question. It is quite generic, but then the answer can also be generic and help a lot of users. – Daniel Darabos Jul 24 '14 at 08:27
  • did you end up using mysql or postgres? If postgres is it possible to have a look at your sbt and code example? – Irene May 05 '15 at 17:17

1 Answers1

45

Our goal is to run parallel SQL queries from the Spark workers.

Build setup

Add the connector and JDBC to the libraryDependencies in build.sbt. I've only tried this with MySQL, so I'll use that in my examples, but Postgres should be much the same.

libraryDependencies ++= Seq(
  jdbc,
  "mysql" % "mysql-connector-java" % "5.1.29",
  "org.apache.spark" %% "spark-core" % "1.0.1",
  // etc
)

Code

When you create the SparkContext you tell it which jars to copy to the executors. Include the connector jar. A good-looking way to do this:

val classes = Seq(
  getClass,                   // To get the jar with our own code.
  classOf[mysql.jdbc.Driver]  // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)

Now Spark is ready to connect to the database. Each executor will run part of the query, so that the results are ready for distributed computation.

There are two options for this. The older approach is to use org.apache.spark.rdd.JdbcRDD:

val rdd = new org.apache.spark.rdd.JdbcRDD(
  sc,
  () => {
    sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  },
  "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
  0, 1000, 10,
  row => row.getString("BOOK_TITLE")
)

Check out the documentation for the parameters. Briefly:

  • You have the SparkContext.
  • Then a function that creates the connection. This will be called on each worker to connect to the database.
  • Then the SQL query. This has to be similar to the example, and contain placeholders for the starting and ending key.
  • Then you specify the range of keys (0 to 1000 in my example) and the number of partitions. The range will be divided among the partitions. So one executor thread will end up executing SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100 in the example.
  • And at last we have a function that converts the ResultSet into something. In the example we convert it into a String, so you end up with an RDD[String].

Since Apache Spark version 1.3.0 another method is available through the DataFrame API. Instead of the JdbcRDD you would create an org.apache.spark.sql.DataFrame:

val df = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
  "dbtable" -> "BOOKS"))

See https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases for the full list of options (the key range and number of partitions can be set just like with JdbcRDD).

Updates

JdbcRDD does not support updates. But you can simply do them in a foreachPartition.

rdd.foreachPartition { it =>
  val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
  for (bookTitle <- it) {
    del.setString(1, bookTitle)
    del.executeUpdate
  }
}

(This creates one connection per partition. If that is a concern, use a connection pool!)

DataFrames support updates through the createJDBCTable and insertIntoJDBC methods.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • Your update creates a new connection for every partition. – BAR Oct 03 '15 at 21:39
  • Yes. Each partition may be processed on a different machine, so they cannot share one connection. You could use a connection pool though, so that if two partitions are processed on the same machine, in the same thread, one after the other, they could re-use the connection. As far as I know there is no connection pool in the Java standard API, so this would complicate the example significantly. But let me know if you know a good solution! – Daniel Darabos Oct 03 '15 at 22:28
  • right on. The new Spark docs for 1.5.1 show 3 examples of do's and dont's regarding this case. They have a pretty elegant solution using a connection pool. – BAR Oct 04 '15 at 17:15
  • Oh fantastic! Do you have a link to this page in the documentation? Thanks! – Daniel Darabos Oct 04 '15 at 17:56
  • Sorry, I can't find anything about a connection pool at that URL. What am I missing? – Daniel Darabos Oct 05 '15 at 11:32
  • Here you go: http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd – BAR Oct 05 '15 at 16:24
  • Thanks! It's pretty elegant, but it's _pseudocode_. The `ConnectionPool` object they are using is a fictional API. As such I'd rather not include it in my answer. But I've added a paragraph to suggest using a connection pool. – Daniel Darabos Oct 06 '15 at 10:15