1

I have a pretty straightforward pyspark SQL application (spark 2.4.4, EMR 5.29) that reads a dataframe of the schema topic, year, count:

df.show()

+--------+----+------+
|   topic|year| count|
+--------+----+------+
|covid-19|2017|606498|
|covid-19|2016|454678|
|covid-19|2011| 10517|
|covid-19|2008|  6193|
|covid-19|2015|510391|
|covid-19|2013| 29551|

I then need to sort by year and collect counts to a list so that they be in ascending order, by year:

df.orderBy('year').groupBy('topic').agg(collect_list('count').alias('counts'))

The issue is, since I order by year, the number of partitions used for this stage is the number of years in my dataset. I thus get a crazy bottleneck stage where 15 out of 300 executors are utilised, leading to obvious memory spills and disk spills, eventually failing the stage due to no space left on device for the overpopulated partitions.

Even more interesting is that I found a way to circumvent this which intuitively appears to be much less efficient, but actually does work, since no bottlenecks are created:

df.groupBy('topic').pivot('year', values=range(START, FINISH)).agg(first('count')) \
    .select('topic', array([col(c) for c in range(START, FINISH)]).alias('counts'))                        

This leads to my desired output, which is an array of counts sorted by year.

Anyone with an explanation or idea why this happens, or how best to prevent this? I found this answer which and this jira where it is basically suggested to 'add noise' to the sort by key to avoid these skew related issues.

I think it is worth mentioning that the pivot method is a better resolution than adding noise, and to my knowledge whenever sorting by a column that has a small range of values. would appreciate any info on this and alternate implementations.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
singletony
  • 1,392
  • 1
  • 9
  • 9

1 Answers1

2

Range Partitioning is used for Sorting, ordering, under water by Spark.

From the docs it is clear that the calculation for determining the number of partitions that will contain ranges of data for sorting subsequently via mapPartitions, is based on sampling from the existing partitions prior to computing some heuristically optimal number of partitions for these computed ranges.

These ranges which are partitions may decrease the number of partitions as a range must be contained with a single partition - for the order by / sort to work. Via mapPartitions type approach.

This:

df.repartitionByRange(100, 'some_col1', 'some_colN')...  

can help or of you order by more columns I suspect. But here it appears not to be the case based on your DF.

The question has nothing to do with pyspark, BTW.

Interesting point, but explainable: reduced partitions needing to hold more data via collect_list based on year, there are obviously more topics than years.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • MUCH more topics than years, 15-50 years vs. billions of topics. What I really am trying to do is time-series analysis and have found methods other than creating a list and running a tailored formula with a UDF to produce less insightful results – singletony May 11 '20 at 21:17
  • I have also tried to use flint timeseries library and ran into memory issues, probably since data is not granular enough with respect to amount of topics – singletony May 11 '20 at 21:19
  • so i answered your question. – thebluephantom May 11 '20 at 22:55
  • Well, not really, but I am guessing that this is spark internals and can see there is no real way to reduce range partitioning. I have tried sortWithinPartitions and that did not lead to a list sorted by year, which is odd because initial partitioning is by topic, so I thought – singletony May 13 '20 at 07:11
  • I understand how it works from the Spark context I believe. Success. – thebluephantom May 13 '20 at 07:48