0

Our requirement is to do some analytical operations on Phoenix (HBase) timeseries tables. we have a table in postgresql which has uniqueIds.

Right now we are getting all uniqueIds from postgresql table and querying the Phoenix tables for corresponding uniqueIds and applying the analytical functions.But here all uniqueIds are processing in sequential manner. we need this to run parallel. we are using scala and spark to achieve this functionality.

Below is the sample code,

val optionsMap = Map("driver" -> config.jdbcDriver, "url" -> config.jdbcUrl,
      "user" -> config.jdbcUser, "password" -> config.jdbcPassword,
      "dbtable" -> query)
val uniqDF = sqlContext.read.format("jdbc").options(optionsMap).load()

val results = uniqDF.collect

results.foreach { uniqId => 
  val data = loadHbaseData(uniqId)
  data.map(func).save()
}

def loadHbaseData(id: String): DataFrame = {
  sqlContext.phoenixTableAsDataFrame("TIMESERIETABLE", Array("TIMESTAMP", "XXXX",""), predicate = Some("\"ID\" = '" + uniqueId + "' "), conf = configuration)
}

could you please let me know what is the better approach in doing this?

Vamshi Mothe
  • 163
  • 1
  • 9

2 Answers2

0

You can use parallel collection functionality provided by scala.

results.par.foreach {
// Your code to be executed
}
  • We have already tried that but of no use, we have around 30K+ unique Ids which needs to be processed in less than an hour. Parallel collection is taking so much time to process all those. – Vamshi Mothe May 23 '17 at 04:38
0

Create one DataFrame that is a union of your HBase DataFrames, then apply your analytical function to this single DataFrame. Something like:

val hbaseDFs = results.map(loadHbaseData)
val unitedDF = hbaseDFs.reduce(_ union _)
unitedDF.map(func).save()

This approach calls union on a large number (30K+ according to your comment in another answer) of DataFrames, so it might be faster to convert the DataFrames to RDDs before doing the union, as described here.

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54