3

I need to read a different file in every map() ,the file is in HDFS

  val rdd=sc.parallelize(1 to 10000)
  val rdd2=rdd.map{x=>
    val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration())
    val path=new Path("/user/zhc/"+x+"/")
    val t=hdfs.listStatus(path)
    val in =hdfs.open(t(0).getPath)
    val reader = new BufferedReader(new InputStreamReader(in))
    var l=reader.readLine()
  }
 rdd2.count

My problem is this code

val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://ITS-Hadoop10:9000/"), new org.apache.hadoop.conf.Configuration())

takes too much running time, every time of map() needs to create a new FileSystem value. Can i put this code outside map() function so it doesn't have to create hdfs every time? Or how can i read files quickly in map()?

My code runs on multiple machines. Thank you!

haochi zhang
  • 113
  • 2
  • 3
  • 6
  • Try moving `val hdfs` out of the map closure. – tuxdna May 09 '16 at 06:39
  • if the cardinality of the files is relatively small (10K is small) I wouldn't read it using BufferedReader but rather constructing files arborescence and read and unify RDDs along with what @tuxdna suggested. – eliasah May 09 '16 at 06:53
  • @tuxdna I've tried to put it outside the map closure, but it has error "Task not serializable,Caused by: java.io.NotSerializableException: org.apache.hadoop.hdfs.DistributedFileSystem" – haochi zhang May 09 '16 at 07:02
  • @eliasah The files are small, but I don't completely understand what you said ,are you suggesting load all the files i need into a RDD like Knows Not Much suggested? – haochi zhang May 09 '16 at 07:09
  • I don't understand what KnowsNotMuch suggest to be honest. – eliasah May 09 '16 at 07:13
  • @eliasah thanks! But what is "constructing files arborescence and read and unify RDDs"? Can you explain a little more specific? – haochi zhang May 09 '16 at 07:20
  • @haochizhang , just to understand well, you need to count the lines in each file from the files located under the same directory? – user1314742 May 09 '16 at 08:36
  • @user1314742 no, i don't need to count the lines, I need to read the file in the map closure, I write the code like that just to simplify the question. And the files are in the same directory. – haochi zhang May 09 '16 at 08:55

1 Answers1

4

In your case, I recommend the use of wholeTextFiles method wich will return pairRdd with the key is the file full path, and the value is the content of the file in string.

val filesPariRDD = sc.wholeTextFiles("hdfs://ITS-Hadoop10:9000/")
val filesLineCount = filesPariRDD.map( x => (x._1, x._2.length ) ) //this will return a map of fileName , number of lines of each file. You could apply any other function on the file contents
filesLineCount.collect() 

Edit

If your files are in directories which are under the same directory ( as mentioned in comments)you could use some kind of regular expression

val filesPariRDD = sc.wholeTextFiles("hdfs://ITS-Hadoop10:9000/*/")

Hope this is clear and helpful

Sachin
  • 3,424
  • 3
  • 21
  • 42
user1314742
  • 2,865
  • 3
  • 28
  • 34
  • Thank you for your help! That's what I need! But what if the files are in different directories, and those directories are in the same directory, is there a similar method to do that? – haochi zhang May 09 '16 at 10:27
  • @haochizhang have you tired with the solution for multiple directories? – user1314742 May 09 '16 at 13:24