0

I'm new to Spark, and I'm trying to achieve the below problem.
Scenario
I am trying to read multiple parquet files (and csv files as well, if possible later on) and load them into single spark dataframe in Python for specific range of dates, I'll explain the condition for selecting the dates later.
Reason: Schema evolution - new columns added in the recent/latest partition, so Union is not possible or I'm unaware. If there is a way to do Union in efficient way, please let me know about that as well.
Files look like this:

s3://dir1/dir2/dir3/files/partition_date=2020-12-25/
# Partitions do not exist for weekend days, i.e., Saturday and Sunday
s3://dir1/dir2/dir3/files/partition_date=2020-12-28/
s3://dir1/dir2/dir3/files/partition_date=2020-12-29/
s3://dir1/dir2/dir3/files/partition_date=2020-12-30/  # Consider this file with new columns in it
s3://dir1/dir2/dir3/files/partition_date=2020-12-31/  # Consider this file with new columns in it

Parquet (and csv, for different folder) resides in each of these files look like this:

s3://dir1/dir2/dir3/files/partition_date=2020-12-31/data_2020-12-31.parquet

Before schema changed, I used to load everything (all of the partitions) that existed in folder s3://dir1/dir2/dir3/files and then inserting the data into single Spark dataframe using below:

spark_df = spark.read.format('parquet').load('s3://dir1/dir2/dir3/files')

But now, I want to pull files just from specific dates, as specific range of dates won't work because of missing partitions. So I created the list using the for loop to just check which partitions exist. This list contains strings of all the dates for which partition exists.

dates = ['2020-12-25','2020-12-26','2020-12-27','2020-12-28','2020-12-29','2020-12-30','2020-12-31'] 
# I'll retrieve these dates by other efficient ways later on
existing_dates = []
# 'for' loop implementation
existing_dates = ['2020-12-25','2020-12-28','2020-12-29','2020-12-30','2020-12-31']

So here's the task for me:

  1. I want to pull data just for dates that exist in existing_dates
  2. I also need to merge partitions with new schema evolutionized partitions that contains new additional columns(namely: 2020-12-30 and 2020-12-31 in this example)
  3. I need to check whether the partition-parquet-file is empty or not, too! I came across this answer, but I don't know equivalent code in Pyspark.
MadDog
  • 3
  • 3

1 Answers1

1
  1. You can use {} syntax to read specific partitions.
base_path = 's3://dir1/dir2/dir3/files'

# Note 1: Extra {{ is to add literal {.
# Note 2: Reading by partitions removes the partition column (partition_date) in returned dataframe by default.
#         To keep the partition_date column, add basePath option to set your parquet data path.
df = (spark.read
      .option('basePath', base_path)
      .parquet(f'{base_path}/partition_date={{{",".join(existing_dates)}}}')

# f'{base_path}/partition_date={{{",".join(existing_dates)}}}' = 
# s3://dir1/dir2/dir3/files/partition_date={2020-12-25,2020-12-28,...}

FYI, other syntax [] can do range capturing.

s3://dir1/dir2/dir3/files/partition_date=2020-12-2[5-8]

will capture partitions for 2020-12-25, 2020-12-26, 2020-12-27, 2020-12-28.

  1. When you read partitions that missing columns and other partitions with additional columns, use mergeSchema option to align all columns.
df = (spark.read
      .option('basePath', base_path)
      .option('mergeSchema', 'true') # this should handle missing columns
      .parquet(f'{base_path}/partition_date={{{",".join(existing_dates)}}}')
  1. I am not sure what is the purpose of this yet. What would you like to do after identifying it? I am asking because depends on the task, you may not need to identify the empty parquet or not.
Emma
  • 8,518
  • 1
  • 18
  • 35
  • Thank you so much, @emma for your answer. I couldn't understand the `{{` and `literal .` as I'm encountering it first time. Could you please explain it to me? Also, 1. Could I use it for the CSV file the same way by just replacing `parquet` to `csv` or is there any other hurdle? 2. If down the lane, schema is changed may be in this fashion: format of some columns, from `int` to `string` for newer files (i.e., `column x: int (nullable = true)` -> older files, `column x: string (nullable = true)` -> newer files) Is there a way to do typecasting before the statement you provided? – MadDog Feb 07 '22 at 20:38
  • In `f''` formatting string syntax, you use {variable} to put the variable's inserted. So, the `{` character is a special character in side of format string. Let's say I have a variable `name='world'`, and I want to make a string "hello world", I can do `f'hello {name}'`. I wrap the variable with `{}` and the value is inserted there. Now if I want to get a string "hello {world}", I need to insert the literal "{" character. Since "{" is a special character in format string, I need to escape it. To escape the "{", you add another "{". Try printing `f'hello {{`, it should results in "hello {" – Emma Feb 07 '22 at 21:08
  • So to get "hello {world}", you need `f'hello {{{name}}}`. The middle `{name}` will be replaced with "world" and you have literal { and } on each side. – Emma Feb 07 '22 at 21:09
  • For the 2nd question, if you have `int` in some partition and `string` in another partition, in this case `mergeSchema` cannot handle the 2 different data types and spark will crash. You need to read only old partitions and fix it to string and save it. – Emma Feb 07 '22 at 21:11
  • Understood, thanks Emma! – MadDog Feb 08 '22 at 06:12