I have a requirement in Spark where I need to fetch data from mysql instances and after some processing enrich them with some more data from a different mysql database.
However, when I try to access the database again from inside a map function, I get a
org.apache.spark.SparkException: Task not serializable
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
My code looks like this:
val reader = sqlContext.read;
initialDataset.map( r => reader.jdbc(jdbcUrl, "(select enrichment_data from other_table where id='${r.getString(1)'}) result", connectionProperties).rdd.first().get(0).toString )
Any ideas / pointers? Should I use two different Datasets? Thanks!