0

I have a database object which is used to insert data from all Spark executors. When I define this object as static, it has a null value in those executors. So I declare it in the driver, broadcast it then get its value in each executor. When I run the application, the following exception is thrown:

Exception in thread "main" java.io.NotSerializableException: database.Database

Notes:

  • The executors class is Serializable
  • The broadcast object is defined as transient in that class
  • I removed the transient but it didn't work
koiralo
  • 22,594
  • 6
  • 51
  • 72
fattah.safa
  • 926
  • 2
  • 14
  • 36

1 Answers1

2

I interpret your question this way:

I want to insert data from my RDD from all Spark executors. I tried to create one DB connection on the Driver and pass it somehow as a Broadcast to the executors, but Spark keeps throwing NotSerializableException. How can I achieve my goal?

The short answer is:

You should create a new connection on every executor node separately.
You should not pass database connection handlers, file handlers and the likes to other processes and especially remote machines.

The problem here is where exactly to create database connections, because with large number of executors one can easily exceed connection pool size of the DB.

What you can actually do is to use foreachPartition, like here:

  // numPartitions == number of simultaneous DB connections you can afford
  yourRdd.repartition(numPartitions)
  .foreachPartition {
    iter =>
      val connection = createConnection()
      while (iter.hasNext) {
        connection.execute("INSERT ...")
      }
      connection.commit()
  }

Here the code inside .foreachPartition will be executed on each executor machine, and connection objects will not be sent over the network, you won't have serialization exceptions and the data will be inserted.

The same reasoning about use of foreachPartition is also mentioned in the answers to this question.

Nikolay Vasiliev
  • 5,656
  • 22
  • 31