0

I've requirement to parse the JSON data as shown in the expected results below, currently i'm not getting how to include the signals name(ABS, ADA, ADW) in Signal column. Any help would be much appreciated.

I tried something which gives the results as shown below, but i will need to include all the signals in SIGNAL column as well which is shown in the expected results.

jsonDF.select(explode($"ABS") as "element").withColumn("stime", col("element.E")).withColumn("can_value", col("element.V")).drop(col("element")).show()

+-------------+--------- --+
|        stime|can_value   |
+-------------+---------   +
|value of E   |value of V  |
+-------------+----------- +

df.printSchema

 -- ABS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADW: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ALT: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)
 |-- APP: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)

I will need output like below:

-----------------+-------------+---------+
|SIGNAL        |stime            |can_value|
+-----------------+-------------+---------+
|ABS           |value of E   |value of V  |
|ADA           |value of E   |value of V  |
|ADW           |value of E   |value of V  |
+-----------------+-------------+---------+
Anil Kumar
  • 525
  • 6
  • 27

1 Answers1

1

To get the expected output, and to insert values in Signal column:

jsonDF.select(explode($"ABS") as "element")
    .withColumn("stime", col("element.E"))
    .withColumn("can_value", col("element.V"))
    .drop(col("element"))
    .withColumn("SIGNAL",lit("ABS"))
    .show()

And the generalized version of the above approach:

(Based on the result of df.printSchema assuming that, you have signal values as column names, and those columns contain array having elements of the form struct(E,V))

val columns:Array[String] = df.columns

var arrayOfDFs:Array[DataFrame] = Array()

for(col_name <- columns){

  val temp = df.selectExpr("explode("+col_name+") as element")
    .select(
      lit(col_name).as("SIGNAL"),
      col("element.E").as("stime"),
      col("element.V").as("can_value"))

  arrayOfDFs = arrayOfDFs :+ temp
}

val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)
  • Hello @Arati, Thanks for your help, i'm getting there but currently stuck with one issue, as per the df.printSchema the final jsonDF is repeatedly printing only the last signal name(APP) and the last E, V value of the signal (APP). Is there a way to make sure jsonDF to hold all the signal names and it's corresponding values as well before it comes out of for loop. Any help would be much appreciated. – Anil Kumar Jul 28 '19 at 03:54
  • @Anil, I got the issue and have updated the answer, please give it a try. Due to '.withColumn("SIGNAL",when(col("SIGNAL").isNotNull, col("SIGNAL")).otherwise(lit(col_name)))' on jsonDF in previous answer, it was copying the last signal name for all the rows. – Arati Nagmal Jul 29 '19 at 06:38
  • Thanks for your time, I've accepted your answer Need help on the below request: my df.printSchema looks like below root |-- APP: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- E: long (nullable = true) | | |-- V: double (nullable = true) |-- B1X: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- E: long (nullable = true) | | |-- V: long (nullable = true) |-- VIN: string (nullable = true) in my final jsonDF i will need to include VIN as well (SIGNAL, STIME, CAN_VALUE, VIN) – Anil Kumar Jul 29 '19 at 09:19
  • To add more columns in final jsonDF, you will have to select them while creating 'temp' dataframe in for loop like below: 'df.selectExpr("explode("+col_name+") as element, VIN").select(.,., col("VIN"))' – Arati Nagmal Jul 29 '19 at 10:05
  • Hello @Arati, since VIN is not a map/array type i cannot use it in explode function. In the code for(col_name <- columns){....} i will need to remove VIN as explode will throw the following error: org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`VIN`)' due to data type mismatch: input to function explode should be array or map type, not string; But i will need this VIN on my final jsonDF as (SIGNAL, stime, can_value, VIN). Any help would be much appreciated. – Anil Kumar Jul 29 '19 at 11:08
  • You have to use 'VIN' outside the explode() in .selectExpr() function. – Arati Nagmal Jul 29 '19 at 12:06
  • Hello @Arati, Thank you so much, many thanks for your time and efforts:) Everything worked fine. – Anil Kumar Jul 29 '19 at 12:28
  • No problem @Anil! – Arati Nagmal Jul 30 '19 at 09:09