I want to pull a specified number of days from an S3 bucket that is partitioned by year/month/day/hour. This bucket has new files added everyday and will grow to be rather large. I want to do spark.read.parquet(<path>).filter(<condition>)
, however when I ran this it took significantly longer (1.5 hr) than specifying the paths (.5 hr). I dont understand why it takes longer, should I be adding a .partitionBy()
when reading from the bucket? or is it because of the volume of data in the bucket that has to be filtered?

- 137
- 10
-
When you say `S3 bucket that is partitioned by year/month/day/hour` do you mean that an example S3 path looks like `s3://bucketname/dataset/2021/03/08/12/`? – Jeremy Mar 08 '21 at 22:57
-
yes, my path looks like ```s3://bucketname/daataset/year=2021/month=03/day=08/hour=12``` – sgallagher Mar 08 '21 at 23:21
-
If you are trying to query spark for particular days you can leverage the partitioning. `val foo = spark.read.parquet("s3a://bucketname/dataset/").where("year = '2021' and month = '03' and day >= '01'")`. That should return all the days and hours greater than and equal to `2021-03-01`. If you need additional filtering you can add that on too with another `.where("...")` or `.filter(...)`, but spark should apply the partitioning which will limit the amount of data the additional filter is applied to. Without specifying the partitioning Spark will apply to filter to ALL data in the path. – Jeremy Mar 09 '21 at 04:13
-
You may find this useful also, https://stackoverflow.com/a/49344688/1407161. It shows how to use wildcards in the path. I've never used that method on S3 and the answer shown is with HDFS so maybe try that way too. – Jeremy Mar 09 '21 at 04:22
1 Answers
That problem that you are facing is regarding the partition discovery. If you point to the path where your parquet files are with the spark.read.parquet("s3://my_bucket/my_folder")
spark will trigger a task in the task manager called
Listing leaf files and directories for <number> paths
This is a partition discovery method. Why that happens? When you call with the path Spark has no place to find where the partitions are and how many partitions are there.
In my case if I run a count like this:
spark.read.parquet("s3://my_bucket/my_folder/").filter('date === "2020-10-10").count()
It will trigger the listing that will take 19 Seconds for around 1700 folders. Plus the 7 seconds to count, it has a total of 26 seconds.
To solve this overhead time you should use a Meta Store. AWS provide a great solution with AWS Glue, to be used just like the Hive Metastore in a Hadoop environment.
With Glue you can store the Table metadata and all the partitions. Instead of you giving the Parquet path you will point to the table just like that:
spark.table("my_db.my_table").filter('date === "2020-10-10").count()
For the same data, with the same filter. The list files doesn't exist and the whole process of counting took only 9 Seconds.
In your case that you partitionate by Year, Month, Day and Hour. We are talking about 8760 folders per year.
I would recommend you take a look at this link and this link
This will show how you can use Glue as your Hive Metastore. That will help a lot to improve the speed of Partition query.

- 7,362
- 3
- 29
- 51