9

So, I want to do certain operations on my spark DataFrame, write them to DB and create another DataFrame at the end. It looks like this :

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    iterator.map(
       row => {
         addRowToBatch(row)
         convertRowToObject(row)
     })
    conn.writeTheBatchToDB()
    conn.close()
  })
  .toDF()

This gives me an error as mapPartitions expects return type of Iterator[NotInferedR], but here it is Unit. I know this is possible with forEachPartition, but I'd like to do the mapping also. Doing it separate would be an overhead (extra spark job). What to do?

Thanks!

void
  • 2,403
  • 6
  • 28
  • 53

2 Answers2

14

On most cases, eager consuming the iterator will result to execution failure if not slow down of jobs. Thus what I've done was to check if iterator is already empty then do the cleanup routines.

rdd.mapPartitions(itr => {
    val conn = new DbConnection
    itr.map(data => {
       val yourActualResult = // do something with your data and conn here
       if(itr.isEmpty) conn.close // close the connection
       yourActualResult
    })
})

Thought this as a spark problem at first but was a scala one actually. http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean

dansuzuki
  • 438
  • 4
  • 9
10

The last expression in the anonymous function implementation must be the return value:

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    // using toList to force eager computation - make it happen now when connection is open
    val result = iterator.map(/* the same... */).toList
    conn.writeTheBatchToDB()
    conn.close()
    result.iterator
  }
).toDF()
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • 2
    What if I have to use the `conn` inside `iterator.map()` function ? Won't I get a connection already closed exception? – void Jun 16 '16 at 18:17
  • Well - you're right - because of iterator.map's laziness, the actual calculation would only happen when the `result` iterator is used, hence - after the connection is closed. I'll fix the answer to reflect this - thanks – Tzach Zohar Jun 19 '16 at 14:28
  • Thanks! I've asked it as a separate one. You may take a look at it http://stackoverflow.com/questions/37881042/spark-db-connection-per-spark-rdd-partition-and-do-mappartition – void Jun 20 '16 at 05:15
  • 2
    I would close it the same lazy way without toList: `(iterator.map(...) ++ Seq(null)).filter(_ != null || { close ; false })` – Valentin Mar 10 '18 at 10:26