11

Inside the given directory I have many different folders and inside each folder I have Hadoop files (part_001, etc.).

directory
   -> folder1
      -> part_001...
      -> part_002...
   -> folder2
      -> part_001...
   ...

Given the directory, how can I recursively read the content of all folders inside this directory and load this content into a single RDD in Spark using Scala?

I found this, but it does not recursively enters into sub-folders (I am using import org.apache.hadoop.mapreduce.lib.input):

  var job: Job = null
  try {
    job = Job.getInstance()
    FileInputFormat.setInputPaths(job, new Path("s3n://" + bucketNameData + "/" + directoryS3))
    FileInputFormat.setInputDirRecursive(job, true)
  } catch {
    case ioe: IOException => ioe.printStackTrace(); System.exit(1);
  }
  val sourceData = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).values

I also found this web-page that uses SequenceFile, but again I don't understand how to apply it to my case?

user7379562
  • 349
  • 2
  • 6
  • 15
  • have you tried with a simple wildcard? If the directory structure is consistent, it should work like a charm – Chobeat Feb 03 '17 at 14:35
  • see http://stackoverflow.com/a/27843858/647053 – Ram Ghadiyaram Feb 03 '17 at 15:00
  • @Chobeat: Do you mean that the answer of dbustosp (`var rdd = sc.textFile("path/*/*")`) will directly do what I explained without the need to write all the code that I posted? – user7379562 Feb 03 '17 at 17:53
  • Remember that if you are writing a Java program for doing that, you will need to instance SparkContext (sc) by yourself. In the spark-shell it automatically instance. A good practice is to use spark-shell to test the code and make sure the code is doing what you expect. – dbustosp Feb 03 '17 at 19:08

3 Answers3

19

If you are using Spark, you can do this using wilcards as follow:

scala>sc.textFile("path/*/*")

sc is the SparkContext which if you are using spark-shell is initialized by default or if you are creating your own program should will have to instance a SparkContext by yourself.

Be careful with the following flag:

scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive") 
> res6: String = null

Yo should set this flag to true:

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
tourist
  • 4,165
  • 6
  • 25
  • 47
dbustosp
  • 4,208
  • 25
  • 46
  • So, do you mean that I can simply do this? `val myRDD = sc.textFile("path/*/*")`? No need to use `setInputDirRecursive`? Will I get RDD of String? (I need RDD of String) – user7379562 Feb 03 '17 at 17:42
  • Yes, exactly right. This will be loaded as String by default and if you use wildcards you will not use that flag. – dbustosp Feb 03 '17 at 19:04
  • Ok, one more thing that I misunderstand: if the data in Hadoop files has JSON format, I will get RDD of JSON strings after doing `sc.textFile(...)`, right? Then, to convert it into DataFrame will this approach work?: `val rddFromHadoop = sc.textFile("path/*/*") import sqlContext.implicits._ var df = rddFromHadoop.toDF()`. Or should I parse `rddFromHadoop` to`RDD[Map[String,String]]` before applying `toDF()`? Sorry, for this additional question. It's just important for me to understand that my overall approach will work if I use `textFile` and read hadoopish files into RDD. – user7379562 Feb 03 '17 at 20:36
  • That's a different use case. You should use SqlContext instead, this class contains support for reading json files. Take a look at: https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html – dbustosp Feb 03 '17 at 20:50
  • Ah, wait, do you mean this?: `var df = sqlContext.read.json("path/*/*")`, assuming that I have hadoopish data inside sub-folders, but these hadoopish data was saved from RDD of JSON strings (`rdd.saveAsTextFile(filePath)`). If so, that's actually great. – user7379562 Feb 03 '17 at 20:58
  • Yup. Also see: https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SQLContext.html#jsonFile%28java.lang.String%29 – dbustosp Feb 03 '17 at 21:03
  • `sc._jsc.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true")` works in pyspark! – Colin Wang Oct 22 '19 at 14:03
5

I have found that the parameters must be set in this way:

.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
Paul
  • 7,155
  • 8
  • 41
  • 40
1
connector_output=${basepath}/output/connector/*/*/*/*/* 

works for me when I've dir structure like -

${basepath}/output/connector/2019/01/23/23/output*.dat

I didn't have to set any other properties, just used following -

sparkSession.read().format("csv").schema(schema)
                    .option("delimiter", "|")
                    .load("/user/user1/output/connector/*/*/*/*/*");
slfan
  • 8,950
  • 115
  • 65
  • 78
bhushan
  • 55
  • 9