0

First time Spark user. I created RDDs for two csv files (employees & dept). I would like to provide an output that counts the number of employees by department ID as well as identify the top two department names with the most employee IDs. "deptno" is my primary key, but I don't know how to join the two files together.

The employee file contains the following columns: [empno, ename, job, mgr, hiredate, sal, comm, deptno]

The dept file contains the following columns: [deptno, dname, location]

Here is what I've done so far:

`employees_rdd = sc.textFile("/FileStore/tables/Employee.csv")
employees_rdd.take(3)
header_e = employees_rdd.first()
employees1 = employees_rdd.filter(lambda row : row != header_e)
employees1.take(1)`

`dept_rdd = sc.textFile("/FileStore/tables/Dept.csv")
dept_rdd.take(3)
header_d = dept_rdd.first()
dept1 = dept_rdd.filter(lambda row : row != header_d)
dept1.take(1)`

`employees2 = employees1.map(lambda row : row.split(","))
employees_kv = employees2.map(lambda row : (row[7],1))
employees_kv.take(3)`

Receiving a syntax error on below:

employees_kv.reduceByKey(lambda x,y : x+y).takeOrdered(2,lambda (x,y): -1*y)

Any assistance is greatly appreciated.

Community
  • 1
  • 1
Z-Millionaire
  • 17
  • 1
  • 5
  • if you use dataframe then it will be much easier. See https://stackoverflow.com/questions/28782940/load-csv-file-with-spark – maogautam Oct 04 '19 at 19:54

2 Answers2

0

I would strongly recommend using Dataframes/DataSets. Not just because they are easier to use and manipulate but also they provide serious performance improvements. Even the spark document recommends the same.

Here goes your dataframe code :-

val spark = SparkSession.builder.master("local[*]").getOrCreate

This creates a SparkSession which is an entry point to the application.

Now, lets read your employee and department files.

val employeeDF = spark.read.format("csv").option("header","true").load("/path/to/employee/file")
val deptDF = spark.read.format("csv").option("header","true").load("/path/to/dept/file")

Now, joining is pretty easy. Below statement will create a dataframe which will be the result of inner join between employeeDF and deptDF on column deptno

val joinedDF = employeeDF.join(deptDF,Seq("deptno"))

Now, you can use the joinedDF to get your results.

val countByDept = joinedDF.groupBy($"deptno").count
//countByDept.show to display the results

val top2Dept = joinedDF.groupBy($"dname").count.orderBy($"count".desc).limit(2)
//top2Dept.show to display the results

Hope this gets you started on your journey with Spark DataFrames and DataSets.

Vihit Shah
  • 314
  • 1
  • 5
  • Thank you for the assistance. I'm using Python language so I've converted what you've recommended. I am running into an issue with the join. I've converted `val joinedDF = employeeDF.join(deptDF,Seq("deptno"))` to `join_df = dept_df.join(employee_df.select("deptno"), "deptno")` I am getting the following message: AnalysisException: "cannot resolve '`deptno`' given input columns: [empno,ename,job,mgr,hiredate,sal,comm,deptno];;\n'Project ['deptno]\n+- Relation[empno,ename,job,mgr,hiredate,sal,comm,deptno#120] csv\n". Is this due to the schema type being a string vs int? – Z-Millionaire Oct 05 '19 at 21:52
  • I also tried this slightly different approach: `joined_df = dept_df.join(employee_df, dept_df.deptno == employee_df.deptno)` but received the following message: AttributeError: 'DataFrame' object has no attribute 'deptno' – Z-Millionaire Oct 05 '19 at 21:59
0

Here is my pyspark code to do this. I have assumed the read statements with delimiter "|".

from pyspark.sql.functions import *
from pyspark.sql.types import *

emp = spark.read.option("header","true") \
                .option("inferSchema","true") \
                .option("sep","|") \
                .csv("/FileStore/tables/employee.txt")

dept = spark.read.option("header","true") \
                 .option("inferSchema","true") \
                 .option("sep","|") \
                 .option("removeQuotes","true") \
                 .csv("/FileStore/tables/department.txt")

# Employee count by department
empCountByDept = emp.groupBy("deptno") \
                       .agg(count("empno").alias("no_of_employees"))

empCountByDept.show(20,False)

# Top two department names with the most employees 
topTwoDept = empCountByDept.join(dept, empCountByDept.deptno == dept.deptno, "inner") \
                           .orderBy(empCountByDept.no_of_employees.desc()).drop(dept.deptno) \
                           .select("dname","no_of_employees") \
                           .limit(2)
topTwoDept.show(20,False)

Result ::

+------+---------------+
|deptno|no_of_employees|
+------+---------------+
|20    |5              |
|10    |3              |
|30    |6              |
+------+---------------+
+----------+---------------+
|dname     |no_of_employees|
+----------+---------------+
|'Sales'   |6              |
|'Research'|5              |
+----------+---------------+
Manish
  • 1,144
  • 8
  • 12