1

I have a CSV with below Data :

dept|emp_json|location
finance|{ "employee":[{"name":{"firstName":"John","lasteName":"Doe"},"address":{"street":"1234 West Broad St","unit":"8505","city":"Columbus"}},{"name":{"firstName":"Alex","lasteName":"Messi"},"address":{"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"}}]}|OH


I am able to read the file and create a Dataset and extract the Json column :

Dataset<Row> empDetlsDS = sparkSession.read().option("header", "true").option(delimiter, "|").schema(mySchema).csv(inputCSVPath);
Dataset<Row> empDetlsJsonDocDS = empDetlsDS.select(emp_json);

I want to flatten the JSON and create an output Dataset with number of rows present in the employee array in the below format:

dept    |emp_name   |emp_address              |emp_city|location  |
---------------------------------------------------------------
finance |John Doe   |1234 West Broad St 8505  |Columbus|OH        |
finance |Alex Messi |4321 North Meecham Rd 300|Salinas |OH        |
-------------------------------------------------------------------


If anybody has any suggestion using Java and Spark please help. Thanks in advance.

tkkman
  • 21
  • 4
  • 1
    How many rows of data are there? If it's just one row, then after reading in the csv you can select the json column convert to rdd and feed it to spark.read.json then add a couple of columns with literals for Dept and location. If there are many then you'll probably need to use a key/value rdd – sramalingam24 Jan 30 '19 at 05:09
  • 1
    Hi @sramalingam24 thank you for responding. I have 1million records to process and i want to do it in Java and spark. – tkkman Jan 30 '19 at 15:26
  • I am not a java person but I am assuming it should be easy translation from scala, here are some examples in scala that you can take a look at https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html – sramalingam24 Jan 30 '19 at 20:35

4 Answers4

1

@tkkman Here is the scala way I was talking about. The rdd way has been deprecated and DataSet way is recommended now, so should be straightforward in Java

import spark.implicits._
import org.apache.spark.sql.functions._

val df = spark.read.option("delimiter","|").option("header","true").csv("/FileStore/tables/test.txt")

val jdf = spark.read.json(df.select("emp_json").rdd.map(_.toString)).select(explode($"employee").alias("emp"))
.select($"emp.name.firstName",$"emp.name.lasteName",$"emp.address.street",$"emp.address.unit",$"emp.address.city")

jdf.printSchema

jdf.withColumn("dept", lit("finance")).withColumn("city",lit("OH")).show(false)

+---------+---------+---------------------+----+----+-------+
|firstName|lasteName|street               |unit|city|dept   |
+---------+---------+---------------------+----+----+-------+
|John     |Doe      |1234 West Broad St   |8505|OH  |finance|
|Alex     |Messi    |4321 North Meecham Rd|300 |OH  |finance|
+---------+---------+---------------------+----+----+-------+
sramalingam24
  • 1,297
  • 1
  • 14
  • 19
0

In Java, you can do it this way:

package net.jgp.books.sparkInAction.ch12.lab950CsvWithEmbdeddedJson;

import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.lit;

import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 * Ingesting a CSV with embedded JSON.
 * 
 * @author jgp
 */
public class CsvWithEmbdeddedJsonApp implements Serializable {
  private static final long serialVersionUID = 19711L;

  /**
   * Turns a Row into JSON. NOt very fail safe, but done to illustrate.
   * 
   * @author jgp
   */
  private final class Jsonifier
      implements MapFunction<Row, String> {
    private static final long serialVersionUID = 19712L;

    @Override
    public String call(Row r) throws Exception {
      StringBuffer sb = new StringBuffer();
      sb.append("{ \"dept\": \"");
      sb.append(r.getString(0));
      sb.append("\",");
      String s = r.getString(1).toString();
      if (s != null) {
        s = s.trim();
        if (s.charAt(0) == '{') {
          s = s.substring(1, s.length() - 1);
        }
      }
      sb.append(s);
      sb.append(", \"location\": \"");
      sb.append(r.getString(2));
      sb.append("\"}");
      return sb.toString();
    }
  }

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    CsvWithEmbdeddedJsonApp app = new CsvWithEmbdeddedJsonApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Processing of invoices")
        .master("local[*]")
        .getOrCreate();

