I want to do a mapPartitions on my spark rdd,
val newRd = myRdd.mapPartitions(
partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(
record => {
readMatchingFromDB(record, connection)
})
connection.close()
newPartition
})
But, this gives me a connection already closed exception, as expected because before the control reaches the .map()
my connection
is closed. I want to create a connection per RDD partition, and close it properly. How can I achieve this?
Thanks!