1

I have a requirement in Spark where I need to fetch data from mysql instances and after some processing enrich them with some more data from a different mysql database.

However, when I try to access the database again from inside a map function, I get a

org.apache.spark.SparkException: Task not serializable
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)     
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
org.apache.spark.SparkContext.clean(SparkContext.scala:2094)

My code looks like this:

val reader = sqlContext.read;
initialDataset.map( r => reader.jdbc(jdbcUrl, "(select enrichment_data from other_table where id='${r.getString(1)'}) result", connectionProperties).rdd.first().get(0).toString )

Any ideas / pointers? Should I use two different Datasets? Thanks!

Kostas Chr
  • 63
  • 5
  • I believe you will have to use initialDataset.collect to access the database inside rdd. – Abhishek Choudhary Mar 12 '18 at 09:46
  • But if I collect, that would mean that I would retrieve the whole dataset to the driver - or not? I would like to avoid it. – Kostas Chr Mar 12 '18 at 10:11
  • Hello @KostasChr did you manage to find a solution about this one? – abiratsis Mar 26 '18 at 18:07
  • It seems the second option is the way to go in my case - mapPartition. Unfortunately I had to pause this task - I will give an update when I revisit it. Thanks @AlexandrosBiratsis! – Kostas Chr Mar 27 '18 at 11:58
  • OK @Kostas good luck. Would be nice to provide more details when you end up with a solution since we have a similar case and would be interesting to share some experience – abiratsis Mar 27 '18 at 12:13

1 Answers1

0

First of all map() function should accept a row from an existing RDD then will apply the changes you made and returns the updated row. This is the reason why you get this exception since scala can't serialize the code reader.jdbc(jdbcUrl, ...

To solve your issue you have multiple options according to your needs:

  1. You could broadcast one of these datasets after collecting it. With broadcast your dataset will be stored into your nodes' memory. This could work if this dataset is reasonably small to fit in the node's memory. Then you could just query it and combine the results with the 2nd dataset

  2. If both datasets are big and not suitable for loading them into node memory then use mapPartition, you can find more information about mapPartition here. mapPartition is called per partition instead of per element that map() does. If you choose this option then you could access the 2nd dataset from mapPartition or even initialize the whole dataset(e.g retrieve all related records from the 2nd database) from mapPartition.

Please be aware that I assumed that these two datasets they do have some kind of dependency(e.g you need to access some value from the 2nd database before executing the next step). If they don't then just create both ds1 and ds2 and use them normally as you would do with any dataset. Finally remember to to cache the datasets if you are sure that you might need to access it multiple times.

Good luck

abiratsis
  • 7,051
  • 3
  • 28
  • 46