    Dataset<Row> df = spark
        .read()
        .option("header", "true")
        .option("delimiter", "|")
        .csv("data/misc/csv_with_embedded_json.csv");
    df.show(5, false);
    df.printSchema();

    Dataset<String> ds = df.map(
        new Jsonifier(),
        Encoders.STRING());
    ds.show(5, false);
    ds.printSchema();

    Dataset<Row> dfJson = spark.read().json(ds);
    dfJson.show(5, false);
    dfJson.printSchema();

    dfJson = dfJson
        .withColumn("emp", explode(dfJson.col("employee")))
        .drop("employee");
    dfJson.show(5, false);
    dfJson.printSchema();

    dfJson = dfJson
        .withColumn("emp_name",
            concat(
                dfJson.col("emp.name.firstName"),
                lit(" "),
                dfJson.col("emp.name.lasteName")))
        .withColumn("emp_address",
            concat(dfJson.col("emp.address.street"),
                lit(" "),
                dfJson.col("emp.address.unit")))
        .withColumn("emp_city", dfJson.col("emp.address.city"))
        .drop("emp");
    dfJson.show(5, false);
    dfJson.printSchema();
  }
}

As usual, Java is pretty verbose :) - I don't complain. I left a lot of printSchema() and show() to illustrate the construction process. The Jsonifierclass can be implemented in a really nicer and more generic way, but it gives an idea (you can also do it as a lambda if you want).

The output is:

+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|dept   |emp_json                                                                                                                                                                                                                                                              |location|
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|finance|{ "employee":[{"name":{"firstName":"John","lasteName":"Doe"},"address":{"street":"1234 West Broad St","unit":"8505","city":"Columbus"}},{"name":{"firstName":"Alex","lasteName":"Messi"},"address":{"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"}}]}|OH      |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+

root
 |-- dept: string (nullable = true)
 |-- emp_json: string (nullable = true)
 |-- location: string (nullable = true)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{ "dept": "finance", "employee":[{"name":{"firstName":"John","lasteName":"Doe"},"address":{"street":"1234 West Broad St","unit":"8505","city":"Columbus"}},{"name":{"firstName":"Alex","lasteName":"Messi"},"address":{"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"}}], "location": "OH"}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

root
 |-- value: string (nullable = true)

+-------+-------------------------------------------------------------------------------------------------------------+--------+
|dept   |employee                                                                                                     |location|
+-------+-------------------------------------------------------------------------------------------------------------+--------+
|finance|[[[Columbus, 1234 West Broad St, 8505], [John, Doe]], [[Salinas, 4321 North Meecham Rd, 300], [Alex, Messi]]]|OH      |
+-------+-------------------------------------------------------------------------------------------------------------+--------+

