I have a large PySpark DataFrame:
import pyspark.sql as ps
spark = (
ps.SparkSession.builder
.appName('some_name')
# ... lots of .config() lines
.getOrCreate()
)
df = spark.table('some_table')
print(df.rdd.getNumPartitions())
# about 5k
print(df.count())
# about 650M
On a high level, I want to:
- (Preferably) Assert that the dataframe is sorted by a subset of columns (let's say
SORT_COLUMNS
). The dataframe is written usingALTER TABLE ... WRITE ORDERED BY
SORT_COLUMNS
, but better safe than sorry. - Split the dataframe into "reasonably-sized" chunks, where each chunk is a (1) sorted, (2) continuous piece of the dataframe (and the union of all chunks covers the whole dataframe)
- Run a job on each chunk, where the job would have the following interface:
def process_chunk(iterator_over_rows_in_chunk):
# Do some processing, aggregation, and occasionally
# yield a new row with a new schema.
# iterator_over_rows_in_chunk should yield rows from
# a continuous chunk sorted by SORT_COLUMNS.
# Ideally, rows should be represented as dicts.
yield new_row
By "reasonably-sized", I mean:
- If that's possible, I want to split the dataframe in approximately constant-size chunks (say, ≈1M rows per chunk; it's fine if some chunks are a lot smaller or a bit bigger);
- Otherwise, I want to group the dataframe by a subset of columns (say,
GROUP_COLUMNS
) and use those groups as chunks.
As far as I understand, the closest thing in Spark interface to what I want is df.rdd.mapPartitions()
, but it seems that partitions are not guaranteed to be continuous (see e.g. this answer).
So how do I map a PySpark DataFrame in sorted continuous chunks?