1

After performing an operation on a data frame for several columns I want to "concatenate / join" all the results on the original frame. In pandas this could be achieved via the index pd.concat(axis=1). How should I port this to spark? It would be nice if it is not required to list all the columns as join columns.

val input = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

And a computation defined like

def handlePercentage(col:String, df: DataFrame, inputToDrop: Seq[String] = inputToDrop,
                 inputToBias: Seq[String] = inputToBias, target:String = "TARGET"): DataFrame = {
    val pre1_1 = df.groupBy(col).agg(mean(target).alias("pre_" + col))
    val pre2_1 = df.groupBy(col, target).agg(count("*") / df.filter(df(target) === 1).count alias ("pre2_" + col))
    df.join(pre1_1, Seq(col)).join(pre2_1, Seq(col, target))
  }

The output for a function call is shown below.

handlePercentage("col1", input).join(handlePercentage("col2", input), Seq("col1", "col2", "TARGET    ", "col3TooMany", "col4")).show
+----+----+----------+-----------+----+------------------+---------+------------------+---------+
|col1|col2|TARGET    |col3TooMany|col4|          pre_col1|pre2_col1|          pre_col2|pre2_col2|
+----+----+----------+-----------+----+------------------+---------+------------------+---------+
|   A|   B|         0|          C|   D|0.6666666666666666|     0.25|0.6666666666666666|     0.25|
|   c|   B|         1|          C|   D|               1.0|     0.25|0.6666666666666666|      0.5|
|   A|   d|         1|          t|   k|0.6666666666666666|      0.5|               1.0|     0.25|
|   d|   c|         1|          C|   D|0.3333333333333333|     0.25|               1.0|     0.25|
|   d|   a|         0|        jkl|   d|0.3333333333333333|      0.5|               0.0|     0.25|
|   d|   g|         0|          C|   D|0.3333333333333333|      0.5|               0.0|     0.25|
|   A|   B|         1|          C|   D|0.6666666666666666|      0.5|0.6666666666666666|      0.5|
+----+----+----------+-----------+----+------------------+---------+------------------+---------+

This operation is rather slow when I join around 40 columns. How could I concatenate them with an index? Or perform this operation quicker in a different way? I believe that my join (which is using a lot of attributes) is rather slow. How to zip two(or more) DataFrame in Spark is a similar problem, but there the columns are calculated up front and the whole schema is merged whereas I want to obtain only the current column.

I noticed that spark tends to waste a lot of time in the garbage collection.

Community
  • 1
  • 1
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • Seeing that you are doing aggregations in your computation I would assume data size gets significantly smaller. You could make broadcasts out of these results and then add these values using a lookup UDF. – LiMuBei Dec 05 '16 at 14:09
  • I could hold the dataset in memory on a single node. this will not be more than a couple of GB and then join afterward with the really big data. Would this open up some improvements? I tried to cache my data frame, but this did not help. – Georg Heiler Dec 06 '16 at 08:24
  • @LiMuBei using spark-sql this mostly seems to result in broadcast joins already. – Georg Heiler Dec 15 '16 at 17:34

0 Answers0