2

I need to read json files from s3 using pyspark. The S3 location may contain hundreds of thousands of files. and every file have same metdata. But each time i need to read only the files that is created after a particular time. How i can do this?

James Z
  • 12,209
  • 10
  • 24
  • 44
Sree
  • 19
  • 2
  • 1
    Please don't use Indian words like lakh / lac here. People living elsewhere won't understand them. – James Z Sep 01 '21 at 14:02

3 Answers3

0

If you have access to the system that creates these files, the simplest way to approach this would be to add a date partition when you write them:

s3://mybucket/myfolder/date=20210901/myfile1.json
s3://mybucket/myfolder/date=20210901/myfile1.json
s3://mybucket/myfolder/date=2021831/myfileA.json

And then you can read them with a filter; Pyspark will then only load the files that it needs into memory.

start_dt = '20210831'
end_dt = '20210901'

df = (
  spark
  .read
  .json(path)
  .filter(F.col("date").between(start_dt, end_dt))
)

Note that I have not explicitly tested this with JSON files, just with Parquet, so this method may need to be adapted.

If you don't have access to change how the files are written, I don't think Pyspark has direct access to the metadata of the files. Instead, you will want to query S3 directly using boto3 to generate a list of files, filter them using boto3 meta data, and then pass the list of files into the read method:

# generate this by querying via boto3
recent_files = ['s3://mybucket/file1.json', 's3://mybucket/file2.json']

df = spark.read.json(*recent_files)

Info about listing files from boto3.

Neal
  • 3,102
  • 1
  • 14
  • 16
  • Date Partition is already present. The problem is, for each date partion there will be thousands of files. So for each hour i need to run the job to read the incremental files. And I will be storing the last read time in a file. And for the next run I need to fetch only the files arrived between the current time and last run time (which is present in a file) – Sree Sep 01 '21 at 15:52
  • Can you partition on the hour instead of by the day? – jscott Sep 01 '21 at 16:21
  • No..because many other teams are using data from this bucket. So we have to handle this in pyspark – Sree Sep 01 '21 at 16:37
0

You can provide modifiedAfter and modifiedBefore parameters to DataFrameReader.json function.

modifiedBefore an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)

modifiedAfter an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)

Example

from datetime import datetime

# Fill this variable with your last date
lowerbound = datetime(2021, 9, 1, 13, 0, 0)

# Current execution
upperbound = datetime.now()

df = spark.read.json(source_path, 
                     modifiedAfter=lowerbound.strftime('%Y-%m-%dT%H:%M:%S'), 
                     modifiedBefore=upperbound.strftime('%Y-%m-%dT%H:%M:%S'))
Kafels
  • 3,864
  • 1
  • 15
  • 32
  • Let me try this, i tried a similar way earlier but that worked only with local files not AWS S3 files – Sree Sep 01 '21 at 18:47
  • @Sree I executed it on Azure Storage Blob, if it doesn't work for you, it might be a problem on AWS side – Kafels Sep 01 '21 at 19:44
  • 1
    Yes ,Seems to be some aws issue. This is not working with s3 – Sree Sep 02 '21 at 06:33
0

As noted in the discussion on Kafels' answer, modifiedBefore and modifiedAfter don't work with S3 as a data source. This is a real shame!

The next best alternative is to use boto3 to list all objects in the partition, and then filter the results on the lastModified element in the results. The results don't contain a creation timestamp so lastModified is the best you can do. You also need to be careful to handle pagination given the large number of objects.

Something like this should work to retrieve the matching keys:

import boto3

def get_matching_s3_keys(bucket, prefix="", after_date=None):
    """
    List keys in an S3 bucket that match specified criteria.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only get objects whose key starts with
        this prefix
    :param after_date: Only get objects that were last modified
        after this date. Note: this needs to be a timezone-aware date
    """
    paginator = s3.get_paginator("list_objects_v2")

    kwargs = {'Bucket': bucket, 'Prefix': prefix}

    for page in paginator.paginate(**kwargs):
        try:
            contents = page["Contents"]
        except KeyError:
            break

        for obj in contents:
            last_modified = obj["LastModified"]
            if after_date is None or last_modified > after_date:
                yield obj["Key"]
jscott
  • 1,011
  • 8
  • 21