3

I have some intermediate data that I need to be stored in HDFS and local as well. I'm using Spark 1.6. In HDFS as intermediate form I'm getting data in /output/testDummy/part-00000 and /output/testDummy/part-00001. I want to save these partitions in local using Java/Scala so that I could save them as /users/home/indexes/index.nt(by merging both in local) or /users/home/indexes/index-0000.nt and /home/indexes/index-0001.nt separately.

Here is my code: Note: testDummy is same as test, output is with two partitions. I want to store them separately or combined but local with index.nt file. I prefer to store separately in two data-nodes. I'm using cluster and submit spark job on YARN. I also added some comments, how many times and what data I'm getting. How could I do? Any help is appreciated.

 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
 println("testDummy done")   //1 time print

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
    println("Inside savesData")                                 //  now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
    println("iter size"+iterator.size)                           //  2 735 2 735 values
    val filenamesWithExtension = outputPath + "/index.nt"
    println("filenamesWithExtension "+filenamesWithExtension.length)   //4 times
    var list = List[(String)]()

    val fileWritter = new FileWriter(filenamesWithExtension,true)
    val bufferWritter = new BufferedWriter(fileWritter)

     while (iterator.hasNext){                       //iterator.hasNext is false
       println("inside iterator")                    //0 times 
       val dat = iterator.next()
       println("datadata "+iterator.next())

       bufferWritter.write(dat + "\n")
       bufferWritter.flush()
       println("index files written")

       val dataElements = dat.split(" ")
       println("dataElements")                                    //0
       list = list.::(dataElements(0))
       list = list.::(dataElements(1))
       list = list.::(dataElements(2))
     }
    bufferWritter.close() //closing
    println("savesData method end")                         //4 times when coal=2
    list.iterator
}

println("before saving data into local")                              //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions)                               //2
println("testRDD size "+test.collect().length)                                //0
println("after saving data into local")   //1

PS: I followed, this and this but not exactly same what I'm looking for, I did somehow but not getting anything in index.nt

Community
  • 1
  • 1
ChikuMiku
  • 509
  • 2
  • 11
  • 22
  • 2
    Scala has made the world a little bit better by making `list.::(dataElements(2))` equivalent to `dataElements(2) :: list` so don't make Scala sad and start using this syntax at least when the methods are operator-like. By the way, a `ListBuffer` would probably be more appropriate here – Dici Jun 27 '16 at 00:28

2 Answers2

7

A couple of things:

  • Never call Iterator.size if you plan to use data later. Iterators are TraversableOnce. The only way to compute Iterator size is to traverse all its element and after that there is no more data to be read.
  • Don't use transformations like mapPartitions for side effects. If you want to perform some type of IO use actions like foreach / foreachPartition. It is a bad practice and doesn't guarantee that given piece of code will be executed only once.
  • Local path inside action or transformations is a local path of particular worker. If you want to write directly on the client machine you should fetch data first with collect or toLocalIterator. It could be better though to write to distributed storage and fetch data later.
zero323
  • 322,348
  • 103
  • 959
  • 935
0

Java 7 provides means to watch directories.

https://docs.oracle.com/javase/tutorial/essential/io/notification.html

The idea is to create a watch service, register it with the directory of interest (mention the events of your interest, like file creation, deletion, etc.,), do watch, you will be notified of any events like creation, deletion, etc., you can take whatever action you want then.

You will have to depend on Java hdfs api heavily wherever applicable.

Run the program in background since it waits for events forever. (You can write logic to quit after you do whatever you want)

On the other hand, shell scripting will also help.

Be aware of coherency model of hdfs file system while reading files.

Hope this helps with some idea.

Marco99
  • 1,639
  • 1
  • 19
  • 32