I need to read json files from s3 using pyspark. The S3 location may contain hundreds of thousands of files. and every file have same metdata. But each time i need to read only the files that is created after a particular time. How i can do this?
3 Answers
If you have access to the system that creates these files, the simplest way to approach this would be to add a date
partition when you write them:
s3://mybucket/myfolder/date=20210901/myfile1.json
s3://mybucket/myfolder/date=20210901/myfile1.json
s3://mybucket/myfolder/date=2021831/myfileA.json
And then you can read them with a filter; Pyspark will then only load the files that it needs into memory.
start_dt = '20210831'
end_dt = '20210901'
df = (
spark
.read
.json(path)
.filter(F.col("date").between(start_dt, end_dt))
)
Note that I have not explicitly tested this with JSON files, just with Parquet, so this method may need to be adapted.
If you don't have access to change how the files are written, I don't think Pyspark has direct access to the metadata of the files. Instead, you will want to query S3 directly using boto3
to generate a list of files, filter them using boto3 meta data, and then pass the list of files into the read
method:
# generate this by querying via boto3
recent_files = ['s3://mybucket/file1.json', 's3://mybucket/file2.json']
df = spark.read.json(*recent_files)
Info about listing files from boto3.

- 3,102
- 1
- 14
- 16
-
Date Partition is already present. The problem is, for each date partion there will be thousands of files. So for each hour i need to run the job to read the incremental files. And I will be storing the last read time in a file. And for the next run I need to fetch only the files arrived between the current time and last run time (which is present in a file) – Sree Sep 01 '21 at 15:52
-
-
No..because many other teams are using data from this bucket. So we have to handle this in pyspark – Sree Sep 01 '21 at 16:37
You can provide modifiedAfter
and modifiedBefore
parameters to DataFrameReader.json function.
modifiedBefore an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
Example
from datetime import datetime
# Fill this variable with your last date
lowerbound = datetime(2021, 9, 1, 13, 0, 0)
# Current execution
upperbound = datetime.now()
df = spark.read.json(source_path,
modifiedAfter=lowerbound.strftime('%Y-%m-%dT%H:%M:%S'),
modifiedBefore=upperbound.strftime('%Y-%m-%dT%H:%M:%S'))

- 3,864
- 1
- 15
- 32
-
Let me try this, i tried a similar way earlier but that worked only with local files not AWS S3 files – Sree Sep 01 '21 at 18:47
-
@Sree I executed it on Azure Storage Blob, if it doesn't work for you, it might be a problem on AWS side – Kafels Sep 01 '21 at 19:44
-
1
As noted in the discussion on Kafels' answer, modifiedBefore
and modifiedAfter
don't work with S3 as a data source. This is a real shame!
The next best alternative is to use boto3 to list all objects in the partition, and then filter the results on the lastModified element in the results. The results don't contain a creation timestamp so lastModified is the best you can do. You also need to be careful to handle pagination given the large number of objects.
Something like this should work to retrieve the matching keys:
import boto3
def get_matching_s3_keys(bucket, prefix="", after_date=None):
"""
List keys in an S3 bucket that match specified criteria.
:param bucket: Name of the S3 bucket.
:param prefix: Only get objects whose key starts with
this prefix
:param after_date: Only get objects that were last modified
after this date. Note: this needs to be a timezone-aware date
"""
paginator = s3.get_paginator("list_objects_v2")
kwargs = {'Bucket': bucket, 'Prefix': prefix}
for page in paginator.paginate(**kwargs):
try:
contents = page["Contents"]
except KeyError:
break
for obj in contents:
last_modified = obj["LastModified"]
if after_date is None or last_modified > after_date:
yield obj["Key"]

- 1,011
- 8
- 21