0

I want to create a table where each row is a unique ID and the Place and City column consists of all the places and cities a person visited , ordered by the date of visit , either using Pyspark or Hive.

   df.groupby("ID").agg(F.concat_ws("|",F.collect_list("Place")))

does the concatenation but I am unable to order it by the date. Also for each column I need to keep doing this step separately.

I also tried using windows function as mentioned in this post, (collect_list by preserving order based on another variable) but it trows an error :java.lang.UnsupportedOperationException: 'collect_list(') is not supported in a window operation. I want to :

1- order the concatenated columns in order of the date travelled

2- do this step for multiple columns

Data

| ID | Date | Place | City |

| 1  | 2017 | UK    | Birm |
| 2  | 2014 | US    | LA   |
| 1  | 2018 | SIN   | Sin  |
| 1  | 2019 | MAL   | KL   |
| 2  | 2015 | US    | SF   |
| 3  | 2019 | UK    | Lon  |

Expected

| ID | Place       | City          | 

| 1  |  UK,SIN,MAL |  Birm,Sin,KL  |
| 2  |  US,US      |  LA,SF        |
| 3  |  UK         |  Lon          |
Faliha Zikra
  • 421
  • 5
  • 10
  • Possible duplicate of [collect\_list by preserving order based on another variable](https://stackoverflow.com/questions/46580253/collect-list-by-preserving-order-based-on-another-variable) – Shaido Jun 26 '19 at 01:44
  • Thanks. The first solution can't be used for multiple columns and the solution using windows functions throws an error :java.lang.UnsupportedOperationException: 'collect_list('Place) is not supported in a window operation. – Faliha Zikra Jun 26 '19 at 21:23
  • What version of Spark are you using? Window functions won't work with too old versions (see e.g.: https://stackoverflow.com/questions/46628459/spark-unsupportedoperationexception-collect-list-is-not-supported-in-a-window). – Shaido Jun 27 '19 at 01:04

1 Answers1

4
>>> from pyspark.sql import functions as F
>>> from pyspark.sql import Window
>>> w = Window.partitionBy('ID').orderBy('Date')

//Input data frame
>>> df.show()
+---+----+-----+----+
| ID|Date|Place|City|
+---+----+-----+----+
|  1|2017|   UK|Birm|
|  2|2014|   US|  LA|
|  1|2018|  SIN| Sin|
|  1|2019|  MAL|  KL|
|  2|2015|   US|  SF|
|  3|2019|   UK| Lon|
+---+----+-----+----+

>>> df2 = df.withColumn("Place",F.collect_list("Place").over(w)).withColumn("City",F.collect_list("City").over(w)).groupBy("ID").agg(F.max("Place").alias("Place"), F.max("City").alias("City"))

 //Data value in List
>>> df2.show()
+---+--------------+---------------+
| ID|         Place|           City|
+---+--------------+---------------+
|  3|          [UK]|          [Lon]|
|  1|[UK, SIN, MAL]|[Birm, Sin, KL]|
|  2|      [US, US]|       [LA, SF]|
+---+--------------+---------------+


//If you want value in String 
>>> df2.withColumn("Place", F.concat_ws(" ", "Place")).withColumn("City", F.concat_ws(" ", "City")).show()
+---+----------+-----------+
| ID|     Place|       City|
+---+----------+-----------+
|  3|        UK|        Lon|
|  1|UK SIN MAL|Birm Sin KL|
|  2|     US US|      LA SF|
+---+----------+-----------+
Nikhil Suthar
  • 2,289
  • 1
  • 6
  • 24
  • I am using order by in windowing that will handle your order per ID. Please check w in my code(3rd line). – Nikhil Suthar Jun 26 '19 at 16:35
  • Throws an error :ava.lang.UnsupportedOperationException: 'collect_list('Place) is not supported in a window operation. – Faliha Zikra Jun 26 '19 at 21:22
  • which Spark version are you using? Have you import all packages that I have mentioned? Are you using collect_list("Place") or collect_list(Place)? – Nikhil Suthar Jun 27 '19 at 05:30