0

I am processing streaming events of different types and different schema in spark with scala and I need to parse them, and save them in a format that's easy to process in a generic way further.

I have a dataframe of events that looks like this:

val df = Seq(("{\"a\": 1, \"b\": 2, \"c\": 3 }", "One", "001") ,("{\"a\": 6, \"b\": 2, \"d\": 2, \"f\": 8 }", "Two", "089"), ("{\"a\": 3, \"b\": 4, \"c\": 6 }", "One", "123")).toDF("col1", "col2", "col3")

which is this:

+------------------------------------+--------+------+
|   body                             |   type |   id |
+------------------------------------+--------+------+
|{"a": 1, "b": 2, "c": 3 }           |   "One"|   001|
|{"a": 6, "d": 2, "f": 8, "g": 10}   |   "Two"|   089|
|{"a": 3, "b": 4, "c": 6 }           | "Three"|   123|
+------------------------------------+--------+------+

and I would like to turn it into this one. We can assume that all the type "One" will have the same schema, and all types of events will share some similar data such as the entry "a", which i would like to surface into its own column

+---+--------------------------------+--------+------+
| a |  data                          |   y    |   z  |
+---+--------------------------------+--------+------+
| 1 |{"b": 2, "c": 3 }               |   "One"|   001|
| 6 |{"d": 2, "f": 8, "g": 10}       |   "Two"|   089|
| 3 |{"b": 4, "c": 6 }               | "Three"|   123|
+------------------------------------+--------+------+
gmh
  • 33
  • 1
  • 4
  • Have a look here: https://stackoverflow.com/questions/52817898/extract-json-data-from-a-spark-dataframe. It is for Python but works pretty much the exact same way in Scala – Richard Nemeth Jan 25 '20 at 08:59
  • Do you have schema for data in another column? Then you can dynamically parse – Salim Jan 25 '20 at 13:08
  • Does this answer your question? [How to query JSON data column using Spark DataFrames?](https://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes) – user10938362 Jan 25 '20 at 13:09

2 Answers2

2

One way to achieve that is to handle the json data as a Map as shown below:

import org.apache.spark.sql.types.{MapType, StringType, IntegerType}
import org.apache.spark.sql.functions.{from_json, expr}

val df = Seq(
  ("{\"a\": 1, \"b\": 2, \"c\": 3 }", "One", "001") ,
  ("{\"a\": 6, \"b\": 2, \"d\": 2, \"f\": 8 }", "Two", "089"), 
  ("{\"a\": 3, \"b\": 4, \"c\": 6 }", "One", "123")
).toDF("body", "type", "id")

val mapSchema = MapType(StringType, IntegerType)

df.withColumn("map", from_json($"body", mapSchema))
  .withColumn("data_keys", expr("filter(map_keys(map), k -> k != 'a')"))
  .withColumn("data_values", expr("transform(data_keys, k -> element_at(map,k))"))
  .withColumn("data", expr("to_json(map_from_arrays(data_keys, data_values))"))
  .withColumn("a", $"map".getItem("a"))
  .select($"a", $"data", $"type".as("y"), $"id".as("z"))
  .show(false)

// +---+-------------------+---+---+
// |a  |data               |y  |z  |
// +---+-------------------+---+---+
// |1  |{"b":2,"c":3}      |One|001|
// |6  |{"b":2,"d":2,"f":8}|Two|089|
// |3  |{"b":4,"c":6}      |One|123|
// +---+-------------------+---+---+

Analysis

  1. withColumn("map", from_json($"body", mapSchema)) : first generate a Map from the given json data.
  2. withColumn("data_keys", expr("filter(map_keys(map), k -> k != 'a')")) : retrieve the keys of the new map by filtering out the keys not equal to a. We use the filter function here which returns an array i.e {"a": 1, "b": 2, "c": 3 } -> [b, c].
  3. withColumn("data_values", expr("transform(data_keys, k -> element_at(map,k))")) : populate the values of the new map using the previous keys in combination with transform.
  4. withColumn("data", expr("to_json(map_from_arrays(data_keys, data_values))")) : generate the map from data_keys and data_values using map_from_arrays. Finally, call to_json for converting the map back to json.
abiratsis
  • 7,051
  • 3
  • 28
  • 46
  • This was what i was looking for. I generalized a bit as follow: `val mapSchema = MapType(StringType, StringType) val ccol = Array("a") def funcgetMessage(commonCols: Array[String]) = udf((cmap: Map[String, String]) => { cmap.filter((t) => !(commonCols.contains(t._1))) } ) var dftt = df .withColumn("map", from_json($"body", mapSchema)) .withColumn("data_keys", funcgetMessage(ccol)($"map")) .withColumn("message", to_json($"data_keys")) .withColumn("a", $"map.a") .select($"a", $"message", $"type", $"id") display(dftt)` now, the common field can be extended – gmh Feb 01 '20 at 20:28
0

First you need to define the json schema as follows:

val schema = spark.read.json(df.select("col1").as[String]).schema

Then you can transform your column col1to json (1st line) and then just select which selements of the json you want to extract (2nd line):

df.select(from_json($"col1", schema).as("data"), $"col2", $"col3")
.select($"data.a", $"data", $"col2", $"col3")

Output:

+---+-------------+----+----+
|  a|         data|col2|col3|
+---+-------------+----+----+
|  1|  [1, 2, 3,,]| One| 001|
|  6|[6, 2,, 2, 8]| Two| 089|
|  3|  [3, 4, 6,,]| One| 123|
+---+-------------+----+----+

I know it's not exactly the same as you want, but it will give you a clue.

Other option if you want to deconstruct completely your json you can use data.*

    df.select(from_json($"col1", schema).as("data"), $"col2", $"col3").select($"data.*", $"col2", $"col3")

+---+---+----+----+----+----+----+
|  a|  b|   c|   d|   f|col2|col3|
+---+---+----+----+----+----+----+
|  1|  2|   3|null|null| One| 001|
|  6|  2|null|   2|   8| Two| 089|
|  3|  4|   6|null|null| One| 123|
+---+---+----+----+----+----+----+
SCouto
  • 7,808
  • 5
  • 32
  • 49