36

I have a Spark 2.0 dataframe example with the following structure:

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.

It contains 24 entries for each id (one for each hour of the day) and is ordered by id, hour using the orderBy function.

I have created an Aggregator groupConcat:

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
    override def zero: String = ""

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

    override def merge(b1: String, b2: String) = b1 + b2

    override def finish(b: String) = b.substring(1)

    override def bufferEncoder: Encoder[String] = Encoders.STRING

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }.toColumn

It helps me concatenate columns into strings to obtain this final dataframe:

id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.

My question is, if I do example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"), does that guarantee that the hourly counts will be ordered correctly in their respective buckets?

I read that this is not necessarily the case for RDDs (see Spark sort by key and then group by to get ordered iterable?), but maybe it's different for DataFrames ?

If not, how can I work around it ?

ZygD
  • 22,092
  • 39
  • 79
  • 102
Ana Todor
  • 781
  • 1
  • 6
  • 15

6 Answers6

36

groupBy after orderBy doesn't maintain order, as others have pointed out. What you want to do is use a Window function, partitioned on id and ordered by hours. You can collect_list over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself in the list, the second hour will have 2 elements in the list, and so on).

Complete example code:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val data = Seq(
    ( "id1", 0, 12),
    ("id1", 1, 55),
    ("id1", 23, 44),
    ("id2", 0, 12),
    ("id2", 1, 89),
    ("id2", 23, 34)
).toDF("id", "hour", "count")

val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}

data.withColumn(
    "collected",
    collect_list($"count").over(
        Window.partitionBy("id").orderBy("hour")
    )
)
.groupBy("id")
.agg(max($"collected").as("collected"))
.withColumn("hourly_count", mergeList($"collected"))
.select("id", "hourly_count")
.show

This keeps us within the DataFrame world. I also simplified the UDF code the OP was using.

Output:

+---+------------+
| id|hourly_count|
+---+------------+
|id1|    12:55:44|
|id2|    12:89:34|
+---+------------+
Nick Chammas
  • 11,843
  • 8
  • 56
  • 115
Adair
  • 1,697
  • 18
  • 22
  • this should work! +1 for pointing with example. one tip, you can use concat_ws(":", $"collected") in favor of using native spark functions over udf – codehammer Feb 06 '23 at 19:21
9

If you want to work around the implementation in Java (Scala and Python should be similar):

example.orderBy("hour")
    .groupBy("id")
    .agg(functions.sort_array(
      functions.collect_list( 
        functions.struct(dataRow.col("hour"),
                         dataRow.col("count"))),false)
    .as("hourly_count"));
Oli
  • 9,766
  • 5
  • 25
  • 46
Shyam
  • 117
  • 1
  • 2
5

I have a case where the order is not always kept: sometimes yes, mostly no.

My dataframe has 200 partitions running on Spark 1.6

df_group_sort = data.orderBy(times).groupBy(group_key).agg(
                                                  F.sort_array(F.collect_list(times)),
                                                  F.collect_list(times)
                                                           )

to check the ordering I compare the return values of

F.sort_array(F.collect_list(times))

and

F.collect_list(times)

giving e.g. (left: sort_array(collect_list()); right: collect_list())

2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000

The left column is always sorted, while the right column only consists of sorted blocks. For different executions of take(), the order of the blocks in the right column is different.

Kat
  • 51
  • 1
  • 2
  • The accepted answer stated that you need to sort by both the column you want sorted as well as the columns you group with, i.e. `orderBy(times, group_key).groupBy(group_key)`. Did you try that? – Shaido Feb 06 '18 at 02:12
1

order may or may not be the same, depending on number of partitions and the distribution of data. We can solve using rdd itself.

For example::

I saved the below sample data in a file and loaded it in hdfs.

1,type1,300
2,type1,100
3,type2,400
4,type2,500
5,type1,400
6,type3,560
7,type2,200
8,type3,800

and executed the below command:

sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()

output:

Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))

That is, we grouped the data by type, thereafter sorted by price, and the concatenated the ids with "~" as separator. The above command can be broken as below:

val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)

val groupedData=validData.groupBy(_(1))  //group data rdds

val sortedJoinedData=groupedData.mapValues(x=>{
   val list=x.toList
   val sortedList=list.sortBy(_(2))
   val idOnlyList=sortedList.map(_(0))
   idOnlyList.mkString("~")
}
)
sortedJoinedData.collect()

we can then take a particular group by using the command

sortedJoinedData.filter(_._1=="type1").collect()

output:

Array[(String, String)] = Array((type1,2~1~5))
Nick
  • 1,178
  • 3
  • 24
  • 36
Ashish
  • 31
  • 4
0

No, sorting within groupByKey will not necessarily be maintained but this is notoriously difficult to reproduce in memory on one node. As was previously said, the most typical way this happens is when things need to be repartitioned for the groupByKey to take place. I managed to reproduce this by manually doing a repartition after the sort. Then I passed the results into the groupByKey.

case class Numbered(num:Int, group:Int, otherData:Int)

// configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number 

val v =
  (1 to 100000)
    // Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won't be monotonic, not sure if needed)
    .map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS()
    // Be sure they are stored in a small number of partitions
    .repartition(2)
    .sort($"num")
    // Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order.
    .repartition(200)
    .groupByKey(_.group)
    .mapGroups {
      case (g, nums) =>
        nums             // all you need is .sortBy(_.num) here to fix the problem          
          .map(_.num)
          .mkString("~")
    }
    .collect()

// Walk through the concatenated strings. If any number ahead 
// is smaller than the number before it, you know that something
// is out of order.
v.zipWithIndex.map { case (r, i) =>
  r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) =>
    if (next < prev) {
      println(s"*** Next: ${next} less then ${prev} for dataset ${i + 1} ***")
    }
    next
  }
}
-9

The short answer is Yes, the hourly counts will maintain the same order.

To generalise, it's important that you sort before you group. Also the sort must be the same as the group + the column for which you actually want the sorting.

An example would be like:

employees
    .sort("company_id", "department_id", "employee_role")
    .groupBy("company_id", "department_id")
    .agg(Aggregators.groupConcat(":", 2) as "count_per_role")
Interfector
  • 1,868
  • 1
  • 23
  • 43
  • 2
    Do you have any references stating that groupBy maintains the ordering? I could not find anything in the official docs – Raphael Roth Jun 07 '17 at 05:02
  • I don't have official docs, but I have this article that explains a little bit better the mechanism https://bzhangusc.wordpress.com/2015/05/28/groupby-on-dataframe-is-not-the-groupby-on-rdd/ .The comments are interesting too. – Interfector Jun 07 '17 at 08:06
  • 7
    Interestingly, even Sean Owen himself states that the ordering may be not preserved (https://issues.apache.org/jira/browse/SPARK-16207?focusedCommentId=15356725&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15356725) – Raphael Roth Feb 07 '18 at 08:05
  • Has anyone read the article and comments I added on June 7, 2017? – Interfector Oct 07 '18 at 18:16