15

In Spark 1.6.0 / Scala, is there an opportunity to get collect_list("colC") or collect_set("colC").over(Window.partitionBy("colA").orderBy("colB")?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Dzmitry Haikov
  • 199
  • 1
  • 2
  • 6

2 Answers2

34

Given that you have dataframe as

+----+----+----+
|colA|colB|colC|
+----+----+----+
|1   |1   |23  |
|1   |2   |63  |
|1   |3   |31  |
|2   |1   |32  |
|2   |2   |56  |
+----+----+----+

You can Window functions by doing the following

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
df.withColumn("colD", collect_list("colC").over(Window.partitionBy("colA").orderBy("colB"))).show(false)

Result:

+----+----+----+------------+
|colA|colB|colC|colD        |
+----+----+----+------------+
|1   |1   |23  |[23]        |
|1   |2   |63  |[23, 63]    |
|1   |3   |31  |[23, 63, 31]|
|2   |1   |32  |[32]        |
|2   |2   |56  |[32, 56]    |
+----+----+----+------------+

Similar is the result for collect_set as well. But the order of elements in the final set will not be in order as with collect_list

df.withColumn("colD", collect_set("colC").over(Window.partitionBy("colA").orderBy("colB"))).show(false)
+----+----+----+------------+
|colA|colB|colC|colD        |
+----+----+----+------------+
|1   |1   |23  |[23]        |
|1   |2   |63  |[63, 23]    |
|1   |3   |31  |[63, 31, 23]|
|2   |1   |32  |[32]        |
|2   |2   |56  |[56, 32]    |
+----+----+----+------------+

If you remove orderBy as below

df.withColumn("colD", collect_list("colC").over(Window.partitionBy("colA"))).show(false)

result would be

+----+----+----+------------+
|colA|colB|colC|colD        |
+----+----+----+------------+
|1   |1   |23  |[23, 63, 31]|
|1   |2   |63  |[23, 63, 31]|
|1   |3   |31  |[23, 63, 31]|
|2   |1   |32  |[32, 56]    |
|2   |2   |56  |[32, 56]    |
+----+----+----+------------+

I hope the answer is helpful

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • Has the orderBy effect here? I was told it's a non-deterministic function to be avoided (https://stackoverflow.com/q/33878370/1773841 at the end of the accepted answer) or is it ok for this case since it happens inside the withColumn? – Ignacio Alorre Sep 27 '17 at 12:19
  • I think its safe to do it in above way @IgnacioAlorre. what is non-deterministic function in the above answer ? – Ramesh Maharjan Sep 27 '17 at 12:50
  • I was doing exactly as you do in the answer, but someone point me to the link I added in the previous comment. And they say there, that it may work in local the orderBy(), but not in a distributed enviroment. I´m rather new in Spark, and I am mostly gathering opinions and recommendation, don´t take my word as I an ultimate answer, it is just what I understood from the link I posted you. If I am wrong, please let me know because I would be happy to keep using the partitionBy and orderBy inside the withColumn – Ignacio Alorre Sep 27 '17 at 14:37
  • 2
    I agree that I have seen inconsistency in using orderBy with groupBy. But I haven't seen inconsistency with the window function as partitionBy will partition the dataframe to be grouped into partitions and shuffle data so that each partition would be in one of the distributed node and orderBy would work on that partitioned distributed dataframe/dataset. So I don't think there would be a problem with Window function. :) – Ramesh Maharjan Sep 28 '17 at 03:52
  • what version of Spark are you using for this example? – RudyVerboven Feb 14 '18 at 13:27
  • I am using 2.1.0 and Window function was introduced since 1.4 – Ramesh Maharjan Feb 15 '18 at 01:24
  • 6
    I don't believe this answer will work for version 1.6, since Spark 1.6 does not support using a distinct aggregate function as a window function (like collect_set). And I believe collect_list isn't a supported window function either. – RudyVerboven Feb 15 '18 at 12:42
  • 1
    Is there any way to specify a limit parameter for each window? For example, if we only want top 5 elements (according to orderBy) from each partition on colA? – Zobayer Hasan Apr 05 '18 at 10:05
  • you should try rowsBetween :) @ZobayerHasan – Ramesh Maharjan Apr 05 '18 at 10:56
  • @RameshMaharjan I don't think rowsBetween would actually work for that use case because after you did collect_list, you would have a bunch of lists with 5 elements each but you would no longer be within the orderBy so you wouldn't know which one was the top for its partition, right? Would you call first() on a second iteration of partition and orderBy then? – Adair Apr 27 '18 at 23:28
0

Existing answer is valid, just adding here a different style of writting window functions:

import org.apache.spark.sql.expressions.Window

val wind_user = Window.partitionBy("colA", "colA2").orderBy("colB", "colB2".desc)

df.withColumn("colD_distinct", collect_set($"colC") over wind_user)
.withColumn("colD_historical", collect_list($"colC") over wind_user).show(false)
Ignacio Alorre
  • 7,307
  • 8
  • 57
  • 94