2

Using Spark DataFrame, eg.

myDf
  .filter(col("timestamp").gt(15000))
  .groupBy("groupingKey")
  .agg(collect_list("aDoubleValue"))

I want the collect_list to return the result, but ordered according to "timestamp". Ia I want the GroupBy results to be sorted by another column.

I know there are other issues about it, but I couldn't find a reliable answer with DataFrame.

How can this be done? (the answer: sort the myDf by "timestamp" before the gorupBy is not good)

Thanks.

Rolintocour
  • 2,934
  • 4
  • 32
  • 63
  • myDf.filter(col("timestamp").gt(15000)).repartition(1).sort(col("timestamp")).groupBy("groupingKey").agg(collect_list("aDoubleValue")).. i am thinking this may serve the purpose! if you could able to possibly do this. – sangam.gavini Oct 04 '19 at 16:32
  • I don't this this is scalable, as this will gather all data on a single executor, won't it? – Rolintocour Oct 07 '19 at 07:43

2 Answers2

3

the only way is to use collect_list on a struct where timestamp is the first element, then project back on aDoubleValue in a second step:

myDf
  .filter(col("timestamp").gt(15000))
  .groupBy("groupingKey")
  // collect_list and sort
  .agg(sort_array(collect_list(struct("timestamp","aDoubleValue"))).as("data"))
  .withColumn("data",col("data.aDoubleValue")) // project array back to aDoubleValue
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • Hi thanks for the solution, this works how this is a little bit hard to use a temporary structure, as I want to use many fields into the .agg() . Is there no other easier way? – Rolintocour Oct 07 '19 at 07:42
  • I don't think this is the _only_ way. There is an [interesting answer here](https://stackoverflow.com/a/50012355/877069) that accomplishes this using a window. – Nick Chammas Aug 08 '22 at 15:22
0

An exmaple:

create table test_agg(guid string, ts timestamp, page_id int) using parquet;
insert into test_agg values('A', timestamp '2021-01-01 19:30:01', 93453);
insert into test_agg values('A', timestamp '2021-01-01 19:30:03', 54434);
insert into test_agg values('A', timestamp '2021-01-01 19:30:05', 878934);
insert into test_agg values('B', timestamp '2021-01-01 00:00:02', 34223);
insert into test_agg values('B', timestamp '2021-01-01 00:10:20', 340094);
insert into test_agg values('B', timestamp '2021-01-01 00:30:12', 134355);
SELECT guid,
       array_sort(collect_list((ts, page_id))).page_id as pageIds
FROM test_agg
GROUP BY guid;
+-------+------------------------+--+
| guid  |        pageIds         |
+-------+------------------------+--+
| A     | [93453,54434,878934]   |
| B     | [34223,340094,134355]  |
+-------+------------------------+--+

yuming
  • 11
  • 1