1

I have a large PySpark DataFrame:

import pyspark.sql as ps

spark = (
    ps.SparkSession.builder
    .appName('some_name')
    # ... lots of .config() lines
    .getOrCreate()
)

df = spark.table('some_table')

print(df.rdd.getNumPartitions())
# about 5k

print(df.count())
# about 650M

On a high level, I want to:

  1. (Preferably) Assert that the dataframe is sorted by a subset of columns (let's say SORT_COLUMNS). The dataframe is written using ALTER TABLE ... WRITE ORDERED BYSORT_COLUMNS, but better safe than sorry.
  2. Split the dataframe into "reasonably-sized" chunks, where each chunk is a (1) sorted, (2) continuous piece of the dataframe (and the union of all chunks covers the whole dataframe)
  3. Run a job on each chunk, where the job would have the following interface:
def process_chunk(iterator_over_rows_in_chunk):
    # Do some processing, aggregation, and occasionally
    # yield a new row with a new schema.
    # iterator_over_rows_in_chunk should yield rows from
    # a continuous chunk sorted by SORT_COLUMNS.
    # Ideally, rows should be represented as dicts.
    yield new_row

By "reasonably-sized", I mean:

  1. If that's possible, I want to split the dataframe in approximately constant-size chunks (say, ≈1M rows per chunk; it's fine if some chunks are a lot smaller or a bit bigger);
  2. Otherwise, I want to group the dataframe by a subset of columns (say, GROUP_COLUMNS) and use those groups as chunks.

As far as I understand, the closest thing in Spark interface to what I want is df.rdd.mapPartitions(), but it seems that partitions are not guaranteed to be continuous (see e.g. this answer).

So how do I map a PySpark DataFrame in sorted continuous chunks?

Pastafarianist
  • 833
  • 11
  • 27

0 Answers0