1

I would like to know if below pseudo code is efficient method to read multiple parquet files between a date range stored in Azure Data Lake from PySpark(Azure Databricks). Note: the parquet files are not partitioned by date.

Im using uat/EntityName/2019/01/01/EntityName_2019_01_01_HHMMSS.parquet convention for storing data in ADL as suggested in the book Big Data by Nathan Marz with slight modification(using 2019 instead of year=2019).

Read all data using * wildcard:

df = spark.read.parquet(uat/EntityName/*/*/*/*)

Add a Column FileTimestamp that extracts timestamp from EntityName_2019_01_01_HHMMSS.parquet using string operation and converting to TimestampType()

df.withColumn(add timestamp column)

Use filter to get relevant data:

start_date = '2018-12-15 00:00:00'
end_date = '2019-02-15 00:00:00'
df.filter(df.FileTimestamp >= start_date).filter(df.FileTimestamp < end_date)

Essentially I'm using PySpark to simulate the neat syntax available in U-SQL:

@rs = 
  EXTRACT 
      user    string,
      id      string,
      __date  DateTime
  FROM 
    "/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
  USING Extractors.Csv();

@rs = 
  SELECT * 
  FROM @rs
  WHERE 
    date >= System.DateTime.Parse("2016/1/1") AND
    date < System.DateTime.Parse("2016/2/1");
samratb
  • 45
  • 1
  • 2
  • 5

1 Answers1

1

The correct way of partitioning out your data is to use the form year=2019, month=01 etc on your data.

When you query this data with a filter such as:

df.filter(df.year >= myYear)

Then Spark will only read the relevant folders.

It is very important that the filtering column name appears exactly in the folder name. Note that when you write partitioned data using Spark (for example by year, month, day) it will not write the partitioning columns into the parquet file. They are instead inferred from the path. It does mean your dataframe will require them when writing though. They will also be returned as columns when you read from partitioned sources.

If you cannot change the folder structure you can always manually reduce the folders for Spark to read using a regex or Glob - this article should provide more context Spark SQL queries on partitioned data using Date Ranges. But clearly this is more manual and complex.

UPDATE: Further example Can I read multiple files into a Spark Dataframe from S3, passing over nonexistent ones?

Also from "Spark - The Definitive Guide: Big Data Processing Made Simple" by Bill Chambers:

Partitioning is a tool that allows you to control what data is stored (and where) as you write it. When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete dataset. ...

This is probably the lowest-hanging optimization that you can use when you have a table that readers frequently filter by before manipulating. For instance, date is particularly common for a partition because, downstream, often we want to look at only the previous week’s data (instead of scanning the entire list of records).

simon_dmorias
  • 2,343
  • 3
  • 19
  • 33
  • Hi Simon, Thanks for your response. I could not understand the column inference mechanism from path. I was looking for more authoritative documentation. Any pointers? The query pattern will be a date range like retrieve current month or last 3 months of data. I agree globs are infeasible. Taking implicit columns year=2019 puts me in same spot as globs. Do you think adding Timestamp column along with year=2019 folder structure would be best to achieve date range filter and efficient data retrieval. Where do I view cumulative reads/ IOPs involved ? – samratb Mar 04 '19 at 16:09
  • I've added an update to try and expand on this. Also note that within SparkUI you can see the input dataset size to see how much data was read and the filtering applied pre-input. – simon_dmorias Mar 05 '19 at 10:49
  • Lemme check and revert – samratb Mar 06 '19 at 20:41
  • Hi Simon, The solution that you gave worked. I was using ADF to build Year/Month/Day format of file structure which was incorrect. Instead I resorted to Databricks to build folder structure with partition key specified while writing. This created the desired folder structure and when i merge the files it gives the implicit columns year, month, day in the merged df. Thanks for your help. – samratb Apr 11 '19 at 16:45