4

I have an operation in spark which should be performed for several columns in a data frame. Generally, there are 2 possibilities to specify such operations

  • hardcode
handleBias("bar", df)
  .join(handleBias("baz", df), df.columns)
  .drop(columnsToDrop: _*).show
  • dynamically generate them from a list of colnames
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode) {
  if (isFirst) {
    res = handleBias(col, res)
    isFirst = false
  } else {
    res = handleBias(col, res)
  }
}
res.drop(columnsToDrop: _*).show

The problem is that the DAG generated dynamically is different and the runtime of the dynamic solution increases far more when more columns are used than for the hard coded operations.

I am curious how to combine the elegance of the dynamic construction with quick execution times.

Here is the comparison for the DAGs of the example code complexity comparison

For around 80 columns this results in a rather nice graph for the hard-coded variant hardCoded And a very big, probably less parallelizable and way slower DAG for the dynamically constructed query. hugeMessDynamic

A current version of spark (2.0.2) was used with DataFrames and spark-sql

Code to complete the minimal example:

def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
  val pre1_1 = df
    .filter(df(target) === 1)
    .groupBy(col, target)
    .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
    .drop(target)

  val pre2_1 = df
    .groupBy(col)
    .agg(mean(target).alias("pre2_" + col))

  df
    .join(pre1_1, Seq(col), "left")
    .join(pre2_1, Seq(col), "left")
    .na.fill(0)
}

edit

Running your task with foldleft generates a linear DAG foldleft and hard coding the function for all the columns results in hardcoded

Both are a lot better than my original DAGs but still, the hardcoded variant looks better to me. String concatenating a SQL statement in spark could allow me to dynamically generate the hard coded execution graph but that seems rather ugly. Do you see any other option?

zero323
  • 322,348
  • 103
  • 959
  • 935
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • I think the problem is that your "handleBias" function is very complex and you need to run it for multiple columns. Even if you do it hardcoded for many columns your DAG will be big, so maybe the problem is not applying "dynamically", but applying to many columns. So, if you can think of a way to adapt your function to process multiple columns at once, it may help considerably. – Daniel de Paula Dec 15 '16 at 19:07
  • @DanieldePaula do you see any way to phrase this method in a simpler way so that less computation power is required? – Georg Heiler Dec 15 '16 at 19:09
  • Unfortunately, I don't have a lot of time to think about it right now, I'm sorry. If by tomorrow you haven't found a solution, I will take a look at it. – Daniel de Paula Dec 15 '16 at 19:10
  • @DanieldePaula I could not figure out a simplification so far. Wondering if caching could be improved? Currently, I use caching before calling this function on the bigger dataset. – Georg Heiler Dec 16 '16 at 07:25
  • @DanieldePaula do you think I could get rid of some of these joins by just "concatenating" the columns? http://stackoverflow.com/questions/32882529/how-to-zip-twoor-more-dataframe-in-spark because a concat operation (which could be performed in parallel) is what I need here. – Georg Heiler Dec 16 '16 at 11:31
  • You can get rid of the joins if you use window functions, as I show in my answer. The concept of concat you want does not make sense in Spark (as you can see in the link you sent). – Daniel de Paula Dec 16 '16 at 11:37

1 Answers1

3

Edit 1: Removed one window function from handleBias and transformed it into a broadcast join.

Edit 2: Changed replacing strategy for null values.

I have some suggestions that can improve your code. First, for the "handleBias" function, I would do it using window functions and "withColumn" calls, avoiding the joins:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

def handleBias(df: DataFrame, colName: String, target: String = "foo") = {
  val w1 = Window.partitionBy(colName)
  val w2 = Window.partitionBy(colName, target)
  val result = df
    .withColumn("cnt_group", count("*").over(w2))
    .withColumn("pre2_" + colName, mean(target).over(w1))
    .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
    .drop("cnt_group")
  result
}

