0

Problem Statement: I am trying to write a Spark code in Scala which will load below mentioned two tiles (1. file with id and name 2. file with id and salary) from HDFS and join the same, and produce the (name.salary) values. And save the data in multiple tile group by salary (Means each file will have name of employees with same salary. File name has to include salary as well.

EmployeeName.csv 
E01,Lokesh 
E02,Bhupesh 
E03,Amit 
E04,Ratan 
E05,Dinesh 
E06,Pavan 
07,Tejas 
E08,Sheela 
E09,Kumar
E10,Venkat 

EmployeeSalary.csv 
E01,50000 
E02,50000 
E03,45000 
E04,45000 
E05,50000 
E06,45000 
E07,50000 
E08,10000 
E09,10000 
E10,10000 

I tried the below, but it is not running. Looks like RDD functions within another RDD is not working. How else can I work this out?

val employeename = sc.textFile("/user/cloudera/EmployeeName").map(x => (x.split(",")(0),x.split(",")(1)))

val employeesalary = sc.textFile("/user/cloudera/EmployeeSalary").map(s => (s.split(",")(0),s.split(",")(1)))

val join = employeename.join(employeesalary).map({case(id,(name,salary)) => (salary,name)})

val group = join.groupByKey().map({case(key, groupvalues) => {
(key,groupvalues.toList)
}}).sortByKey()`enter code here`

val rdd1 = group.map{case (k,v) => k->sc.parallelize(v)}

rdd1.foreach{case (k,rdd) => rdd.saveAsTextFile("user/cloudera/"+k)}
tuomastik
  • 4,559
  • 5
  • 36
  • 48
  • Correct, you can not have a rdd inside another rdd. What about using dataframes and `partitionby` to get different files? – Shaido Aug 15 '18 at 05:05

2 Answers2

0

it is relatively easy to get each file (partition) to contain the information of just one employee, however, Spark does not really let you control output file names (as explained here Change output filename prefix for DataFrame.write()

note that you can set partitions as part of the path (i.e. things like .../employee=Lokesh/salary=50000/part...txt) but then the data will not be part of the file

Arnon Rotem-Gal-Oz
  • 25,469
  • 3
  • 45
  • 68
0

*I have tried below code snippets.

Instead of RDD, use of Dataframe or Dataset would be great to perform operations.

I tried this below snippet which saves result in txt file. By default it will create folder with partitioned file. results can be viewd in file "part-0000"*

Below is code snippet:

`

val rddInput1 = sc.textFile("Path To Input CSV1").map { x => (x.split(",")(0) -> x.split(",")(1)) }

val rddInput2 = sc.textFile("Path to Input CSV2").map { x => (x.split(",")(0) -> x.split(",")(1)) }

// Join based on EMP_ID
val joinData = rddInput1.join(rddInput2).map(x => x._1 -> x._2._2)

// Reduce By Key to aggregate, Option user can directly use joinData directly
// Actual code which wraps result into RDD then write into txt file

joinData.reduceByKey((k1, k2) => k1 + k2).sortByKey().map(x => x._1 + "__" + x._2).collect().foreach(x => {

val collectionTxt = Array[String](x);

sc.parallelize(collectionTxt)
.repartition(1)
.saveAsTextFile("<Output Dir>" + x)
})

`

I hope it will help you to what you try to do...

vishal gawde
  • 158
  • 8