3

I am facing some issue while running a spark java program that reads a file, do some manipulation and then generates output file at a given path. Every thing works fine when master and slaves are on same machine .ie: in Standalone-cluster mode. But problem started when I deployed same program in multi machine multi node cluster set up. That means the master is running at x.x.x.102 and slave is running on x.x.x.104. Both the master -slave have shared their SSH keys and are reachable from each other.

Initially slave was not able to read input file , for that I came to know I need to call sc.addFile() before sc.textFile(). that solved issue. But now I see output is being generated on slave machine in a _temporary folder under the output path. ie: /tmp/emi/_temporary/0/task-xxxx/part-00000 In local cluster mode it works fine and generates output file in /tmp/emi/part-00000.

I came to know that i need to use SparkFiles.get(). but i am not able to understand how and where to use this method.

till now I am using

DataFrame dataobj = ...
 dataObj.javaRDD().coalesce(1).saveAsTextFile("file:/tmp/emi");

Can any one please let me know how to call SparkFiles.get()?

In short how can I tell slave to create output file in the machine where driver is running?

Please help.

Thanks a lot in advance.

zero323
  • 322,348
  • 103
  • 959
  • 935
summary
  • 125
  • 1
  • 14
  • In short , i want to know , in multi machine multi node cluster environment how to tell slaves to save the output file in driver machine ? – summary Jul 06 '16 at 12:46

1 Answers1

1

There is nothing unexpected here. Each worker writes its own part of the data separately. Using file scheme only means that data is writer to a file in the file system local from the worker perspective.

Regarding SparkFiles it is not applicable in this particular case. SparkFiles can be used to distribute common files to the worker machines not to deal with the results.

If for some reason you want to perform writes on the machine used to run driver code you'll have to fetch data to the driver machine first (either collect which requires enough memory to fit all data or toLocalIterator which collects partition at the time and requires multiple jobs) and use standard tools to write results to local file system. In general though writing to driver is not a good practice and most of the time is simply useless.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks zero . If i need to use standard methods to collect() and then get it in array and then save to file as simple java program , then won't i loose the motive to use spark ? I mean to say that i would defeat the advantage of using in memory calculation for which apache spark is known for . Is there any better way to do it ? Because the Output file would be huge in future. – summary Jul 06 '16 at 17:57
  • Because writing on driver it is a really bad idea and generally useless in practice. Same as `.coalese(1)`. You can use `toLocalIterator` instead of literal `collect` but it is much more expensive. – zero323 Jul 06 '16 at 18:00
  • Hi Zero , Now i am not using saveAsTextFile . But now it it giving outofMemory error GC overhead at collect (). List mrow = errors.javaRDD().coalesce(1).collect(); JavaRDD data = sc.parallelize(mrow,100); try{ print for each data row } I am not getting any proper document for parallize , except that it partitions into slices . Does that mean data is chunk of 100 recs of N rows each . Any pointers ? Thanks much . – summary Jul 07 '16 at 08:58
  • Below code gives GC error >> even after coalesce(50 ) List mrow = errors.javaRDD().coalesce(50).collect(); JavaRDD data = sc.parallelize(mrow,100); ----------------- WARN TaskSetManager: Lost task 0.1in stage 6.0 (TID 12, ): java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) – summary Jul 07 '16 at 11:38
  • 1
    Not `collect` - `toLocalIterator` And don't coalesce at all. If you still get an error __increase__ number of partitions. – zero323 Jul 07 '16 at 11:58
  • Thanks Zero , It worked finally . As per your suggestion i repartitioned => called toLocalIterator() to save it in a simple File writer . :-) Thanks U 0 !! – summary Jul 08 '16 at 11:04
  • Sure, could you accept the answer so it can be closed? – zero323 Jul 08 '16 at 11:06