4

I'm a bit confused to find the right way to save data into HDFS after processing them with spark.

This is what I'm trying to do. I'm calculating min, max and SD of numeric fields. My input files have millions of rows, but output will have only around 15-20 fields. So, the output is a single value(scalar) for each field.

For example: I will load all the rows of FIELD1 into an RDD, and at the end, I will get 3 single values for FIELD 1(MIN, MAX, SD). I concatenated these three values into temporary string. In the end, I will have 15 to twenty rows, containing 4 columns in this following format

FIELD_NAME_1  MIN  MAX  SD
FIELD_NAME_2  MIN  MAX  SD

This is a snippet of the code:

//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))

val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev

So, i have 3 variables, min_value, max_value and SD that I want to store back to hdfs.

Question 1: Since the output will be rather small, do I just save it locally on the server? or should I dump it to HDFS. Seems to me like dumping the file locally makes better sense.

Question 2: In spark, I can just call the following to save an RDD into text file

some_RDD.saveAsTextFile("hdfs://namenode/path")

How do I accomplish the same thing in for a String variable that is not an RDD in scala? should I parallelize my result into an RDD first and then call saveAsTextFile?

user2773013
  • 3,102
  • 8
  • 38
  • 58

2 Answers2

12

To save locally just do

some_RDD.collect()

Then save the resulting array with something like from this question. And yes if the data set is small, and can easily fit in memory you should collect and bring it to the driver of the program. Another option if the data is a little to large to store in memory is just some_RDD.coalesce(numParitionsToStoreOn). Keep in mind coalesce also takes a boolean shuffle, if you are doing calculations on the data before coalescing, you should set this to true to get more parallelism on the calculations. Coalesce will reduce the number of nodes that store data when you call some_RDD.saveAsTextFile("hdfs://namenode/path"). If the file is very small but you need it on hdfs, call repartition(1), which is the same as coalesce(1,true), this will ensure that your data is only saved on one node.

UPDATE: So if all you want to do is save three values in HDFS you can do this. sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")

Basically you are just putting the 3 vars in a tuple, wrap that in a List and set the parallelism to one since the data is very small

Community
  • 1
  • 1
aaronman
  • 18,343
  • 7
  • 63
  • 78
  • Sorry i wasn't clear enough. My end result is not in RDD format. My final result is a single unit value(scalar). So, I go through the RDD, calculate quartiles for each field, and store each scalar value in a temporary string. So, I can't really use saveAsTextFile – user2773013 Jun 30 '14 at 19:42
  • @user2773013 are you sure you don't mean just one value, unit type in scala means void, even if you only have one value you can still use collect – aaronman Jun 30 '14 at 19:43
  • @user2773013 if your end result is not an RDD, how did you get to that point, your question is pretty un detailed. I updated to show how you could save to hdfs but only on one node – aaronman Jun 30 '14 at 20:04
  • thx a ton aaronman. Sorry about being unclear. I have updated the question. Hoepfully it's a bit clearer. – user2773013 Jun 30 '14 at 21:52
  • @user2773013 check out the update section I think it does exactly what you want – aaronman Jun 30 '14 at 23:23
5

Answer 1: Since you just need several scalar, I'd like to say storing them in local file system. You can first do val localValue = rdd.collect(), which will collect all data from workers to master. And then you call java.io to write things to disk.

Answer 2: You can do sc.parallelize(yourString).saveAsTextFile("hdfs://host/yourFile"). The will write things to part-000*. If you want to have all things in one file, hdfs dfs -getmerge is here to help you.

Chong Tang
  • 2,066
  • 15
  • 12
  • sc.parallelize(yourString) will not work as parallelize method except list not string, if you know any way to pass string to this, please reply – Kalpesh May 24 '16 at 14:34