7

I have some code like this:

      println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
      val lastRevs = distinctFileGidsRDD.
        foreachPartition(iter => {
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          while(iter.hasNext) {
            val item = iter.next()
            //println(item(0))
            println("String: "+item(0).toString())
            val jsonStr = DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
                map { resultSet => resultSet.string(1) }.single.apply()
            }
            println("\nJSON: "+jsonStr)
          }
        })
      println("\nEND Last Revs Class: "+ lastRevs.getClass)

The code outputs (with heavy edits) something like:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None()
END Last Revs Class: void

QUESTION 1: How can I get the lastRevs value to be in a useful format like the JSON string/null or an option like Some / None?

QUESTION 2: My preference: IS there another way get at partitions data that an RDD-like format (rather than the iterator format)?

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  } 
}

from http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

QUESTION 3: Is the method of getting data that I am using a sane method (given I am following the link above)? (Put aside the fact that this is a scalikejdbc system JDBC right now. This is going to be a key, value store of some type other than this prototype.)

codeaperature
  • 1,089
  • 2
  • 10
  • 25
  • I don't get the question. `lastRevs` should be `Unit` because `.forEachPartition` is only used for its side effect (the function is T=>Unit). I think you want to transform the data, like using `mapPartitions` instead. I'd like to understand what is the overal goal here, because the individual questions don't make much sense (to me) – maasg Apr 30 '16 at 16:31
  • @maasg: Yes. This is the answer that I am looking for - mapPartitions. I found another example at http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine. – codeaperature Apr 30 '16 at 16:54

1 Answers1

7

To create a transformation that uses resources local to the executor (such as a DB or network connection), you should use rdd.mapPartitions. It allows to initialize some code locally to the executor and use those local resources to process the data in the partition.

The code should look like:

 val lastRevs = distinctFileGidsRDD.
        mapPartitions{iter => 
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          iter.map{ element => 
            DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
              .map { resultSet => resultSet.string(1) }.single.apply()
            }
          }
        }
codeaperature
  • 1,089
  • 2
  • 10
  • 25
maasg
  • 37,100
  • 11
  • 88
  • 115
  • you mean that it differs from `foreachPartition` in that it uses Executor's resources instead of Driver's resources? Ie. code `foreachPartition` code is executed on Driver whereas `mapPartitions` on Executor ... right? – lisak May 26 '16 at 15:13
  • 5
    @lisak No, Both `foreachPartition` and `mapPartitions` will let you run code on the executors. The difference is that `foreachPartition` only does side-effects (like write to a db), while `mapPartitions` returns a value. The key of this question is 'how to get data back' hence `mapPartitions` is the way to go. – maasg May 26 '16 at 18:27
  • @maasg I have a code like this ' val company_model_vals_df = enriched_company_model_vals_df.repartition(col("model_id"),col("fiscal_quarter"),col("fiscal_year")) company_model_vals_df.foreachPartition( writeAsParquet(dataframe) ///will write as parquet in hdfs //// But how to use foreachPartition here ? ) – BdEngineer Jan 16 '19 at 10:50