After performing an operation on a data frame for several columns I want to "concatenate / join" all the results on the original frame.
In pandas
this could be achieved via the index pd.concat(axis=1)
. How should I port this to spark?
It would be nice if it is not required to list all the columns as join columns.
val input = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
And a computation defined like
def handlePercentage(col:String, df: DataFrame, inputToDrop: Seq[String] = inputToDrop,
inputToBias: Seq[String] = inputToBias, target:String = "TARGET"): DataFrame = {
val pre1_1 = df.groupBy(col).agg(mean(target).alias("pre_" + col))
val pre2_1 = df.groupBy(col, target).agg(count("*") / df.filter(df(target) === 1).count alias ("pre2_" + col))
df.join(pre1_1, Seq(col)).join(pre2_1, Seq(col, target))
}
The output for a function call is shown below.
handlePercentage("col1", input).join(handlePercentage("col2", input), Seq("col1", "col2", "TARGET ", "col3TooMany", "col4")).show
+----+----+----------+-----------+----+------------------+---------+------------------+---------+
|col1|col2|TARGET |col3TooMany|col4| pre_col1|pre2_col1| pre_col2|pre2_col2|
+----+----+----------+-----------+----+------------------+---------+------------------+---------+
| A| B| 0| C| D|0.6666666666666666| 0.25|0.6666666666666666| 0.25|
| c| B| 1| C| D| 1.0| 0.25|0.6666666666666666| 0.5|
| A| d| 1| t| k|0.6666666666666666| 0.5| 1.0| 0.25|
| d| c| 1| C| D|0.3333333333333333| 0.25| 1.0| 0.25|
| d| a| 0| jkl| d|0.3333333333333333| 0.5| 0.0| 0.25|
| d| g| 0| C| D|0.3333333333333333| 0.5| 0.0| 0.25|
| A| B| 1| C| D|0.6666666666666666| 0.5|0.6666666666666666| 0.5|
+----+----+----------+-----------+----+------------------+---------+------------------+---------+
This operation is rather slow when I join around 40 columns. How could I concatenate them with an index? Or perform this operation quicker in a different way? I believe that my join (which is using a lot of attributes) is rather slow. How to zip two(or more) DataFrame in Spark is a similar problem, but there the columns are calculated up front and the whole schema is merged whereas I want to obtain only the current column.
I noticed that spark tends to waste a lot of time in the garbage collection.