50

I have a directory of directories on HDFS, and I want to iterate over the directories. Is there any easy way to do this with Spark using the SparkContext object?

maasg
  • 37,100
  • 11
  • 88
  • 115
Jon
  • 3,985
  • 7
  • 48
  • 80
  • you mean 'iterate' like get the list of sub-directories and files within? or getting all files across all subdirectories? – maasg Nov 19 '14 at 19:23
  • Iterate as in list all the sub-directories. Each subdirectory contains a bunch of text files that I want to process in different ways. – Jon Nov 19 '14 at 19:27

10 Answers10

58

You can use org.apache.hadoop.fs.FileSystem. Specifically, FileSystem.listFiles([path], true)

And with Spark...

FileSystem.get(sc.hadoopConfiguration).listFiles(..., true)

Edit

It's worth noting that good practice is to get the FileSystem that is associated with the Path's scheme.

path.getFileSystem(sc.hadoopConfiguration).listFiles(path, true)
drew moore
  • 31,565
  • 17
  • 75
  • 112
Mike Park
  • 10,845
  • 2
  • 34
  • 50
  • really nice! [I had this question](http://stackoverflow.com/questions/34738296/spark-spark-submit-jars-arguments-wants-comma-list-how-to-declare-a-directory/35550151#35550151), granted, I guess this wouldn't work in the original spark-submit call – JimLohse Feb 23 '16 at 13:46
  • How can I create a list of the files using the RemoteIterator this creates? – horatio1701d Jan 27 '18 at 13:58
45

Here's PySpark version if someone is interested:

    hadoop = sc._jvm.org.apache.hadoop

    fs = hadoop.fs.FileSystem
    conf = hadoop.conf.Configuration() 
    path = hadoop.fs.Path('/hivewarehouse/disc_mrt.db/unified_fact/')

    for f in fs.get(conf).listStatus(path):
        print(f.getPath(), f.getLen())

In this particular case I get list of all files that make up disc_mrt.unified_fact Hive table.

Other methods of FileStatus object, like getLen() to get file size are described here:

Class FileStatus

Tagar
  • 13,911
  • 6
  • 95
  • 110
21
import  org.apache.hadoop.fs.{FileSystem,Path}

FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path("hdfs:///tmp")).foreach( x => println(x.getPath ))

This worked for me.

Spark version 1.5.0-cdh5.5.2

ozw1z5rd
  • 3,034
  • 3
  • 32
  • 49
  • This worked fine for me, for a single folder. Is there some way to get this to run at the level of the parent folder, and get all files in all subfolders? That would be VERY helpful/useful for me. – ASH Jun 28 '19 at 15:03
2

this did the job for me

FileSystem.get(new URI("hdfs://HAservice:9000"), sc.hadoopConfiguration).listStatus( new Path("/tmp/")).foreach( x => println(x.getPath ))
Vincent Claes
  • 3,960
  • 3
  • 44
  • 62
2

@Tagar didn't say how to connect remote hdfs, but this answer did:

URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration())

status = fs.listStatus(Path('/some_dir/yet_another_one_dir/'))

for fileStatus in status:
    print(fileStatus.getPath())
Mithril
  • 12,947
  • 18
  • 102
  • 153
1

Scala FileSystem (Apache Hadoop Main 3.2.1 API)

    import org.apache.hadoop.fs.{FileSystem, Path}
    import scala.collection.mutable.ListBuffer

    
    val fileSystem : FileSystem = {
        val conf = new Configuration()
        conf.set( "fs.defaultFS", "hdfs://to_file_path" )
        FileSystem.get( conf )
    }
      
    val files = fileSystem.listFiles( new Path( path ), false )
    val filenames = ListBuffer[ String ]( )
    while ( files.hasNext ) filenames += files.next().getPath().toString()
    filenames.foreach(println(_))
Bryce
  • 3
  • 3
oetzi
  • 1,002
  • 10
  • 21
1

I had some issues with other answers(like 'JavaObject' object is not iterable), but this code works for me

fs = self.spark_contex._jvm.org.apache.hadoop.fs.FileSystem.get(spark_contex._jsc.hadoopConfiguration())
i = fs.listFiles(spark_contex._jvm.org.apache.hadoop.fs.Path(path), False)
while i.hasNext():
  f = i.next()
  print(f.getPath())
Hodza
  • 3,118
  • 26
  • 20
0

You can try with globStatus status as well

val listStatus = org.apache.hadoop.fs.FileSystem.get(new URI(url), sc.hadoopConfiguration).globStatus(new org.apache.hadoop.fs.Path(url))

      for (urlStatus <- listStatus) {
        println("urlStatus get Path:"+urlStatus.getPath())
}
Jaap
  • 81,064
  • 34
  • 182
  • 193
Nitin
  • 3,533
  • 2
  • 26
  • 36
