3

I am new to SPARK and figuring out a better way to achieve the following scenario. There is a database table containing 3 fields - Category, Amount, Quantity. First I try to pull all the distinct Categories from the database.

 val categories:RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)

Now for each category I want to execute the Pipeline which essentially creates dataframes from each category and apply some Machine Learning.

 categories.foreach(executePipeline)
 def execute(category: String): Unit = {
   val dfCategory = sqlCtxt.read.jdbc(JDBC_URL,"SELECT * FROM TABLE_NAME WHERE CATEGORY="+category)
dfCategory.show()    
}

Is it possible to do something like this ? Or is there any better alternative ?

zero323
  • 322,348
  • 103
  • 959
  • 935
atreya biswas
  • 31
  • 1
  • 3

2 Answers2

3
// You could get all your data with a single query and convert it to an rdd
val data = sqlCtxt.read.jdbc(JDBC_URL,"SELECT * FROM TABLE_NAME).rdd

// then group the data by category
val groupedData = data.groupBy(row => row.getAs[String]("category"))

// then you get an RDD[(String, Iterable[org.apache.spark.sql.Row])]
// and you can iterate over it and execute your pipeline
groupedData.map { case (categoryName, items) =>
  //executePipeline(categoryName, items)
}
Daniel B.
  • 929
  • 4
  • 8
  • 1
    `groupBy` is risky in this context - assuming each category has lots of records, this can cause an OutOfMemory error as a single, un-distributable record would hold too much data. Plus, the real issue here is that `executePipeline` can't be called on the worker-side since it uses `SQLContext` to load a DataFrame - you can't serialize that. – Tzach Zohar Mar 23 '16 at 12:41
  • No. I don't load anything on the worker side, thats the point of my solution:) – Daniel B. Mar 23 '16 at 13:29
  • About the other comment, yes groupBy can be a potentially expensive operation, but as in this case we really want to group I'd start to think about ways to circumvent it if the datasets are really that huge. – Daniel B. Mar 23 '16 at 13:48
  • I notice a lot of people don't get this notion. – thebluephantom Jul 05 '18 at 11:24
3

Your code would fail on a TaskNotSerializable exception since you're trying to use the SQLContext (which isn't serializable) inside the execute method, which should be serialized and sent to workers to be executed on each record in the categories RDD.

Assuming you know the number of categories is limited, which means the list of categories isn't too large to fit in your driver memory, you should collect the categories to driver, and iterate over that local collection using foreach:

val categoriesRdd: RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)
val categories: Seq[String] = categoriesRdd.collect()
categories.foreach(executePipeline)

Another improvement would be reusing the dataframe that you loaded instead of performing another query, using a filter for each category:

def executePipeline(singleCategoryDf: DataFrame) { /* ... */ }

categories.foreach(cat => {
  val filtered = df.filter(col(CATEGORY) === cat)
  executePipeline(filtered)
})

NOTE: to make sure the re-use of df doesn't reload it for every execution, make sure you cache() it before collecting the categories.

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • If you collect the categories on the driver and iterate over them, it gets run sequentially, which kinda defeats the purpose of spark. – Daniel B. Mar 23 '16 at 13:35
  • I'm assuming #Categories <<< #Records, so running on categories sequentially is much better than running sequentially on *records* (which is what a `groupBy`-based solution would do - each category creates a huge `Iterable` that can only be processed by a single task). If that assumption is wrong - indeed this isn't an optimal solution. It also seems that changing `executePipeline` to work on an `Iterable[Row]` is either hard or impossible, as OP implied it should "apply some Machine Learning" - if by that they mean MLLib, the input must be an RDD/DataFrame. – Tzach Zohar Mar 23 '16 at 13:44
  • @TzachZohar , do you mean by saying Iterable[Row] , these data are iterated by Row? does not it consume more momery on spark client ? – BdEngineer Jan 16 '19 at 10:21
  • @user3252097 that's why I'm saying you should NOT use `Iterable[Row]` (which is why I still think the other solution suggested here is NOT recommended) – Tzach Zohar Jan 16 '19 at 21:10
  • @DanielB "If you collect the categories on the driver and iterate over them, it gets run sequentially, which kinda defeats the purpose of spark. " then how should I handle this case , i am having same issue .. https://stackoverflow.com/questions/54416623/how-to-group-dataframe-year-wise-and-iterate-through-groups-and-send-each-year-d – BdEngineer Jan 29 '19 at 10:46