18

I have a Spark Dataframe and I would like to group the elements by a key and have the results as a sorted list

Currently I am using:

df.groupBy("columnA").agg(collect_list("columnB"))

How do I make the items in the list sorted ascending order?

Community
  • 1
  • 1
user2392965
  • 435
  • 2
  • 4
  • 13
  • 2
    Possible duplicate of [How to sort dataframe in Spark without using Spark SQL ?](http://stackoverflow.com/questions/29741360/how-to-sort-dataframe-in-spark-without-using-spark-sql) – Alberto Bonsanto Aug 01 '16 at 10:53

2 Answers2

32

You could try the function sort_array available in the functions package:

import org.apache.spark.sql.functions._
df.groupBy("columnA").agg(sort_array(collect_list("columnB")))
Daniel de Paula
  • 17,362
  • 9
  • 71
  • 72
20

Just wanted to add another hint to the answer of Daniel de Paula regarding sort_array solution.

If you want to sort elements according to a different column, you can form a struct of two fields:

  • the sort by field
  • the result field

Since structs are sorted field by field, you'll get the order you want, all you need is to get rid of the sort by column in each element of the resulting list.
The same approach can be applied with several sort by columns when needed.

Here's an example that can be run in local spark-shell (use :paste mode):

import org.apache.spark.sql.Row
import spark.implicits._

case class Employee(name: String, department: String, salary: Double)

val employees = Seq(
  Employee("JSMITH", "A", 20.0),
  Employee("AJOHNSON", "A", 650.0),
  Employee("CBAKER", "A", 650.2),
  Employee("TGREEN", "A", 13.0),
  Employee("CHORTON", "B", 111.0),
  Employee("AIVANOV", "B", 233.0),
  Employee("VSMIRNOV", "B", 11.0)
)

val employeesDF = spark.createDataFrame(employees)

val getNames = udf { salaryNames: Seq[Row] =>
  salaryNames.map { case Row(_: Double, name: String) => name }
}

employeesDF
  .groupBy($"department")
  .agg(collect_list(struct($"salary", $"name")).as("salaryNames"))
  .withColumn("namesSortedBySalary", getNames(sort_array($"salaryNames", asc = false)))
  .show(truncate = false)

The result:

+----------+--------------------------------------------------------------------+----------------------------------+
|department|salaryNames                                                         |namesSortedBySalary               |
+----------+--------------------------------------------------------------------+----------------------------------+
|B         |[[111.0, CHORTON], [233.0, AIVANOV], [11.0, VSMIRNOV]]              |[AIVANOV, CHORTON, VSMIRNOV]      |
|A         |[[20.0, JSMITH], [650.0, AJOHNSON], [650.2, CBAKER], [13.0, TGREEN]]|[CBAKER, AJOHNSON, JSMITH, TGREEN]|
+----------+--------------------------------------------------------------------+----------------------------------+
GoodDok
  • 1,770
  • 13
  • 28
  • 1
    I think you can avoid the UDF. `expr("transform(array_col, x -> x.name)")` – Wassim Maaoui Jan 11 '21 at 17:15
  • [Here](https://stackoverflow.com/a/61651832/4335600) is a similar solution that uses a struct to avoid needing a UDF. Written in pyspark too. – bsauce Oct 27 '21 at 14:34