2

When I do an orderBy on a pyspark dataframe does it sort the data across all partitions (i.e the entire result)? Or is the sorting at a partition level? If the later, then can anyone suggest how to do an orderBy across the data? I have an orderBy right at the end

My current code:

def extract_work(self, days_to_extract):

        source_folders = self.work_folder_provider.get_work_folders(s3_source_folder=self.work_source,
                                                                    warehouse_ids=self.warehouse_ids,
                                                                    days_to_extract=days_to_extract)
        source_df = self._load_from_s3(source_folders)

        # Partition and de-dupe the data-frame retaining latest
        source_df = self.data_frame_manager.partition_and_dedupe_data_frame(source_df,
                                                                            partition_columns=['binScannableId', 'warehouseId'],
                                                                            sort_key='cameraCaptureTimestampUtc',
                                                                            desc=True)
        # Filter out anything that does not qualify for virtual count.
        source_df = self._virtual_count_filter(source_df)

        history_folders = self.work_folder_provider.get_history_folders(s3_history_folder=self.history_source,
                                                                        days_to_extract=days_to_extract)
        history_df = self._load_from_s3(history_folders)

        # Filter out historical items
        if history_df:
            source_df = source_df.join(history_df, 'binScannableId', 'leftanti')
        else:
            self.logger.error("No History was found")

        # Sort by defectProbability
        source_df = source_df.orderBy(desc('defectProbability'))

        return source_df

def partition_and_dedupe_data_frame(data_frame, partition_columns, sort_key, desc): 
          if desc: 
            window = Window.partitionBy(partition_columns).orderBy(F.desc(sort_key)) 
          else: 
            window = Window.partitionBy(partition_columns).orderBy(F.asc(sort_key)) 

          data_frame = data_frame.withColumn('rank', F.rank().over(window)).filter(F.col('rank') == 1).drop('rank') 
          return data_frame

def _virtual_count_filter(self, source_df):
        df = self._create_data_frame()
        for key in self.virtual_count_thresholds.keys():
            temp_df = source_df.filter((source_df['expectedQuantity'] == key) & (source_df['defectProbability'] > self.virtual_count_thresholds[key]))
            df = df.union(temp_df)
        return df

When I do a df.explain(), I get the following-

Physical Plan == *Sort [defectProbability#2 DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(defectProbability#2 DESC NULLS LAST, 25) +- *Project [expectedQuantity#0, cameraCaptureTimestampUtc#1, defectProbability#2, binScannableId#3, warehouseId#4, defectResult#5] +- *Filter ((isnotnull(rank#35) && (rank#35 = 1)) && (((((((expectedQuantity#0 = 0) && (defectProbability#2 > 0.99)) || ((expectedQuantity#0 = 1) && (defectProbability#2 > 0.98))) || ((expectedQuantity#0 = 2) && (defectProbability#2 > 0.99))) || ((expectedQuantity#0 = 3) && (defectProbability#2 > 0.99))) || ((expectedQuantity#0 = 4) && (defectProbability#2 > 0.99))) || ((expectedQuantity#0 = 5) && (defectProbability#2 > 0.99)))) +- Window [rank(cameraCaptureTimestampUtc#1) windowspecdefinition(binScannableId#3, warehouseId#4, cameraCaptureTimestampUtc#1 DESC NULLS LAST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rank#35], [binScannableId#3, warehouseId#4], [cameraCaptureTimestampUtc#1 DESC NULLS LAST] +- *Sort [binScannableId#3 ASC NULLS FIRST, warehouseId#4 ASC NULLS FIRST, cameraCaptureTimestampUtc#1 DESC NULLS LAST], false, 0 +- Exchange hashpartitioning(binScannableId#3, warehouseId#4, 25) +- Union :- Scan ExistingRDD[expectedQuantity#0,cameraCaptureTimestampUtc#1,defectProbability#2,binScannableId#3,warehouseId#4,defectResult#5] +- *FileScan json [expectedQuantity#13,cameraCaptureTimestampUtc#14,defectProbability#15,binScannableId#16,warehouseId#17,defectResult#18] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3://vbi-autocount-chunking-prod-nafulfillment2/TPA1/2019/04/25/12/vbi-ac-chunk..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<expectedQuantity:int,cameraCaptureTimestampUtc:string,defectProbability:double,binScannabl... 
lalatnayak
  • 160
  • 1
  • 6
  • 21
  • I want to add that in my code, I partition the dataframe by two columns and remove duplicates before sorting the dataframe by a third column. I wrote a test that verifies if the items in the dataframe are sorted and they are not. I initially thought that the test had a problem, but it seems to be fine – lalatnayak Apr 29 '19 at 01:55
  • I added your code from the comment on the answer below. In the code you explicitly partition by some columns and the order the data, this will only result in partition level sorting. – Shaido Apr 30 '19 at 02:36
  • Actually I do another sort after the above partition – lalatnayak Apr 30 '19 at 03:22
  • Please add that too the question as well (click on the [edit] button). – Shaido Apr 30 '19 at 03:24
  • Done. Thanks for taking a look. – lalatnayak Apr 30 '19 at 03:26
  • Looks correct to me, although as the answer by thePurplePython states no sorting has happend yet due to no action being called. You can try running `source_df.explain()` and see what it says. – Shaido Apr 30 '19 at 04:58
  • Actually the method that consumes the response from the top method in my code does a df.toLocaIterator() to iterate through the rows – lalatnayak Apr 30 '19 at 05:56

1 Answers1

4

orderBy() is a "wide transformation" which means Spark needs to trigger a "shuffle" and "stage splits (1 partition to many output partitions)" thus retrieve all the partition splits distributed across the cluster to perform an orderBy() here.

If you look at the explain plan it has a re-partitioning indicator with the default 200 output partitions (spark.sql.shuffle.partitions configuration) which are written to disk after execution. This tells you a "wide transformation" aka "shuffle" will happen when a Spark "action" is executed.

Other "wide transformations" include: distinct(), groupBy(), and join() => *sometimes*

from pyspark.sql.functions import desc
df = spark.range(10).orderBy(desc("id"))
df.show()
df.explain()

+---+
| id|
+---+
|  9|
|  8|
|  7|
|  6|
|  5|
|  4|
|  3|
|  2|
|  1|
|  0|
+---+

== Physical Plan ==
*(2) Sort [id#6L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#6L DESC NULLS LAST, 200)
   +- *(1) Range (0, 10, step=1, splits=8)

thePurplePython
  • 2,621
  • 1
  • 13
  • 34
  • Thanks for the response I had explicitly partitioned my data frame by 2 columns to remove duplicates. And then I tried sorting the same by a third column. I'm seeing that the data is not being sorted across all of the data frame. Would you need code snippet? – lalatnayak Apr 29 '19 at 01:49
  • sure ... please provide more code ... also you can use ```.dropDuplicates()``` to remove duplicate rows ... not sure why you are partitioning here – thePurplePython Apr 29 '19 at 17:16
  • I had a very custom drop duplicates. See this question https://stackoverflow.com/questions/55660085/how-to-remove-duplicates-from-a-spark-data-frame-while-retaining-the-latest – lalatnayak Apr 30 '19 at 00:19