20

I am new to spark, and I want to use group-by & reduce to find the following from CSV (one line by employed):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

I would like to simplify the about CSV with group by Department, Designation, State with additional columns with sum(costToCompany) and TotalEmployeeCount

Should get a result like:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

Is there any way to achieve this using transformations and actions. Or should we go for RDD operations?

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
mithra
  • 1,141
  • 2
  • 15
  • 27
  • 1
    could you please organize the CSV blocks (input and result) in order to separate clearly between the headers and each single line? It's not clear right now where a line starts or ends. – emecas Aug 18 '14 at 12:57
  • 1
    Check this for [way to do it with Spark 2.x +](https://stackoverflow.com/a/44889688/1592191) – mrsrinivas Jul 03 '17 at 15:55

4 Answers4

40

Procedure

  • Create a Class (Schema) to encapsulate your structure (it’s not required for the approach B, but it would make your code easier to read if you are using Java)

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
  • Loading CVS (JSON) file

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
    
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

At this point you have 2 approaches:

A. SparkSQL

  • Register a table (using the your defined Schema Class)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • Query the table with your desired Query-group-by

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • Here you would also be able to do any other query you desire, using a SQL approach

B. Spark

  • Mapping using a composite key: Department,Designation,State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    });

  • reduceByKey using the composite key, summing costToCompany column, and accumulating the number of records by key

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    
emecas
  • 1,586
  • 3
  • 27
  • 43
  • Take into account that B approach is also using Record class and the loading step. I'm not sure about what symbol the error is referring, can you include the full error trace?, it can also be due to your data input file, have you changed something in the input? – emecas Aug 21 '14 at 11:04
  • Hi @emecas thanks for the great answer. I am using your code and for me some reason table is empty with no schema. JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class); when I save table.saveAsTextFile() it prints [] in all the lines of part file. – Umesh K May 06 '15 at 17:12
  • Don't forget to fill the sections: (constructor), getters and setters on your schema class (Record) .. see @user449355 Q/A http://stackoverflow.com/a/30103554/833336 – emecas May 11 '15 at 17:08
25

CSV file can be parsed with Spark built-in CSV reader. It will return DataFrame/DataSet on the successful read of the file. On top of DataFrame/DataSet, you apply SQL-like operations easily.

Using Spark 2.x(and above) with Java

Create SparkSession object aka spark

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();

Create Schema for Row with StructType

import org.apache.spark.sql.types.StructType;

StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");

Create dataframe from CSV file and apply schema to it

Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("hdfs://path/input.csv");

more option on reading data from CSV file

Now we can aggregation on data in 2 ways

1. SQL way

Register a table in spark sql metastore to perform SQL operation

df.createOrReplaceTempView("employee");

Run SQL query on registered dataframe

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing

We can even execute SQL directly on CSV file with out creating table with Spark SQL


2. Object chaining or Programming or Java-like way

Do the necessary import for sql functions

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

Use groupBy and agg on dataframe/dataset to perform count and sum on data

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing

dependent libraries

"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
4

The following might not be entirely correct, but it should give you some idea of how to juggle data. It's not pretty, should be replaced with case classes etc, but as a quick example of how to use the spark api, I hope it's enough :)

val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
  .map(_.split(",") /*or use a proper CSV parser*/
  .map( Employee(row(0), row(1), row(2), row(3) )

# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))

val results = keyVals.reduceByKey{ a,b =>
    (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}

#debug output
results.take(100).foreach(println)

results
  .map( keyval => someThingToFormatAsCsvStringOrWhatever )
  .saveAsTextFile("hdfs://.../results")

Or you can use SparkSQL:

val sqlContext = new SQLContext(sparkContext)

# case classes can easily be registered as tables
employees.registerAsTable("employees")

val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
  from employees 
  group by dep,des,state"""
jkgeyti
  • 2,344
  • 18
  • 31
  • thanks for you swift response, i want a group by result, like for ex in mysql select Dept,designation,state,sum(costToCompany) from employeeTable group by Dept,Designation,state; not just for one dept like sales – mithra Aug 19 '14 at 05:12
  • Then simply skip the filter step. I've updated the code accordingly. The goal is to convert lines into key-value elements, where the key contains the identifier you want to group by, and the value contains the values you want to reduce. In this case, we group things by department,designation and state, and we want to sum up the count of employees, together with the cost, so those are the values. – jkgeyti Aug 19 '14 at 07:30
  • Thankyou Thanks a lot, i shall try it. You saved my day! – mithra Aug 19 '14 at 09:14
4

For JSON, if your text file contains one JSON object per line, you can use sqlContext.jsonFile(path) to let Spark SQL load it as a SchemaRDD (the schema will be automatically inferred). Then, you can register it as a table and query it with SQL. You can also manually load the text file as an RDD[String] containing one JSON object per record and use sqlContext.jsonRDD(rdd) to turn it as a SchemaRDD. jsonRDD is useful when you need to pre-process your data.

robx
  • 2,221
  • 1
  • 14
  • 31
yhuai
  • 410
  • 2
  • 6