0

I have the following dataframe (df_parquet):

DataFrame[id: bigint, date: timestamp, consumption: decimal(38,18)]

I intend to get sorted lists of dates and consumptions using collect_list, just as stated in this post: collect_list by preserving order based on another variable

I am following the last approach (https://stackoverflow.com/a/49246162/11841618), which is the one i think its more efficient.

So instead of just calling repartition with the default number of partitions (200) i call it with 500, and i sort within partitions by id and date, not just by date (in order to make the groupBy more efficient, or so i hope). The thing is that once per partition (on only one id per partition, and it seems to be a random id) i get the first item of a list in the last place.

Any clue on what is going on? The rest of ids are well sorted in its arrays, so I thing there is something going on with the way groupBy or collect_list behave inside each partition.

I verified its not the first or last id on a partition the one that behaves differently by getting the partition id and checking if the same groupBy + collect_list combination fails on one of those values, so it seems it's random.

Youc can check my code if you want, its pretty simple.


    ordered_df = df_parquet.repartition(500, 
    'id').sortWithinPartitions(['id', 'date'])

    grouped_df =  ordered_df.groupby("id").agg(F.collect_list("date").alias('date'), 
    F.collect_list('consumption').alias('consumption'))

And the code use to test it (comparing the first and last value, the first should be older, but on 500 cases it is not):


    test = grouped_df.filter(F.size('date') > 
    1).select('id', (F.col('date').getItem(0) > 
    F.col('date').getItem(F.size('date') - 1)).alias('test'), 
    F.array([F.col('fecha').getItem(0), 
                      F.col('date').getItem(F.size('date') - 
    1)]).alias('see')).filter(F.col('test'))

    test.show(5, 100)

    test.count()

And the results:

+-----+----+------------------------------------------+
|   id|test|                                       see|
+-----+----+------------------------------------------+
|89727|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76325|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|80115|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|89781|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76411|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
+-----+----+------------------------------------------+
only showing top 5 rows

500

While its expected to be an empty dataframe, as all the arrays should be sorted for all the ids.

kubote
  • 86
  • 2
  • 6

2 Answers2

0

Ok, the question is still unsolved, but I found an easy workaround, just in case somebody gets stuck cause of this same issue:

The point is to invert the first and last places of the arrays. On the date array this can be done by sorting with the array_sort function introduced in spark 2.4. To perform the reordering on the consumption array we need to use an udf.

invert_last = F.udf(lambda vector: [vector[-1]] + vector[:-1], ArrayType(DoubleType()))

test = grouped_df.withColumn('error', (F.size('date') > 1) & (F.col('date').getItem(0) >
           F.col('date').getItem(F.size('date') - 1))).withColumn('date', F.when(F.col('error'),
           F.array_sort(F.col('date'))).otherwise(F.col('date'))).withColumn('consumption',
           F.when(F.col('error'), invert_last(F.col('consumption'))).otherwise(F.col('consumption'))).drop('error')

Cheers.

kubote
  • 86
  • 2
  • 6
0

I can reproduce this in Spark 3.2.

collect_list is documented to be deterministic over the partition ordering. That seems to suggest sortWithinPartitions is not reliable here.

It might be related this other issue with spilling during the sort. This was for the case of sorting partitions when writing out the dataframe: Apache Spark sort partition by user ID and write each partition to CSV.

Related JIRAs:

The array_sort solution is fine if you have a single column. If you're collecting over multiple columns, that is covered by this question: collect_list by preserving order based on another variable

user5233494
  • 71
  • 1
  • 6