3

I am trying to load a RDD from a Mysql database:

package ro.mfl.employees
import org.apache.spark.{SparkConf, SparkContext}
import java.sql.{Connection, DriverManager}

import org.apache.spark.rdd.JdbcRDD

class Loader(sc: SparkContext) {

  Class.forName("com.mysql.jdbc.Driver").newInstance()

  def connection(): Connection = {
    DriverManager.getConnection("jdbc:mysql://localhost/employees", "sakila", "sakila")
  }


  def load(): Unit = {
    val employeesRDD = new JdbcRDD(sc, connection, "select * from employees.employees", 0, 0, 1)
    println(employeesRDD.count())

  }

}

object Test extends App {
  val conf = new SparkConf().setAppName("test")
  val sc = new SparkContext(conf)
  val l = new Loader(sc)
  l.load()
}

When I execute this, I get an error saying

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@323a9221)
    - field (class: ro.mfl.employees.Loader, name: sc, type: class org.apache.spark.SparkContext)
    - object (class ro.mfl.employees.Loader, ro.mfl.employees.Loader@607c6d60)
    - field (class: ro.mfl.employees.Loader$$anonfun$1, name: $outer, type: class ro.mfl.employees.Loader)
    - object (class ro.mfl.employees.Loader$$anonfun$1, <function0>)
    - field (class: org.apache.spark.rdd.JdbcRDD, name: org$apache$spark$rdd$JdbcRDD$$getConnection, type: interface scala.Function0)
    - object (class org.apache.spark.rdd.JdbcRDD, JdbcRDD[0] at JdbcRDD at Loader.scala:17)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (JdbcRDD[0] at JdbcRDD at Loader.scala:17,<function2>))

Has anyone encountered this problem?

I tried to make the Loader class to extend java.io.Serializable, but I got the same error, only with org.apache.spark.SparkContext instead of Loader.

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Matei Florescu
  • 1,155
  • 11
  • 23
  • `Class.forName("com.mysql.jdbc.Driver")` has been obsolete since 2007, and the `.newInstance()` part has never been required. Is this Scala? – user207421 Sep 03 '16 at 10:16
  • Yes, it is scala. I removed newInstance, same error. I don't understand what was deprecated. I don't see "Class.forName" marked as deprecated in the javadoc. – Matei Florescu Sep 03 '16 at 11:55
  • [check this](http://stackoverflow.com/questions/24916852/how-can-i-connect-to-a-postgresql-database-into-apache-spark-using-scala) – Ram Ghadiyaram Sep 03 '16 at 16:46
  • if you are okay with the answer please vote-up as well. Thx – Ram Ghadiyaram Sep 09 '16 at 15:02

1 Answers1

1

Problem :

Your issue is Loader as class which is not serializable

try to change it to object. or else follow below example given.

object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@323a9221)

This is because your Loader is class and you are passing SparkContext to Loader class by creating new instance..

Follow this example(simple and elegant way),this should work:

import org.apache.spark._
import org.apache.spark.rdd.JdbcRDD
import java.sql.{DriverManager, ResultSet}
// not class enclosed in an object
object LoadSimpleJdbc {
  def main(args: Array[String]) {
    if (args.length < 1) {
      println("Usage: [sparkmaster]")
      exit(1)
    }
    val master = args(0)
    val sc = new SparkContext(master, "LoadSimpleJdbc", System.getenv("SPARK_HOME"))
    val data = new JdbcRDD(sc,
      createConnection, "SELECT * FROM panda WHERE ? <= id AND ID <= ?",
      lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
    println(data.collect().toList)
  }
/** createConnection - Get connection here **/
  def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver").newInstance();
    DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
  }
/** This returns tuple **/
  def extractValues(r: ResultSet) = {
    (r.getInt(1), r.getString(2))
  }
}

In general, try to avoid storing the SparkContext in your classes.

Also, look at Serialization Exception on spark

try declare SparkContext as @transient (some users are using this approach across SO)

Community
  • 1
  • 1
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121