13

Is it possible to list all of the files in given S3 path (ex: s3://my-bucket/my-folder/*.extension) using a SparkSession object?

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
code
  • 5,294
  • 16
  • 62
  • 113
  • What is the use case that you trying to achieve? – Kaushal Jan 06 '19 at 11:03
  • I know it is possible to do this using the AWS S3 SDK API but was wondering if it is supported in the SparkSession object. I am interested in counting how many files in a specific S3 path contain a particular file format (ex: *.extension) and also would like to know its' full path / file name. – code Jan 06 '19 at 11:09

4 Answers4

21

You can use Hadoop API for accessing files on S3 (Spark uses it as well):

import java.net.URI
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration

val path = "s3://somebucket/somefolder"
val fileSystem = FileSystem.get(URI.create(path), new Configuration())
val it = fileSystem.listFiles(new Path(path), true)
while (it.hasNext()) {
  ...
}
Michael Spector
  • 36,723
  • 6
  • 60
  • 88
  • this is the way to do it, and the listFiles(path, true) can give you much better performance with s3 than trying to do a treewalk yourself. – stevel Jan 08 '19 at 12:13
  • How can we extract list of files from this iterator and store it in an array? I see in this link https://docs.scala-lang.org/overviews/collections/iterators.html that many functions should be available for iterator; but, cannot activate any of those except .hasNext() and .next() – Mahmoud Nov 02 '19 at 00:01
  • This did not work for me at all. I got a lot of errors concerning s3. I don't remember what they were exactly but I tried a lot of different ways. Just commenting if someone else runs into the same thing they won't think they're crazy. – Noah Gary Apr 30 '20 at 13:33
  • @Mahmoud `it` is an instance of `org.apache.hadoop.fs.RemoteIterator`, not collections' iterators – panc Jun 11 '20 at 03:10
  • This works for me. I need to define a `var` outside the while loop in order to save the results of the name of the file I am looking for. Also, in order to stop the loop once my file is found, I need to wrap the while loop inside a `breakable`, and then I can `break` once the file is found. – panc Jun 11 '20 at 03:12
  • how to add aws access key and secret access key in to this one – NickyPatel May 18 '21 at 13:12
10

Approach 1

For pyspark users, I've translated Michael Spector's answer (I'll leave it to you to decide if using this is a good idea):

sc = spark.sparkContext
myPath = f's3://my-bucket/my-prefix/'
javaPath = sc._jvm.java.net.URI.create(myPath)
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem.get(javaPath, sc._jvm.org.apache.hadoop.conf.Configuration())
iterator = hadoopFileSystem.listFiles(hadoopPath, True)

s3_keys = []
while iterator.hasNext():
    s3_keys.append(iterator.next().getPath().toUri().getRawPath())    

s3_keys now holds all file keys found at my-bucket/my-prefix

Approach 2 Here is an alternative that I found (hat tip to @forgetso):

myPath = 's3://my-bucket/my-prefix/*'
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(sc._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)

for status in statuses:
  status.getPath().toUri().getRawPath()
  # Alternatively, you can get file names only with:
  # status.getPath().getName()

Approach 3 (incomplete!)

The two approaches above do not use the Spark parallelism mechanism that would be applied on a distributed read. That logic looks private though. See parallelListLeafFiles here. I have not found a way to compel pyspark do to a distributed ls on s3 without also reading the file contents. I tried to use py4j to instantiate an InMemoryFileIndex, but can't get the incantation right. Here is what I have so far if someone wants to pick it up from here:

myPath = f's3://my-bucket/my-path/'
paths = sc._gateway.new_array(sc._jvm.org.apache.hadoop.fs.Path, 1)
paths[0] = sc._jvm.org.apache.hadoop.fs.Path(myPath)

emptyHashMap = sc._jvm.java.util.HashMap()
emptyScalaMap = sc._jvm.scala.collection.JavaConversions.mapAsScalaMap(emptyMap)

# Py4J is not happy with this:
sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex(
    spark._jsparkSession, 
    paths, 
    emptyScalaMap, 
    sc._jvm.scala.Option.empty() # Optional None
)
Lou Zell
  • 5,255
  • 3
  • 28
  • 23
  • FWIW S3a in Apache Hadoop distros (not EMR) does async prefetch of the next page in the results. The whole "parallelise or not" question is an interesting one. It can be done effectively on a single host, but you have to decide between parallel shallow LIST calls vs deep sequential LISTs...really depends on the directory structure as to which is best. My current strategy is list the immediate children, then deep list under each child in separate threads. – stevel Apr 12 '21 at 12:58
  • 1
    Your incantation was very close. Bring a pentagram and a goat and see this: https://stackoverflow.com/a/75326116/543720 – Jeroen Vlek Feb 02 '23 at 16:26
  • 1
    Nice find! I'm glad you spelled it out completely – Lou Zell Feb 03 '23 at 17:26
  • I ran with the theme: https://www.perceptivebits.com/a-comprehensive-guide-to-finding-files-via-spark/ – Jeroen Vlek Feb 14 '23 at 18:58
  • It's pretty slow compared to os.scan_dir in python – Hari Baskar Aug 23 '23 at 04:51
1

Lou Zell was very close! The below ended up working on ADLS2, but I'm putting it here because of the Py4J magic. Note that the noopcache causes the job to be run twice: Once when the index is created and once when listfiles is called. I also wrote a blogpost on this: https://www.perceptivebits.com/a-comprehensive-guide-to-finding-files-via-spark/

import os

base_path = "/mnt/my_data/"
glob_pattern = "*"
sc = spark.sparkContext
hadoop_base_path = sc._jvm.org.apache.hadoop.fs.Path(base_path)
paths = sc._jvm.PythonUtils.toSeq([hadoop_base_path])

#noop_cache_clazz = sc._jvm.java.lang.Class.forName("org.apache.spark.sql.execution.datasources.NoopCache$")
#ff = noop_cache_clazz.getDeclaredField("MODULE$")
#noop_cache = ff.get(None)

file_status_cache_clazz = jvm.java.lang.Class.forName(
            "org.apache.spark.sql.execution.datasources.FileStatusCache$"
        )
ff = file_status_cache_clazz.getDeclaredField("MODULE$")
jvm_spark_session = spark._jsparkSession
file_status_cache = ff.get(None).getOrCreate(jvm_spark_session)

in_memory_file_index = sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex(
    jvm_spark_session,
    paths,
    sc._jvm.PythonUtils.toScalaMap({}),
    sc._jvm.scala.Option.empty(),
    file_status_catche,  # or use noop_cache if you need to save memory
    sc._jvm.scala.Option.empty(),
    sc._jvm.scala.Option.empty()
)
glob_path = sc._jvm.org.apache.hadoop.fs.Path(os.path.join(base_path, glob_pattern))
glob_paths = sc._jvm.PythonUtils.toSeq([glob_path])
# SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
status_list = in_memory_file_index.listLeafFiles(glob_paths)
path_list = []
iter = status_list.iterator()
while iter.hasNext():
    path_status = iter.next()
    path_list.append(str(path_status.getPath().toUri().getRawPath()))

path_list.sort()

print(path_list)
Jeroen Vlek
  • 307
  • 4
  • 15
-1

You can use input_file_name with dataframe, it will give you absolute file-path per row.

Following code will give you all the file paths.

spark.read.table("zen.intent_master").select(input_file_name).distinct.collect

I am assuming. For your use case, you just want to read data from a set of files, with some regex, so then you can apply that in filter.

For example,

val df = spark.read.table("zen.intent_master").filter(input_file_name.rlike("your regex string"))
Kaushal
  • 3,237
  • 3
  • 29
  • 48
  • Oh for this particular question, it doesn't have or use a table. Is it possible to do this if there isn't a table? I just have access to the S3 folder – code Jan 06 '19 at 12:15
  • @codeshark you are correct but I don't have S3 setup write now, i have tested with HDFS, its working fine, i am assuming it will work with s3 too. – Kaushal Jan 06 '19 at 12:38
  • You can use this with s3 as well like: `val blah = spark.read.json("s3://blah-bucket/blah-folder/") val blahWithFilename = blah.withColumn("filename", input_file_name) blahWithFilename.show(false)` – Aditya Anand Krishna Oct 02 '19 at 09:34
  • That will load the whole data into DF. When you need only a list of files from the S3 bucket with recursive lookup what for downloading files itself? Data could be to heavy and processing might take days. – 0script0 Apr 15 '22 at 11:15