5

I have a Spark DataFrame consisting of three columns:

 id | col1 | col2 
-----------------
 x  |  p1  |  a1  
-----------------
 x  |  p2  |  b1
-----------------
 y  |  p2  |  b2
-----------------
 y  |  p2  |  b3
-----------------
 y  |  p3  |  c1

After applying df.groupBy("id").pivot("col1").agg(collect_list("col2")) I am getting the following dataframe (aggDF):

+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|[a1]|    [b1]|  []|
|  y|  []|[b2, b3]|[c1]|
+---+----+--------+----+

Then I find the name of columns except the id column.

val cols = aggDF.columns.filter(x => x != "id")

After that I am using cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null)))) to replace empty array with null. The performance of this code becomes poor when the number of columns increases. Additionally, I have the name of string columns val stringColumns = Array("p1","p3"). I want to get the following final dataframe:

+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x| a1 |    [b1]|null|
|  y|null|[b2, b3]| c1 |
+---+----+--------+----+

Is there any better solution to this problem in order to achieve the final dataframe?

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Abir Chokraborty
  • 1,695
  • 4
  • 15
  • 23
  • Sorry, I completely forgot to mention in my question that I have to deal with string columns also. – Abir Chokraborty Dec 25 '19 at 20:52
  • last point, your extra request makes little sense. – thebluephantom Dec 25 '19 at 22:55
  • When I was dealing with a large dataset I came to know that some of the columns are string type. But if I keep them as an array type then querying against those array types will be time-consuming. – Abir Chokraborty Dec 25 '19 at 23:04
  • I was able to use your approach with string and array columns together using a 35 GB dataset which has more than 105 columns but could see any noticeable performance improvement. The cluster setup was: 6 nodes having 64 GB RAM and 8 cores each and the spark version was 2.4.4. – Abir Chokraborty Dec 26 '19 at 22:37
  • 35gb not really that much either. – thebluephantom Dec 26 '19 at 23:21
  • Yeah, you can improve the title, and I am not using S3. Dataset is uploaded to HDFS. I ran my previous code and the code you suggested 3 times keeping the same default settings for performance comparison, but could not see any noticeable performance improvement. – Abir Chokraborty Dec 26 '19 at 23:42
  • The conclusion may be that 244 version has improved things. Your question should show those stats and settings and stuff. You can also point out the advice did not help. I will do some testing. – thebluephantom Dec 26 '19 at 23:47
  • Seetings: `spark-submit --class myclass --master yarn --deploy-mode client --conf spark.sql.parquet.filterPushdown=true --driver-memory 4G --num-executors 14 --executor-memory 19G --executor-cores 2 myjar.jar`. I think the problem is groupBy/pivot/agg. I have to find an alternative solution. – Abir Chokraborty Dec 27 '19 at 01:06
  • I do not think you can. – thebluephantom Dec 27 '19 at 01:08
  • @AbirChokraborty the problem occurs because of `pivot` or `foldLeft` have you figured out that part? Also what is the cardinality of `col1`? – abiratsis Dec 29 '19 at 19:02
  • I ran a number of tests and found that performance was largely equal. I wll run a simulation on an EMR cluster, but it appears ro contradict the things that I refer to. – thebluephantom Dec 29 '19 at 21:13
  • @Alexandros Biratsis the cardinality of `col1` is 106. @thebluephantom the number of columns is only 100+, that's why I couldn't notice any performance improvement. It might have an impact on a large number of columns. – Abir Chokraborty Dec 29 '19 at 23:28
  • I tried with a 1000 cols. But as I am on holidays in transit i just used community edition databricks. I will try with more rows and on an emr cluster soon, but 100 is liw. interesting – thebluephantom Dec 30 '19 at 00:00
  • @AbirChokraborty did you try to replace `pivot` with `inner join` and intermediate `persist` for the performance? Here is an example with pyspark https://stackoverflow.com/questions/55167913/pyspark-dataframe-cast-two-columns-into-new-column-of-tuples-based-value-of-a-th/55265538#55265538 – abiratsis Dec 30 '19 at 14:39
  • In my view one should use the pivot unless one knows the pivot values in advance. – thebluephantom Dec 30 '19 at 15:14
  • @Alexandros Biratsis could you provide a complete answer to my problem so that I can test on my dataset. @thebluephantom I also used distinct values of `col1` as pivot values, but couldn't find noticeable performance improvement, moreover, finding distinct values from `col1` is causing extra overhead. – Abir Chokraborty Dec 30 '19 at 20:23
  • Well I found little issue with 10000 distinct col1s, it seems to contradict the writings and earlier findings. wait on AB and I will run on a bigger set. Confusing. pivot i think should be used – thebluephantom Dec 30 '19 at 21:50
  • Interested to know what comes out of it. – thebluephantom Jan 02 '20 at 16:34
  • So what did you conclude? – thebluephantom Jan 05 '20 at 22:36

2 Answers2

1

You current code pays 2 performance costs as structured:

  • As mentioned by Alexandros, you pay 1 catalyst analysis per DataFrame transform so if you loop other a few hundreds or thousands columns, you'll notice some time spent on the driver before the job is actually submitted. If this is a critical issue for you, you can use a single select statement instead of your foldLeft on withColumns but this won't really change a lot the execution time because of the next point

  • When you use an expression such as when().otherwise() on columns in what can be optimized as a single select statement, the code generator will produce a single large method processing all the columns. If you have more than a couple hundred columns, it's likely that the resulting method won't be JIT-compiled by default by the JVM, resulting in very slow execution performance (max JIT-able method is 8k bytecode in Hotspot).