0

You can use below code to iterate recursivly through a parent HDFS directory, storing only sub-directories up to a third level. This is useful, if you need to list all directories that are created due to the partitioning of the data (in below code three columns were used for partitioning):

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def rememberDirectories(fs: FileSystem, path: List[Path]): List[Path] = {
  val buff = new ListBuffer[LocatedFileStatus]()

  path.foreach(p => {
    val iter = fs.listLocatedStatus(p)
    while (iter.hasNext()) buff += iter.next()
  })

  buff.toList.filter(p => p.isDirectory).map(_.getPath)
}

@tailrec
def getRelevantDirs(fs: FileSystem, p: List[Path], counter: Int = 1): List[Path] = {
  val levelList = rememberDirectories(fs, p)
  if(counter == 3) levelList
  else getRelevantDirs(fs, levelList, counter + 1)
}
Michael Heil
  • 16,250
  • 3
  • 42
  • 77
0

All answers are good. Here is bonus including their sizes and file counts FileUtils.byteCountToDisplaySize will display their sizes in MB or KB i.e. human readable form

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession

object ListFilesRecursive {
  val spark = SparkSession.builder()
    .appName("ListFilesRecursive")
    .master("local[*]")
    .getOrCreate()
  def listFilesRecursive(path: Path): (Seq[Path], Long, Int) = {
    val fs = FileSystem.get(path.toUri, spark.sparkContext.hadoopConfiguration)
    val statuses = fs.listStatus(path)
    var subDirCount = 0
    var size = 0L
    var numFiles = 0
    for (status <- statuses) {
      if (status.isDirectory) {
        val (subFiles, subSize, subCount) = listFilesRecursive(status.getPath)
        numFiles += subCount
        size += subSize
        subDirCount += 1
        println(s"status.getPath ${status.getPath}\t subSize ${FileUtils.byteCountToDisplaySize(subSize)}\tsubCount $subCount")
      } else {
        val fileSize = status.getLen
        numFiles += 1
        size += fileSize
        println(s"${status.getPath}\t${FileUtils.byteCountToDisplaySize(fileSize)}\t1")
      }
    }
    (statuses.map(_.getPath).toSeq, size, numFiles + subDirCount)
  }

  def main(args: Array[String]): Unit = {


    val rootPath = new Path(args(0))
    val (allFiles, totalSize, totalNumFiles) = listFilesRecursive(rootPath)
    println(s"\nTotal size: ${FileUtils.byteCountToDisplaySize(totalSize)}")
    println(s"Total number of files: $totalNumFiles")

    spark.stop()
  }
}

Result: (argument I passed C:\Users\xyz\Downloads\scala-hive-project\user)

file:/C:/Users/xyz/Downloads/scala-hive-project/user/hive/warehouse/sample_db.db/sample_partitioned_table/age=30/part-00000-0d9bd9c2-4dc4-4638-90ec-80fefd621dc7-c000   17 bytes    1
file:/C:/Users/xyz/Downloads/scala-hive-project/user/hive/warehouse/sample_db.db/sample_partitioned_table/age=30/part-00001-0d9bd9c2-4dc4-4638-90ec-80fefd621dc7-c000   14 bytes    1
status.getPath file:/C:/Users/xyz/Downloads/scala-hive-project/user/hive/warehouse/sample_db.db/sample_partitioned_table/age=30  subSize 31 bytes   subCount 2
status.getPath file:/C:/Users/xyz/Downloads/scala-hive-project/user/hive/warehouse/sample_db.db/sample_partitioned_table     subSize 31 bytes   subCount 3
file:/C:/Users/xyz/Downloads/scala-hive-project/user/hive/warehouse/sample_db.db/sample_table/part-00000-ea67892e-f8ea-4b32-bf10-592652f12735-c000  16 bytes    1
file:/C:/Users/ramgh/Downloads/scala-hive-project/user/hive/warehouse/sample_db.db/sample_table/part-00001-ea67892e-f8ea-4b32-bf10-592652f12735-c000    13 bytes    1
status.getPath file:/C:/Users/xyz/Downloads/scala-hive-project/user/hive/warehouse/sample_db.db/sample_table     subSize 29 bytes   subCount 2
status.getPath file:/C:/Users/xyz/Downloads/scala-hive-project/user/hive/warehouse/sample_db.db  subSize 60 bytes   subCount 7
status.getPath file:/C:/Users/xyz/Downloads/scala-hive-project/user/hive/warehouse   subSize 60 bytes   subCount 8
status.getPath file:/C:/Users/xyz/Downloads/scala-hive-project/user/hive     subSize 60 bytes   subCount 9

Total size: 60 bytes
Total number of files: 10

Note : I used windows with hadoop api thats the reason printing window directories in the result. Sameway with out changing api it will work for hadoop since I used hadoop api for recursively printing

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121