1

I have the following script which returns properly sorted result:

from transforms.api import transform, Output
from pyspark.sql import functions as F


@transform(
    out=Output("ri.foundry.main.dataset.29fdbff7-168a-457d-bb79-8f7508cede9d"),
)
def compute(out, ctx):

    data = [("1", "2022-02-01", "older"),
            ("1", "2022-02-12", "older"),
            ("1", "2022-02-09", "new")]
    df_inp = (
        ctx.spark_session.createDataFrame(data, ["c1", "date", "record_status"])
        .withColumn("date", F.to_date("date"))
        .withColumn("record_status", F.lit("older"))
    )
    df_upd = (
        ctx.spark_session.createDataFrame([('1',)], ['c1'])
        .withColumn('date', F.to_date(F.lit('2022-02-17')))
        .withColumn('record_status', F.lit('new'))
    )

    df = df_inp.unionByName(df_upd)

    df = df.coalesce(1)
    df = df.sort(F.desc('date'))
    out.write_dataframe(df)

Notice df = df.coalesce(1) before the sort.

sorted after coalesce

Question. As both df.coalesce(1) and df.repartition(1) should result in one partition, I tried to replace df = df.coalesce(1) with df = df.repartition(1). But then the result appeared not sorted. Why?

not sorted after repartition

Additional details

If I don't interfere with partitioning, the result as well appears not sorted:

not sorted without repartitioning

Physical plan using coalesce(1):

  +- *(3) Sort [date#6 DESC NULLS LAST], true, 0
     +- Coalesce 1
        +- Union
           :- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
           :  +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
           +- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
              +- *(2) Scan ExistingRDD[c1#14]

Physical plan using repartition(1):

  +- *(3) Sort [date#6 DESC NULLS LAST], true, 0
     +- CustomShuffleReader coalesced
        +- ShuffleQueryStage 1
           +- Exchange rangepartitioning(date#6 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#85]
              +- ShuffleQueryStage 0
                 +- Exchange RoundRobinPartitioning(1), REPARTITION_WITH_NUM, [id=#83]
                    +- Union
                       :- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
                       :  +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
                       +- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
                          +- *(2) Scan ExistingRDD[c1#14]

I am aware of the question Difference between repartition(1) and coalesce(1) where the guy says he cannot use coalesce(1) for some reason. In my case it's the opposite.

ZygD
  • 22,092
  • 39
  • 79
  • 102
  • FYI, so that you aren't left in silence, we had a very quick chat internally where some folks said "we've also seen this" but no one came forward with a concrete answer. – fmsf Feb 18 '22 at 15:31
  • I followed your exact steps, and got properly sorted dataframes in both cases. Could it be version specific? I tried Spark 3.1 and 2.4. Maybe you can also share `df.explain()` for both scenarios? – mazaneicha Feb 18 '22 at 20:05
  • `df.show()` shows well sorted result. What `out.write_dataframe(df)` does is writing the result into file system (as a parquet file). So if you want to test it in other than Foundry system, I think you should use `df.write.parquet`. Have you performed the test using `df.write`, or was it `df.show`? Our system uses Spark 3.0. – ZygD Feb 18 '22 at 21:11
  • Oh sorry, so your screenshot is not a `show` but the result of reading a parquet thats been written to fs? Then the disorder is kind of unsurprising. Perhaps you can do a `parquet-tools dump ...` to see if the rows in parquet are properly sorted. – mazaneicha Feb 21 '22 at 13:45
  • We have also seen this in our Foundry instance but couldn't build a reproducible example yet. Thanks ZygD! @fmsf: Do you have a timeline for a fix for this issue? – nicornk Mar 17 '22 at 07:40

2 Answers2

2

The reason why the result of repartitioning isn't sorted is visible in the query plans you've listed - it writes out multiple partitions rather than one. There are two Exchanges, the first (lower) brings the data to a single partition, but the second (higher on the plan) does a RangePartitioning to up to 200(*) partitions, on which the Sort happens. Each resulting partition/file is most likely sorted but the order across files isn't maintained.

This seems to be a bug in Spark 3.0.2, on which Foundry's Spark is based on currently. Testing the different Spark versions, I see this happening on 3.0.2:

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.2
      /_/

>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#0L ASC NULLS FIRST, 200), true, [id=#15]
   +- Exchange RoundRobinPartitioning(1), false, [id=#14]
      +- *(1) Scan ExistingRDD[a#0L]

but not on 3.2.0 (with AQE disabled just to match 3.0.2, it doesn't affect the result):

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/

>>> df = spark.createDataFrame([{"a": 3*i} for i in range(10000)])
>>> sorted = df.repartition(1).sort("a")
>>> sorted.explain()
== Physical Plan ==
*(2) Sort [a#0L ASC NULLS FIRST], true, 0
+- Exchange SinglePartition, REPARTITION_BY_NUM, [id=#12]
   +- *(1) Scan ExistingRDD[a#0L]

Note how 3.2.0 shows the initial Exchange as SinglePartition rather than RoundRobinPartitioning(1), and based on that is able to skip the rangepartitioning otherwise needed for Sort.

avo
  • 36
  • 1
  • You've explained it all. Thank you! – ZygD Mar 05 '22 at 20:29
  • In a nut shell, in older Spark (3.0.2), `repartition(1)` works (everything is moved into 1 partition), but subsequent **`sort`** again creates more partitions, because before sorting it also adds *rangepartitioning(...,200)*. – ZygD Oct 07 '22 at 10:06
0

To explicitly sort the single partition you can use dataframe.sortWithinPartitions().

Then you don't have to worry about sort affecting the partitioning.

user5233494
  • 71
  • 1
  • 6