You can detect if you hit the second issue by inspecting the executor logs and check if you see a WARNING on a too large method that can't be JITed.

How to try and solve this ?

1 - Changing the logic

You can filter the empty cells before the pivot by using a window transform

import org.apache.spark.sql.expressions.Window

val finalDf = df
  .withColumn("count", count('col2) over Window.partitionBy('id,'col1)) 
  .filter('count > 0)
  .groupBy("id").pivot("col1").agg(collect_list("col2"))

This may or may not be faster depending on actual dataset as the pivot also generates a large select statement expression by itself so it may hit the large method threshold if you encounter more than approximately 500 values for col1. You may want to combine this with option 2 as well.

2 - Try and finesse the JVM

You can add an extraJavaOption on your executors to ask the JVM to try and JIT hot methods larger than 8k.

For example, add the option --conf "spark.executor.extraJavaOptions=-XX:-DontCompileHugeMethods" on your spark-submit and see how it impacts the pivot execution time.

It's difficult to guarantee a substantial speed increase without more details on your real dataset but it's definitely worth a shot.

rluta
  • 6,717
  • 1
  • 19
  • 21
  • Your second point, applies to varargs? Did not see that in my 1sf reference. – thebluephantom Jan 02 '20 at 22:06
  • @rluta run time increases after adding `withColumn("count", count('col2) over Window.partitionBy('id,'col1)) .filter('count > 0)`. – Abir Chokraborty Jan 03 '20 at 00:10
  • @abir So you should you try and the additional JVM options on the executors (and driver if you're running in local mode) – rluta Jan 03 '20 at 08:27
  • @bluephantom I'm not sure I understand your comment on JIT scope. JIT is the just-in-time compilation of bytecode to native code done by the JVM on frequently accessed methods. – rluta Jan 03 '20 at 08:29
  • neither am I. all scala goes to jaca and typically runs in a Big D framework, so what are you stating exactly? – thebluephantom Jan 03 '20 at 09:21
  • https://stackoverflow.com/questions/1326071/is-java-a-compiled-or-an-interpreted-programming-language should not be an issue, surely... ... – thebluephantom Jan 03 '20 at 09:47
  • Well it's an issue because the spark code generator will generate large methods in some cases such as a pivot on many columns. I'll add some performance data points to the answer if you're interested – rluta Jan 03 '20 at 22:21
0

If you look at https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 then you see that withColumn with a foldLeft has known performance issues. Select is an alternative, as shown below - using varargs.

Not convinced collect_list is an issue. 1st set of logic I kept as well. pivot kicks off a Job to get distinct values for pivoting. It is an accepted approach imo. Trying to roll your own seems pointless to me, but the other answers may prove me wrong or Spark 2.4 has been improved.

import spark.implicits._ 
import org.apache.spark.sql.functions._

// Your code & assumig id is only col of interest as in THIS question. More elegant than 1st posting.
val df = Seq( ("x","p1","a1"), ("x","p2","b1"), ("y","p2","b2"), ("y","p2","b3"), ("y","p3","c1")).toDF("id", "col1", "col2")
val aggDF = df.groupBy("id").pivot("col1").agg(collect_list("col2")) 
//aggDF.show(false)

val colsToSelect = aggDF.columns  // All in this case, 1st col id handled by head & tail

val aggDF2 = aggDF.select((col(colsToSelect.head) +: colsToSelect.tail.map
    (col => when(size(aggDF(col)) === 0,lit(null)).otherwise(aggDF(col)).as(s"$col"))):_*)
aggDF2.show(false)

returns:

+---+----+--------+----+
|id |p1  |p2      |p3  |
+---+----+--------+----+
|x  |[a1]|[b1]    |null|
|y  |null|[b2, b3]|[c1]|
+---+----+--------+----+

Also a nice read BTW: https://lansalo.com/2018/05/13/spark-how-to-add-multiple-columns-in-dataframes-and-how-not-to/. The effects become more noticable with a higher number of columns. At the end a reader makes a relevant point.

I think that performance is better with select approach when higher number of columns prevail.

UPD: Over the holidays I trialed both approaches with Spark 2.4.x with little observable difference up to 1000 columns. That has puzzled me.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • Is there a way to use `if else` condition inside `{ col => ( when(size(aggDF2(col)) === 0,lit(null)).otherwise(aggDF2(col))).as(s"$col") } ` for `id` and other string type columns lets say `p1` and `p3` are listed as an array of string in order to avoid creating additional dataframe like `aggDF2`. – Abir Chokraborty Dec 24 '19 at 23:32
  • NO, there is not. I was fooled by that myself as I had forgotten that IF does not work for a data frame, only WHEN... You could do an UDF but performance is an issue. I suspect with a WHEN you can add, but I leave that to you. The major point is that of the article on foldLeft icw withColumn... – thebluephantom Dec 24 '19 at 23:43
  • Lazy evaluation, no additional DF created in this solution, that's the whole point. – thebluephantom Dec 24 '19 at 23:52
  • so the -1 er, reveal why – thebluephantom Dec 27 '19 at 18:44