2

I'm reading a Hive table which has two columns, id and jsonString. I can easily transform the jsonString into a Spark Data Structure calling the spark.read.json function, but I have to add the column id as well.

val jsonStr1 = """{"fruits":[{"fruit":"banana"},{"fruid":"apple"},{"fruit":"pera"}],"bar":{"foo":"[\"daniel\",\"pedro\",\"thing\"]"},"daniel":"daniel data random","cars":["montana","bagulho"]}"""
val jsonStr2 = """{"fruits":[{"dt":"banana"},{"fruid":"apple"},{"fruit":"pera"}],"bar":{"foo":"[\"daniel\",\"pedro\",\"thing\"]"},"daniel":"daniel data random","cars":["montana","bagulho"]}"""
val jsonStr3 = """{"fruits":[{"a":"banana"},{"fruid":"apple"},{"fruit":"pera"}],"bar":{"foo":"[\"daniel\",\"pedro\",\"thing\"]"},"daniel":"daniel data random","cars":["montana","bagulho"]}"""


case class Foo(id: Integer, json: String)

val ds = Seq(new Foo(1,jsonStr1), new Foo(2,jsonStr2), new Foo(3,jsonStr3)).toDS
val jsonDF = spark.read.json(ds.select($"json").rdd.map(r => r.getAs[String](0)).toDS)

jsonDF.show()

jsonDF.show
+--------------------+------------------+------------------+--------------------+
|                 bar|              cars|            daniel|              fruits|
+--------------------+------------------+------------------+--------------------+
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[,,, banana], [,...|
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[, banana,,], [,...|
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[banana,,,], [,,...|
+--------------------+------------------+------------------+--------------------+

I would like to add the column id from the Hive table, like this:

+--------------------+------------------+------------------+--------------------+---------------
|                 bar|              cars|            daniel|              fruits|  id
+--------------------+------------------+------------------+--------------------+--------------
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[,,, banana], [,...|1
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[, banana,,], [,...|2
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[banana,,,], [,,...|3
+--------------------+------------------+------------------+--------------------+

I will not use regular expressions

I created a udf which take this two fields as argument and using a proper JSON library include the desired field(id) and return a new JSON string. It works like a charm but I hope Spark API offers a better way to do it. I'm using Apache Spark 2.3.0.

Matteo Guarnerio
  • 720
  • 2
  • 9
  • 26
Mantovani
  • 500
  • 2
  • 7
  • 18

2 Answers2

3

I already knew about the from_json function before, but in my case it would be "impossible" to manually infer the schema for each JSON. I was thinking that Spark would have an "idiomatic" interface.

This is my final solution:

ds.select($"id", from_json($"json", jsonDF.schema).alias("_json_path")).select($"_json_path.*", $"id").show

ds.select($"id", from_json($"json", jsonDF.schema).alias("_json_path")).select($"_json_path.*", $"id").show

+--------------------+------------------+------------------+--------------------+---+
|                 bar|              cars|            daniel|              fruits| id|
+--------------------+------------------+------------------+--------------------+---+
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[,,, banana], [,...|  1|
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[, banana,,], [,...|  2|
|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[banana,,,], [,,...|  3|
+--------------------+------------------+------------------+--------------------+---+
Stornu2
  • 2,284
  • 3
  • 27
  • 47
Mantovani
  • 500
  • 2
  • 7
  • 18
2

One way would be to apply from_json to the JSON strings with the corresponding schema, as shown below:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

case class Foo(id: Int, json: String)

val df = Seq(Foo(1, jsonStr1), Foo(2, jsonStr2), Foo(3, jsonStr3)).toDF

val schema = StructType(Seq(
  StructField("bar", StructType(Seq(
    StructField("foo", StringType, true)
    )), true),
  StructField("cars", ArrayType(StringType, true), true),
  StructField("daniel", StringType, true),
  StructField("fruits", ArrayType(StructType(Seq(
    StructField("a", StringType, true),
    StructField("dt", StringType, true),
    StructField("fruid", StringType, true),
    StructField("fruit", StringType, true)
  )), true), true)
))

df.
  withColumn("json_col", from_json($"json", schema)).
  select($"id", $"json_col.*").
  show
// +---+--------------------+------------------+------------------+--------------------+
// | id|                 bar|              cars|            daniel|              fruits|
// +---+--------------------+------------------+------------------+--------------------+
// |  1|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[null,null,null,...|
// |  2|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[null,banana,nul...|
// |  3|[["daniel","pedro...|[montana, bagulho]|daniel data random|[[banana,null,nul...|
// +---+--------------------+------------------+------------------+--------------------+
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • I already knew this solution ,the JSON files are insanely huge, would be insane create the schema manually. I could use from_json if I Spark infer the schema automatic as it does with "spark.read.json". I was thinking in call spadk.read.json infer the schema and give as parameter to from_json. But I'm not sure it will be straightforward as sounds like, besides the override of the serialization. – Mantovani Mar 09 '19 at 15:52
  • ds.select($"id",from_json($"json",jsonDF.schema).alias("_json_path")).show – Mantovani Mar 09 '19 at 16:04
  • @Mantovani, you could certainly get the schema from your `jsonDF`, which itself would require additional transformations to generate. For a large dataset with complex JSON schema, perhaps it would be best to create a JSON file with one single row of the JSON data, perform a `spark.read.json` and apply `.schema` for its schema. – Leo C Mar 09 '19 at 16:14