root
 |-- dept: string (nullable = true)
 |-- employee: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: struct (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- street: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |-- name: struct (nullable = true)
 |    |    |    |-- firstName: string (nullable = true)
 |    |    |    |-- lasteName: string (nullable = true)
 |-- location: string (nullable = true)

+-------+--------+------------------------------------------------------+
|dept   |location|emp                                                   |
+-------+--------+------------------------------------------------------+
|finance|OH      |[[Columbus, 1234 West Broad St, 8505], [John, Doe]]   |
|finance|OH      |[[Salinas, 4321 North Meecham Rd, 300], [Alex, Messi]]|
+-------+--------+------------------------------------------------------+

root
 |-- dept: string (nullable = true)
 |-- location: string (nullable = true)
 |-- emp: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- unit: string (nullable = true)
 |    |-- name: struct (nullable = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- lasteName: string (nullable = true)

+-------+--------+----------+-------------------------+--------+
|dept   |location|emp_name  |emp_address              |emp_city|
+-------+--------+----------+-------------------------+--------+
|finance|OH      |John Doe  |1234 West Broad St 8505  |Columbus|
|finance|OH      |Alex Messi|4321 North Meecham Rd 300|Salinas |
+-------+--------+----------+-------------------------+--------+

root
 |-- dept: string (nullable = true)
 |-- location: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- emp_address: string (nullable = true)
 |-- emp_city: string (nullable = true)
jgp
  • 2,069
  • 1
  • 21
  • 40
  • Thank you for helping out. I used flatmap function to pass the row and expand the json column to multiple column as output and then multiple row from one row depending on some criteria. – tkkman Feb 27 '19 at 02:56
-1

If you have a proper json as per schema,then you can use the explode method and then select columns you want using dot operator. (eg : emp_json.name,emp_json.address...)

Sample code

val flatJSON = df.select($"dept", explode($"emp_json").as("emp))

flatJSON.select("dept", "emp.name","emp.address")

-1

Check this out:

scala> val df = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("/tmp/stack/tkkman.csv")
df: org.apache.spark.sql.DataFrame = [dept: string, emp_json: string ... 1 more field]

scala> df.show(false)
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|dept   |emp_json                                                                                                                                                                                                                                                              |location|
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|finance|{ "employee":[{"name":{"firstName":"John","lasteName":"Doe"},"address":{"street":"1234 West Broad St","unit":"8505","city":"Columbus"}},{"name":{"firstName":"Alex","lasteName":"Messi"},"address":{"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"}}]}|OH      |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+


scala> df.printSchema
root
 |-- dept: string (nullable = true)
 |-- emp_json: string (nullable = true)
 |-- location: string (nullable = true)

scala> val jsonstr = """{ "employee":[{"name":{"firstName":"John","lasteName":"Doe"},"address":{"street":"1234 West Broad St","unit":"8505","city":"Columbus"}},{"name":{"firstName":"Alex","lasteName":"Messi"},"address":{"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"}}]}"""
jsonstr: String = { "employee":[{"name":{"firstName":"John","lasteName":"Doe"},"address":{"street":"1234 West Broad St","unit":"8505","city":"Columbus"}},{"name":{"firstName":"Alex","lasteName":"Messi"},"address":{"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"}}]}

scala> val dfj = spark.read.json(Seq(jsonstr).toDS)
dfj: org.apache.spark.sql.DataFrame = [employee: array<struct<address:struct<city:string,street:string,unit:string>,name:struct<firstName:string,lasteName:string>>>]

scala> dfj.show(false)
+-------------------------------------------------------------------------------------------------------------+
|employee                                                                                                     |
+-------------------------------------------------------------------------------------------------------------+
|[[[Columbus, 1234 West Broad St, 8505], [John, Doe]], [[Salinas, 4321 North Meecham Rd, 300], [Alex, Messi]]]|
+-------------------------------------------------------------------------------------------------------------+


scala> dfj.schema
res51: org.apache.spark.sql.types.StructType = StructType(StructField(employee,ArrayType(StructType(StructField(address,StructType(StructField(city,StringType,true), StructField(street,StringType,true), StructField(unit,StringType,true)),true), StructField(name,StructType(StructField(firstName,StringType,true), StructField(lasteName,StringType,true)),true)),true),true))


scala> val sch_emp = dfj.schema
sch_emp: org.apache.spark.sql.types.StructType = StructType(StructField(employee,ArrayType(StructType(StructField(address,StructType(StructField(city,StringType,true), StructField(street,StringType,true), StructField(unit,StringType,true)),true), StructField(name,StructType(StructField(firstName,StringType,true), StructField(lasteName,StringType,true)),true)),true),true))

scala> val df2 = df.select(col("*"),from_json('emp_json,sch_emp).as("emp"))
df2: org.apache.spark.sql.DataFrame = [dept: string, emp_json: string ... 2 more fields]

scala> df2.select(explode($"emp.employee")).printSchema
root
 |-- col: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- unit: string (nullable = true)
 |    |-- name: struct (nullable = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- lasteName: string (nullable = true)


scala> df2.select(col("*"),explode($"emp.employee").as("emp2")).select('dept,concat($"emp2.name.firstName",lit(" "),$"emp2.name.lasteName").as("emp_name"),$"emp2.address.street" as "emp_address", $"emp2.address.city" as "emp_city", 'location).show(false)
+-------+----------+---------------------+--------+--------+
|dept   |emp_name  |emp_address          |emp_city|location|
+-------+----------+---------------------+--------+--------+
|finance|John Doe  |1234 West Broad St   |Columbus|OH      |
|finance|Alex Messi|4321 North Meecham Rd|Salinas |OH      |
+-------+----------+---------------------+--------+--------+


scala>
stack0114106
  • 8,534
  • 3
  • 13
  • 38