I have an operation in spark which should be performed for several columns in a data frame. Generally, there are 2 possibilities to specify such operations
- hardcode
handleBias("bar", df)
.join(handleBias("baz", df), df.columns)
.drop(columnsToDrop: _*).show
- dynamically generate them from a list of colnames
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode) {
if (isFirst) {
res = handleBias(col, res)
isFirst = false
} else {
res = handleBias(col, res)
}
}
res.drop(columnsToDrop: _*).show
The problem is that the DAG generated dynamically is different and the runtime of the dynamic solution increases far more when more columns are used than for the hard coded operations.
I am curious how to combine the elegance of the dynamic construction with quick execution times.
Here is the comparison for the DAGs of the example code
For around 80 columns this results in a rather nice graph for the hard-coded variant
And a very big, probably less parallelizable and way slower DAG for the dynamically constructed query.
A current version of spark (2.0.2) was used with DataFrames
and spark-sql
Code to complete the minimal example:
def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
edit
Running your task with foldleft
generates a linear DAG
and hard coding the function for all the columns results in
Both are a lot better than my original DAGs but still, the hardcoded variant looks better to me. String concatenating a SQL statement in spark could allow me to dynamically generate the hard coded execution graph but that seems rather ugly. Do you see any other option?