2

What's the simplest/fastest way to get the partition keys? Ideally into a python list.

Ultimately want to use is this to not process data from partitions that have already been processed. So in the example below only want to process data from day 3. But there may be more than 1 day to process.

Lets say the directory structure is

date_str=2010-01-01
date_str=2010-01-02
date_str=2010-01-03

Reading the dataframe with partition information

ddf2 = spark.read.csv("data/bydate")

Solutions I have tried below. They look excessively wordy and not sure if they are fast. The query shouldn't read any data since it just needs to check directory keys.

from pyspark.sql import functions as F

ddf2.select(F.collect_set('date_str').alias('date_str')).first()['date_str']
# seems to work well albeit wordy

ddf2.select("date_str").distinct().collect()
# [Row(date_str=datetime.date(2010, 1, 10)), Row(date_str=datetime.date(2010, 1, 7)),
# not a python list and slow?

ddf2.createOrReplaceTempView("intent")
spark.sql("""show partitions intent""").toPandas()
# error

ddf2.rdd.getNumPartitions()
# not returning the keys, just the number, which isn't even all the keys

Convert distinct values in a Dataframe in Pyspark to a list

PySpark + Cassandra: Getting distinct values of partition key

pyspark - getting Latest partition from Hive partitioned column logic

Show partitions on a pyspark RDD

David Robles
  • 9,477
  • 8
  • 37
  • 47
citynorman
  • 4,918
  • 3
  • 38
  • 39
  • how about a simple condition `ddf2.where(F.col('date_str') >= '2010-01-03')` ? – pltc Oct 26 '21 at 21:26
  • yes but i need to retrieve the `'2010-01-03'` somehow from the partition keys – citynorman Oct 26 '21 at 21:39
  • on the actual data `ddf2.where(F.col('date_str') >= '2010-01-03').limit(10).show()` takes a few seconds while the `set` and `distinct` functions run for several minutes, seems it reads all data instead of just looking at the partition keys which it should be able to in a few seconds also – citynorman Nov 05 '21 at 02:27

2 Answers2

2

So indeed the set and distinct solutions scan all the data and will be horribly slow on large data. The details are documented here

https://issues.apache.org/jira/browse/SPARK-34194

https://issues.apache.org/jira/browse/SPARK-12890

Afaik the fastest way to get partition keys is this solution pyspark - getting Latest partition from Hive partitioned column logic

Adopted to get into a list (one partition solution)

spark.catalog.createTable(
    'table',
    path='data/bydate',
    source='csv',
)
spark.catalog.recoverPartitions('table')

df_partitions = spark.sql('show partitions table').toPandas()
partitions = df_partitions['partition'].str.replace('dt=','').tolist()

Since there isn't a neat pyspark solution, can use the below on the directories.

Databricks: ls the directory

import re
[re.search('(\d+)', o.name).group() for o in dbutils.fs.ls(cfg_fpath_intent)]

S3: use datawrangler list_directories instead of ls

citynorman
  • 4,918
  • 3
  • 38
  • 39
0

Let's look at each of your approaches

Approach #1:

ddf2.select(F.collect_set('date_str').alias('date_str')).first()['date_str']

There is nothing wrong with this, except (as you said), it's unnecessarily long.

Approach #2:

ddf2.select("date_str").distinct().collect()

I'd say this might be the best approach, but collect return a list of rows, you'd need to loop through it like this. (And it's not that slow compare with other solutions.)

[r['date_str'] for r in ddf2.select("date_str").distinct().collect()]

Approach #3:

spark.sql("""show partitions intent""").toPandas()

This won't work because you're reading from CSV files, not a table.

Approach #4:

ddf2.rdd.getNumPartitions()

This won't work either. "partition" in getNumPartitions is not the same as "table partitioning". getNumPartitions is the number of partitions in RDD.

pltc
  • 5,836
  • 1
  • 13
  • 31