Then, for calling it for multiple columns, I would recommend using foldLeft which is the "functional" approach for this kind of problem:

val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")

val columnsToDrop = Seq("baz")
val columnsToCode = Seq("bar", "baz")
val target = "foo"

val targetCounts = df.filter(df(target) === 1).groupBy(target)
  .agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) {
  (currentDF, colName) => handleBias(currentDF, colName)
}

result.drop(columnsToDrop:_*).show()

+---+--------------------+------------------+--------+------------------+--------+
|foo|                 bar|           pre_baz|pre2_baz|           pre_bar|pre2_bar|
+---+--------------------+------------------+--------+------------------+--------+
|  2|       noValidFormat|               0.0|     2.0|               0.0|     2.0|
|  1|lastAssumingSameDate|0.3333333333333333|     1.0|0.3333333333333333|     1.0|
|  1|              second|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
|  1|               first|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
+---+--------------------+------------------+--------+------------------+--------+

I'm not sure it will improve a lot your DAG, but at least it makes the code cleaner and more readable.

Reference:

Daniel de Paula
  • 17,362
  • 9
  • 71
  • 72
  • 1
    thanks a lot for this great answer. I still need to test it on bigger data. Please see the edit / the 2 different DAGs generated by your code for hardcoding and foldleft operation. Why aren't these "the same"? – Georg Heiler Dec 16 '16 at 14:15
  • @GeorgHeiler they are different because your hardcoded version uses joins, which usually is worse. The linear DAG means there are no joins involved, and I think it looks better than the other one. After you try with more data, please let me know which is faster – Daniel de Paula Dec 16 '16 at 15:54
  • `.withColumn("pre_" + colName, coalesce(col("cnt_group") / col("cnt_foo_eq_1"), lit(0D)))` is not what I want to achieve, rather than a substitution of 0 I want to replace all the null values with the respective value of the class==1 so that e.g. for A&foo=1 which is 0.5 is used as an replacement for all A&foo=0 which are null. – Georg Heiler Dec 18 '16 at 15:47
  • @GeorgHeiler Instead of `lit(0D)` you can put a reference to the column you want, like `coalesce(..., col("foo"))` – Daniel de Paula Dec 18 '16 at 16:01
  • sure, but as far as I see, this value is not stored in a column, e.g. would require an additional join which I should prevent? `coalesce(col("cnt_group") / col("cnt_foo_eq_1")` cant be repeated as that returns the null. – Georg Heiler Dec 18 '16 at 16:55
  • The good thing about your solution is that it does not crash with out of memory exception. For the hard coded joins this is the case. The bad thing is it does not finish either. So far I could only run it successfully with very few columns (2). What I observe is that the leftfold loop runs through, the transform schema is executed really quickly but then only a single core at 100% is used to continue computation. The strange thing is, that only in the case of 2 columns a task is started to persist to parquet. So still the computation is too slow to be useful. – Georg Heiler Dec 19 '16 at 09:20
  • Is there a possibility to compute all columns separately and parallel? – Georg Heiler Dec 19 '16 at 09:23
  • @GeorgHeiler The parallelism in spark is row-wise, not column-wise, so I'm afraid it won't be parallel because spark needs at least one shuffle per column in this case. – Daniel de Paula Dec 19 '16 at 09:48
  • @GeorgHeiler Regarding the problems you having, I think I may know the reason. In your "target" column, are there many different values with few rows each, or few different values with a lot of rows containing each value? – Daniel de Paula Dec 19 '16 at 09:49
  • Target consists of the class labels for ML, in my case these are binary e.g. 0 and 1. – Georg Heiler Dec 19 '16 at 09:51
  • But couldn't the aggregates e.g. window1 and window2 in your solution be computer on their own (all in parallel) and the result could be sort of joined in a single select statement? – Georg Heiler Dec 19 '16 at 09:53
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/130925/discussion-between-daniel-de-paula-and-georg-heiler). – Daniel de Paula Dec 19 '16 at 09:54