0

I wonder how can I pass OJAI connection from spark driver to its executors. Here's my code:

val connection = DriverManager.getConnection("ojai:mapr:")
val store = connection.getStore("/tables/table1")
val someStream = messagesDStream.mapPartitions {
  iterator => {
  val list = iterator
    .map(record => record.value())
    .toList
    .asJava
    //TODO serializacja, deserializacja, interface serializable w javie
  val query = connection
    .newQuery()
    .where(connection.newCondition()
      .in("_id", list)
      .build())
    .build()}

and the error I got:

    Caused by: java.io.NotSerializableException: com.mapr.ojai.store.impl.OjaiConnection
Serialization stack:
        - object not serializable (class: com.mapr.ojai.store.impl.OjaiConnection, value: com.mapr.ojai.store.impl.OjaiConnection@2a367e93)
        - field (class: com.example.App$$anonfun$1, name: connection$1, type: interface org.ojai.store.Connection)
        - object (class com.example.App$$anonfun$1, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
        ...

As long as the connection to the OJAI is inside the mapPartitions function, everything is fine and dandy. I know that I need to pass the configuration from the driver to executors in order for the code to work but I don't know how to do it. Tschüs!

karl marx
  • 3
  • 2
  • 1
    The concepts shown [here](https://stackoverflow.com/questions/31590592/how-to-write-spark-streaming-df-to-kafka-topic/39539234#39539234) might be useful. – Michael Heil Sep 24 '21 at 10:54

1 Answers1

0

You're running into spark's most infamous error - task not serialisable. Essentailly what it means is that one of the classes or objects you're attempting to serialise - send over the network from the driver to the executors - cannot be processed in this way: here, it's the ojai connector.

You cannot pass the connection itself from the driver to the executors - what you can do, while avoiding constant re-creation of the connection for each batch of RDDs coming from your stream, is declare the connection in a companion object as

@transient lazy val connection = ...

And refer to that inside mapPartitions. This will ensure that each executor has a connection to the database which will persist through multiple batches, as fields marked in this way are not creted on the driver then serialised but created on each executor instead.

Shent
  • 16