4

I'm trying to generate a list of all S3 files in a bucket/folder. There are usually in the magnitude of millions of files in the folder. I use boto right now and it's able to retrieve around 33k files per minute, which for even a million files, takes half an hour. I also load these files into a dataframe, but generate and use this list as a way to track which files are being processed.

What I've noticed is that when I ask Spark to read all files in the folder, it does a listing of its own and is able to list them out much faster than the boto call can, and then process those files. I looked up a way to do this in PySpark, but found no good examples. The closest I got was some Java and Scala code to list out the files using the HDFS library.

Is there a way we can do this in Python and Spark? For reference, I'm trying to replicate the following code snippet:

def get_s3_files(source_directory, file_type="json"):
    s3_resource = boto3.resource("s3")

    file_prepend_path = f"/{'/'.join(source_directory.parts[1:4])}"
    bucket_name = str(source_directory.parts[3])
    prefix = "/".join(source_directory.parts[4:])

    bucket = s3_resource.Bucket(bucket_name)

    s3_source_files = []

    for object in bucket.objects.filter(Prefix=prefix):
        if object.key.endswith(f".{file_type}"):
            s3_source_files.append(
                (
                    f"{file_prepend_path}/{object.key}",
                    object.size,
                    str(source_directory),
                    str(datetime.now()),
                )
            )

    return s3_source_files
CodingInCircles
  • 2,565
  • 11
  • 59
  • 84

2 Answers2

2

This can be achievable very simply by dbutils.

def get_dir_content(ls_path):
  dir_paths = dbutils.fs.ls(ls_path)
  subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
  flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
  return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
    

paths = get_dir_content('s3 location')
[print(p) for p in paths]
  • 1
    We used to use `dbutils` before, but it was too slow. We found boto to be quicker, but are trying to see if we can make it even quicker. – CodingInCircles Nov 11 '21 at 01:24
  • means , You are looking option , listing the files from given s3 location path instead of data read . am i right ? please confirm . – Karthikeyan Rasipalay Durairaj Nov 11 '21 at 19:26
  • Yeah, as long as I can replicate the piece of code I have posted using PySpark (and have it run faster), that'd be perfect. That said, I used the AWS CLI to list objects and it was significantly faster. Weird that the CLI would take less time than boto. – CodingInCircles Nov 11 '21 at 22:19
  • `dbutils.fs.ls` is far worse than just listing the files with pathlib and glob. – arun Jan 10 '23 at 21:52
0

For some reason, using the AWS CLI command was roughly 15 times(!) faster than using boto. Not sure exactly why this is the case, but here's the code I am currently using, in case someone might find it handy. Basically, use s3api to list the objects, and then use jq to manipulate the output and get it into a form of my liking.

def get_s3_files(source_directory, schema, file_type="json"):

    file_prepend_path = f"/{'/'.join(source_directory.parts[1:4])}"
    bucket = str(source_directory.parts[3])
    prefix = "/".join(source_directory.parts[4:])

    s3_list_cmd = f"aws s3api list-objects-v2 --bucket {bucket} --prefix {prefix} | jq -r '.Contents[] | select(.Key | endswith(\".{file_type}\")) | [\"{file_prepend_path}/\"+.Key, .Size, \"{source_directory}\", (now | strftime(\"%Y-%m-%d %H:%M:%S.%s\"))] | @csv'"

    s3_list = subprocess.check_output(s3_list_cmd, shell=True, universal_newlines=True)

    with open(f"s3_file_paths.csv", "w") as f:
        f.truncate()
        f.write(s3_list)

    s3_source_files_df = spark.read.option("header", False).schema(schema).csv(f"s3_file_paths.csv")

    return s3_source_files_df
CodingInCircles
  • 2,565
  • 11
  • 59
  • 84